import asyncio import os from pathlib import Path from typing import List, Dict, Optional from nonebot import logger from curl_cffi import AsyncSession, CurlError import random try: import browser_cookie3 except ImportError: browser_cookie3 = None # 可选,如果没安装则 fallback 到 cookies.txt # ---------- 配置 ---------- file_path = os.path.dirname(__file__).replace("\\", "/") exported_cookie_path = Path(f"{file_path}/cookies/tracker.txt") # 你导出的 cookies.txt CUSTOM_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36" RAW_BROWSER_HEADERS = """ sec-ch-ua-platform: "Windows" sec-ch-ua: "Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141" sec-ch-ua-mobile: ?0 user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36 """ # ====================== def parse_raw_headers(raw: str) -> Dict[str, str]: headers = {} skip_prefix = (":authority", ":method", ":path", ":scheme") for line in raw.strip().splitlines(): if not line.strip(): continue if any(line.lower().startswith(p) for p in skip_prefix): continue if ":" not in line: continue key, value = line.split(":", 1) headers[key.strip()] = value.strip() return headers def load_cookies_from_txt(path: Path) -> List[Dict[str, str]]: cookies = [] if not path.exists(): return cookies with path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue parts = line.split("\t") if len(parts) >= 7: cookies.append({"name": parts[5], "value": parts[6]}) elif "=" in line: k, v = line.split("=", 1) cookies.append({"name": k.strip(), "value": v.strip()}) return cookies def load_browser_cookies(domain="tracker.gg") -> List[Dict[str, str]]: if not browser_cookie3: return [] try: return [{"name": c.name, "value": c.value} for c in browser_cookie3.chrome(domain_name=domain)] except Exception: return [] def build_cookie_header(cookies: List[Dict[str, str]]) -> str: return "; ".join(f"{c['name']}={c['value']}" for c in cookies) def build_headers( raw_headers: str, cookies: Optional[List[Dict[str, str]]] = None, ua_override: Optional[str] = None, ) -> Dict[str, str]: headers = parse_raw_headers(raw_headers) # 强制改成 XHR 请求风格 headers.update({ "accept": "application/json, text/plain, */*", "origin": "https://tracker.gg", "referer": "https://tracker.gg/", "sec-fetch-mode": "cors", "sec-fetch-site": "same-site", }) if cookies: headers["cookie"] = build_cookie_header(cookies) if ua_override: headers["user-agent"] = ua_override return headers async def fetch_with_cookies(url: str, headers: dict, impersonate: str = "chrome110", timeout: int = 15): async with AsyncSession() as session: try: await asyncio.sleep(random.uniform(0.5, 1.2)) # 模拟浏览器请求间隔 response = await session.get(url, headers=headers, impersonate=impersonate, timeout=timeout) # 重试一次,防止偶尔 CF 拦截 if getattr(response, "status_code", 0) in (403, 429): await asyncio.sleep(2) response = await session.get(url, headers=headers, impersonate=impersonate, timeout=timeout) return response except CurlError as e: return {"error": f"cURL 错误: {e}"} except Exception as e: return {"error": f"未知错误: {e}"} def is_challenge_response(resp) -> bool: if isinstance(resp, dict) and "error" in resp: return True try: status = getattr(resp, "status_code", None) text = getattr(resp, "text", "") or "" if status in (429, 403): return True low = text.lower() if any(k in low for k in ["cloudflare", "captcha", "cf-challenge", "just a moment"]): return True except Exception: return True return False async def search_user_with_fallback(url: str): # 1️⃣ 优先尝试浏览器 cookie cookies = load_browser_cookies() # 2️⃣ 如果浏览器 cookie 不存在,再 fallback 到 cookies.txt if not cookies: cookies = load_cookies_from_txt(exported_cookie_path) headers = build_headers(RAW_BROWSER_HEADERS, cookies=cookies, ua_override=CUSTOM_UA) logger.info(f"请求 URL: {url}") logger.info(f"使用 headers 字段数: {len(headers)}") resp = await fetch_with_cookies(url, headers) if is_challenge_response(resp): logger.warning("⚠️ Cloudflare 拦截或 cookies 失效。") if isinstance(resp, dict): return resp return {"status": getattr(resp, "status_code", None), "preview": getattr(resp, "text", "")[:200]} else: logger.info("✅ 请求成功。") return getattr(resp, "json", lambda: resp)() if hasattr(resp, "json") else resp