import time import shutil import requests import json import re import sys import os import html import tempfile import asyncio from pathlib import Path from typing import Optional import hashlib from nonebot import on_message, logger from nonebot.rule import to_me from nonebot.adapters import Event from yt_dlp import YoutubeDL from httpx import AsyncClient import msgspec from .minio import upload_to_s3 # 匹配链接的正则 URL_PATTERN = re.compile(r"(https?://[^\s]+)") pattern = r"QQ小程序(?:&#93;|]|\])" # 保留原有平台识别,同时加入抖音相关域名 VALID_HOSTS = [ "b23.tv", "bilibili.com", "youtube.com", "youtu.be", # douyin "douyin.com", "v.douyin.com", "iesdouyin.com", "m.douyin.com", "jingxuan.douyin.com", ] # 抖音短链识别 SHORT_LINK_PATTERN = re.compile(r"(v\.douyin\.com/[A-Za-z0-9_\-]+)") # 从 HTML 提取路由数据 DOUYIN_ROUTER_PATTERN = re.compile(r"window\._ROUTER_DATA\s*=\s*(.*?)", re.DOTALL) video_handler = on_message(priority=10, block=False, rule=to_me()) @video_handler.handle() async def handle_video_download(event: Event): msg = str(event.get_message()).strip() logger.info(f"获取到的消息:{msg}") target_url: Optional[str] = None # 提取 URL if not re.search(pattern, msg): urls = URL_PATTERN.findall(msg) for u in urls: logger.info(f"链接:{u}") # 优先处理抖音(单独解析以获取直链) if "douyin.com" in u or "v.douyin.com" in u or "iesdouyin.com" in u: await video_handler.send("检测到链接,正在尝试下载视频,请稍候...") try: parsed_path = await parse_douyin(u) if parsed_path: # 抖音直接获取直链并且合并文件后返回文件地址后直接上传s3 logger.info(f"文件路径:{parsed_path}") # 构建path对象 await video_handler.send(f"视频:{parsed_path.name} 已缓存\n正在生成下载链接,请耐心等待!") s3_url = upload_to_s3(parsed_path) await video_handler.send(f"{s3_url}") break else: await video_handler.send(f"抖音解析失败或未找到直链:{u}") logger.warning(f"抖音解析失败或未找到直链:{u}") # 如果解析失败,继续尝试其他链接 continue except Exception as e: await video_handler.send(f"解析抖音链接时出错:{e}") logger.exception(f"解析抖音链接时出错:{e}") continue # 非抖音走原逻辑 if any(domain in u for domain in VALID_HOSTS): target_url = u break if not target_url: return # 不处理无效链接 await video_handler.send("检测到链接,正在尝试下载视频,请稍候...") else: target_url = None logger.info("检测到小程序") url = await proc_xcx(msg) logger.info(f"链接:{url}") if url and any(domain in url for domain in VALID_HOSTS): target_url = url if not target_url: return # 不处理无效链接 await video_handler.send("检测到视频,正在尝试下载视频,请稍候...") try: video_file = await download_video(target_url) if not video_file: await video_handler.send("视频下载失败。") return await video_handler.send(f"视频已缓存:{video_file.name}\n正在生成下载链接,请耐心等待!") logger.info(f"文件路径:{video_file}") s3_url = upload_to_s3(video_file) logger.info(f"文件上传完成,返回链接:{s3_url}") await video_handler.send(f"{s3_url}") except Exception as e: logger.exception(e) await video_handler.send("下载过程中出现错误。") async def download_video(url: str) -> Optional[Path]: """ 如果 url 看起来是直接的媒体直链(如 mp4),则使用 httpx 直接下载; 否则回退到 yt-dlp 下载(兼容多平台),并支持: 1) Firefox cookies 2) cookies.txt 兜底 """ # ---------- 1. 直链探测 ---------- direct_media_ext = re.search(r"\.(mp4|m3u8|ts|webm|mov|flv)(?:$|\?)", url, re.IGNORECASE) is_direct = bool(direct_media_ext) if not is_direct: try: async with AsyncClient(follow_redirects=True, timeout=30) as client: head = await client.head(url, follow_redirects=True) ctype = head.headers.get("content-type", "") if ctype.startswith("video/") or "application/octet-stream" in ctype: is_direct = True except Exception: is_direct = False if is_direct: temp_dir = tempfile.mkdtemp(prefix="direct_ytcache_") ext = "mp4" m = re.search(r"\.([a-zA-Z0-9]{2,5})(?:$|\?)", url) if m and len(m.group(1)) <= 5: ext = m.group(1) filename = os.path.join(temp_dir, f"downloaded_video.{ext}") try: async with AsyncClient(follow_redirects=True, timeout=120) as client: async with client.stream("GET", url) as resp: resp.raise_for_status() with open(filename, "wb") as fh: async for chunk in resp.aiter_bytes(chunk_size=8192): fh.write(chunk) p = Path(filename) new_path = p.with_name(clean_filename(p.name)) p.rename(new_path) logger.info(f"直接下载完成: {new_path}") return new_path except Exception: logger.exception("直接下载失败,回退 yt-dlp") try: if os.path.exists(filename): os.remove(filename) except Exception: pass # ---------- 2. yt-dlp 下载 ---------- temp_dir = tempfile.mkdtemp(prefix="ytcache_") output_path = os.path.join(temp_dir, "%(title).80s.%(ext)s") base_opts = { "outtmpl": output_path, "format": "bestvideo+bestaudio/best", "noplaylist": True, "quiet": True, "ffmpeg_location": get_ffmpeg_path(), "extractor_args": { "youtube": { "player_client": ["android"] } }, } loop = asyncio.get_event_loop() def _run_yt(opts: dict): with YoutubeDL(opts) as ydl: ydl.download([url]) # ---------- 2.1 优先:Firefox cookies ---------- try: opts = dict(base_opts) opts["cookiesfrombrowser"] = ("firefox",) logger.info("尝试使用 Firefox cookies 下载") await loop.run_in_executor(None, _run_yt, opts) except Exception as e: logger.warning(f"Firefox cookies 下载失败,尝试 cookies.txt:{e}") # ---------- 2.2 回退:cookies.txt ---------- cookie_path = Path(__file__).resolve().parent / "cookies.txt" if cookie_path.exists(): opts = dict(base_opts) opts["cookiefile"] = str(cookie_path) logger.info(f"使用 cookies 文件: {cookie_path}") await loop.run_in_executor(None, _run_yt, opts) else: logger.error("未找到 cookies.txt,无法通过登录验证") raise # ---------- 3. 结果处理 ---------- files = list(Path(temp_dir).glob("*.*")) if not files: return None original_file = files[0] new_path = original_file.with_name(clean_filename(original_file.name)) original_file.rename(new_path) logger.info(f"下载完成并重命名: {new_path}") return new_path def get_ffmpeg_path() -> str: scripts_dir = os.path.dirname(sys.executable) ffmpeg_path = os.path.join(scripts_dir, "ffmpeg.exe") if os.path.exists(ffmpeg_path): return ffmpeg_path return "ffmpeg" def clean_filename(filename: str) -> str: return ( filename.replace(" ", "_") .replace("&", "_") .replace("#", "_") .replace("'", "") .replace('"', "") .replace("?", "") .replace(":", "_") .replace("|", "_") .replace("/", "_") .replace("\\", "_") .replace("*", "_") .replace("<", "_") .replace(">", "_") .replace("【", "") .replace("】", "") .replace(":", "") .replace("。", "") .replace(",", "_") .replace("《", "") .replace("》", "") .replace("?", "_") .replace("|", "") ) async def proc_xcx(msg): p_pattern = r'"qqdocurl":"(.*?)"' match = re.search(p_pattern, msg) if match: raw_url = match.group(1) unescaped = html.unescape(raw_url) cleaned_url = unescaped.replace(r"\\/", "/") base_url = cleaned_url.split("?")[0] print("最终提取链接:", base_url) return base_url else: print("未找到 qqdocurl 字段") return None # ----------------- Douyin parsing helpers ----------------- async def parse_douyin(url: str) -> Optional[str]: """ 解析抖音链接,返回可直接下载的媒体直链(优先)或 None。 支持短链(v.douyin.com)、长链、iesdouyin、m.douyin 等。 """ # 处理短链 short = SHORT_LINK_PATTERN.search(url) if short: short_url = "https://" + short.group(1) try: cookies_path = os.path.dirname(__file__).replace("\\", "/") + "/cookies.txt" file_path = await fetch_douyin_mp4(short_url, cookies_path, 10) return file_path except Exception: logger.exception("获取直链失败") # 仍然尝试原始 url # 现在尝试从页面抓取 router data try: logger.info(f"获取到的信息:{url}") except Exception: logger.exception("抖音页面解析失败") return None import asyncio import tempfile import aiohttp import os from typing import List, Dict, Optional from playwright.async_api import async_playwright import ffmpeg class DouyinFetchError(Exception): pass def parse_netscape_cookies(cookies_txt_path: str) -> List[Dict]: """ 解析 Netscape HTTP Cookie File -> Playwright cookies """ cookies: List[Dict] = [] with open(cookies_txt_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) != 7: continue domain, include_sub, path, secure, expiry, name, value = parts cookie = { "name": name, "value": value, "domain": domain, "path": path, "secure": secure.upper() == "TRUE", "httpOnly": False, } if expiry.isdigit() and int(expiry) > 0: cookie["expires"] = int(expiry) cookies.append(cookie) return cookies async def download_url(url: str, filepath: str, headers: Optional[Dict] = None): headers = headers or {} timeout = aiohttp.ClientTimeout(total=300) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url, headers=headers) as resp: if resp.status != 200: raise DouyinFetchError(f"下载失败 {resp.status}: {url}") with open(filepath, "wb") as f: async for chunk in resp.content.iter_chunked(1024 * 64): f.write(chunk) async def fetch_douyin_mp4( douyin_url: str, cookies_txt: str, wait_seconds: int = 15, ) -> str: """ 输入抖音 URL 输出:合并后的 MP4 临时文件路径 """ cookies = parse_netscape_cookies(cookies_txt) if not cookies: raise DouyinFetchError("cookies.txt 解析失败或为空") video_url: Optional[str] = None audio_url: Optional[str] = None async with async_playwright() as p: browser = await p.chromium.launch( executable_path="C:/Program Files/Google/Chrome/Application/chrome.exe", headless=False, args=[ "--autoplay-policy=no-user-gesture-required", "--disable-features=AutoplayDisableSuppression", "--use-fake-ui-for-media-stream", ] ) context = await browser.new_context( viewport={"width": 640, "height": 320}, user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ), ) await context.add_cookies(cookies) page = await context.new_page() def handle_response(response): nonlocal video_url, audio_url url = response.url # 分轨视频 if video_url is None and ("video" in url and "mime_type=video_mp4" in url): video_url = url elif audio_url is None and ("audio" in url and "mime_type=audio" in url): audio_url = url page.on("response", handle_response) await page.goto(douyin_url, wait_until="domcontentloaded") await page.wait_for_timeout(wait_seconds * 1000) await browser.close() if not video_url: raise DouyinFetchError("未捕获视频流 URL") # ----------------------------- # 下载到临时文件 # ----------------------------- tmp_dir = Path(tempfile.gettempdir()) timestamp = int(time.time() * 1000) tmp_video = tmp_dir / f"douyin/{timestamp}_video.mp4" tmp_out = tmp_dir / f"douyin/{timestamp}_out.mp4" # 下载视频 await download_url(video_url, tmp_video) if audio_url: tmp_audio = tmp_dir / f"{timestamp}_audio.m4a" try: await download_url(audio_url, tmp_audio) ( ffmpeg .input(tmp_video) .output(tmp_audio, tmp_out, vcodec="copy", acodec="aac", movflags="faststart") .overwrite_output() .run(quiet=True) ) finally: tmp_video.unlink(missing_ok=True) tmp_audio.unlink(missing_ok=True) else: # 视频自带音频,直接拷贝到输出 shutil.copy(tmp_video, tmp_out) tmp_video.unlink(missing_ok=True) return tmp_out