HeXi/hexi/plugins/deadlock/__init__.py
sansenhoshi 275f05ee4a 重构
2026-01-04 17:15:40 +08:00

204 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import asyncio
# import base64
# import io
# import json
# import os
# from datetime import datetime
# from nonebot import logger
# from PIL import Image, ImageOps, ImageDraw, ImageFont
# from nonebot.internal.params import ArgPlainText
# from nonebot.params import CommandArg
# from nonebot.typing import T_State
# from playwright.async_api import async_playwright
# from nonebot import on_command
# from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment, GroupMessageEvent, Message
# from nonebot.plugin import PluginMetadata
# from typing import Optional, Union
# from nonebot.matcher import Matcher
#
# __plugin_meta__ = PluginMetadata(
# name="DeadLock助手",
# description="DeadLock助手我要成为DeadLock高手",
# usage="nekoscoreneko查分 [steam好友代码]\n"
# "nekobindneko绑定 [steam好友代码]\n"
# "nekorankneko排行[TODO]\n"
# "nekomatchdl比赛详情[TODO]\n",
# type="application",
# extra={
#
# }
# )
#
# neko_score = on_command("nekoscore", aliases={"dl查分"})
# neko_bind = on_command("nekobind", aliases={"dl绑定"})
# neko_unbind = on_command("nekounbind", aliases={"取消dl绑定"})
# neko_rank = on_command("nekorank", aliases={"dl排行"})
# neko_match = on_command("nekomatch", aliases={"dl比赛详情"})
#
# basic_path = os.path.dirname(__file__)
# save_path = os.path.join(basic_path, "temp")
# img_path = os.path.join(basic_path, "img")
# data_path = os.path.join(basic_path, "data")
#
#
# @neko_score.handle()
# async def get_neko_score(ev: MessageEvent, rgs: Message = CommandArg()):
# steam_id = rgs.extract_plain_text()
# # 查询绑定情况
# if steam_id is None or steam_id == "" or steam_id.isdigit() is False:
# bind_steam_id = await get_neko_bind(ev.group_id, ev.user_id)
# if bind_steam_id is None:
# await neko_score.finish("请输入正确的steam好友代码")
# return
# else:
# steam_id = bind_steam_id
# await neko_score.send(f"正在获取{steam_id}的数据")
# url = f"https://deadlock.blast.tv/users/{steam_id}"
# time_present1 = get_present_time()
# result = await capture_screenshot(url, time_present1)
# if result != "success":
# await neko_score.finish(MessageSegment.reply(ev.message_id) + result)
# img_path1 = os.path.join(save_path, f"{time_present1}.png")
# images = gen_ms_img(Image.open(img_path1))
# mes = (MessageSegment.reply(ev.message_id), images)
# await neko_score.send(mes)
# os.remove(img_path1)
#
#
# @neko_bind.handle()
# async def play_neko_bind(ev: MessageEvent, rgs: Message = CommandArg()):
# # 获取群号
# group_id = ev.group_id
# steam_id = rgs.extract_plain_text()
# if steam_id is None or steam_id == "" or steam_id.isdigit() is False:
# await neko_score.finish("请输入正确的steam好友代码")
# return
# qq_id = ev.user_id
# res = await to_neko_bind(group_id, steam_id, qq_id)
# await neko_score.send(res)
#
#
# @neko_unbind.handle()
# async def play_neko_unbind(ev: MessageEvent, rgs: Message = CommandArg()):
# # 获取群号
# group_id = ev.group_id
# steam_id = rgs.extract_plain_text()
# qq_id = ev.user_id
# res = await to_neko_unbind(group_id, qq_id)
# if res is None:
# await neko_score.finish(f"{qq_id}未绑定过")
# else:
# await neko_score.send(f"{qq_id}已经取消与{res}的绑定")
#
#
# async def to_neko_bind(group_id, steam_id, qq_id):
# if os.path.exists(f"{data_path}/{group_id}.json"):
# with open(f"{data_path}/{group_id}.json", 'r') as file:
# json_data = json.load(file)
# else:
# # 如果文件不存在,创建一个新的空字典
# json_data = {}
# # 检查指定的 key 是否存在
# if str(qq_id) in json_data:
# # 如果 key 存在,将对应键值更新
# old_steam_id = json_data[str(qq_id)]
# json_data[str(qq_id)] = steam_id
# # 将更新后的数据写回 JSON 文件
# with open(f"{data_path}/{group_id}.json", 'w') as file:
# json.dump(json_data, file, indent=4)
# return f"已经将[ {str(qq_id)} ]的绑定[ {old_steam_id} ]更新到了[ {str(steam_id)} ]"
# else:
# # 如果 key 不存在,添加新的 key-value 对
# json_data[str(qq_id)] = steam_id
#
# # 将更新后的数据写回 JSON 文件
# with open(f"{data_path}/{group_id}.json", 'w') as file:
# json.dump(json_data, file, indent=4)
# return f"[ {str(qq_id)} ]已经绑定[ {steam_id} ]"
#
#
# async def get_neko_bind(group_id, qq_id):
# if os.path.exists(f"{data_path}/{group_id}.json"):
# with open(f"{data_path}/{group_id}.json", 'r') as file:
# json_data = json.load(file)
# else:
# # 如果文件不存在,创建一个新的空字典
# json_data = {
# "000000": 000000
# }
# # 检查指定的 key 是否存在
# if str(qq_id) in json_data:
# return json_data[str(qq_id)]
# else:
# return None
#
#
# async def to_neko_unbind(group_id, qq_id):
# if os.path.exists(f"{data_path}/{group_id}.json"):
# with open(f"{data_path}/{group_id}.json", 'r') as file:
# json_data = json.load(file)
# else:
# # 如果文件不存在,创建一个新的空字典
# json_data = {
# "000000": 000000
# }
# # 检查指定的 key 是否存在
# if str(qq_id) in json_data:
# remove_value = json_data.pop(str(qq_id))
# # 将更新后的数据写回 JSON 文件
# with open(f"{data_path}/{group_id}.json", 'w') as file:
# json.dump(json_data, file, indent=4)
# return remove_value
# else:
# return None
#
#
# def get_present_time() -> int:
# return int(datetime.timestamp(datetime.now()))
#
#
# async def capture_screenshot(url: str, time_present: int):
# async with async_playwright() as p:
# browser = await p.chromium.launch()
# page = await browser.new_page()
# try:
# # 设置视口大小
# await page.set_viewport_size({"width": 1600, "height": 900})
# await page.goto(url)
# await page.wait_for_load_state('networkidle')
#
# except Exception as e:
# return f"访问网站异常{type(e)}`{e}`"
#
# await asyncio.sleep(1)
# i_path = os.path.join(save_path, f'{time_present}.png')
# await page.screenshot(
# path=i_path,
# full_page=True
# )
# logger.info("正在压缩图片...")
# await asyncio.sleep(2)
# img_convert = Image.open(i_path)
# img_convert.save(i_path, quality=80)
# logger.info("图片保存成功!")
# await browser.close()
# return "success"
#
#
# def gen_ms_img(image: Union[bytes, Image.Image]) -> MessageSegment:
# if isinstance(image, bytes):
# return MessageSegment.image(
# pic2b64(Image.open(io.BytesIO(image)))
# )
# else:
# return MessageSegment.image(
# pic2b64(image)
# )
#
#
# def pic2b64(pic: Image) -> str:
# buf = io.BytesIO()
# pic.save(buf, format='PNG')
# base64_str = base64.b64encode(buf.getvalue()).decode()
# return 'base64://' + base64_str