首次提交

This commit is contained in:
sanse 2025-10-28 09:30:54 +08:00
commit 9e60caaeeb
65 changed files with 3214 additions and 0 deletions

38
.env Normal file
View File

@ -0,0 +1,38 @@
DEBUG=true
HOST=0.0.0.0 # 配置 NoneBot 监听的 IP / 主机名
PORT=43001 # 配置 NoneBot 监听的端口
COMMAND_START=["","/"] # 配置命令起始字符
COMMAND_SEP=["."] # 配置命令分割字符
DRIVER=~fastapi+~httpx
LOG_LEVEL=DEBUG
SUPERUSERS=[] # 超级管理员
NICKNAME=[] # 机器人昵称
# QQ官方机器人配置文件
# QQ_IS_SANDBOX=true
QQ_BOTS='[{
"id": "102084781",
"token": "TqIcQANJ9YcK4TnZqikOtPI7QAzmZumQ",
"secret": "eTI7wlbRH7xndULC3ulcUME6yqiaTMF8",
"intent": {
"c2c_group_at_messages": true
},
"use_websocket": false
}]'
APSCHEDULER_CONFIG={"apscheduler.timezone": "Asia/Shanghai"}
# .env.prod
savedata = data/ram_data # 保存路径,相对路径,此处为保存至运行目录下的 "Yuni/savedata/" 下,默认为 ""
ram_policy = 0 # 授权策略 0 为根据可用功能 1 为根据服务级别,默认为 0
ram_cmd = ram # 指令名,或者叫触发词,默认为 ram
ram_add = -a # 启用功能(根据可用功能),默认为 -a
ram_rm = -r # 禁用功能(根据可用功能),默认为 -r
ram_show = -s # 展示群功能状态(根据可用功能),默认为 -s
ram_available = -v # 展示全局可用功能(根据可用功能),默认为 -v

0
README.md Normal file
View File

22
bot.py Normal file
View File

@ -0,0 +1,22 @@
import nonebot
from nonebot.adapters.qq import Adapter as QQ
from nonebot.log import logger
# 初始化 NoneBot 以及 数据库
nonebot.init()
app = nonebot.get_asgi()
# 注册适配器
driver = nonebot.get_driver()
driver.register_adapter(QQ)
# 加载自定义插件
nonebot.load_plugins("src/plugins") # 加载bot自定义插件
# 加载配置文件中的插件
nonebot.load_from_toml("pyproject.toml")
if __name__ == "__main__":
logger.warning("Bot启动")
nonebot.run()

15
dpces.txt Normal file
View File

@ -0,0 +1,15 @@
pip install nonebot-plugin-rauthman
pip install nonebot-plugin-user
pip install nonebot_plugin_userinfo
pip install nonebot_plugin_session
pip install nonebot-plugin-send-anything-anywhere
pip install nonebot-plugin-apscheduler
pip install nonebot-adapter-qq
pip install nonebot2[fastapi]

73
pyproject.toml Normal file
View File

@ -0,0 +1,73 @@
[tool.poetry]
authors = ["sansenhoshi <sansenhoshi@outlook.com>"]
description = "基于 NoneBot2 的 BF查询机器人"
license = "MIT"
name = "BF_BOT"
readme = "README.md"
repository = ""
version = "0.0.1"
[tool.poetry.dependencies]
python = "^3.13"
eorzeaenv = "^2.2.8"
matplotlib = "^3.7.1"
expiringdict = "^1.2.2"
nonebot2 = { extras = ["httpx", "fastapi", "websockets"], version = "^2.1.0" }
nb-cli = "^1.2.3"
nonebot-adapter-onebot = "2.2.4"
nonebot-plugin-user = "^0.0.1"
nonebot-plugin-apscheduler = "^0.3.0"
nonebot-plugin-send-anything-anywhere = "^0.3.1"
nonebot-plugin-alconna = "^0.24.0"
nonebot-plugin-session = "^0.1.0"
nonebot-plugin-userinfo = "^0.1.0"
[tool.poetry.group.dev.dependencies]
nonebug = "^0.3.3"
pytest-cov = "^4.0.0"
pytest-mock = "^3.6.1"
pytest-xdist = "^3.0.2"
pytest-asyncio = "^0.21.0"
respx = "^0.20.1"
freezegun = "^1.2.2"
nonebug-saa = { git = "https://github.com/MountainDash/nonebug-saa.git" }
[tool.nonebot]
adapters = [
{ name = "QQ", module_name = "nonebot.adapter.qq" }
]
plugins = [
"nonebot_plugin_apscheduler",
"nonebot_plugin_saa",
"nonebot_plugin_session",
"nonebot_plugin_userinfo",
"nonebot_plugin_user",
"nonebot_plugin_rauthman",
]
[tool.black]
line-length = 88
[tool.isort]
profile = "black"
line_length = 88
skip_gitignore = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
[tool.pyright]
typeCheckingMode = "basic"
[tool.ruff]
select = ["E", "W", "F", "UP", "C", "T", "PYI", "Q"]
ignore = ["E402", "E501", "E711", "C901", "UP037"]
[build-system]
build-backend = "poetry.core.masonry.api"
requires = ["poetry-core>=1.0.0"]

View File

@ -0,0 +1,115 @@
# import time
from nonebot import on_command, require
from nonebot.adapters import Message
from nonebot.matcher import Matcher
from nonebot.params import CommandArg
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot.log import logger
import json
require("nonebot_plugin_alconna")
from nonebot_plugin_alconna import UniMessage
from .data import *
from .data_utils import *
from .image_builder import *
from .text_utils import *
__plugin_meta__ = PluginMetadata(
name="BF查询",
description="战地341520426",
usage="",
extra={
},
)
query = on_command("bft", rule=to_me(), aliases={"bf3", "bf4", "bfv", "bf1", "bf2042", "bf6"}, block=True)
bind = on_command("bind", rule=to_me(), aliases={"绑定"}, block=True)
bf_dict = {
"bf3": "战地3",
"bf4": "战地4",
"bf1": "战地1",
"bfv": "战地5",
"bf2042": "战地2042",
"bf6": "战地6",
}
@query.handle()
async def handle_function(matcher: Matcher, msg: Message = CommandArg()):
cmd = matcher.state["_prefix"]["command"][0]
game = cmd
content = msg.extract_plain_text()
play_stat = ""
if cmd == "bf3":
play_stat = await get_data_bf3(content, "pc")
elif cmd == "bf4":
play_stat = await get_data_bf4(content, "pc")
elif cmd == "bf1":
play_stat = await get_data_bf1(content, "pc")
elif cmd == "bfv":
play_stat = await get_data_bfv(content, "pc")
elif cmd == "bf2042":
play_stat = await get_data_bfv(content, "pc")
elif cmd == "bf6":
flag, play_stat = await get_data_bf6(content, 0)
if flag == 0:
msg = '检测到多个同名用户\n' + '\n'.join(
f'用户名:{info["name"]}-等级:{info["rank"]}-UID:{info["uid"]}' for info in play_stat)
await UniMessage.text(msg).finish()
elif flag == 2:
msg = "未找到该玩家名"
await UniMessage.text(msg).finish()
else:
await UniMessage.text("指令异常").finish()
if "errors" in play_stat:
logger.warning(play_stat['errors'][0])
msg = play_stat['errors'][0]
else:
if cmd == "bf6":
img = build_bf6_simple_card(play_stat)
else:
weapon, vehicle = await get_best_weapon_and_best_vehicle(play_stat)
player = play_stat['userName']
pid = play_stat['userId']
kd = play_stat['killDeath']
kpm = play_stat['killsPerMinute']
spm = play_stat['scorePerMinute']
acc = play_stat['accuracy']
# 战地3 特化
if cmd == 'bf3':
head_shots = play_stat['headShots']
else:
head_shots = play_stat['headshots']
rank = play_stat['rank']
time_play = convert_to_hours(play_stat['timePlayed'])
kills = int(play_stat['kills'])
kill_assists = int(play_stat['killAssists'])
revives = int(play_stat['revives'])
wins = int(play_stat['wins'])
loses = int(play_stat['loses'])
best_weapon = weapon
best_vehicle = vehicle
best_class = play_stat['bestClass']
longest_head_shot = play_stat['longestHeadShot']
highest_ill_streak = play_stat['highestKillStreak']
# await stats_calculator(play_stat)
stat_data, level_designation = await stats_calculator(play_stat, cmd)
destroyed = await get_vehicle_destroyed(play_stat["vehicles"])
img = await build_stats_card(game, player, pid, kd, kpm, spm, acc, head_shots, rank, time_play, kills,
kill_assists, revives, wins, loses, destroyed, best_weapon, best_vehicle,
best_class,
longest_head_shot, highest_ill_streak, stat_data, level_designation)
await UniMessage.image(raw=img.getvalue()).finish()
await UniMessage.text(f"\n玩家【{content}】的【{bf_dict[cmd]}】数据\n{msg}").send()
@bind.handle()
async def bind_user(matcher: Matcher, msg: Message = CommandArg()):
matcher.state.get()

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -0,0 +1,380 @@
import asyncio
import os
from pathlib import Path
from typing import List, Dict, Optional
from nonebot import logger
from curl_cffi import AsyncSession, CurlError
# ---------- 配置 ----------
url_search = "https://api.tracker.gg/api/v2/bf6/standard/search?platform={platform}&query={name}"
url_overview = "https://api.tracker.gg/api/v2/bf6/standard/profile/ign/{param}"
file_path = os.path.dirname(__file__).replace("\\", "/")
exported_cookie_path = Path(f"{file_path}/cookies/tracker.txt") # 你导出的 cookies.txt
CUSTOM_UA = """Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36"""
# 你从抓包复制下来的完整请求头(不需要改格式)
RAW_BROWSER_HEADERS = """
accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
accept-encoding: gzip, deflate, br, zstd
accept-language: zh-CN,zh;q=0.9
cache-control: max-age=0
referer: https://account.tracker.gg/
sec-ch-ua: "Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-dest: document
sec-fetch-mode: navigate
sec-fetch-site: same-origin
sec-fetch-user: ?1
upgrade-insecure-requests: 1
user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36
""" # ← 这里可以直接粘贴你浏览器的 headers不用转义
# =============================================================
def parse_raw_headers(raw: str) -> Dict[str, str]:
"""
将抓包的多行 headers 文本解析为 Python dict
自动去除 :authority:method:path:scheme 等伪头
"""
headers = {}
skip_prefix = (":authority", ":method", ":path", ":scheme")
for line in raw.strip().splitlines():
if not line.strip():
continue
if any(line.lower().startswith(p) for p in skip_prefix):
continue
if ":" not in line:
continue
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return headers
def load_cookies_from_txt(path: Path) -> List[Dict[str, str]]:
"""
cookies.txt 文件读取 cookies支持 Netscape 格式 / name=value 格式
"""
cookies = []
if not path.exists():
return cookies
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split("\t")
if len(parts) >= 7:
cookies.append({"name": parts[5], "value": parts[6]})
elif "=" in line:
k, v = line.split("=", 1)
cookies.append({"name": k.strip(), "value": v.strip()})
return cookies
def build_cookie_header(cookies: List[Dict[str, str]]) -> str:
return "; ".join(f"{c['name']}={c['value']}" for c in cookies)
def build_headers(
raw_headers: str,
cookies: Optional[List[Dict[str, str]]] = None,
ua_override: Optional[str] = None,
) -> Dict[str, str]:
"""
综合生成请求头
1. RAW_BROWSER_HEADERS 解析 headers
2. 若存在 cookies则替换 cookie 字段
3. 若存在 ua_override则覆盖 UA 字段
"""
headers = parse_raw_headers(raw_headers)
if cookies:
headers["cookie"] = build_cookie_header(cookies)
if ua_override:
headers["user-agent"] = ua_override
return headers
async def fetch_with_cookies(url: str, headers: dict, impersonate: str = "chrome110", timeout: int = 15):
async with AsyncSession() as session:
try:
response = await session.get(url, headers=headers, impersonate=impersonate, timeout=timeout)
return response
except CurlError as e:
return {"error": f"cURL 错误: {e}"}
except Exception as e:
return {"error": f"未知错误: {e}"}
def is_challenge_response(resp) -> bool:
if isinstance(resp, dict) and "error" in resp:
return True
try:
status = getattr(resp, "status_code", None)
text = getattr(resp, "text", "") or ""
if status in (429, 403):
return True
low = text.lower()
if any(k in low for k in ["cloudflare", "captcha", "cf-challenge", "just a moment"]):
return True
except Exception:
return True
return False
async def search_user_with_fallback(url: str):
cookies = load_cookies_from_txt(exported_cookie_path)
headers = build_headers(RAW_BROWSER_HEADERS, cookies=cookies, ua_override=CUSTOM_UA)
logger.info(f"请求 URL: {url}")
logger.info(f"使用 headers 字段数: {len(headers)}")
resp = await fetch_with_cookies(url, headers)
if is_challenge_response(resp):
logger.warning("⚠️ Cloudflare 拦截或 cookies 失效。")
if isinstance(resp, dict):
return resp
return {"status": resp.status_code, "preview": resp.text[:200]}
else:
logger.info("✅ 请求成功。")
return getattr(resp, "json", lambda: resp)() if hasattr(resp, "json") else resp
async def format_data(response, search_type):
status = ""
platform_info = response['data']['platformInfo']
segments = response['data']['segments']
overview = ""
weapons = []
vehicles = []
gamemodes = []
gadgets = []
kits = []
maps = []
for segment in segments:
if segment['type'] == 'overview':
overview = segment
elif segment['type'] == 'weapon':
weapons.append(segment)
elif segment['type'] == 'vehicle':
vehicles.append(segment)
elif segment['type'] == 'gamemode':
gamemodes.append(segment)
elif segment['type'] == 'gadget':
gadgets.append(segment)
elif segment['type'] == 'kit':
kits.append(segment)
elif segment['type'] == 'level':
maps.append(segment)
if search_type == 0:
status = await get_overview(platform_info, overview, weapons, vehicles, gamemodes, gadgets, kits, maps)
if search_type == 1:
status = await get_weapons(platform_info, overview, weapons, vehicles, gamemodes, gadgets, kits, maps)
return status
async def get_user_id_2_data(title_id, search_type):
url = url_overview.format(param=title_id)
info = await search_user_with_fallback(url)
info = await format_data(info, search_type)
return info
# 调用此方法获取格式化后的数据(调用此方法即可)
async def get_info(player, search_type):
# 先使用橘子搜索
url = url_search.format(platform="orign", name=player)
result = await search_user_with_fallback(url)
# 无结果再使用steam搜索
if "errors" in result:
url = url_search.format(platform="steam", name=player)
result = await search_user_with_fallback(url)
title_id_list = []
logger.warning(result)
if "errors" in result:
return 3, '查询异常'
for res in result['data']:
if res['status'] is not None:
name = res['platformUserHandle']
uid = res['titleUserId']
status = res['status'].strip().split('', 1)[0].replace("Rank", "")
user = {
'name': name,
'rank': status,
'uid': uid,
}
title_id_list.append(user)
if len(title_id_list) > 1:
user_list = sorted(title_id_list, key=lambda k: k['rank'], reverse=True)
msg = '\n'.join(f'用户名:{info["name"]}-等级:{info["rank"]}-UID:{info["uid"]}\n' for info in user_list)
logger.info(f"检测到多个同名用户{msg}")
return 0, title_id_list
elif len(title_id_list) == 1:
logger.info(f"单用户{title_id_list}")
info = await get_user_id_2_data(title_id_list[0]['uid'], search_type)
return 1, info
else:
logger.info(f"未查询到用户")
return 2, '未查询到用户'
async def get_overview(platform_info, overview, weapons, vehicles, gamemodes, gadgets, kits, maps):
top_kill_weapon = sorted(weapons, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_weapon = {
"名称": top_kill_weapon['metadata']['name'],
"击杀": top_kill_weapon['stats']['kills']['value'],
"KPM": top_kill_weapon['stats']['killsPerMinute']['value'],
"时长": top_kill_weapon['stats']['timePlayed']['displayValue'],
"爆头率": top_kill_weapon['stats']['headshotPercentage']['displayValue'],
"命中率": top_kill_weapon['stats']['shotsAccuracy']['displayValue']
}
top_kill_vehicle = sorted(vehicles, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_vehicle = {
"名称": top_kill_vehicle['metadata']['name'],
"击杀": top_kill_vehicle['stats']['kills']['value'],
"KPM": top_kill_vehicle['stats']['killsPerMinute']['value'],
"时长": top_kill_vehicle['stats']['timePlayed']['displayValue'],
"碾压": top_kill_vehicle['stats']['roadKills']['value'],
"摧毁": top_kill_vehicle['stats']['destroyedWith']['value'],
}
shot_count = overview['stats']['shotsFired']['value']
hit_count = overview['stats']['shotsHit']['value']
logger.info(f"开火数:{shot_count}")
logger.info(f"命中数:{hit_count}")
acc = round((hit_count / shot_count) * 100, 2)
logger.info(f"命中率:{acc}%")
player_info = {
'玩家名称': platform_info['platformUserHandle'],
'游玩平台': platform_info['platformSlug'],
'游戏等级': overview['stats']['careerPlayerRank']['displayValue'],
'游玩时长': overview['stats']['timePlayed']['displayValue'],
'游玩场次': overview['stats']['matchesPlayed']['displayValue'],
'对局胜率': overview['stats']['wlPercentage']['displayValue'],
'对局得分': overview['stats']['score']['displayValue'],
'玩家K/D': overview['stats']['kdRatio']['value'],
'玩家KPM': overview['stats']['killsPerMinute']['value'],
'玩家SPM': overview['stats']['scorePerMinute']['value'],
'真人击杀': overview['stats']['playerKills']['displayValue'],
'击杀总计': overview['stats']['kills']['displayValue'],
'目标占领': overview['stats']['objectivesCaptured']['value'],
'游戏助攻': overview['stats']['assists']['displayValue'],
'救援数量': overview['stats']['revives']['displayValue'],
'最高连杀': overview['stats']['multiKills']['value'],
'命中率': f"{acc}%",
'爆头率': overview['stats']['headshotPercentage']['displayValue'],
'最佳武器': best_weapon,
'最佳载具': best_vehicle,
}
return player_info
async def get_weapons(platform_info, overview, weapons, vehicles, gamemodes, gadgets, kits, maps):
top_kill_weapon = sorted(weapons, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_weapon = {
"名称": top_kill_weapon['metadata']['name'],
"击杀": top_kill_weapon['stats']['kills']['value'],
"KPM": top_kill_weapon['stats']['killsPerMinute']['value'],
"时长": top_kill_weapon['stats']['timePlayed']['displayValue'],
"爆头率": top_kill_weapon['stats']['headshotPercentage']['displayValue'],
"命中率": top_kill_weapon['stats']['shotsAccuracy']['displayValue']
}
top_kill_vehicle = sorted(vehicles, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_vehicle = {
"名称": top_kill_vehicle['metadata']['name'],
"击杀": top_kill_vehicle['stats']['kills']['value'],
"KPM": top_kill_vehicle['stats']['killsPerMinute']['value'],
"时长": top_kill_vehicle['stats']['timePlayed']['displayValue'],
"碾压": top_kill_vehicle['stats']['roadKills']['value'],
"摧毁": top_kill_vehicle['stats']['destroyedWith']['value'],
}
shot_count = overview['stats']['shotsFired']['value']
hit_count = overview['stats']['shotsHit']['value']
logger.info(f"开火数:{shot_count}")
logger.info(f"命中数:{hit_count}")
acc = round((hit_count / shot_count) * 100, 2)
logger.info(f"命中率:{acc}%")
player_info = {
'玩家名称': platform_info['platformUserHandle'],
'游玩平台': platform_info['platformSlug'],
'游戏等级': overview['stats']['careerPlayerRank']['displayValue'],
'游玩时长': overview['stats']['timePlayed']['displayValue'],
'游玩场次': overview['stats']['matchesPlayed']['displayValue'],
'对局胜率': overview['stats']['wlPercentage']['displayValue'],
'对局得分': overview['stats']['score']['displayValue'],
'玩家K/D': overview['stats']['kdRatio']['value'],
'玩家KPM': overview['stats']['killsPerMinute']['value'],
'玩家SPM': overview['stats']['scorePerMinute']['value'],
'真人击杀': overview['stats']['playerKills']['displayValue'],
'击杀总计': overview['stats']['kills']['displayValue'],
'目标占领': overview['stats']['objectivesCaptured']['value'],
'游戏助攻': overview['stats']['assists']['displayValue'],
'救援数量': overview['stats']['revives']['displayValue'],
'最高连杀': overview['stats']['multiKills']['value'],
'命中率': f"{acc}%",
'爆头率': overview['stats']['headshotPercentage']['displayValue'],
'最佳武器': best_weapon,
'最佳载具': best_vehicle,
}
return player_info
async def get_vehicles(platform_info, overview, weapons, vehicles, gamemodes, gadgets, kits, maps):
top_kill_weapon = sorted(weapons, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_weapon = {
"名称": top_kill_weapon['metadata']['name'],
"击杀": top_kill_weapon['stats']['kills']['value'],
"KPM": top_kill_weapon['stats']['killsPerMinute']['value'],
"时长": top_kill_weapon['stats']['timePlayed']['displayValue'],
"爆头率": top_kill_weapon['stats']['headshotPercentage']['displayValue'],
"命中率": top_kill_weapon['stats']['shotsAccuracy']['displayValue']
}
top_kill_vehicle = sorted(vehicles, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_vehicle = {
"名称": top_kill_vehicle['metadata']['name'],
"击杀": top_kill_vehicle['stats']['kills']['value'],
"KPM": top_kill_vehicle['stats']['killsPerMinute']['value'],
"时长": top_kill_vehicle['stats']['timePlayed']['displayValue'],
"碾压": top_kill_vehicle['stats']['roadKills']['value'],
"摧毁": top_kill_vehicle['stats']['destroyedWith']['value'],
}
shot_count = overview['stats']['shotsFired']['value']
hit_count = overview['stats']['shotsHit']['value']
logger.info(f"开火数:{shot_count}")
logger.info(f"命中数:{hit_count}")
acc = round((hit_count / shot_count) * 100, 2)
logger.info(f"命中率:{acc}%")
player_info = {
'玩家名称': platform_info['platformUserHandle'],
'游玩平台': platform_info['platformSlug'],
'游戏等级': overview['stats']['careerPlayerRank']['displayValue'],
'游玩时长': overview['stats']['timePlayed']['displayValue'],
'游玩场次': overview['stats']['matchesPlayed']['displayValue'],
'对局胜率': overview['stats']['wlPercentage']['displayValue'],
'对局得分': overview['stats']['score']['displayValue'],
'玩家K/D': overview['stats']['kdRatio']['value'],
'玩家KPM': overview['stats']['killsPerMinute']['value'],
'玩家SPM': overview['stats']['scorePerMinute']['value'],
'真人击杀': overview['stats']['playerKills']['displayValue'],
'击杀总计': overview['stats']['kills']['displayValue'],
'目标占领': overview['stats']['objectivesCaptured']['value'],
'游戏助攻': overview['stats']['assists']['displayValue'],
'救援数量': overview['stats']['revives']['displayValue'],
'最高连杀': overview['stats']['multiKills']['value'],
'命中率': f"{acc}%",
'爆头率': overview['stats']['headshotPercentage']['displayValue'],
'最佳武器': best_weapon,
'最佳载具': best_vehicle,
}
return player_info

View File

@ -0,0 +1,173 @@
import json
import asyncio
import os
from curl_cffi import requests, AsyncSession, CurlError
from nonebot import logger
url_search = "https://api.tracker.gg/api/v2/bf6/standard/search?platform=origin&query={name}&autocomplete=true"
url_overview = "https://api.tracker.gg/api/v2/bf6/standard/profile/ign/{user_id}?"
filepath = os.path.dirname(__file__).replace("\\", "/")
def load_cookies_from_txt(path: str):
cookies = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
parts = line.strip().split("\t")
if len(parts) >= 7:
cookies.append({"name": parts[5], "value": parts[6]})
return cookies
def build_cookie_header(cookies):
return "; ".join([f"{c['name']}={c['value']}" for c in cookies])
async def fetch_url(url: str, headers: dict):
async with AsyncSession() as session:
try:
response = await session.get(url, headers=headers, impersonate="chrome110")
return response
except CurlError as e:
logger.warning("cURL 错误:", e)
return None
async def search_user(name: str):
cookies = load_cookies_from_txt(f"{filepath}/cookies/tracker.txt")
cookie_header = build_cookie_header(cookies)
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/139.0.0.0 Safari/537.36"
),
"cookie": cookie_header,
"referer": "https://tracker.gg/",
}
search_url = url_search.format(name=name)
response = await fetch_url(search_url, headers=headers)
if not response:
logger.warning("请求失败")
return
if response.status_code == 200:
logger.info("搜索成功")
json_data = response.json()
title_id_list = []
for res in json_data['data']:
title_id_list.append(res['titleUserId'])
for title_id in title_id_list:
flag, info = await get_data(title_id)
if flag:
return info
else:
continue
else:
logger.warning(f"请求失败,状态码: {response.status_code}")
logger.warning(response.text)
async def get_data(user_id: str):
cookies = load_cookies_from_txt(f"{filepath}/cookies/tracker.txt")
cookie_header = build_cookie_header(cookies)
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
"user-agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/139.0.0.0 Safari/537.36"
),
"cookie": cookie_header,
"referer": "https://tracker.gg/",
}
get_url = url_overview.format(user_id=user_id)
response = await fetch_url(get_url, headers=headers)
if not response:
logger.warning("获取失败")
return False
if response.status_code == 200:
logger.info("获取成功")
platform_info = response.json()['data']['platformInfo']
segments = response.json()['data']['segments']
overview = ""
weapons = []
vehicles = []
gamemodes = []
gadgets = []
kits = []
maps = []
for segment in segments:
if segment['type'] == 'overview':
overview = segment
elif segment['type'] == 'weapon':
weapons.append(segment)
elif segment['type'] == 'vehicle':
vehicles.append(segment)
elif segment['type'] == 'gamemode':
gamemodes.append(segment)
elif segment['type'] == 'gadget':
gadgets.append(segment)
elif segment['type'] == 'kit':
kits.append(segment)
elif segment['type'] == 'level':
maps.append(segment)
top_kill_weapon = sorted(weapons, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_weapon = {
"名称": top_kill_weapon['metadata']['name'],
"击杀": top_kill_weapon['stats']['kills']['value'],
"KPM": top_kill_weapon['stats']['killsPerMinute']['value'],
"时长": top_kill_weapon['stats']['timePlayed']['displayValue'],
"爆头率": top_kill_weapon['stats']['headshotPercentage']['displayValue'],
"命中率": top_kill_weapon['stats']['shotsAccuracy']['displayValue']
}
top_kill_vehicle = sorted(vehicles, key=lambda k: k['stats']['kills']['value'], reverse=True)[0]
best_vehicle = {
"名称": top_kill_vehicle['metadata']['name'],
"击杀": top_kill_vehicle['stats']['kills']['value'],
"KPM": top_kill_vehicle['stats']['killsPerMinute']['value'],
"时长": top_kill_vehicle['stats']['timePlayed']['displayValue'],
"碾压": top_kill_vehicle['stats']['roadKills']['value'],
}
player_info = {
'玩家': platform_info['platformUserHandle'],
'平台': platform_info['platformSlug'],
'等级': overview['stats']['careerPlayerRank']['displayValue'],
'时长': overview['stats']['timePlayed']['displayValue'],
'场次': overview['stats']['matchesPlayed']['displayValue'],
'胜率': overview['stats']['wlPercentage']['displayValue'],
'总得分': overview['stats']['score']['displayValue'],
'K/D': overview['stats']['kdRatio']['value'],
'KPM': overview['stats']['killsPerMinute']['value'],
'SPM': overview['stats']['scorePerMinute']['value'],
'击杀': overview['stats']['kills']['displayValue'],
'助攻': overview['stats']['assists']['displayValue'],
'救援': overview['stats']['revives']['displayValue'],
'命中率': f"{round(overview['stats']['shotsFired']['value'] / overview['stats']['shotsHit']['value'])}%",
'爆头率': overview['stats']['headshotPercentage']['displayValue'],
'最佳武器': best_weapon,
'最佳载具': best_vehicle,
}
return True, player_info
else:
logger.warning(f"请求失败,状态码: {response.status_code}")
logger.warning(response.text)
if __name__ == "__main__":
name = "HEIZI-HARUSAME"
asyncio.run(search_user(name))

View File

@ -0,0 +1,233 @@
import json
from nonebot import logger
import os
from cffi.model import voidp_type
from jinja2 import Environment, FileSystemLoader
from playwright.async_api import async_playwright, ViewportSize
import asyncio
from Core.Utils import get_root_path
from Core.BrowserCore import BrowserTab
from curl_cffi import requests, AsyncSession, CurlError
import base64
b = BrowserTab("https://battlefieldtracker.com/bf2042/profile/origin/Sansorano_Yume/overview")
url_zy = "https://battlefieldtracker.com/bf2042/profile/origin/{name}/overview"
url_over = "https://api.tracker.gg/api/v2/bf2042/standard/profile/origin/{name}"
url_weapon = "https://api.tracker.gg/api/v2/bf2042/standard/profile/origin/{name}/segments/weapon"
url_vehicle = "https://api.tracker.gg/api/v2/bf2042/standard/profile/origin/{name}/segments/vehicle"
url_soldier = "https://api.tracker.gg/api/v2/bf2042/standard/profile/origin/{name}/segments/soldier"
url_history = "https://api.tracker.gg/api/v2/bf2042/standard/profile/origin/{name}/history"
url_ban = "https://api.gametools.network/bfban/checkban/?names={name}"
url_dict = {
"overview": url_over,
"weapons": url_weapon,
"vehicle": url_vehicle,
"soldier": url_soldier,
"ban": url_ban,
}
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5",
"priority": "u=1, i",
"sec-ch-ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0"
}
classesList = {
"Mackay": " 麦凯",
"Angel": " 天使",
"Falck": " 法尔克",
"Paik": " 白智秀",
"Sundance": " 日舞",
"Dozer": " 推土机",
"Rao": " 拉奥",
"Lis": " 莉丝",
"Irish": "爱尔兰佬",
"Crawford": "克劳福德",
"Boris": " 鲍里斯",
"Zain": " 扎因",
"Casper": " 卡斯帕",
"Blasco": "布拉斯科",
"BF3 Recon": "BF3 侦察",
"BF3 Support": "BF3 支援",
"BF3 Assault": "BF3 突击",
"BF3 Engineer": "BF3 工程",
"BC2 Recon": "BC2 侦察",
"BC2 Medic": "BC2 医疗",
"BC2 Assault": "BC2 突击",
"BC2 Engineer": "BC2 工程",
"1942 Anti-tank": "1942 反坦克",
"1942 Assault": "1942 突击",
"1942 Medic": "1942 医疗",
"1942 Engineer": "1942 工程",
"1942 Scout": "1942 侦察",
}
def get_base64(img_path: str, deafult: str = None):
path = f"{get_root_path()}/{img_path}"
if not os.path.exists(path):
if deafult is None:
return False, None
return get_base64(deafult)
with open(path, "rb") as f:
return True, base64.b64encode(f.read()).decode()
async def fetch_url(url: str, name: str, browser: str = "chrome110"):
async with AsyncSession() as session:
try:
response = await session.get(url, impersonate=browser)
return {
"name": name,
"status": response.status_code,
"content": response.json(),
}
except TimeoutError:
return {"name": name, "status": 0, "error": "请求超时"}
except ConnectionError:
return {"name": name, "status": 0, "error": "连接失败"}
except CurlError as e:
return {"name": name, "status": 0, "error": f"底层 cURL 错误: {str(e)}"}
except Exception as e:
return {"name": name, "status": 0, "error": f"未知错误: {str(e)}"}
async def bf2042_state(cmd):
name = cmd
raw = {}
logger.info("开始get数据")
task = [fetch_url(v.format(name=name), k) for k, v in url_dict.items()]
res = await asyncio.gather(*task)
logger.info("开始解析数据")
if raw is None or isinstance(raw, str):
await event.send("没有找到该玩家或者服务器请求失败,请等待")
return
flag_cf = False
has_player = False
for i in res:
if i["status"] == 404:
try:
json.loads(i["content"])
has_player = True
except json.JSONDecodeError:
flag_cf = True
return
if i["status"] == 0 or i["status"] == 400:
has_player = True
if flag_cf:
await event.send("当前数据获取失败,正在过盾,请等待响应")
await b.cf_bypasser.bypass_turnstile()
b.page.get_screenshot("img/hc.png")
if b.cf_bypasser.is_bypassed() or b.cf_bypasser.is_turnstile():
await event.send("已获取认证,请重新查询")
else:
await event.send(Message(TextSegment("获取失败已返回结果图")).append(ImgSegment("img/hc.png")))
return
if has_player:
await event.send("没有找到该玩家,,该文件的名字可能出错或者该玩家并未开启隐私设置")
return
logging.info(1)
overview = res[0]["content"]["data"]["segments"][0]["stats"]
logging.info(2)
res[1]["content"]["data"].sort(key=lambda x: x["stats"]["kills"]["value"], reverse=True)
weapons = res[1]["content"]["data"][:6]
# logging.info(weapons)
logging.info(3)
res[2]["content"]["data"].sort(key=lambda x: x["stats"]["kills"]["value"], reverse=True)
veh = res[2]["content"]["data"][:6]
logging.info(veh)
logging.info(4)
res[3]["content"]["data"].sort(key=lambda x: x["stats"]["kills"]["value"], reverse=True)
class_data = res[3]["content"]["data"][0]
# logging.info(class_data)
logging.info(5)
cheat_hack = res[4]["content"]["names"][name.lower()]["hacker"]
logging.info(6)
percentile_kills = overview["kills"]["percentile"]
kills_t = f"Top {round(100 - percentile_kills, 2)}" if percentile > 50 else f"Bottom {percentile}"
kills_t = f"{kills_t}%"
percentile_deaths = overview["deaths"]["percentile"]
death_t = f"Top {round(100 - percentile_deaths, 2)}" if percentile > 50 else f"Bottom {percentile}"
death_t = f"{death_t}%"
percentile_assists = overview["assists"]["percentile"]
zg_t = f"Top {round(100 - percentile_assists, 2)}" if percentile > 50 else f"Bottom {percentile}"
zg_t = f"{zg_t}%"
percentile_wins = overview["wins"]["percentile"]
slcs_t = f"Top {round(100 - percentile_wins, 2)}" if percentile > 50 else f"Bottom {percentile}"
slcs_t = f"{slcs_t}%"
percentile_losses = overview["losses"]["percentile"]
sbcs_t = f"Top {round(100 - percentile_losses, 2)}" if percentile > 50 else f"Bottom {percentile}"
sbcs_t = f"{sbcs_t}%"
percentile_revives = overview["revives"]["percentile"]
fhcs_t = f"Top {round(100 - percentile_revives, 2)}" if percentile > 50 else f"Bottom {percentile}"
fhcs_t = f"{fhcs_t}%"
data = {
"playerName": name,
"level": overview["level"]["displayValue"],
"time": overview["timePlayed"]["displayValue"],
"kd": overview["kdRatio"]["displayValue"],
"kd_real": overview["humanKdRatio"]["displayValue"],
"kpm": overview["killsPerMinute"]["displayValue"],
"win_acc": overview["wlPercentage"]["displayValue"],
"kills": overview["kills"]["displayValue"],
"kills_p": overview["kills"]["percentile"],
"kills_t": kills_t,
"death": overview["deaths"]["displayValue"],
"death_p": overview["deaths"]["percentile"],
"death_t": death_t,
"ai_kills": overview["aiKills"]["displayValue"],
"zr_kills": overview["humanKills"]["displayValue"],
"zk_kills": overview["vehicleKills"]["displayValue"],
"dc_kills": overview["multiKills"]["displayValue"],
"jz_kills": overview["meleeKills"]["displayValue"],
"zj_kills": overview["roadKills"]["displayValue"],
"zg": overview["assists"]["displayValue"],
"zg_p": overview["assists"]["percentile"],
"zg_t": zg_t,
"slcs": overview["wins"]["displayValue"],
"slcs_p": overview["wins"]["percentile"],
"slcs_t": slcs_t,
"sbcs": overview["losses"]["displayValue"],
"sbcs_p": overview["losses"]["percentile"],
"sbcs_t": sbcs_t,
"fhcs": overview["revives"]["displayValue"],
"fhcs_p": overview["revives"]["percentile"],
"fhcs_t": fhcs_t,
"zsh": overview["damageDealt"]["displayValue"],
"mfzsh": round(overview["damageDealt"]["value"] / (overview["timePlayed"]["value"] / 60), 2),
"chzj": overview["vehiclesDestroyed"]["displayValue"],
"kjjs": overview["scopedKills"]["displayValue"],
"mbzlsj": overview["objectiveTime"]["displayValue"],
"zlmb": overview["objectivesCaptured"]["displayValue"],
"fsmb": overview["defendedObjectives"]["displayValue"],
"a_c_c": f"{overview['objectivesArmed']['displayValue']}/{overview['objectivesDisarmed']['displayValue']}/{overview['objectivesDestroyed']['displayValue']}",
"fsqy": overview["defendedSectors"]["displayValue"],
"sqqb": overview["intelPickedUp"]["displayValue"],
"tqqb": overview["intelExtracted"]["displayValue"],
"btl": overview["headshotPercentage"]["displayValue"],
"bts": overview["headshotKills"]["displayValue"],
"w_data": weapons,
"v_data": veh,
"zjmz": classesList[class_data["metadata"]["name"].strip()] if class_data["metadata"][
"name"].strip() in classesList else
class_data["metadata"]["name"],
"zj_kd": class_data["stats"]["kdRatio"]["displayValue"],
"zj_kpm": class_data["stats"]["killsPerMinute"]["displayValue"],
"zjia_kills": class_data["stats"]["kills"]["displayValue"],
"zj_time": class_data["stats"]["timePlayed"]["displayValue"],
}
logger.info("解析数据完成")
return data

View File

@ -0,0 +1,25 @@
# Netscape HTTP Cookie File
# https://curl.haxx.se/rfc/cookie_spec.html
# This is a generated file! Do not edit.
tracker.gg FALSE / FALSE 1763712553 _lr_env_src_ats false
tracker.gg FALSE / FALSE 1766200813 nitro-uid %7B%22TDID%22%3A%2238381d61-f87b-42c2-9a52-7764b7e111ac%22%2C%22TDID_LOOKUP%22%3A%22TRUE%22%2C%22TDID_CREATED_AT%22%3A%222025-09-21T03%3A20%3A13%22%7D
tracker.gg FALSE / FALSE 1766200813 nitro-uid_cst znv0HA%3D%3D
.tracker.gg TRUE / TRUE 1794816553 _scor_uid d3fe4e72704a49548e3d944283183d16
.tracker.gg TRUE / FALSE 1792552814 ncmp.domain tracker.gg
.tracker.gg TRUE / TRUE 1792657486 __stripe_mid f3195526-098b-451a-a803-afe861f3546981d4b7
.tracker.gg TRUE / FALSE 1795681481 _ga GA1.1.1090555935.1761016835
.tracker.gg TRUE / FALSE 1795576862 _ga_4115T4MP2X GS2.1.s1761016839$o1$g1$t1761016862$j37$l0$h0
tracker.gg FALSE / FALSE 1766305486 pbjs-unifiedid %7B%22TDID%22%3A%2238381d61-f87b-42c2-9a52-7764b7e111ac%22%2C%22TDID_LOOKUP%22%3A%22TRUE%22%2C%22TDID_CREATED_AT%22%3A%222025-09-21T03%3A21%3A09%22%7D
tracker.gg FALSE / FALSE 1766305486 pbjs-unifiedid_cst YiwPLDosoA%3D%3D
.tracker.gg TRUE / FALSE 1784448555 _cc_id d7583525ab694e187a68c7c9adac9679
tracker.gg FALSE / FALSE 1761621671 _lr_sampling_rate 100
.tracker.gg TRUE / FALSE 1794757153 cto_bidid ZYK0w18xUFhlUWZoQ2ttd2Vlc0lnVjV1azlmSXI5ZlBVRUZ3QjVFazJUOHAlMkZnYWlFc0l2endrZGVDb1Z6dXdJdXA4V3Y1OVlEVGI4VUlNV1QxejgwWmxwJTJGRjByeGJFbWhFRTBQQnExU1luNTlvd1klM0Q
.tracker.gg TRUE / TRUE 1761122345 __cf_bm 0U2HKncz5gYebQMBxL_qtaeuqoAdy.5lnue9xCP_Lls-1761120544-1.0.1.1-vFt.KkK9agrVBKJiVR6KstByxEN86BEvyVVQxg5VhtRT40Oq6Bmaan1yzLbW9V0ixAlOLPpYcflX6CqpCjl0D9Lg.73D54Jnm3KLQ5tGxnEZhnr0ORp6W5HYsfpsNiRc
tracker.gg FALSE / FALSE 1761124153 _lr_retry_request true
.tracker.gg TRUE / FALSE 1761206955 panoramaId_expiry 1761206955341
.tracker.gg TRUE / TRUE 1761123286 __stripe_sid 99b864d5-840b-4cce-9c31-b4699e594ec16de245
.tracker.gg TRUE / FALSE 1795681481 _ga_HWSV72GK8X GS2.1.s1761120545$o13$g1$t1761121480$j60$l0$h0
tracker.gg FALSE / FALSE 1766305486 pbjs-unifiedid_last Wed%2C%2022%20Oct%202025%2008%3A24%3A46%20GMT
.tracker.gg TRUE / FALSE 1794817546 cto_bundle dv27kl9RN0JGSHVyZE9za1E5a1ZNZEo4R21rQVkxVUFtYSUyRjQxd0JaWDJtWGhOVmZnV2hxUEVzemZ2TE9XSVB0UXNMVkZySUtXaW0lMkZ4WTRnN3FhSmNFOE4lMkI2WXg4cW42UFpKd2I3QmViY1JuTkZYMG9FVUZ1STJESHc2NDUzMHBEMyUyQm92bUtEMzJlajF0bXMyZ3M1Z0YwJTJGRkR3JTNEJTNE
.tracker.gg TRUE / TRUE 1792657481 cf_clearance betnOuJnsRXlDgs9rKx_2GJXze5bdRFK3eVFFB9k610-1761121481-1.2.1.1-RDnj4R4HqOjhE1Sn5Dp5wKysllw6cVSYRPbbL4y_STOAnBlQEVRv_7xC1h9TleDg9Ecyy2neJCW1Xk6BNd51K4htSHptKFlQF8tv_JqMlDyFg9DxShAveOMXu4o9vkivWZmFvJRKv9ERqm33dfsYAQD5lkyYFtsurraCOPzu3TILLbsnjoi2v_kCILeFLPdEbeiABRHQUrsZdi1jhwKM9cDJ_XHs0bmPZrDOfVaf444

View File

@ -0,0 +1,57 @@
import json
import requests
from .bf6_data import *
async def get_data_bf3(user_name, platform):
url = f"https://api.gametools.network/bf3/all/?format_values=true&name={user_name}&platform={platform}"
payload = {}
headers = {
'accept': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
data_json = json.loads(response.text)
return data_json
async def get_data_bf4(user_name, platform):
url = f"https://api.gametools.network/bf4/all/?format_values=true&name={user_name}&platform={platform}"
payload = {}
headers = {
'accept': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
data_json = json.loads(response.text)
return data_json
async def get_data_bf1(user_name, platform):
url = f"https://api.gametools.network/bf1/all/?format_values=true&name={user_name}&platform={platform}&skip_battlelog=false&lang=en-us"
payload = {}
headers = {
'accept': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
data_json = json.loads(response.text)
return data_json
async def get_data_bfv(user_name, platform):
url = f"https://api.gametools.network/bfv/all/?format_values=true&name={user_name}&platform={platform}&skip_battlelog=false&lang=en-us"
payload = {}
headers = {
'accept': 'application/json'
}
response = requests.request("GET", url, headers=headers, data=payload)
data_json = json.loads(response.text)
return data_json
async def get_data_bf6(user_name, search_type):
flag, data_json = await get_info(user_name, search_type)
return flag, data_json

View File

@ -0,0 +1,141 @@
import os
from PIL import Image, ImageDraw, ImageFont
from nonebot import logger
from functools import reduce
from .img_utils import png_resize
from .param import round_data, interval_table
filepath = os.path.dirname(__file__).replace("\\", "/")
# async def avatar_cache(avatar_url, player_id):
# if avatar_url:
# try:
# # 添加10s超时判断如果超时直接使用默认头像
# res = BytesIO(requests.get(avatar_url, timeout=10).content)
# avatar = Image.open(res).convert('RGBA')
# avatar.save(f'{filepath}/avatar_cache/{player_id}.png', 'PNG')
# return avatar
# except requests.exceptions.RequestException as e:
# logger.warning(f"请求异常:{e}")
# else:
# return None
#
#
# async def get_avatar(avatar_url, player_id):
# img = Image.open(filepath + "/avatar_cache/0.png").convert('RGBA')
# path = filepath + "/avatar_cache/"
# try:
# avatar_list = os.listdir(path)
# if player_id in str(avatar_list):
# logger.info(f"本地已存在{player_id}头像")
# img = Image.open(f"{path}{player_id}.png").convert('RGBA')
# else:
# logger.info(f"未检测到{player_id}头像,缓存至本地")
# out_put = filepath + f"/avatar_cache/{player_id}.png"
# img = await avatar_cache(avatar_url, out_put)
# except Exception as err:
# logger.error(f"获取头像失败:{str(err)}")
# return img
async def get_best_weapon_and_best_vehicle(player_stat):
weapon = sorted(player_stat["weapons"], key=lambda k: k['kills'], reverse=True)[0]
vehicle = sorted(player_stat["vehicles"], key=lambda k: k['kills'], reverse=True)[0]
return weapon, vehicle
async def get_vehicle_destroyed(player_stat):
vehicle = reduce(lambda acc, x: acc + x['destroyed'], player_stat, 0)
return vehicle
async def stats_calculator(player_stat, game):
per500_data = round_data[game]
# 场次(无直接数值反馈,采用胜利数+失败数)
match_played = player_stat["wins"] + player_stat["loses"]
kd = player_stat["killDeath"]
kpm = player_stat["killsPerMinute"]
kill_assists = player_stat["killAssists"]
acc = float(player_stat["accuracy"].replace("%", ""))
if game == "bf3":
headshots = float(0)
else:
headshots = float(player_stat["headshots"].replace("%", ""))
spm = player_stat["scorePerMinute"]
revives = player_stat["revives"]
# second_play = player_stat["secondsPlayed"]
apr = kill_assists / match_played
rpr = revives / match_played
# SPM*秒数/60/场数=场均得分
# dpr = score_perMinute * (second_play / 60) / match_played
# 计算比值
# kd
ratio_kd = round(kd / per500_data["kd"], 2)
# kpm
ratio_kpm = round(kpm / per500_data["kpm"], 2)
# 爆头率
ratio_hs = round(headshots / 100 / per500_data["hs"], 2)
# 场均救援
ratio_rpr = round(rpr / per500_data["rpr"], 2)
# 场均助攻
ratio_apr = round(apr / per500_data["apr"], 2)
# 分钟得分
ratio_spm = round(spm / per500_data["spm"], 2)
logger.info(f"雷达图数据")
logger.info(f"生涯KD比值:{ratio_kd}")
logger.info(f"生涯Kpm比值:{ratio_kpm}")
logger.info(f"生涯准度比值:{ratio_hs}")
logger.info(f"场均救援比值:{ratio_rpr}")
logger.info(f"场均助攻比值:{ratio_apr}")
logger.info(f"分钟得分比值:{ratio_spm}")
stat_data = [ratio_kd, ratio_apr, ratio_hs, ratio_spm, ratio_kpm, ratio_rpr]
# 血腥度 kd*0.5 + kpm *0.4 +hs*0.1
player_blood = kd * 0.5 + kpm * 0.4 + headshots * 0.1
ratio_blood = player_blood / per500_data["blood"]
# 贡献度 场均助攻*0.4 + 场均救援*0.4 + spm*0.2
player_dedication = rpr * 0.4 + apr * 0.4 + spm * 0.2
ratio_dedication = player_dedication / per500_data["dedication"]
# 全能度 血腥度*0.5 + 贡献度*0.5
player_all_round = player_blood * 0.5 + player_dedication * 0.5
ratio_all_round = player_all_round / per500_data["all_round"]
logger.info(f"称号评级数据")
logger.info(f"血腥度:{player_blood} 比值:{ratio_blood}")
logger.info(f"贡献度:{player_dedication} 比值:{ratio_dedication}")
logger.info(f"全能度:{player_all_round} 比值:{ratio_all_round}")
level_designation = get_level_designation(ratio_blood, ratio_dedication, ratio_all_round)
return stat_data, level_designation
def get_level_designation(blood, dedication, all_round):
feature_type = ""
final_max = max([blood, dedication, all_round])
if final_max == blood:
feature_type = "blood"
elif final_max == dedication:
feature_type = "dedication"
elif final_max == all_round:
feature_type = "all_round"
logger.info(f"最高能力:{feature_type}")
logger.info(f"最高能力值:{final_max}")
final_info = get_feature(final_max, feature_type)
return final_info
def get_feature(value, feature_type):
"""根据输入值和数值类别,返回对应特征值"""
for (low, high), features in interval_table.items():
if low <= value < high:
return features[feature_type]
raise ValueError(f"数值 {value} 不在任何区间内")

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,320 @@
from PIL import Image, ImageDraw, ImageFont
import math
import os
from io import BytesIO
import io
import textwrap
import cv2
import time
import datetime
import numpy as np
from .img_utils import *
from .data_utils import *
from .param import *
filepath = os.path.dirname(__file__).replace("\\", "/")
# 字体
font_XXL = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 72)
font_XL = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 64)
font_L = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 48)
font_M = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 36)
font_MS = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 32)
font_S = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 28)
font_XS = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 24)
font_XXS = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 18)
font_XXXS = ImageFont.truetype(f"{filepath}/font/演示创黑FLY.ttf", 16)
async def build_stats_card(game="bfv", player="Player001", pid=1145141919, kd=1.25, kpm=2.3, spm=500, acc=25.6,
head_shots=10, rank=100, time_play=100, kills=100, kill_assists=150, revives=114, wins=10,
loses=10, destroyed=514, best_weapon="M1907 SF", best_vehicle="坦克", best_class="Support",
longest_head_shot=1200, highest_ill_streak=100, stat_data=None, level_designation=None):
if stat_data is None:
stat_data = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# 获取当前事件并且格式化
now = datetime.datetime.now()
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S")
# ---------------- OpenCV 部分 ----------------
# 载入模板
img = cv2.imread(f"{filepath}/template/{game}.png", cv2.IMREAD_COLOR)
# 载入头像并缩放
avatar = cv2.imread(f"{filepath}/avatar_cache/0.png", cv2.IMREAD_UNCHANGED)
avatar = cv2.resize(avatar, (200, 200), interpolation=cv2.INTER_AREA)
# 创建圆形蒙版
mask = np.zeros((200, 200), dtype=np.uint8)
cv2.circle(mask, (100, 100), 100, 255, -1) # 绘制半径100的实心圆
# 处理透明通道
if avatar.shape[2] == 4:
# 合并圆形蒙版与原有alpha通道
alpha = cv2.bitwise_and(avatar[:, :, 3], mask)
avatar[:, :, 3] = alpha
else:
# 无透明通道时创建BGRA图像
avatar = cv2.cvtColor(avatar, cv2.COLOR_BGR2BGRA)
avatar[:, :, 3] = mask
# 粘贴头像(保持原有坐标)
y, x = 55, 426
h, w = avatar.shape[:2]
alpha = avatar[:, :, 3] / 255.0
for c in range(3):
img[y:y + h, x:x + w, c] = (alpha * avatar[:, :, c] +
(1 - alpha) * img[y:y + h, x:x + w, c])
# 公告栏
img = notice_paste(img)
# ---------------- PIL 部分 ----------------
pil_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
# 玩家信息
draw.text((665, 65), f"{player}", fill="white", font=font_L)
draw.text((745, 133), f"{pid}", fill="white", font=font_M)
# 等级游玩时长
draw.text((740, 212), f"{rank}", fill="white", font=font_M)
draw.text((985, 212), f"{time_play}H", fill="white", font=font_M)
# draw_centered_text(draw=draw, text=rank, x_left=730, x_right=905, y=210, font=font_M, fill="white", )
# draw_centered_text(draw=draw, text=f"{time_play}H", x_left=965, x_right=1120, y=210, font=font_M, fill="white", )
# draw.text((730, 195), f"{rank}", fill="white", font=font_M)
# draw.text((1000, 195), f"{time_play}", fill="white", font=font_M)
# 生涯总览
# 场次
draw_centered_text(draw=draw, text=(wins + loses), x_left=45, x_right=265, y=350,
font=font_L,
fill="white")
# 命中率
draw_centered_text(draw=draw, text=round(float(acc.replace("%", "")), 1), x_left=415, x_right=495, y=343,
font=font_M,
fill="white")
# 爆头率
if head_shots:
head_shots = head_shots.replace("%", "")
else:
head_shots = 0
draw_centered_text(draw=draw, text=round(float(head_shots), 1), x_left=395, x_right=470, y=420, font=font_M,
fill="white")
# kd
draw_centered_text(draw=draw, text=kd, x_left=585, x_right=785, y=340, font=font_XL, fill="white")
# kpm
draw_centered_text(draw=draw, text=round(kpm, 1), x_left=825, x_right=1025, y=340, font=font_XL, fill="white")
# spm
draw_centered_text(draw=draw, text=round(spm, 1), x_left=1065, x_right=1265, y=340, font=font_XL, fill="white")
# 击杀
draw_centered_text(draw=draw, text=kills, x_left=870, x_right=1190, y=595, font=font_L, fill="white")
# 助攻
draw_centered_text(draw=draw, text=kill_assists, x_left=820, x_right=1145, y=770, font=font_L, fill="white")
# 急救
draw_centered_text(draw=draw, text=revives, x_left=770, x_right=1090, y=960, font=font_L, fill="white")
# 摧毁
draw_centered_text(draw=draw, text=destroyed, x_left=720, x_right=1040, y=1140, font=font_L, fill="white")
# 底部栏
# 评级
level = level_designation['level']
des = level_designation['designation']
color = level_designation['color']
draw_centered_text(draw=draw, text=level, x_left=70, x_right=200, y=1120, font=font_XL, fill=color)
draw_centered_text(draw=draw, text=des, x_left=200, x_right=550, y=1120, font=font_XL, fill="white")
# 最佳兵种
draw_centered_text(draw=draw, text=classes[best_class], x_left=140, x_right=310, y=1290, font=font_L, fill="white")
# 最远击杀
draw_centered_text(draw=draw, text=f"{longest_head_shot}m", x_left=455, x_right=625, y=1290, font=font_L,
fill="white")
# 最高连杀
draw_centered_text(draw=draw, text=highest_ill_streak, x_left=765, x_right=935, y=1290, font=font_L, fill="white")
# 最佳⭐
pil_img = build_best(draw, pil_img, best_weapon, best_vehicle, game)
# 生成时间
draw.text((1870, 1420), f"{formatted_time}", fill=(154, 132, 149), font=font_XXXS)
# ---------------- 转回 OpenCV ----------------
img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
# ---------------- 雷达图 ----------------
center = (311, 796)
r = 185
scale_min = 0.25 if min(stat_data) < 0.25 else min(stat_data)
img = draw_radar(img, stat_data, center=center, r=r,
buffer=0.1, scale_min=scale_min, scale_max=1.10)
# 保存
success, buffer = cv2.imencode(".png", img)
if success:
b_io = io.BytesIO(buffer)
return b_io # 就可以直接 return 二进制流
else:
return None
def build_best(draw: ImageDraw, img: Image, weapons, vehicle, game: str):
# 最佳武器
weapon_name = weapons["weaponName"]
w_kills = weapons["kills"]
w_kpm = weapons["killsPerMinute"]
acc = weapons["accuracy"].replace("%", "")
headshots = weapons["headshots"].replace("%", "")
# 最佳载具
vehicle_name = vehicle["vehicleName"]
v_kills = vehicle["kills"]
v_kpm = vehicle["killsPerMinute"]
destroyed = vehicle["destroyed"]
# 绘制最佳⭐
# 武器
draw_centered_text(draw=draw, text=bf_item[game][weapon_name].upper(), x_left=1240, x_right=1750, y=1355,
font=font_MS, fill="white")
draw_centered_text(draw=draw, text=w_kills, x_left=1190, x_right=1390, y=1260, font=font_L, fill="white")
draw_centered_text(draw=draw, text=w_kpm, x_left=1510, x_right=1635, y=1260, font=font_L, fill="white")
draw_centered_text(draw=draw, text=acc, x_left=1735, x_right=1803, y=1030, font=font_M, fill="white")
draw_centered_text(draw=draw, text=headshots, x_left=1695, x_right=1760, y=1200, font=font_M, fill="white")
# 载具
draw_centered_text(draw=draw, text=bf_item[game][vehicle_name].upper(), x_left=1990, x_right=2440, y=1355,
font=font_MS, fill="white")
draw_centered_text(draw=draw, text=v_kills, x_left=1940, x_right=2150, y=1260, font=font_L, fill="white")
draw_centered_text(draw=draw, text=v_kpm, x_left=2295, x_right=2465, y=1055, font=font_XL, fill="white")
draw_centered_text(draw=draw, text=destroyed, x_left=2265, x_right=2440, y=1260, font=font_L, fill="white")
# 图片
weapon_url = weapons["image"]
vehicle_url = vehicle["image"]
wp_icon = get_save_icon(game, weapon_name, "weapon", weapon_url)
vc_icon = get_save_icon(game, vehicle_name, "vehicles", vehicle_url)
img = image_paste_center(wp_icon, img, (1165, 1110))
img = image_paste_center(vc_icon, img, (1920, 1110))
return img
def normalize_for_radar(data, buffer=0.1, scale_min=0.0, scale_max=1.0):
"""
将数据动态归一化到指定区间 [scale_min, scale_max]
并给最大值预留 buffer让图形更饱满
Args:
data (list/ndarray): 原始数据
buffer (float): 缓冲比例例如 0.1 表示最大值按 1.1 倍处理
scale_min (float): 映射区间下限
scale_max (float): 映射区间上限
Returns:
list: 归一化后的数据
"""
data = np.array(data, dtype=float)
d_min, d_max = data.min(), data.max()
# 给最大值留 buffer
d_max = d_max * (1 + buffer)
if d_max == d_min: # 避免除零
norm = np.ones_like(data) * (scale_min + scale_max) / 2
else:
norm = (data - d_min) / (d_max - d_min)
norm = scale_min + norm * (scale_max - scale_min)
return norm.tolist()
# ---------------- 雷达图示例 ----------------
def draw_radar(img, stat_data, center=(311, 796), r=185,
buffer=0.1, scale_min=0.1, scale_max=1.05):
# 动态归一化
scaled_data = normalize_for_radar(stat_data, buffer, scale_min, scale_max)
points = []
for i, v in enumerate(scaled_data):
angle = 2 * np.pi / len(scaled_data) * i - np.pi / 2
px = int(center[0] + r * v * np.cos(angle))
py = int(center[1] + r * v * np.sin(angle))
points.append([px, py])
points = np.array(points, np.int32)
# 绘制填充
overlay = img.copy()
cv2.fillPoly(overlay, [points], color=(255, 255, 255, 76))
img = cv2.addWeighted(overlay, 0.3, img, 0.7, 0)
# 绘制边框
cv2.polylines(img, [points], isClosed=True, color=(255, 255, 255), thickness=2)
return img
def build_bf6_simple_card(player_info):
# 1.创建黑色板块 1920*1080
new_img = Image.new('RGBA', (1080, 1920), (255, 255, 255, 1000))
draw = ImageDraw.Draw(new_img)
draw.text((50, 60), f"玩家名称:{player_info['玩家名称']}", fill='black', font=font_M)
draw.text((550, 60), f"游玩平台:{player_info['游玩平台']}", fill='black', font=font_M)
draw.text((50, 160), f"游戏等级:{player_info['游戏等级']}", fill='black', font=font_M)
draw.text((550, 160), f"游玩时长:{player_info['游玩时长']}", fill='black', font=font_M)
draw.text((50, 260), f"游玩场次:{player_info['游玩场次']}", fill='black', font=font_M)
draw.text((550, 260), f"对局胜率:{player_info['对局胜率']}", fill='black', font=font_M)
draw.text((50, 360), f"对局得分:{player_info['对局得分']}", fill='black', font=font_M)
draw.text((550, 360), f"玩家SPM{player_info['玩家SPM']}", fill='black', font=font_M)
draw.text((50, 460), f"玩家K/D{player_info['玩家K/D']}", fill='black', font=font_M)
draw.text((550, 460), f"玩家KPM{player_info['玩家KPM']}", fill='black', font=font_M)
draw.text((50, 560), f"击杀总计:{player_info['击杀总计']}", fill='black', font=font_M)
draw.text((550, 560), f"真人击杀:{player_info['真人击杀']}", fill='black', font=font_M)
draw.text((50,660), f"游戏助攻:{player_info['游戏助攻']}", fill='black', font=font_M)
draw.text((550, 660), f"目标占领:{player_info['目标占领']}", fill='black', font=font_M)
draw.text((50, 760), f"救援数量:{player_info['救援数量']}", fill='black', font=font_M)
draw.text((550, 760), f"最高连杀:{player_info['最高连杀']}", fill='black', font=font_M)
draw.text((50, 860), f"命中率:{player_info['命中率']}", fill='black', font=font_M)
draw.text((550, 860), f"爆头率:{player_info['爆头率']}", fill='black', font=font_M)
draw.text((150, 1060), f"最佳武器", fill='black', font=font_M)
draw.text((650, 1060), f"最佳载具", fill='black', font=font_M)
draw.text((50, 1160), f"名称:{player_info['最佳武器']['名称']}", fill='black', font=font_M)
draw.text((50, 1260), f"击杀:{player_info['最佳武器']['击杀']}", fill='black', font=font_M)
draw.text((50, 1360), f"KPM{player_info['最佳武器']['KPM']}", fill='black', font=font_M)
draw.text((50, 1460), f"时长:{player_info['最佳武器']['时长']}", fill='black', font=font_M)
draw.text((50, 1560), f"命中率:{player_info['最佳武器']['命中率']}", fill='black', font=font_M)
draw.text((50, 1660), f"爆头率:{player_info['最佳武器']['爆头率']}", fill='black', font=font_M)
draw.text((550, 1160), f"名称:{player_info['最佳载具']['名称']}", fill='black', font=font_M)
draw.text((550, 1260), f"击杀:{player_info['最佳载具']['击杀']}", fill='black', font=font_M)
draw.text((550, 1360), f"KPM{player_info['最佳载具']['KPM']}", fill='black', font=font_M)
draw.text((550, 1460), f"时长:{player_info['最佳载具']['时长']}", fill='black', font=font_M)
draw.text((550, 1560), f"碾压:{player_info['最佳载具']['碾压']}", fill='black', font=font_M)
draw.text((550, 1660), f"摧毁:{player_info['最佳载具']['摧毁']}", fill='black', font=font_M)
b_io = BytesIO()
new_img.save(b_io, format="PNG")
return b_io # 就可以直接 return 二进制流
def build_bf6_weapon_card(weapon_info):
# 1.创建黑色板块 1920*1080
new_img = Image.new('RGBA', (1080, 1920), (255, 255, 255, 1000))

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.6 KiB

View File

@ -0,0 +1,367 @@
import hashlib
import json
import os
import random
import time
from io import BytesIO
from typing import List, Tuple
import cv2
import numpy as np
from nonebot import logger
import aiohttp
import requests
import requests.exceptions
from PIL import Image, ImageDraw, ImageFont
filepath = os.path.dirname(__file__).replace("\\", "/")
# 圆角遮罩处理
def draw_rect(img, pos, radius, **kwargs):
trans = Image.new('RGBA', img.size, (0, 0, 0, 0))
alpha_draw = ImageDraw.Draw(trans, "RGBA")
alpha_draw.rounded_rectangle(pos, radius, **kwargs)
img.paste(Image.alpha_composite(img, trans))
return img
# 圆角处理
def circle_corner(img, radii):
"""
半透明圆角处理
:param img: 要修改的文件
:param radii: 圆角弧度
:return: 返回修改过的文件
"""
circle = Image.new('L', (radii * 2, radii * 2), 0) # 创建黑色方形
draw = ImageDraw.Draw(circle)
draw.ellipse((0, 0, radii * 2, radii * 2), fill=255) # 黑色方形内切白色圆形
img = img.convert("RGBA")
w, h = img.size
# 创建一个alpha层存放四个圆角使用透明度切除圆角外的图片
alpha = Image.new('L', img.size, 255)
alpha.paste(circle.crop((0, 0, radii, radii)), (0, 0)) # 左上角
alpha.paste(circle.crop((radii, 0, radii * 2, radii)),
(w - radii, 0)) # 右上角
alpha.paste(circle.crop((radii, radii, radii * 2, radii * 2)),
(w - radii, h - radii)) # 右下角
alpha.paste(circle.crop((0, radii, radii, radii * 2)),
(0, h - radii)) # 左下角
img.putalpha(alpha) # 白色区域透明可见,黑色区域不可见
# 添加圆角边框
draw = ImageDraw.Draw(img)
draw.rounded_rectangle(img.getbbox(), outline="white", width=3, radius=radii)
return img
# PNG重绘大小
def png_resize(source_file, new_width=0, new_height=0, resample="LANCZOS", ref_file=''):
"""
PNG缩放透明度处理
:param source_file: 源文件Image.open()
:param new_width: 设置的宽度
:param new_height: 设置的高度
:param resample: 抗锯齿
:param ref_file: 参考文件
:return:
"""
img = source_file
img = img.convert("RGBA")
width, height = img.size
if ref_file != '':
imgRef = Image.open(ref_file)
new_width, new_height = imgRef.size
else:
if new_height == 0:
new_height = new_width * width / height
bands = img.split()
resample_map = {
"NEAREST": Image.NEAREST,
"BILINEAR": Image.BILINEAR,
"BICUBIC": Image.BICUBIC,
"LANCZOS": Image.LANCZOS
}
resample_method = resample_map.get(resample, Image.LANCZOS) # 默认使用 LANCZOS
bands = [b.resize((new_width, new_height), resample=resample_method) for b in bands]
resized_file = Image.merge('RGBA', bands)
return resized_file
# 图片粘贴
def image_paste(paste_image, under_image, pos):
"""
:param paste_image: 需要粘贴的图片
:param under_image: 底图
:param pos: 位置x,y坐标
:return: 返回图片
"""
# 获取需要贴入图片的透明通道
r, g, b, alpha = paste_image.split()
# 粘贴时将alpha值传递至mask属性
under_image.paste(paste_image, pos, alpha)
return under_image
def image_paste_center(paste_image: Image, under_image: Image, pos):
"""
将一张图片粘贴到另一张图片上并使粘贴图片的Y轴中心对齐到指定y坐标
:param paste_image: 需要粘贴的图片 (RGBA)
:param under_image: 底图 (RGBA)
:param pos: 粘贴位置 (x, y)其中y表示目标对齐的中心位置
:return: 返回合成后的图片
"""
x, y = pos
# 计算上下居中偏移
paste_w, paste_h = paste_image.size
y_top = y - paste_h // 2 # 上边位置 = 中心点y - 半高
# 获取透明通道
r, g, b, alpha = paste_image.split()
under_image.paste(paste_image, (x, y_top), alpha)
return under_image
def download_icon(url):
data = download_url(url)
img = BytesIO(data)
return img
# 下载QQ头像
def download_avatar(user_id: str) -> bytes:
url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
data = download_url(url)
if not data or hashlib.md5(data).hexdigest() == "acef72340ac0e914090bd35799f5594e":
url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=100"
data = download_url(url)
return data
# 根据URL下载文件
def download_url(url: str) -> bytes:
for i in range(3):
try:
resp = requests.get(url)
if resp.status_code != 200:
continue
return resp.content
except Exception as e:
print(f"Error downloading {url}, retry {i}/3: {str(e)}")
# 图片裁剪
def cut_image(pic_data: bytes, target_ratio: float):
try:
pic_data = Image.open(BytesIO(pic_data))
w, h = pic_data.size
pic_ratio = w / h
if pic_ratio > target_ratio:
# 宽高比大于目标比例,按高度缩放。保持宽度不变
new_h = w / target_ratio
h_delta = (h - new_h) / 2
w_delta = 0
else:
# 宽高比小于目标比例,按宽度缩放。保持高度不变
new_w = h * target_ratio
w_delta = (w - new_w) / 2
h_delta = 0
cropped = pic_data.crop((w_delta, h_delta, w - w_delta, h - h_delta))
return cropped
except Exception as e:
raise Exception(f"图片剪裁失败{str(e)}")
def draw_centered_text(draw, text, x_left, x_right, y, font, fill):
"""
在指定区域水平居中绘制文本
:param draw: ImageDraw对象
:param text: 要绘制的文本
:param x_left: 区域左边界x坐标
:param x_right: 区域右边界x坐标
:param y: 文本基线y坐标
:param font: ImageFont对象
:param fill: 文本颜色
"""
text = str(text)
text_width = font.getlength(text) # 获取文本像素宽度
area_width = x_right - x_left
x_center = x_left + (area_width - text_width) / 2
draw.text((x_center, y), text, font=font, fill=fill)
def get_save_icon(game, name, icon_type, url):
name = name.replace("/", "")
icon = get_icon_from_cache(name, game, icon_type)
if not icon:
if game == "bf1":
icon = get_icon_from_url(url)
if icon_type == "weapon":
icon = png_resize(icon, 500, 125)
else:
icon = png_resize(icon, 315, 79)
elif game == "bf3":
icon = get_icon_from_url(url)
if icon_type == "weapon":
icon = png_resize(icon, 500, 300)
else:
icon = png_resize(icon, 315, 200)
elif game == "bf4":
icon = get_icon_from_url(url)
if icon_type == "weapon":
icon = png_resize(icon, 500, 165)
else:
icon = png_resize(icon, 315, 80)
elif game == "bfv":
icon = get_icon_from_url(url)
if icon_type == "weapon":
icon = png_resize(icon, 500, 166)
else:
icon = png_resize(icon, 315, 315)
icon.save(f"{filepath}/img/{game}/{icon_type}/{name}.png")
logger.info(f"文件已经缓存至:{filepath}/img/{game}/{icon_type}/{name}.png")
# 上述步骤处理完再执行一次icon判空
if icon:
return icon
else:
# 针对空图片处理
if icon_type == "weapon":
icon = Image.open(filepath + "/img/wp.png").convert('RGBA')
else:
icon = Image.open(filepath + "/img/vc.png").convert('RGBA')
return icon
# 获取icon
def get_icon_from_url(url):
if url:
try:
# 添加10s超时判断如果超时返回空
res = BytesIO(requests.get(url, timeout=10).content)
icon = Image.open(res).convert('RGBA')
return icon
except requests.exceptions.RequestException as e:
logger.error(f"请求异常:{e}")
def get_icon_from_cache(icon_name, game, icon_type):
path = f"{filepath}/img/{game}/{icon_type}"
try:
icon_list = os.listdir(path)
if icon_name in str(icon_list):
logger.info(f"本地存在{icon_name}物品")
img = Image.open(f"{path}/{icon_name}.png").convert('RGBA')
return img
except Exception as err:
logger.error(f"获取图标失败:{str(err)}")
return None
def cutout_region(img, mask, alpha=0):
"""
在图像指定区域设置透明度
:param img: 输入图像 (BGR BGRA)
:param mask: 区域定义
- 矩形: (x1, y1, x2, y2)
- 圆形: (cx, cy, r)
- 任意 mask: numpy.ndarray (0/255)
:param alpha: 设置的透明度 (0=透明, 255=不透明, 0-255之间半透明)
:return: 处理后的图像 (BGRA)
"""
# 转为 BGRA (确保有 alpha 通道)
if img.shape[2] == 3:
b, g, r = cv2.split(img)
a = np.ones(b.shape, dtype=np.uint8) * 255
img = cv2.merge((b, g, r, a))
h, w = img.shape[:2]
region_mask = np.zeros((h, w), dtype=np.uint8)
# 根据传入参数类型生成区域 mask
if isinstance(mask, tuple):
if len(mask) == 4: # 矩形
x1, y1, x2, y2 = mask
region_mask[y1:y2, x1:x2] = 255
elif len(mask) == 3: # 圆形
cx, cy, r = mask
cv2.circle(region_mask, (cx, cy), r, 255, -1)
else:
raise ValueError("不支持的 mask tuple 格式")
elif isinstance(mask, np.ndarray): # 自定义 mask
if mask.shape != (h, w):
raise ValueError("mask 尺寸必须与图像一致")
region_mask = mask
else:
raise TypeError("mask 必须是 tuple 或 numpy.ndarray")
# 修改 alpha 通道
img[region_mask > 0, 3] = alpha
return img
def paste_image_cv2(fg, bg, pos):
"""
将前景图粘贴到背景图上支持透明度 (Alpha 混合)
:param fg: 前景图 (必须是 BGRA)
:param bg: 背景图 (BGR BGRA)
:param pos: 粘贴位置 (x, y) 左上角坐标
:return: 合成后的图像 (BGRA)
"""
x, y = pos
fh, fw = fg.shape[:2]
# 确保背景有 alpha 通道
if bg.shape[2] == 3:
b, g, r = cv2.split(bg)
a = np.ones(b.shape, dtype=np.uint8) * 255
bg = cv2.merge((b, g, r, a))
# 截取 ROI 区域
roi = bg[y:y + fh, x:x + fw]
# 防止越界
if roi.shape[0] != fh or roi.shape[1] != fw:
raise ValueError("前景图超出背景范围")
# 提取 alpha 通道
alpha_fg = fg[:, :, 3] / 255.0
alpha_bg = 1.0 - alpha_fg
# 混合 RGB
for c in range(3):
roi[:, :, c] = (alpha_fg * fg[:, :, c] +
alpha_bg * roi[:, :, c])
# 更新 alpha 通道(取最大值,保证透明度正确)
roi[:, :, 3] = np.clip(fg[:, :, 3] + roi[:, :, 3], 0, 255)
# 放回背景
bg[y:y + fh, x:x + fw] = roi
return bg
def notice_paste(cv2_bg):
pos = (1176, 318)
# 获取公告
notice_path = f"{filepath}/notice/notice.png"
front_img = cv2.imread(notice_path)
mask = np.zeros((631, 1355), dtype=np.uint8)
pts = np.array([[0, 0], [167, 0], [0, 631]], np.int32)
cv2.fillPoly(mask, [pts], 255)
front_img = cutout_region(front_img, mask, alpha=0)
mix_bg = paste_image_cv2(front_img, cv2_bg, pos)
return mix_bg

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

1003
src/plugins/bf_bot/param.py Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 MiB

View File

@ -0,0 +1,5 @@
from bf6_data import *
if __name__ == "__main__":
name = "A.R.O.N.A"
asyncio.run(get_info(name, 0))

View File

@ -0,0 +1,33 @@
import re
def convert_to_hours(time_str):
"""
将各种时间格式字符串转换为小时数
支持格式示例
- "1 day, 1:49:15" (包含天数)
- "1:56:55" (小时:分钟:)
- "0:06:41" (分钟:)
- "14:11:36" (小时:分钟:)
"""
total_hours = 0.0
# 处理包含天数的部分
if "day" in time_str:
days_part, time_part = time_str.split(", ")
days = float(re.search(r"\d+", days_part).group())
total_hours += days * 24
time_str = time_part
# 分割时间部分
parts = list(map(float, time_str.split(":")))
# 根据时间部分长度计算
if len(parts) == 3: # 时:分:秒
total_hours += parts[0] + parts[1] / 60 + parts[2] / 3600
elif len(parts) == 2: # 分:秒
total_hours += parts[0] / 60 + parts[1] / 3600
elif len(parts) == 1: # 仅秒
total_hours += parts[0] / 3600
return round(total_hours, 2)

View File

@ -0,0 +1,92 @@
import sqlite3
from typing import List, Dict, Any, Optional
class TableManager:
def __init__(self, db_path: str = 'user_data.db'):
self.db_path = db_path
def _execute(self, sql: str, params: tuple = ()) -> sqlite3.Cursor:
"""执行SQL语句的通用方法"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(sql, params)
conn.commit()
return cursor
class UserManager(TableManager):
def add_user(self, qq_id: str, ea_id: str, dog_tag_list: str) -> int:
"""添加用户记录"""
cursor = self._execute(
"INSERT INTO users (qq_id, ea_id, dog_tag_list) VALUES (?, ?, ?)",
(qq_id, ea_id, dog_tag_list)
)
return cursor.lastrowid
def get_user_by_qq(self, qq_id: str) -> Optional[Dict[str, Any]]:
"""通过QQ号查询用户"""
cursor = self._execute(
"SELECT * FROM users WHERE qq_id = ?",
(qq_id,)
)
return dict(cursor.fetchone()) if cursor.fetchone() else None
def update_dog_tags(self, user_id: int, new_tags: str) -> bool:
"""更新用户的狗牌列表"""
self._execute(
"UPDATE users SET dog_tag_list = ? WHERE id = ?",
(new_tags, user_id)
)
return True
class DogTagManager(TableManager):
def create_tag(self, name: str) -> int:
"""创建新狗牌"""
cursor = self._execute(
"INSERT INTO dog_tag (name) VALUES (?)",
(name,)
)
return cursor.lastrowid
def get_all_tags(self) -> List[Dict[str, Any]]:
"""获取所有狗牌"""
cursor = self._execute("SELECT * FROM dog_tag")
return [dict(row) for row in cursor.fetchall()]
class QueryRecordManager(TableManager):
def log_query(self, user_id: str, target_id: str, status: str) -> int:
"""记录查询操作"""
cursor = self._execute(
"""INSERT INTO query_record
(user_id, target_id, status)
VALUES (?, ?, ?)""",
(user_id, target_id, status)
)
return cursor.lastrowid
def get_user_history(self, user_id: str) -> List[Dict[str, Any]]:
"""获取用户查询历史"""
cursor = self._execute(
"SELECT * FROM query_record WHERE user_id = ?",
(user_id,)
)
return [dict(row) for row in cursor.fetchall()]
if __name__ == "__main__":
# 使用示例
user_db = UserManager()
user_id = user_db.add_user("123456", "EA_001", "tag1,tag2")
print(f"Created user with ID: {user_id}")
tag_db = DogTagManager()
tag_id = tag_db.create_tag("新狗牌")
print(f"Created tag with ID: {tag_id}")
record_db = QueryRecordManager()
record_id = record_db.log_query("123", "456", "success")
print(f"Logged query with ID: {record_id}")

View File

@ -0,0 +1,114 @@
import os
import sqlite3
from nonebot import logger
from typing import Optional, List, Tuple
class DatabaseManager:
def __init__(self, db_path: str = 'user_data.db'):
self.db_path = db_path
self.expected_tables = {
'users': [
('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
('qq_id', 'TEXT NOT NULL'),
('ea_id', 'TEXT UNIQUE'),
('dog_tag_list', 'TEXT NOT NULL'),
('create_time', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
],
'dog_tag': [
('id', 'INTEGER PRIMARY KEY'),
('name', 'TEXT NOT NULL'),
('create_time', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
]
,
'query_record': [
('id', 'INTEGER PRIMARY KEY'),
('user_id', 'TEXT NOT NULL'),
('target_id', 'TEXT NOT NULL'),
('status', 'TEXT NOT NULL'),
('create_time', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
]
}
def initialize_database(self) -> bool:
"""初始化数据库并创建表结构"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 创建所有表
for table_name, columns in self.expected_tables.items():
columns_sql = ', '.join(f'{col} {typ}' for col, typ in columns)
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
{columns_sql}
)
''')
conn.commit()
return True
except Exception as e:
logger.error(f"Database initialization failed: {str(e)}")
if os.path.exists(self.db_path):
conn.close() # 确保连接关闭
for _ in range(3): # 重试机制
try:
os.remove(self.db_path)
break
except PermissionError:
time.sleep(0.5)
return False
def _verify_database_integrity(self, conn: sqlite3.Connection) -> bool:
"""验证数据库完整性"""
cursor = conn.cursor()
# 检查表是否存在
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = {row[0] for row in cursor.fetchall()}
if existing_tables != set(self.expected_tables.keys()):
logger.warning(f"Missing tables: {set(self.expected_tables.keys()) - existing_tables}")
return False
# 检查每个表的结构
for table_name, expected_columns in self.expected_tables.items():
cursor.execute(f"PRAGMA table_info({table_name})")
actual_columns = [(row[1], row[2]) for row in cursor.fetchall()]
if not self._compare_columns(actual_columns, expected_columns):
logger.warning(f"Table {table_name} structure mismatch")
return False
# 执行完整性检查
cursor.execute("PRAGMA integrity_check")
integrity_result = cursor.fetchone()
if integrity_result[0] != 'ok':
logger.warning(f"Integrity check failed: {integrity_result}")
return False
return True
def _compare_columns(self, actual: List[Tuple], expected: List[Tuple]) -> bool:
"""比较实际列与预期列是否匹配"""
if len(actual) != len(expected):
return False
for (act_col, act_type), (exp_col, exp_type) in zip(actual, expected):
if act_col.lower() != exp_col.lower():
return False
if not act_type.upper().startswith(exp_type.upper()):
return False
return True
# 初始化
if __name__ == '__main__':
db_manager = DatabaseManager()
if db_manager.initialize_database():
print("Database initialized successfully")
print(f"Database file created at: {os.path.abspath(db_manager.db_path)}")
else:
print("Database initialization failed")

8
启动机器人.bat Normal file
View File

@ -0,0 +1,8 @@
@echo off
chcp 65001 > nul
echo 启动 Bot
call .venv\Scripts\activate.bat
python bot.py
deactivate
pause
exit