Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added __pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file added __pycache__/forward_handler.cpython-311.pyc
Binary file not shown.
Binary file added __pycache__/link_parser_adapter.cpython-311.pyc
Binary file not shown.
Binary file added __pycache__/main.cpython-311.pyc
Binary file not shown.
Binary file added __pycache__/message_parser.cpython-311.pyc
Binary file not shown.
Binary file not shown.
Binary file added lite_link_parser/__pycache__/base.cpython-311.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added lite_link_parser/__pycache__/data.cpython-311.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
15 changes: 15 additions & 0 deletions lite_link_parser/cookie.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,23 @@ def load_from_file(self) -> None:
try:
jar.load(ignore_discard=True, ignore_expires=True)
except Exception:
# ---- 新增容错逻辑:发现不是标准格式,尝试作为普通字符串读取 ----
try:
with open(self.cookie_file, "r", encoding="utf-8") as f:
raw_text = f.read().strip()
if raw_text and not raw_text.startswith("# Netscape"):
# 使用类内置的方法解析普通字符串
self._load_from_cookies_str(raw_text)
# 解析完后,自动帮你转换并保存为标准的 Netscape 格式文件
self.save_to_file()
logger.info(f"[link_parser] 成功将普通字符串 Cookie 转换为标准格式: {self.cookie_file}")
return
except Exception:
pass
# -------------------------------------------------------------
logger.warning(f"[link_parser] failed to load cookies: {self.cookie_file}")
return

self.cookies = [
Cookie(
domain=item.domain,
Expand Down
6 changes: 6 additions & 0 deletions lite_link_parser/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
from .ncm import NCMLiteParser
from .xiaoheihe import XiaoheiheLiteParser
from .xhs import XHSLiteParser
from .lofter import LofterLiteParser
from .qqmusic import QQMusicLiteParser
from .zhihu import ZhihuLiteParser

__all__ = [
"BilibiliLiteParser",
"NCMLiteParser",
"XiaoheiheLiteParser",
"XHSLiteParser",
"LofterLiteParser",
"QQMusicLiteParser",
"ZhihuLiteParser",
]
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
14 changes: 14 additions & 0 deletions lite_link_parser/parsers/bilibili.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@ def __init__(self, config: dict):
"Origin": "https://www.bilibili.com",
}
)

# --- 新增:支持从 bilibili_cookies.txt 读取普通字符串 ---
cookie_str = self.site_config.get("cookies", "")
if not cookie_str:
cookie_dir = self.ensure_cookie_dir(self.config.get("cookie_dir", "data/cookies"))
cookie_file = cookie_dir / "bilibili_cookies.txt"
if cookie_file.exists():
try:
with open(cookie_file, "r", encoding="utf-8") as f:
cookie_str = f.read().strip()
except Exception as e:
pass
# --------------------------------------------------------

self.credential = self._build_credential(self.site_config.get("cookies", ""))

@handle("b23.tv", r"b23\.tv/[A-Za-z\d\._?%&+\-=/#]+")
Expand Down
65 changes: 65 additions & 0 deletions lite_link_parser/parsers/lofter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations
import re
import json
from typing import Any, ClassVar
from bs4 import BeautifulSoup
from curl_cffi import requests as curl_requests
from astrbot.api import logger
from ..base import BaseLiteParser, handle
from ..data import Platform
from ..exception import ParseException

class LofterLiteParser(BaseLiteParser):
platform: ClassVar[Platform] = Platform(name="lofter", display_name="LOFTER")

def __init__(self, config: dict[str, Any]):
super().__init__(config)
self.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Referer": "https://www.lofter.com/"
})

cookie_dir = self.ensure_cookie_dir(self.config.get("cookie_dir", "data/cookies"))
cookie_file = cookie_dir / "lofter_cookies.txt"
if cookie_file.exists():
try:
with open(cookie_file, "r", encoding="utf-8") as f:
self.headers["Cookie"] = f.read().strip()
except Exception:
pass

@handle("lofter.com", r"(?P<username>[a-zA-Z0-9-]+)\.lofter\.com/post/(?P<post_id>[a-zA-Z0-9_]+)")
async def _parse_post(self, searched: re.Match[str]):
username = searched.group("username")
post_id = searched.group("post_id")
url = f"https://{username}.lofter.com/post/{post_id}"

async with curl_requests.AsyncSession(impersonate="chrome110") as session:
response = await session.get(url, headers=self.headers)
html_text = response.text

soup = BeautifulSoup(html_text, "html.parser")

content_div = soup.find("div", class_="content") or soup.find("div", class_="text")
# 移除 [:500] 限制,交由 adapter 统一处理
text = content_div.get_text(separator="\n").strip() if content_div else ""

title_tag = soup.find("h2") or soup.find("title")
title = title_tag.get_text().strip() if title_tag else ""

image_urls = []
for img in soup.find_all("img"):
src = img.get("bigimgsrc") or img.get("src")
if src and "nosdn.127.net" in src:
image_urls.append(src.split("?")[0])

if not text and not image_urls:
raise ParseException("无法解析 LOFTER 内容")

return self.result(
title=title or f"LOFTER 笔记 - {username}",
text=text,
author=self.create_author(name=username),
contents=self.create_image_contents(image_urls[:9]),
url=url,
)
91 changes: 81 additions & 10 deletions lite_link_parser/parsers/ncm.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
from __future__ import annotations

import json
import re
from typing import ClassVar

from aiohttp import ClientError

from ..base import BaseLiteParser, handle
from ..cookie import CookieJar
from ..data import Platform
from ..exception import ParseException


class NCMLiteParser(BaseLiteParser):
platform: ClassVar[Platform] = Platform(name="ncm", display_name="网易云音乐")

def __init__(self, config: dict):
super().__init__(config)
self.headers.update({"Referer": "https://music.163.com"})
cookie_dir = self.ensure_cookie_dir(self.config["cookie_dir"])
# 采用 astrbot_plugin_ncm_get 的极简伪装头和 HTTP 引用源
self.headers.update({
"Referer": "http://music.163.com/",
"User-Agent": "Mozilla/5.0"
})

cookie_dir = self.ensure_cookie_dir(self.config.get("cookie_dir", "data/cookies"))
cookie_str = self.site_config.get("cookies", "")

# 保持强力的双重 Cookie 逻辑
self.cookiejar = CookieJar(
cookie_dir,
name="ncm",
domain="music.163.com",
raw_cookies=self.site_config.get("cookies", ""),
raw_cookies=cookie_str,
)

if self.cookiejar.cookies_str:
self.headers["cookie"] = self.cookiejar.cookies_str
else:
cookie_file = cookie_dir / "ncm_cookies.txt"
if cookie_file.exists():
try:
with open(cookie_file, "r", encoding="utf-8") as f:
raw_text = f.read().strip()
if raw_text and not raw_text.startswith("# Netscape"):
self.headers["cookie"] = raw_text
except Exception:
pass

@handle("163cn.tv", r"163cn\.tv/(?P<short_key>\w+)")
async def _parse_short(self, searched):
Expand All @@ -35,41 +56,91 @@ async def _parse_short(self, searched):
@handle("music.163.com/#/song", r"music\.163\.com/#/song\?.*id=(?P<song_id>\d+)")
async def _parse_song(self, searched):
song_id = searched.group("song_id")
detail_url = f"https://music.163.com/api/song/detail/?id={song_id}&ids=[{song_id}]"
play_url = f"https://music.163.com/api/song/enhance/player/url?ids=[{song_id}]&br=320000"

# 参考 ncm_get 插件,使用 http 协议 API
detail_url = f"http://music.163.com/api/song/detail/?id={song_id}&ids=[{song_id}]"
play_url = f"http://music.163.com/api/song/enhance/player/url?ids=[{song_id}]&br=320000"

# 歌词接口:lv/kv/tv 使用 -1 以获取最新版本,避免新歌返回空值
lyric_url = f"http://music.163.com/api/song/lyric?id={song_id}&lv=-1&kv=-1&tv=-1"

# 1. 获取歌曲基础信息
async with self.session.get(detail_url, headers=self.headers, proxy=self.proxy) as response:
if response.status >= 400:
raise ClientError(f"ncm detail failed {response.status}")
detail_json = json.loads(await response.text())

song = detail_json.get("songs", [{}])[0]
songs = detail_json.get("songs", [])
if not songs:
raise ParseException("歌曲信息获取失败,可能是VIP专属、无版权或已下架")
song = songs[0]

title = song.get("name", "")
sub_title = song.get("alias", [""])[0]
sub_title = song.get("alias", [""])[0] if song.get("alias") else ""
album_name = song.get("album", {}).get("name", "")
cover_url = song.get("album", {}).get("picUrl", "") + "?param=640y640"
duration_ms = song.get("duration", 0)
artists = song.get("artists", [])
author_name = " / ".join(item.get("name", "") for item in artists)
author_avatar = artists[0].get("img1v1Url", "") if artists else ""

# 2. 获取歌曲播放直链
async with self.session.get(play_url, headers=self.headers, proxy=self.proxy) as response:
if response.status >= 400:
raise ClientError(f"ncm play failed {response.status}")
play_json = json.loads(await response.text())

play_info = play_json.get("data", [{}])[0]
play_data = play_json.get("data", [])
if not play_data:
raise ParseException("音频播放链接获取失败")
play_info = play_data[0]

audio_url = play_info.get("url", "")
if not audio_url:
audio_url = f"https://music.163.com/song/media/outer/url?id={song_id}.mp3"

audio = self.create_audio_content(
audio_url,
cover_url=cover_url,
duration=duration_ms // 1000,
name=title,
)

# 3. 歌词抓取与清洗 (参考 ncm_get 的正则但增加更严谨的判断)
lyrics_text = "(未能获取到歌词)"
try:
async with self.session.get(lyric_url, headers=self.headers, proxy=self.proxy) as response:
if response.status == 200:
data = json.loads(await response.text())

if data.get("nolyric"):
lyrics_text = "(纯音乐,无歌词)"
elif data.get("uncollected"):
lyrics_text = "(网易云暂未收录歌词)"
elif 'lrc' in data and 'lyric' in data['lrc']:
raw_lyric = data['lrc']['lyric']
if raw_lyric and raw_lyric.strip():
# 过滤 [by:xxx] 等标签
clean_l = re.sub(r'\[[a-zA-Z]+:[^\]]*\]', '', raw_lyric).strip()
# 过滤时间轴:支持 [00:00]、[00:00.00]、[00:00.000]
clean_l = re.sub(r'\[\d{2,}:\d{2}(?:[:\.]\d{1,3})?\]', '', clean_l).strip()
# 合并多余换行
lyrics_text = re.sub(r'\n+', '\n', clean_l).strip()

if not lyrics_text:
lyrics_text = "(暂无有效歌词内容)"
else:
lyrics_text = "(暂无歌词文本)"
except Exception:
pass

# 拼装返回文本
display_title = f"{title}({sub_title})" if sub_title else title
display_text = f"专辑:{album_name}\n\n【歌词】\n{lyrics_text}"

return self.result(
title=display_title,
text=f"专辑:{album_name}",
text=display_text,
author=self.create_author(author_name, author_avatar),
contents=[audio],
url=f"https://music.163.com/song?id={song_id}",
Expand All @@ -96,4 +167,4 @@ async def _parse_outer(self, searched):
text="直链音频",
contents=[self.create_audio_content(url)],
url=url,
)
)
104 changes: 104 additions & 0 deletions lite_link_parser/parsers/qqmusic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

import json
import re
from typing import Any, ClassVar

from curl_cffi import requests as curl_requests
from astrbot.api import logger

from ..base import BaseLiteParser, handle
from ..cookie import CookieJar
from ..data import Platform
from ..exception import ParseException

class QQMusicLiteParser(BaseLiteParser):
platform: ClassVar[Platform] = Platform(name="qqmusic", display_name="QQ音乐")

def __init__(self, config: dict[str, Any]):
super().__init__(config)
self.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Referer": "https://y.qq.com/"
})

cookie_dir = self.ensure_cookie_dir(self.config.get("cookie_dir", "data/cookies"))
cookie_file = cookie_dir / "qqmusic_cookies.txt"
if cookie_file.exists():
try:
with open(cookie_file, "r", encoding="utf-8") as f:
self.headers["Cookie"] = f.read().strip()
except Exception:
pass

# 1. 处理 QQ 音乐短链接
@handle("c6.y.qq.com", r"c6\.y\.qq\.com/[A-Za-z0-9._?%&+=/#@-]+")
async def _parse_short_link(self, searched: re.Match[str]):
return await self.parse_with_redirect(f"https://{searched.group(0)}", self.headers)

# 2. 【核心新增】处理 QQ 音乐移动端/QQ卡片 链接 (i.y.qq.com)
@handle("i.y.qq.com", r"i\.y\.qq\.com/v8/playsong\.html\?[^>\s]*songmid=(?P<mid>[A-Za-z0-9]+)")
# 3. 处理 QQ 音乐标准详情页链接
@handle("y.qq.com/n/ryqq/songDetail", r"y\.qq\.com/n/ryqq/songDetail/(?P<mid>[A-Za-z0-9]+)")
@handle("y.qq.com/n/yqq/song", r"y\.qq\.com/n/yqq/song/(?P<mid>[A-Za-z0-9]+)\.html")
async def _parse_song(self, searched: re.Match[str]):
mid = searched.group("mid")
# QQ 音乐歌曲详情 API
api_url = f"https://u.y.qq.com/cgi-bin/musicu.fcg?data=%7B%22songinfo%22%3A%7B%22method%22%3A%22get_song_detail_yqq%22%2C%22module%22%3A%22music.pf_song_detail_svr%22%2C%22param%22%3A%7B%22song_mid%22%3A%22{mid}%22%7D%7D%7D"

async with curl_requests.AsyncSession(impersonate="chrome110") as session:
resp = await session.get(api_url, headers=self.headers)
data = resp.json()

try:
# 兼容性检查,防止返回数据结构变化导致崩溃
songinfo_data = data.get("songinfo", {}).get("data", {})
track_info = songinfo_data.get("track_info")
if not track_info:
raise ParseException("QQ音乐接口未返回歌曲详情,可能是MID已失效")

title = track_info.get("name", "未知歌曲")
album = track_info.get("album", {}).get("name", "未知专辑")
singers = [s.get("name", "") for s in track_info.get("singer", [])]
author_name = " / ".join(singers)

album_mid = track_info.get("album", {}).get("mid", "")
cover_url = f"https://y.gtimg.cn/music/photo_new/T002R300x300M000{album_mid}.jpg" if album_mid else None

audio_url = f"https://i.y.qq.com/v8/playsong.html?songmid={mid}&ADTAG=myqq&from=myqq&channel=10007100"

lyrics_text = await self._fetch_lyrics(mid)
display_text = f"专辑:{album}\n\n【歌词】\n{lyrics_text}"

return self.result(
title=title,
text=display_text,
author=self.create_author(name=author_name),
contents=[self.create_audio_content(audio_url, cover_url=cover_url, name=title)],
url=f"https://y.qq.com/n/ryqq/songDetail/{mid}",
)
except (KeyError, TypeError) as e:
raise ParseException(f"QQ音乐数据解析失败: {e}")

async def _fetch_lyrics(self, mid: str) -> str:
lyric_api = f"https://c.y.qq.com/lyric/fcgi-bin/fcg_query_lyric_new.fcg?songmid={mid}&format=json&nobase64=1"
headers = self.headers.copy()
headers["Referer"] = "https://y.qq.com/n/ryqq/player"

try:
async with curl_requests.AsyncSession(impersonate="chrome110") as session:
resp = await session.get(lyric_api, headers=headers)
text = resp.text.strip()
if text.startswith("MusicJsonCallback("):
text = text[len("MusicJsonCallback("):-1]
data = json.loads(text)

lyric = data.get("lyric", "")
if lyric:
# 清洗时间轴和标签
clean_l = re.sub(r'\[\d{2,}:\d{2}(?:[:\.]\d{1,3})?\]', '', lyric).strip()
clean_l = re.sub(r'\[[a-zA-Z]+:[^\]]*\]', '', clean_l).strip()
return re.sub(r'\n+', '\n', clean_l) or "(暂无歌词内容)"
except Exception:
pass
return "(未能获取到歌词)"
Loading