inital commit

This commit is contained in:
shaonianzhentan 2022-11-15 11:12:27 +08:00
parent f0832c3c26
commit e237b7a6e0
15 changed files with 1878 additions and 2 deletions

View File

@ -1,2 +1,52 @@
# cloud_music
新版云音乐(测试)
# 云音乐(新版测试)
在Home Assistant里使用的网易云音乐插件
[![hacs_badge](https://img.shields.io/badge/Home-Assistant-%23049cdb)](https://www.home-assistant.io/)
[![hacs_badge](https://img.shields.io/badge/HACS-Custom-41BDF5.svg)](https://github.com/hacs/integration)
![visit](https://visitor-badge.laobi.icu/badge?page_id=shaonianzhentan.ha_cloud_music&left_text=visit)
## 安装
安装完成重启HA刷新一下页面在集成里搜索`云音乐`
[![Add Integration](https://my.home-assistant.io/badges/config_flow_start.svg)](https://my.home-assistant.io/redirect/config_flow_start?domain=ha_cloud_music)
## 使用
播放网易云音乐歌单 `cloudmusic://163/playlist?id=25724904`
- cloudmusic://163/playlist?id=歌单ID
播放网易云音乐电台 `cloudmusic://163/radio/playlist?id=1008`
- cloudmusic://163/radio/playlist?id=电台ID
播放网易云音乐歌手 `cloudmusic://163/artist/playlist?id=2116`
- cloudmusic://163/artist/playlist?id=歌手ID
播放喜马拉雅专辑 `cloudmusic://xmly/playlist?id=258244`
- cloudmusic://xmly/playlist?id=专辑ID
全网音乐搜索播放 `cloudmusic://search/name?kv=倒影 周杰伦`
- cloudmusic://search/name?kv=关键词
configuration.yaml
```yaml
homeassistant:
customize: !include customize.yaml
```
customize.yaml
```yaml
media_player.yun_yin_le:
media_player: media_player.源实体
```
## 如果这个项目对你有帮助,请我喝杯<del style="font-size: 14px;">咖啡</del>奶茶吧😘
|支付宝|微信|
|---|---|
<img src="https://ha.jiluxinqing.com/img/alipay.png" align="left" height="160" width="160" alt="支付宝" title="支付宝"> | <img src="https://ha.jiluxinqing.com/img/wechat.png" align="left" height="160" width="160" alt="微信支付" title="微信">
#### 关注我的微信订阅号了解更多HomeAssistant相关知识
<img src="https://ha.jiluxinqing.com/img/wechat-channel.png" height="160" alt="HomeAssistant家庭助理" title="HomeAssistant家庭助理">

View File

@ -0,0 +1,34 @@
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.const import CONF_URL
from .intent_script import async_register
from .const import PLATFORMS
from .manifest import manifest
from .http import HttpView
from .cloud_music import CloudMusic
DOMAIN = manifest.domain
CONFIG_SCHEMA = cv.deprecated(DOMAIN)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
data = entry.data
api_url = data.get(CONF_URL)
hass.data['cloud_music'] = CloudMusic(hass, api_url)
hass.http.register_view(HttpView)
hass.config_entries.async_setup_platforms(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener))
async_register(hass, entry.options.get('conversation', True))
return True
async def update_listener(hass, entry):
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@ -0,0 +1,749 @@
"""Support for media browsing."""
from enum import Enum
import logging, os, random
from urllib.parse import urlparse, parse_qs, parse_qsl, quote
from custom_components.ha_cloud_music.http_api import http_get
from .utils import parse_query
from homeassistant.components import media_source
from homeassistant.components.media_player import (
BrowseError, BrowseMedia,
async_process_play_media_url
)
from homeassistant.components.media_player.const import (
MEDIA_CLASS_ALBUM,
MEDIA_CLASS_ARTIST,
MEDIA_CLASS_CHANNEL,
MEDIA_CLASS_DIRECTORY,
MEDIA_CLASS_EPISODE,
MEDIA_CLASS_MOVIE,
MEDIA_CLASS_MUSIC,
MEDIA_CLASS_PLAYLIST,
MEDIA_CLASS_SEASON,
MEDIA_CLASS_TRACK,
MEDIA_CLASS_TV_SHOW,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_SEASON,
MEDIA_TYPE_TRACK,
MEDIA_TYPE_TVSHOW,
)
PLAYABLE_MEDIA_TYPES = [
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_TRACK,
]
CONTAINER_TYPES_SPECIFIC_MEDIA_CLASS = {
MEDIA_TYPE_ALBUM: MEDIA_CLASS_ALBUM,
MEDIA_TYPE_ARTIST: MEDIA_CLASS_ARTIST,
MEDIA_TYPE_PLAYLIST: MEDIA_CLASS_PLAYLIST,
MEDIA_TYPE_SEASON: MEDIA_CLASS_SEASON,
MEDIA_TYPE_TVSHOW: MEDIA_CLASS_TV_SHOW,
}
CHILD_TYPE_MEDIA_CLASS = {
MEDIA_TYPE_SEASON: MEDIA_CLASS_SEASON,
MEDIA_TYPE_ALBUM: MEDIA_CLASS_ALBUM,
MEDIA_TYPE_MUSIC: MEDIA_CLASS_MUSIC,
MEDIA_TYPE_ARTIST: MEDIA_CLASS_ARTIST,
MEDIA_TYPE_MOVIE: MEDIA_CLASS_MOVIE,
MEDIA_TYPE_PLAYLIST: MEDIA_CLASS_PLAYLIST,
MEDIA_TYPE_TRACK: MEDIA_CLASS_TRACK,
MEDIA_TYPE_TVSHOW: MEDIA_CLASS_TV_SHOW,
MEDIA_TYPE_CHANNEL: MEDIA_CLASS_CHANNEL,
MEDIA_TYPE_EPISODE: MEDIA_CLASS_EPISODE,
}
_LOGGER = logging.getLogger(__name__)
protocol = 'cloudmusic://'
cloudmusic_protocol = 'cloudmusic://163/'
xmly_protocol = 'cloudmusic://xmly/'
fm_protocol = 'cloudmusic://fm/'
qq_protocol = 'cloudmusic://qq/'
ting_protocol = 'cloudmusic://ting/'
search_protocol = 'cloudmusic://search/'
# 云音乐路由表
class CloudMusicRouter():
media_source = 'media-source://'
local_playlist = f'{protocol}local/playlist'
toplist = f'{cloudmusic_protocol}toplist'
playlist = f'{cloudmusic_protocol}playlist'
radio_playlist = f'{cloudmusic_protocol}radio/playlist'
artist_playlist = f'{cloudmusic_protocol}artist/playlist'
my_daily = f'{cloudmusic_protocol}my/daily'
my_recommend_resource = f'{cloudmusic_protocol}my/recommend_resource'
my_cloud = f'{cloudmusic_protocol}my/cloud'
my_created = f'{cloudmusic_protocol}my/created'
my_radio = f'{cloudmusic_protocol}my/radio'
my_artist = f'{cloudmusic_protocol}my/artist'
# 乐听头条
ting_homepage = f'{ting_protocol}homepage'
ting_playlist = f'{ting_protocol}playlist'
# 喜马拉雅
xmly_playlist = f'{xmly_protocol}playlist'
# FM
fm_channel = f'{fm_protocol}channel'
fm_playlist = f'{fm_protocol}playlist'
# 搜索名称
search_name = f'{search_protocol}name'
async def async_browse_media(media_player, media_content_type, media_content_id):
print(media_content_type, media_content_id)
hass = media_player.hass
cloud_music = hass.data['cloud_music']
# 媒体库
if media_content_id is not None and media_content_id.startswith(CloudMusicRouter.media_source):
if media_content_id.startswith(CloudMusicRouter.media_source + '?title='):
media_content_id = None
return await media_source.async_browse_media(
hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
# 主界面
if media_content_id in [None, protocol]:
children = [
{
'title': '播放列表',
'path': CloudMusicRouter.local_playlist,
'type': MEDIA_TYPE_PLAYLIST
},
{
'title': '媒体库',
'path': CloudMusicRouter.media_source,
'type': MEDIA_TYPE_PLAYLIST,
'thumbnail': 'https://brands.home-assistant.io/_/media_source/icon.png'
},
{
'title': '榜单',
'path': CloudMusicRouter.toplist,
'type': MEDIA_TYPE_ALBUM,
'thumbnail': 'http://p2.music.126.net/pcYHpMkdC69VVvWiynNklA==/109951166952713766.jpg'
}
]
# 当前登录用户
if cloud_music.userinfo.get('uid') is not None:
children.extend([
{
'title': '每日推荐歌曲',
'path': CloudMusicRouter.my_daily,
'type': MEDIA_TYPE_MUSIC
},{
'title': '每日推荐歌单',
'path': CloudMusicRouter.my_recommend_resource,
'type': MEDIA_TYPE_ALBUM
},{
'title': '我的云盘',
'path': CloudMusicRouter.my_cloud,
'type': MEDIA_TYPE_ALBUM,
'thumbnail': 'http://p3.music.126.net/ik8RFcDiRNSV2wvmTnrcbA==/3435973851857038.jpg'
},{
'title': '我的歌单',
'path': CloudMusicRouter.my_created,
'type': MEDIA_TYPE_ALBUM,
'thumbnail': 'https://p2.music.126.net/tGHU62DTszbFQ37W9qPHcg==/2002210674180197.jpg'
},{
'title': '我的电台',
'path': CloudMusicRouter.my_radio,
'type': MEDIA_TYPE_SEASON
},{
'title': '我的歌手',
'path': CloudMusicRouter.my_artist,
'type': MEDIA_TYPE_ARTIST,
#'thumbnail': 'http://p1.music.126.net/9M-U5gX1gccbuBXZ6JnTUg==/109951165264087991.jpg'
}
])
# 扩展资源
children.extend([
{
'title': '新闻快讯',
'path': CloudMusicRouter.ting_homepage,
'type': MEDIA_TYPE_ALBUM,
'thumbnail': 'https://p1.music.126.net/ilcqG4jS0GJgAlLs9BCz0g==/109951166709733089.jpg'
},{
'title': 'FM电台',
'path': CloudMusicRouter.fm_channel,
'type': MEDIA_TYPE_CHANNEL
}
])
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=protocol,
media_content_type=MEDIA_TYPE_CHANNEL,
title="云音乐",
can_play=False,
can_expand=True,
children=[],
)
for item in children:
title = item['title']
media_content_type = item['type']
media_content_id = item['path'] + f'?title={quote(title)}'
thumbnail = item.get('thumbnail')
if thumbnail is not None:
thumbnail = cloud_music.netease_image_url(thumbnail)
library_info.children.append(
BrowseMedia(
title=title,
media_class=CHILD_TYPE_MEDIA_CLASS[media_content_type],
media_content_type=media_content_type,
media_content_id=media_content_id,
can_play=False,
can_expand=True,
thumbnail=thumbnail
)
)
return library_info
# 判断是否云音乐协议
if media_content_id.startswith(protocol) == False:
return None
# 协议转换
url = urlparse(media_content_id)
query = parse_query(url.query)
title = query['title']
id = query.get('id')
if media_content_id.startswith(CloudMusicRouter.local_playlist):
# 本地播放列表
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=False,
can_expand=False,
children=[],
)
playlist = [] if hasattr(media_player, 'playlist') == False else media_player.playlist
for index, item in enumerate(playlist):
title = item.song
if not item.singer:
title = f'{title} - {item.singer}'
library_info.children.append(
BrowseMedia(
title=title,
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=item.thumbnail
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_daily):
# 每日推荐
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_get_dailySongs()
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=music_info.song,
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_cloud):
# 我的云盘
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_get_cloud()
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=music_info.song,
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_created):
# 我创建的歌单
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=False,
can_expand=False,
children=[],
)
uid = cloud_music.userinfo.get('uid')
res = await cloud_music.netease_cloud_music(f'/user/playlist?uid={uid}')
for item in res['playlist']:
library_info.children.append(
BrowseMedia(
title=item.get('name'),
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=MEDIA_TYPE_MUSIC,
media_content_id=f"{CloudMusicRouter.playlist}?title={quote(item['name'])}&id={item['id']}",
can_play=False,
can_expand=True,
thumbnail=cloud_music.netease_image_url(item['coverImgUrl'])
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_radio):
# 收藏的电台
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=False,
can_expand=False,
children=[],
)
res = await cloud_music.netease_cloud_music('/dj/sublist')
for item in res['djRadios']:
library_info.children.append(
BrowseMedia(
title=item.get('name'),
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{CloudMusicRouter.radio_playlist}?title={quote(item['name'])}&id={item['id']}",
can_play=False,
can_expand=True,
thumbnail=cloud_music.netease_image_url(item['picUrl'])
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.radio_playlist):
# 电台音乐列表
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_get_djradio(id)
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=music_info.song,
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_artist):
# 收藏的歌手
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=False,
can_expand=False,
children=[],
)
res = await cloud_music.netease_cloud_music('/artist/sublist')
for item in res['data']:
library_info.children.append(
BrowseMedia(
title=item['name'],
media_class=MEDIA_CLASS_ARTIST,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{cloudmusic_protocol}my/artist/playlist?title={quote(item['name'])}&id={item['id']}",
can_play=False,
can_expand=True,
thumbnail=cloud_music.netease_image_url(item['picUrl'])
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.artist_playlist):
# 歌手音乐列表
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_get_artists(id)
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=music_info.song,
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.my_recommend_resource):
# 每日推荐歌单
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_CLASS_TRACK,
title=title,
can_play=False,
can_expand=True,
children=[],
)
res = await cloud_music.netease_cloud_music('/recommend/resource')
for item in res['recommend']:
library_info.children.append(
BrowseMedia(
title=item['name'],
media_class=MEDIA_CLASS_PLAYLIST,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{CloudMusicRouter.playlist}?title={quote(item['name'])}&id={item['id']}",
can_play=False,
can_expand=True,
thumbnail=cloud_music.netease_image_url(item['picUrl'])
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.toplist):
# 排行榜
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_CLASS_TRACK,
title=title,
can_play=False,
can_expand=True,
children=[],
)
res = await cloud_music.netease_cloud_music('/toplist')
for item in res['list']:
library_info.children.append(
BrowseMedia(
title=item['name'],
media_class=MEDIA_CLASS_PLAYLIST,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{CloudMusicRouter.playlist}?title={quote(item['name'])}&id={item['id']}",
can_play=False,
can_expand=True,
thumbnail=cloud_music.netease_image_url(item['coverImgUrl'])
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.playlist):
# 歌单列表
library_info = BrowseMedia(
media_class=MEDIA_CLASS_PLAYLIST,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_get_playlist(id)
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=f'{music_info.song} - {music_info.singer}',
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
#================= 乐听头条
if media_content_id.startswith(CloudMusicRouter.ting_homepage):
children = [
{
'id': 'f3f5a6d2-5557-4555-be8e-1da281f97c22',
'title': '热点'
},
{
'id': 'd8e89746-1e66-47ad-8998-1a41ada3beee',
'title': '社会'
},
{
'id': '4905d954-5a85-494a-bd8c-7bc3e1563299',
'title': '国际'
},
{
'id': 'fc583bff-e803-44b6-873a-50743ce7a1e9',
'title': '国内'
},
{
'id': 'c7467c00-463d-4c93-b999-7bbfc86ec2d4',
'title': '体育'
},
{
'id': '75564ed6-7b68-4922-b65b-859ea552422c',
'title': '娱乐'
},
{
'id': 'c6bc8af2-e1cc-4877-ac26-bac1e15e0aa9',
'title': '财经'
},
{
'id': 'f5cff467-2d78-4656-9b72-8e064c373874',
'title': '科技'
},
{
'id': 'ba89c581-7b16-4d25-a7ce-847a04bc9d91',
'title': '军事'
},
{
'id': '40f31d9d-8af8-4b28-a773-2e8837924e2e',
'title': '生活'
},
{
'id': '0dee077c-4956-41d3-878f-f2ab264dc379',
'title': '教育'
},
{
'id': '5c930af2-5c8a-4a12-9561-82c5e1c41e48',
'title': '汽车'
},
{
'id': 'f463180f-7a49-415e-b884-c6832ba876f0',
'title': '人文'
},
{
'id': '8cae0497-4878-4de9-b3fe-30518e2b6a9f',
'title': '旅游'
}
]
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_CHANNEL,
title=title,
can_play=False,
can_expand=False,
children=[],
)
for item in children:
title = item['title']
library_info.children.append(
BrowseMedia(
title=title,
media_class=CHILD_TYPE_MEDIA_CLASS[MEDIA_TYPE_EPISODE],
media_content_type=MEDIA_TYPE_EPISODE,
media_content_id=f'{CloudMusicRouter.ting_playlist}?title={quote(title)}&id=' + item['id'],
can_play=True,
can_expand=False
)
)
return library_info
#================= FM
if media_content_id.startswith(CloudMusicRouter.fm_channel):
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_CHANNEL,
title=title,
can_play=False,
can_expand=False,
children=[],
)
result = await http_get('https://rapi.qingting.fm/categories?type=channel')
data = result['Data']
for item in data:
title = item['title']
library_info.children.append(
BrowseMedia(
title=title,
media_class=CHILD_TYPE_MEDIA_CLASS[MEDIA_TYPE_CHANNEL],
media_content_type=MEDIA_TYPE_CHANNEL,
media_content_id=f'{CloudMusicRouter.fm_playlist}?title={quote(title)}&id={item["id"]}',
can_play=False,
can_expand=True
)
)
return library_info
if media_content_id.startswith(CloudMusicRouter.fm_playlist):
library_info = BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=media_content_id,
media_content_type=MEDIA_TYPE_PLAYLIST,
title=title,
can_play=True,
can_expand=False,
children=[],
)
playlist = await cloud_music.async_fm_playlist(id)
for index, music_info in enumerate(playlist):
library_info.children.append(
BrowseMedia(
title=f'{music_info.song} - {music_info.singer}',
media_class=MEDIA_CLASS_MUSIC,
media_content_type=MEDIA_TYPE_PLAYLIST,
media_content_id=f"{media_content_id}&index={index}",
can_play=True,
can_expand=False,
thumbnail=music_info.thumbnail
)
)
return library_info
#================= 喜马拉雅
''' ================== 播放音乐 ================== '''
async def async_play_media(media_player, cloud_music, media_content_id):
print(media_content_id)
hass = media_player.hass
# 媒体库
if media_source.is_media_source_id(media_content_id):
play_item = await media_source.async_resolve_media(
hass, media_content_id, media_player.entity_id
)
return async_process_play_media_url(hass, play_item.url)
# 判断是否云音乐协议
if media_content_id.startswith(protocol) == False:
return
# 协议转换
url = urlparse(media_content_id)
query = parse_query(url.query)
playlist = None
# 通用索引
playindex = int(query.get('index', 0))
# 通用ID
id = query.get('id')
# 通用搜索关键词
keywords = query.get('kv')
if media_content_id.startswith(CloudMusicRouter.local_playlist):
media_player.playindex = playindex
return 'index'
if media_content_id.startswith(CloudMusicRouter.playlist):
playlist = await cloud_music.async_get_playlist(id)
elif media_content_id.startswith(CloudMusicRouter.my_daily):
playlist = await cloud_music.async_get_dailySongs()
elif media_content_id.startswith(CloudMusicRouter.my_cloud):
playlist = await cloud_music.async_get_cloud()
elif media_content_id.startswith(CloudMusicRouter.artist_playlist):
playlist = await cloud_music.async_get_artists(id)
elif media_content_id.startswith(CloudMusicRouter.radio_playlist):
playlist = await cloud_music.async_get_djradio(id)
elif media_content_id.startswith(CloudMusicRouter.ting_playlist):
playlist = await cloud_music.async_ting_playlist(id)
elif media_content_id.startswith(CloudMusicRouter.xmly_playlist):
page = query.get('page', 1)
size = query.get('size', 50)
asc = query.get('asc', 1)
playlist = await cloud_music.async_xmly_playlist(id, page, size, asc)
elif media_content_id.startswith(CloudMusicRouter.fm_playlist):
page = query.get('page', 1)
size = query.get('size', 200)
playlist = await cloud_music.async_fm_playlist(id, page, size)
elif media_content_id.startswith(CloudMusicRouter.search_name):
playlist = await cloud_music.async_search_song(keywords)
if playlist is not None:
media_player.playindex = playindex
media_player.playlist = playlist
return 'playlist'
# 上一曲
async def async_media_previous_track(media_player, shuffle=False):
if hasattr(media_player, 'playlist') == False:
return
playlist = media_player.playlist
count = len(playlist)
# 随机
if shuffle:
playindex = random.randint(0, count - 1)
else:
if count <= 1:
return
playindex = media_player.playindex - 1
if playindex < 0:
playindex = count - 1
media_player.playindex = playindex
await media_player.async_play_media(MEDIA_TYPE_MUSIC, playlist[playindex].url)
# 下一曲
async def async_media_next_track(media_player, shuffle=False):
if hasattr(media_player, 'playlist') == False:
return
playindex = media_player.playindex + 1
playlist = media_player.playlist
count = len(playlist)
# 随机
if shuffle:
playindex = random.randint(0, count - 1)
else:
if playindex >= len(playlist):
playindex = 0
media_player.playindex = playindex
await media_player.async_play_media(MEDIA_TYPE_MUSIC, playlist[playindex].url)

View File

@ -0,0 +1,371 @@
from unittest import result
import uuid, time, json, os, random, aiohttp
from urllib.parse import quote
from homeassistant.helpers.network import get_url
from .http_api import http_get, http_cookie
from .models.music_info import MusicInfo, MusicSource
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.util.json import load_json, save_json
from .browse_media import (
async_browse_media,
async_play_media,
async_media_previous_track,
async_media_next_track
)
class CloudMusic():
def __init__(self, hass, url) -> None:
self.hass = hass
self.api_url = url.strip('/')
# 媒体资源
self.async_browse_media = async_browse_media
self.async_play_media = async_play_media
self.async_media_previous_track = async_media_previous_track
self.async_media_next_track = async_media_next_track
self.userinfo = {}
# 读取用户信息
self.userinfo_filepath = self.get_storage_dir('cloud_music.userinfo')
if os.path.exists(self.userinfo_filepath):
self.userinfo = load_json(self.userinfo_filepath)
def get_storage_dir(self, file_name):
return os.path.abspath(f'{STORAGE_DIR}/{file_name}')
def netease_image_url(self, url, size=200):
return f'{url}?param={size}y{size}'
# 登录
async def login(self, username, password):
login_url = f'{self.api_url}/login'
if username.count('@') > 0:
login_url = login_url + '?email='
else:
login_url = login_url + '/cellphone?phone='
data = await http_cookie(login_url + f'{quote(username)}&password={quote(password)}')
res_data = data.get('data', {})
# 登录成功
if res_data.get('code') == 200:
# 写入cookie
uid = res_data['account']['id']
cookie = data.get('cookie')
self.userinfo = {
'uid': uid,
'cookie': cookie
}
save_json(self.userinfo_filepath, self.userinfo)
return res_data
else:
print(res_data)
# 获取播放链接
def get_play_url(self, id, song, singer, source):
base_url = get_url(self.hass, prefer_external=True)
if singer is None:
singer = ''
return f'{base_url}/cloud_music/url?id={id}&song={quote(song)}&singer={quote(singer)}&source={source}'
# 网易云音乐接口
async def netease_cloud_music(self, url):
return await http_get(self.api_url + url, self.userinfo.get('cookie', {}))
# 获取音乐链接
async def song_url(self, id):
res = await self.netease_cloud_music(f'/song/url/v1?id={id}&level=standard')
data = res['data'][0]
url = data['url']
# 0免费
# 1收费
fee = 0 if data['freeTrialInfo'] is None else 1
return url, fee
# 获取云盘音乐链接
async def cloud_song_url(self, id):
res = await self.netease_cloud_music(f'/user/cloud')
filter_list = list(filter(lambda x:x['simpleSong']['id'] == id, res['data']))
if len(filter_list) > 0:
songId = filter_list[0]['songId']
url, fee = await self.song_url(songId)
return url
# 获取歌单列表
async def async_get_playlist(self, playlist_id):
res = await self.netease_cloud_music(f'/playlist/track/all?id={playlist_id}')
def format_playlist(item):
id = item['id']
song = item['name']
singer = item['ar'][0].get('name', '')
album = item['al']['name']
duration = item['dt']
url = self.get_play_url(id, song, singer, MusicSource.PLAYLIST.value)
picUrl = item['al'].get('picUrl', 'https://p2.music.126.net/fL9ORyu0e777lppGU3D89A==/109951167206009876.jpg')
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.PLAYLIST.value)
return music_info
return list(map(format_playlist, res['songs']))
# 获取电台列表
async def async_get_djradio(self, rid):
res = await self.netease_cloud_music(f'/dj/program?rid={rid}&limit=200')
def format_playlist(item):
mainSong = item['mainSong']
id = mainSong['id']
song = mainSong['name']
singer = mainSong['artists'][0]['name']
album = item['dj']['brand']
duration = mainSong['duration']
url = self.get_play_url(id, song, singer, MusicSource.DJRADIO.value)
picUrl = item['coverUrl']
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.DJRADIO.value)
return music_info
return list(map(format_playlist, res['programs']))
# 获取歌手列表
async def async_get_artists(self, aid):
res = await self.netease_cloud_music(f'/artists?id={aid}')
def format_playlist(item):
id = item['id']
song = item['name']
singer = item['ar'][0]['name']
album = item['al']['name']
duration = item['dt']
url = self.get_play_url(id, song, singer, MusicSource.ARTISTS.value)
picUrl = res['artist']['picUrl']
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.ARTISTS.value)
return music_info
return list(map(format_playlist, res['hotSongs']))
# 获取云盘音乐
async def async_get_cloud(self):
res = await self.netease_cloud_music('/user/cloud')
def format_playlist(item):
id = item['songId']
song = ''
singer = ''
duration = ''
album = ''
picUrl = 'http://p3.music.126.net/ik8RFcDiRNSV2wvmTnrcbA==/3435973851857038.jpg'
simpleSong = item.get('simpleSong')
if simpleSong is not None:
song = simpleSong.get("name")
duration = simpleSong.get("dt")
al = simpleSong.get('al')
if al is not None:
picUrl = al.get('picUrl')
album = al.get('name')
ar = simpleSong.get('ar')
if ar is not None and len(ar) > 0:
singer = ar[0].get('name', '')
if singer is None:
singer = ''
url = self.get_play_url(id, song, singer, MusicSource.CLOUD.value)
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.CLOUD.value)
return music_info
return list(map(format_playlist, res['data']))
# 获取每日推荐歌曲
async def async_get_dailySongs(self):
res = await self.netease_cloud_music('/recommend/songs')
def format_playlist(item):
id = item['id']
song = item['name']
singer = item['ar'][0]['name']
album = item['al']['name']
duration = item['dt']
url = self.get_play_url(id, song, singer, MusicSource.PLAYLIST.value)
picUrl = item['al'].get('picUrl', 'https://p2.music.126.net/fL9ORyu0e777lppGU3D89A==/109951167206009876.jpg')
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.PLAYLIST.value)
return music_info
return list(map(format_playlist, res['data']['dailySongs']))
# 乐听头条
async def async_ting_playlist(self, catalog_id):
now = int(time.time())
if hasattr(self, 'letingtoutiao') == False:
uid = uuid.uuid4().hex
self.letingtoutiao = {
'time': now,
'headers': {"uid": uid, "logid": uid, "token": ''}
}
headers = self.letingtoutiao['headers']
async with aiohttp.ClientSession() as session:
# 获取token
if headers['token'] == '' or now > self.letingtoutiao['time']:
async with session.get('https://app.leting.io/app/auth?uid=' +
uid + '&appid=a435325b8662a4098f615a7d067fe7b8&ts=1628297581496&sign=4149682cf40c2bf2efcec8155c48b627&v=v9&channel=huawei',
headers=headers) as res:
r = await res.json()
token = r['data']['token']
headers['token'] = token
# 保存时间10分钟重新获取token
self.letingtoutiao['time'] = now + 60 * 10
self.letingtoutiao['headers']['token'] = token
# 获取播放列表
async with session.get('https://app.leting.io/app/url/channel?catalog_id=' +
catalog_id + '&size=100&distinct=1&v=v8&channel=xiaomi', headers=headers) as res:
r = await res.json()
def format_playlist(item):
id = item['sid']
song = item['title']
singer = item['source']
album = item['catalog_name']
duration = item['duration']
url = item['audio']
picUrl = item['source_icon']
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.URL.value)
return music_info
return list(map(format_playlist, r['data']['data']))
# 喜马拉雅
async def async_xmly_playlist(self, id, page=1, size=50, asc=1):
if page < 1:
page = 1
isAsc = 'true' if asc != 1 else 'false'
url = f'https://mobile.ximalaya.com/mobile/v1/album/track?albumId={id}&isAsc={isAsc}&pageId={page}&pageSize={size}'
result = await http_get(url)
if result['ret'] == 0:
_list = result['data']['list']
_totalCount = result['data']['totalCount']
if len(_list) > 0:
# 获取专辑名称
trackId = _list[0]['trackId']
url = f'http://mobile.ximalaya.com/v1/track/baseInfo?trackId={trackId}'
album_result = await http_get(url)
# 格式化列表
def format_playlist(item):
id = item['trackId']
song = item['title']
singer = item['nickname']
album = album_result['albumTitle']
duration = item['duration']
url = item['playUrl64']
picUrl = item['coverLarge']
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.XIMALAYA.value)
return music_info
return list(map(format_playlist, _list))
# FM
async def async_fm_playlist(self, id, page=1, size=100):
result = await http_get(f'https://rapi.qingting.fm/categories/{id}/channels?with_total=true&page={page}&pagesize={size}')
data = result['Data']
# 格式化列表
def format_playlist(item):
id = item['content_id']
song = item['title']
album = item['categories'][0]['title']
singer = album
duration = item['audience_count']
url = f'http://lhttp.qingting.fm/live/{id}/64k.mp3'
picUrl = item['cover']
nowplaying = item.get('nowplaying')
if nowplaying is not None:
singer = nowplaying.get('title', song)
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.URL.value)
return music_info
return list(map(format_playlist, data['items']))
# 音乐搜索
async def async_search_song(self, name):
ha_music_source = self.hass.data.get('ha_music_source')
if ha_music_source is not None:
music_list = await ha_music_source.async_search_all(name)
# 格式化列表
def format_playlist(item):
id = item['id']
song = item['song']
album = item['album']
singer = item['singer']
duration = 0
url = item['url']
picUrl = self.netease_image_url('http://p1.music.126.net/6nuYK0CVBFE3aslWtsmCkQ==/109951165472872790.jpg')
music_info = MusicInfo(id, song, singer, album, duration, url, picUrl, MusicSource.URL.value)
return music_info
return list(map(format_playlist, music_list))
# 电台
async def async_search_djradio(self, name):
_list = []
res = await self.netease_cloud_music(f'/search?keywords={name}&type=1009')
if res['code'] == 200:
_list = list(map(lambda item: {
"id": item['id'],
"name": item['name'],
"cover": item['picUrl'],
"intro": item['dj']['signature'],
"creator": item['dj']['nickname'],
"source": MusicSource.DJRADIO.value
}, res['result']['djRadios']))
return _list
# 喜马拉雅
async def async_search_xmly(self, name):
_list = []
url = f'https://m.ximalaya.com/m-revision/page/search?kw={name}&core=all&page=1&rows=5'
res = await http_get(url)
if res['ret'] == 0:
result = res['data']['albumViews']
if result['total'] > 0:
_list = list(map(lambda item: {
"id": item['albumInfo']['id'],
"name": item['albumInfo']['title'],
"cover": item['albumInfo'].get('cover_path', 'https://imagev2.xmcdn.com/group79/M02/77/6C/wKgPEF6masWTCICAAAA7qPQDtNY545.jpg!strip=1&quality=7&magick=webp&op_type=5&upload_type=cover&name=web_large&device_type=ios'),
"intro": item['albumInfo']['intro'],
"creator": item['albumInfo']['nickname'],
"source": MusicSource.XIMALAYA.value
}, result['albums']))
return _list
# 歌单
async def async_search_playlist(self, name):
_list = []
res = await self.netease_cloud_music(f'/search?keywords={name}&type=1000')
if res['code'] == 200:
_list = list(map(lambda item: {
"id": item['id'],
"name": item['name'],
"cover": item['coverImgUrl'],
"intro": item['description'],
"creator": item['creator']['nickname'],
"source": MusicSource.PLAYLIST.value
}, res['result']['playlists']))
return _list
# 歌手
async def async_search_singer(self, name):
_list = []
res = await self.netease_cloud_music(f'/search?keywords={name}&type=100')
if res['code'] == 200:
_list = list(map(lambda item: {
"id": item['id'],
"name": item['name'],
"cover": item['picUrl'],
"intro": ''.join(item['alias']),
"creator": item['name'],
"source": MusicSource.ARTISTS.value
}, res['result']['artists']))
return _list

View File

@ -0,0 +1,77 @@
from __future__ import annotations
from typing import Any
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, OptionsFlow, ConfigEntry
from homeassistant.data_entry_flow import FlowResult
from homeassistant.const import CONF_URL, CONF_USERNAME, CONF_PASSWORD
from homeassistant.helpers.storage import STORAGE_DIR
from urllib.parse import quote
from homeassistant.core import callback
import os
from .manifest import manifest
from .http_api import http_cookie
from homeassistant.util.json import save_json
DOMAIN = manifest.domain
class SimpleConfigFlow(ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle the initial step."""
if self._async_current_entries():
return self.async_abort(reason="single_instance_allowed")
errors = {}
if user_input is not None:
url = user_input.get(CONF_URL).strip('/')
user_input[CONF_URL] = url
return self.async_create_entry(title=DOMAIN, data=user_input)
else:
user_input = {}
DATA_SCHEMA = vol.Schema({
vol.Required(CONF_URL, default=user_input.get(CONF_URL)): str
})
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA, errors=errors)
@staticmethod
@callback
def async_get_options_flow(entry: ConfigEntry):
return OptionsFlowHandler(entry)
class OptionsFlowHandler(OptionsFlow):
def __init__(self, config_entry: ConfigEntry):
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
return await self.async_step_user(user_input)
async def async_step_user(self, user_input=None):
options = self.config_entry.options
errors = {}
if user_input is not None:
username = user_input.get(CONF_USERNAME)
password = user_input.get(CONF_PASSWORD)
cloud_music = self.hass.data['cloud_music']
result = await cloud_music.login(username, password)
if result is not None:
return self.async_create_entry(title='', data=user_input)
else:
errors['base'] = 'login_failed'
DATA_SCHEMA = vol.Schema({
vol.Required(CONF_USERNAME, default=options.get(CONF_USERNAME)): str,
vol.Required(CONF_PASSWORD, default=options.get(CONF_PASSWORD)): str,
vol.Required("conversation", default=options.get("conversation", True)): bool
})
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA, errors=errors)

View File

@ -0,0 +1 @@
PLATFORMS = ["media_player"]

View File

@ -0,0 +1,67 @@
import os, json
from urllib.parse import parse_qsl, quote
from homeassistant.components.http import HomeAssistantView
from aiohttp import web
from .models.music_info import MusicSource
from .manifest import manifest
DOMAIN = manifest.domain
class HttpView(HomeAssistantView):
url = "/cloud_music/url"
name = f"cloud_music:url"
requires_auth = False
async def get(self, request):
hass = request.app["hass"]
cloud_music = hass.data['cloud_music']
query = request.query
print(query)
id = query.get('id')
source = query.get('source')
song = query.get('song')
singer = query.get('singer')
not_found_tips = quote(f'当前没有找到编号是{id},歌名为{song},作者是{singer}的播放链接')
play_url = f'https://fanyi.baidu.com/gettts?lan=zh&text={not_found_tips}&spd=5&source=web'
if id is None or source is None:
return web.HTTPFound(play_url)
source = int(source)
if source == MusicSource.PLAYLIST.value \
or source == MusicSource.ARTISTS.value \
or source == MusicSource.DJRADIO.value \
or source == MusicSource.CLOUD.value:
# 获取播放链接
url, fee = await cloud_music.song_url(id)
if url is not None:
# 收费音乐
if fee == 1:
result = await self.async_music_source(hass, song, singer)
if result is not None:
url = result
play_url = url
else:
# 从云盘里获取
url = await cloud_music.cloud_song_url(id)
if url is not None:
play_url = url
else:
result = await self.async_music_source(hass, song, singer)
if result is not None:
play_url = result
print(play_url)
# 重定向到可播放链接
return web.HTTPFound(play_url)
async def async_music_source(self, hass, song, singer):
# 使用全网音乐搜索
ha_music_source = hass.data['ha_music_source']
if ha_music_source is not None:
return await ha_music_source.async_song_url(song, singer)

View File

@ -0,0 +1,42 @@
import json, aiohttp
from urllib.parse import urlparse
# 全局请求头
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36 Edg/105.0.1343.50'
}
# 获取cookie
async def http_cookie(url):
COOKIES = {'os': 'osx'}
jar = aiohttp.CookieJar(unsafe=True)
location = urlparse(url)
location_orgin = f'{location.scheme}://{location.netloc}'
async with aiohttp.ClientSession(headers=HEADERS, cookies=COOKIES, cookie_jar=jar) as session:
async with session.get(url) as resp:
cookies = session.cookie_jar.filter_cookies(location_orgin)
for key, cookie in cookies.items():
COOKIES[key] = cookie.value
result = await resp.json()
return {
'cookie': COOKIES,
'data': result
}
async def http_get(url, COOKIES={}):
print(url)
headers = {'Referer': url, **HEADERS}
jar = aiohttp.CookieJar(unsafe=True)
async with aiohttp.ClientSession(headers=headers, cookies=COOKIES, cookie_jar=jar) as session:
async with session.get(url) as resp:
# 喜马拉雅返回的是文本内容
if 'https://mobile.ximalaya.com/mobile/' in url:
result = json.loads(await resp.text())
else:
result = await resp.json()
return result
async def http_code(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return response.status

View File

@ -0,0 +1,93 @@
import asyncio
from homeassistant.helpers import intent
import homeassistant.helpers.config_validation as cv
import re
def async_register(hass, conversation):
if conversation:
intent.async_register(hass, CloudMusicIntent())
hass.components.conversation.async_register(
"CloudMusicIntent",
[
"播放专辑{album}",
"播放电台{radio}",
"播放歌单{playlist}",
"播放歌曲{song}",
"播放{singer}的歌"
]
)
class CloudMusicIntent(intent.IntentHandler):
intent_type = "CloudMusicIntent"
slot_schema = {
'playlist': cv.string,
'album': cv.string,
'radio': cv.string,
'singer': cv.string,
'song': cv.string
}
@asyncio.coroutine
async def async_handle(self, intent_obj):
hass = intent_obj.hass
slots = intent_obj.slots
cloud_music = hass.data['cloud_music']
playlist = slots.get('playlist')
album = slots.get('album')
radio = slots.get('radio')
singer = slots.get('singer')
song = slots.get('song')
media_content_id = None
# 专辑搜索
if album is not None:
_list = await cloud_music.async_search_xmly(album["value"])
if len(_list) > 0:
media_content_id = f'cloudmusic://xmly/playlist?id={_list[0]["id"]}'
# 电台搜索
if radio is not None:
_list = await cloud_music.async_search_djradio(radio["value"])
if len(_list) > 0:
media_content_id = f'cloudmusic://163/radio/playlist?id={_list[0]["id"]}'
# 歌单搜索
if playlist is not None:
_list = await cloud_music.async_search_playlist(playlist["value"])
if len(_list) > 0:
media_content_id = f'cloudmusic://163/playlist?id={_list[0]["id"]}'
# 歌曲
if song is not None:
media_content_id = f'cloudmusic://search/name?kv={song["value"]}'
# 歌手搜索
if singer is not None:
_list = await cloud_music.async_search_singer(singer["value"])
if len(_list) > 0:
media_content_id = f'cloudmusic://163/artist/playlist?id={_list[0]["id"]}'
if media_content_id is None:
message = '没有找到对应的资源'
states = hass.states.async_all()
for state in states:
domain = state.domain
platform = state.attributes.get('platform')
friendly_name = state.attributes.get('friendly_name')
if state.domain == 'media_player' and platform == 'cloud_music':
hass.async_create_task(hass.services.async_call(domain, 'play_media', {
'entity_id': state.entity_id,
'media_content_id': media_content_id,
'media_content_type': 'music'
}))
message = f"正在{friendly_name}{intent_obj.text_input}"
break
response = intent_obj.create_response()
response.async_set_speech(message)
return response

View File

@ -0,0 +1,12 @@
{
"domain": "ha_cloud_music",
"name": "\u4E91\u97F3\u4E50",
"version": "2022.11.7",
"config_flow": true,
"documentation": "https://github.com/shaonianzhentan/ha_cloud_music",
"requirements": [],
"codeowners": [
"@shaonianzhentan"
],
"iot_class": "cloud_polling"
}

View File

@ -0,0 +1,25 @@
import os
from homeassistant.util.json import load_json
def custom_components_path(file_path):
return os.path.abspath('./custom_components/' + file_path)
class Manifest():
def __init__(self, domain):
self.domain = domain
self.manifest_path = custom_components_path(f'{domain}/manifest.json')
self.update()
@property
def remote_url(self):
return 'https://gitee.com/shaonianzhentan/ha_cloud_music/raw/dev/custom_components/ha_cloud_music/manifest.json'
def update(self):
data = load_json(self.manifest_path, {})
self.domain = data.get('domain')
self.name = data.get('name')
self.version = data.get('version')
self.documentation = data.get('documentation')
manifest = Manifest('ha_cloud_music')

View File

@ -0,0 +1,243 @@
import logging, time, datetime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.config_entries import ConfigEntry
from homeassistant.helpers.event import track_time_interval
from homeassistant.components.media_player import MediaPlayerEntity, MediaPlayerDeviceClass
from homeassistant.components.media_player.const import (
SUPPORT_BROWSE_MEDIA,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_STEP,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_MUTE,
SUPPORT_SELECT_SOURCE,
SUPPORT_SELECT_SOUND_MODE,
SUPPORT_PLAY_MEDIA,
SUPPORT_PLAY,
SUPPORT_PAUSE,
SUPPORT_SEEK,
SUPPORT_CLEAR_PLAYLIST,
SUPPORT_SHUFFLE_SET,
SUPPORT_REPEAT_SET,
SUPPORT_NEXT_TRACK,
SUPPORT_PREVIOUS_TRACK,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_EPISODE,
MEDIA_TYPE_MOVIE,
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_SEASON,
MEDIA_TYPE_TRACK,
MEDIA_TYPE_TVSHOW,
)
from homeassistant.const import (
CONF_TOKEN,
CONF_URL,
CONF_NAME,
STATE_OFF,
STATE_ON,
STATE_PLAYING,
STATE_PAUSED,
STATE_IDLE,
STATE_UNAVAILABLE
)
from .manifest import manifest
DOMAIN = manifest.domain
_LOGGER = logging.getLogger(__name__)
SUPPORT_FEATURES = SUPPORT_VOLUME_STEP | SUPPORT_VOLUME_MUTE | SUPPORT_VOLUME_SET | \
SUPPORT_SELECT_SOURCE | SUPPORT_SELECT_SOUND_MODE | \
SUPPORT_PLAY_MEDIA | SUPPORT_PLAY | SUPPORT_PAUSE | SUPPORT_PREVIOUS_TRACK | SUPPORT_NEXT_TRACK | \
SUPPORT_BROWSE_MEDIA | SUPPORT_SEEK | SUPPORT_CLEAR_PLAYLIST | SUPPORT_SHUFFLE_SET | SUPPORT_REPEAT_SET
# 定时器时间
TIME_BETWEEN_UPDATES = datetime.timedelta(seconds=2)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
media_player = CloudMusicMediaPlayer(hass)
await hass.async_add_executor_job(track_time_interval, hass, media_player.interval, TIME_BETWEEN_UPDATES)
async_add_entities([ media_player ], True)
class CloudMusicMediaPlayer(MediaPlayerEntity):
def __init__(self, hass):
self.hass = hass
self._attributes = {
'platform': 'cloud_music'
}
# fixed attribute
self._attr_media_image_remotely_accessible = True
self._attr_device_class = MediaPlayerDeviceClass.TV.value
self._attr_supported_features = SUPPORT_FEATURES
# default attribute
self._attr_source_list = []
self._attr_sound_mode_list = []
self._attr_name = manifest.name
self._attr_unique_id = manifest.documentation
self._attr_state = STATE_ON
self._attr_volume_level = 1
self._attr_repeat = 'all'
self._attr_shuffle = False
self.cloud_music = hass.data['cloud_music']
self.before_state = None
self.current_state = None
def interval(self, now):
# 暂停时不更新
if self._attr_state == STATE_PAUSED:
return
media_player = self.media_player
if media_player is not None:
attrs = media_player.attributes
self._attr_media_position = attrs.get('media_position', 0)
self._attr_media_duration = attrs.get('media_duration', 0)
self._attr_media_position_updated_at = datetime.datetime.now()
# 判断是否下一曲
if self.before_state is not None:
# 判断音乐总时长
if self.before_state['media_duration'] > 0 and self.before_state['media_duration'] - self.before_state['media_duration'] <= 5:
# 判断源音乐播放器状态
if self.before_state['state'] == STATE_PLAYING and self.current_state == STATE_IDLE:
self.hass.async_create_task(self.async_media_next_track())
self.before_state = None
return
self.before_state = {
'media_position': self._attr_media_position,
'media_duration': self._attr_media_duration,
'state': self.current_state
}
self.current_state = media_player.state
if hasattr(self, 'playlist'):
music_info = self.playlist[self.playindex]
self._attr_app_name = music_info.singer
self._attr_media_image_url = music_info.thumbnail
self._attr_media_album_name = music_info.album
self._attr_media_title = music_info.song
self._attr_media_artist = music_info.singer
@property
def media_player(self):
if self.entity_id is not None:
state = self.hass.states.get(self.entity_id)
entity_id = state.attributes.get('media_player')
if entity_id is not None and entity_id != self.entity_id and entity_id.startswith('media_player.'):
return self.hass.states.get(entity_id)
@property
def device_info(self):
return {
'identifiers': {
(DOMAIN, manifest.documentation)
},
'name': self.name,
'manufacturer': 'shaonianzhentan',
'model': 'CloudMusic',
'sw_version': manifest.version
}
@property
def extra_state_attributes(self):
return self._attributes
async def async_browse_media(self, media_content_type=None, media_content_id=None):
return await self.cloud_music.async_browse_media(self, media_content_type, media_content_id)
async def async_select_source(self, source):
if self._attr_source_list.count(source) > 0:
self._attr_source = source
async def async_select_sound_mode(self, mode):
if self._attr_sound_mode_list.count(mode) > 0:
self._attr_sound_mode = mode
async def async_volume_up(self):
await self.async_call('volume_up')
async def async_volume_down(self):
await self.async_call('volume_down')
async def async_mute_volume(self, mute):
await self.async_call('mute_volume', { 'mute': mute })
async def async_set_volume_level(self, volume: float):
await self.async_call('volume_set', { 'volume_level': volume })
async def async_play_media(self, media_type, media_id, **kwargs):
self._attr_state = STATE_PAUSED
media_content_id = media_id
result = await self.cloud_music.async_play_media(self, self.cloud_music, media_id)
if result is not None:
if result == 'index':
# 播放当前列表指定项
media_content_id = self.playlist[self.playindex].url
elif result.startswith('http'):
# HTTP播放链接
media_content_id = result
else:
# 添加播放列表到播放器
media_content_id = self.playlist[self.playindex].url
self._attr_media_content_id = media_content_id
await self.async_call('play_media', {
'media_content_id': media_content_id,
'media_content_type': 'music'
})
self._attr_state = STATE_PLAYING
self.before_state = None
async def async_media_play(self):
self._attr_state = STATE_PLAYING
await self.async_call('media_play')
async def async_media_pause(self):
self._attr_state = STATE_PAUSED
await self.async_call('media_pause')
async def async_set_repeat(self, repeat):
self._attr_repeat = repeat
async def async_set_shuffle(self, shuffle):
self._attr_shuffle = shuffle
async def async_media_next_track(self):
self._attr_state = STATE_PAUSED
await self.cloud_music.async_media_next_track(self, self._attr_shuffle)
async def async_media_previous_track(self):
self._attr_state = STATE_PAUSED
await self.cloud_music.async_media_previous_track(self, self._attr_shuffle)
async def async_media_seek(self, position):
await self.async_call('media_seek', { 'position': position })
async def async_media_stop(self):
await self.async_call('media_stop')
# 更新属性
async def async_update(self):
pass
# 调用服务
async def async_call(self, service, service_data={}):
media_player = self.media_player
if media_player is not None:
service_data.update({ 'entity_id': media_player.entity_id })
await self.hass.services.async_call('media_player', service, service_data)

View File

@ -0,0 +1,70 @@
import enum
class MusicSource(enum.Enum):
URL = 1
XIMALAYA = 2
PLAYLIST = 3
DJRADIO = 4
ARTISTS = 5
CLOUD = 6
class MusicInfo:
def __init__(self, id, song, singer, album, duration, url, picUrl, source) -> None:
self._id = id
self._song = song
self._singer = singer
self._duration = duration
self._album = album
self._url = url
self._picUrl = picUrl
self._source = source
@property
def id(self):
return self._id
@property
def song(self):
return self._song
@property
def singer(self):
return self._singer
@property
def duration(self):
return self._duration
@property
def album(self):
return self._album
@property
def url(self):
return self._url
@property
def picUrl(self):
return self._picUrl
@property
def thumbnail(self):
return self._picUrl + '?param=200y200'
@property
def source(self) -> MusicSource:
return self._source
def to_dict(self):
return {
'id': self.id,
'song': self.song,
'singer': self.singer,
'album': self.album,
'duration': self.duration,
'url': self.url,
'picUrl': self.picUrl,
'source': self.source
}

View File

@ -0,0 +1,34 @@
{
"title": "云音乐",
"config": {
"abort": {
"single_instance_allowed": "仅允许单个配置"
},
"step": {
"user": {
"title": "接口配置",
"description": "为防止你的账号密码泄露建议自行部署API接口服务 \nhttps://neteasecloudmusicapi.vercel.app",
"data": {
"url": "网易云音乐API"
}
}
},
"error": {
"login_failed": "登录失败"
}
},
"options": {
"step": {
"user": {
"title": "网易云音乐",
"description": "登录后会将cookie保存在HomeAssistant存储目录之中",
"data": {
"username": "邮箱/手机号",
"password": "网易云音乐密码",
"conversation": "启用语音控制"
}
}
},
"error": {}
}
}

View File

@ -0,0 +1,8 @@
from urllib.parse import parse_qsl, quote
def parse_query(url_query):
query = parse_qsl(url_query)
data = {}
for item in query:
data[item[0]] = item[1]
return data