HeXi/hexi/plugins/fuck_pilipili/__init__.py
sansenhoshi 275f05ee4a 重构
2026-01-04 17:15:40 +08:00

463 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import shutil
import requests
import json
import re
import sys
import os
import html
import tempfile
import asyncio
from pathlib import Path
from typing import Optional
import hashlib
from nonebot import on_message, logger
from nonebot.rule import to_me
from nonebot.adapters import Event
from yt_dlp import YoutubeDL
from httpx import AsyncClient
import msgspec
from .minio import upload_to_s3
# 匹配链接的正则
URL_PATTERN = re.compile(r"(https?://[^\s]+)")
pattern = r"QQ小程序(?:]|]|\])"
# 保留原有平台识别,同时加入抖音相关域名
VALID_HOSTS = [
"b23.tv",
"bilibili.com",
"youtube.com",
"youtu.be",
# douyin
"douyin.com",
"v.douyin.com",
"iesdouyin.com",
"m.douyin.com",
"jingxuan.douyin.com",
]
# 抖音短链识别
SHORT_LINK_PATTERN = re.compile(r"(v\.douyin\.com/[A-Za-z0-9_\-]+)")
# 从 HTML 提取路由数据
DOUYIN_ROUTER_PATTERN = re.compile(r"window\._ROUTER_DATA\s*=\s*(.*?)</script>", re.DOTALL)
video_handler = on_message(priority=10, block=False, rule=to_me())
@video_handler.handle()
async def handle_video_download(event: Event):
msg = str(event.get_message()).strip()
logger.info(f"获取到的消息:{msg}")
target_url: Optional[str] = None
# 提取 URL
if not re.search(pattern, msg):
urls = URL_PATTERN.findall(msg)
for u in urls:
logger.info(f"链接:{u}")
# 优先处理抖音(单独解析以获取直链)
if "douyin.com" in u or "v.douyin.com" in u or "iesdouyin.com" in u:
await video_handler.send("检测到链接,正在尝试下载视频,请稍候...")
try:
parsed_path = await parse_douyin(u)
if parsed_path:
# 抖音直接获取直链并且合并文件后返回文件地址后直接上传s3
logger.info(f"文件路径:{parsed_path}")
# 构建path对象
await video_handler.send(f"视频:{parsed_path.name} 已缓存\n正在生成下载链接,请耐心等待!")
s3_url = upload_to_s3(parsed_path)
await video_handler.send(f"{s3_url}")
break
else:
await video_handler.send(f"抖音解析失败或未找到直链:{u}")
logger.warning(f"抖音解析失败或未找到直链:{u}")
# 如果解析失败,继续尝试其他链接
continue
except Exception as e:
await video_handler.send(f"解析抖音链接时出错:{e}")
logger.exception(f"解析抖音链接时出错:{e}")
continue
# 非抖音走原逻辑
if any(domain in u for domain in VALID_HOSTS):
target_url = u
break
if not target_url:
return # 不处理无效链接
await video_handler.send("检测到链接,正在尝试下载视频,请稍候...")
else:
target_url = None
logger.info("检测到小程序")
url = await proc_xcx(msg)
logger.info(f"链接:{url}")
if url and any(domain in url for domain in VALID_HOSTS):
target_url = url
if not target_url:
return # 不处理无效链接
await video_handler.send("检测到视频,正在尝试下载视频,请稍候...")
try:
video_file = await download_video(target_url)
if not video_file:
await video_handler.send("视频下载失败。")
return
await video_handler.send(f"视频已缓存:{video_file.name}\n正在生成下载链接,请耐心等待!")
logger.info(f"文件路径:{video_file}")
s3_url = upload_to_s3(video_file)
logger.info(f"文件上传完成,返回链接:{s3_url}")
await video_handler.send(f"{s3_url}")
except Exception as e:
logger.exception(e)
await video_handler.send("下载过程中出现错误。")
async def download_video(url: str) -> Optional[Path]:
"""
如果 url 看起来是直接的媒体直链(如 mp4则使用 httpx 直接下载;
否则回退到 yt-dlp 下载(兼容多平台),并支持:
1) Firefox cookies
2) cookies.txt 兜底
"""
# ---------- 1. 直链探测 ----------
direct_media_ext = re.search(r"\.(mp4|m3u8|ts|webm|mov|flv)(?:$|\?)", url, re.IGNORECASE)
is_direct = bool(direct_media_ext)
if not is_direct:
try:
async with AsyncClient(follow_redirects=True, timeout=30) as client:
head = await client.head(url, follow_redirects=True)
ctype = head.headers.get("content-type", "")
if ctype.startswith("video/") or "application/octet-stream" in ctype:
is_direct = True
except Exception:
is_direct = False
if is_direct:
temp_dir = tempfile.mkdtemp(prefix="direct_ytcache_")
ext = "mp4"
m = re.search(r"\.([a-zA-Z0-9]{2,5})(?:$|\?)", url)
if m and len(m.group(1)) <= 5:
ext = m.group(1)
filename = os.path.join(temp_dir, f"downloaded_video.{ext}")
try:
async with AsyncClient(follow_redirects=True, timeout=120) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
with open(filename, "wb") as fh:
async for chunk in resp.aiter_bytes(chunk_size=8192):
fh.write(chunk)
p = Path(filename)
new_path = p.with_name(clean_filename(p.name))
p.rename(new_path)
logger.info(f"直接下载完成: {new_path}")
return new_path
except Exception:
logger.exception("直接下载失败,回退 yt-dlp")
try:
if os.path.exists(filename):
os.remove(filename)
except Exception:
pass
# ---------- 2. yt-dlp 下载 ----------
temp_dir = tempfile.mkdtemp(prefix="ytcache_")
output_path = os.path.join(temp_dir, "%(title).80s.%(ext)s")
base_opts = {
"outtmpl": output_path,
"format": "bestvideo+bestaudio/best",
"noplaylist": True,
"quiet": True,
"ffmpeg_location": get_ffmpeg_path(),
"extractor_args": {
"youtube": {
"player_client": ["android"]
}
},
}
loop = asyncio.get_event_loop()
def _run_yt(opts: dict):
with YoutubeDL(opts) as ydl:
ydl.download([url])
# ---------- 2.1 优先Firefox cookies ----------
try:
opts = dict(base_opts)
opts["cookiesfrombrowser"] = ("firefox",)
logger.info("尝试使用 Firefox cookies 下载")
await loop.run_in_executor(None, _run_yt, opts)
except Exception as e:
logger.warning(f"Firefox cookies 下载失败,尝试 cookies.txt{e}")
# ---------- 2.2 回退cookies.txt ----------
cookie_path = Path(__file__).resolve().parent / "cookies.txt"
if cookie_path.exists():
opts = dict(base_opts)
opts["cookiefile"] = str(cookie_path)
logger.info(f"使用 cookies 文件: {cookie_path}")
await loop.run_in_executor(None, _run_yt, opts)
else:
logger.error("未找到 cookies.txt无法通过登录验证")
raise
# ---------- 3. 结果处理 ----------
files = list(Path(temp_dir).glob("*.*"))
if not files:
return None
original_file = files[0]
new_path = original_file.with_name(clean_filename(original_file.name))
original_file.rename(new_path)
logger.info(f"下载完成并重命名: {new_path}")
return new_path
def get_ffmpeg_path() -> str:
scripts_dir = os.path.dirname(sys.executable)
ffmpeg_path = os.path.join(scripts_dir, "ffmpeg.exe")
if os.path.exists(ffmpeg_path):
return ffmpeg_path
return "ffmpeg"
def clean_filename(filename: str) -> str:
return (
filename.replace(" ", "_")
.replace("&", "_")
.replace("#", "_")
.replace("'", "")
.replace('"', "")
.replace("?", "")
.replace(":", "_")
.replace("|", "_")
.replace("/", "_")
.replace("\\", "_")
.replace("*", "_")
.replace("<", "_")
.replace(">", "_")
.replace("", "")
.replace("", "")
.replace("", "")
.replace("", "")
.replace("", "_")
.replace("", "")
.replace("", "")
.replace("", "_")
.replace("", "")
)
async def proc_xcx(msg):
p_pattern = r'"qqdocurl":"(.*?)"'
match = re.search(p_pattern, msg)
if match:
raw_url = match.group(1)
unescaped = html.unescape(raw_url)
cleaned_url = unescaped.replace(r"\\/", "/")
base_url = cleaned_url.split("?")[0]
print("最终提取链接:", base_url)
return base_url
else:
print("未找到 qqdocurl 字段")
return None
# ----------------- Douyin parsing helpers -----------------
async def parse_douyin(url: str) -> Optional[str]:
"""
解析抖音链接,返回可直接下载的媒体直链(优先)或 None。
支持短链(v.douyin.com)、长链、iesdouyin、m.douyin 等。
"""
# 处理短链
short = SHORT_LINK_PATTERN.search(url)
if short:
short_url = "https://" + short.group(1)
try:
cookies_path = os.path.dirname(__file__).replace("\\", "/") + "/cookies.txt"
file_path = await fetch_douyin_mp4(short_url, cookies_path, 10)
return file_path
except Exception:
logger.exception("获取直链失败")
# 仍然尝试原始 url
# 现在尝试从页面抓取 router data
try:
logger.info(f"获取到的信息:{url}")
except Exception:
logger.exception("抖音页面解析失败")
return None
import asyncio
import tempfile
import aiohttp
import os
from typing import List, Dict, Optional
from playwright.async_api import async_playwright
import ffmpeg
class DouyinFetchError(Exception):
pass
def parse_netscape_cookies(cookies_txt_path: str) -> List[Dict]:
"""
解析 Netscape HTTP Cookie File -> Playwright cookies
"""
cookies: List[Dict] = []
with open(cookies_txt_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) != 7:
continue
domain, include_sub, path, secure, expiry, name, value = parts
cookie = {
"name": name,
"value": value,
"domain": domain,
"path": path,
"secure": secure.upper() == "TRUE",
"httpOnly": False,
}
if expiry.isdigit() and int(expiry) > 0:
cookie["expires"] = int(expiry)
cookies.append(cookie)
return cookies
async def download_url(url: str, filepath: str, headers: Optional[Dict] = None):
headers = headers or {}
timeout = aiohttp.ClientTimeout(total=300)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as resp:
if resp.status != 200:
raise DouyinFetchError(f"下载失败 {resp.status}: {url}")
with open(filepath, "wb") as f:
async for chunk in resp.content.iter_chunked(1024 * 64):
f.write(chunk)
async def fetch_douyin_mp4(
douyin_url: str,
cookies_txt: str,
wait_seconds: int = 15,
) -> str:
"""
输入抖音 URL
输出:合并后的 MP4 临时文件路径
"""
cookies = parse_netscape_cookies(cookies_txt)
if not cookies:
raise DouyinFetchError("cookies.txt 解析失败或为空")
video_url: Optional[str] = None
audio_url: Optional[str] = None
async with async_playwright() as p:
browser = await p.chromium.launch(
executable_path="C:/Program Files/Google/Chrome/Application/chrome.exe",
headless=False,
args=[
"--autoplay-policy=no-user-gesture-required",
"--disable-features=AutoplayDisableSuppression",
"--use-fake-ui-for-media-stream",
]
)
context = await browser.new_context(
viewport={"width": 640, "height": 320},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
),
)
await context.add_cookies(cookies)
page = await context.new_page()
def handle_response(response):
nonlocal video_url, audio_url
url = response.url
# 分轨视频
if video_url is None and ("video" in url and "mime_type=video_mp4" in url):
video_url = url
elif audio_url is None and ("audio" in url and "mime_type=audio" in url):
audio_url = url
page.on("response", handle_response)
await page.goto(douyin_url, wait_until="domcontentloaded")
await page.wait_for_timeout(wait_seconds * 1000)
await browser.close()
if not video_url:
raise DouyinFetchError("未捕获视频流 URL")
# -----------------------------
# 下载到临时文件
# -----------------------------
tmp_dir = Path(tempfile.gettempdir())
timestamp = int(time.time() * 1000)
tmp_video = tmp_dir / f"douyin/{timestamp}_video.mp4"
tmp_out = tmp_dir / f"douyin/{timestamp}_out.mp4"
# 下载视频
await download_url(video_url, tmp_video)
if audio_url:
tmp_audio = tmp_dir / f"{timestamp}_audio.m4a"
try:
await download_url(audio_url, tmp_audio)
(
ffmpeg
.input(tmp_video)
.output(tmp_audio, tmp_out, vcodec="copy", acodec="aac", movflags="faststart")
.overwrite_output()
.run(quiet=True)
)
finally:
tmp_video.unlink(missing_ok=True)
tmp_audio.unlink(missing_ok=True)
else:
# 视频自带音频,直接拷贝到输出
shutil.copy(tmp_video, tmp_out)
tmp_video.unlink(missing_ok=True)
return tmp_out