204 lines
7.3 KiB
Python
204 lines
7.3 KiB
Python
|
|
# import asyncio
|
|||
|
|
# import base64
|
|||
|
|
# import io
|
|||
|
|
# import json
|
|||
|
|
# import os
|
|||
|
|
# from datetime import datetime
|
|||
|
|
# from nonebot import logger
|
|||
|
|
# from PIL import Image, ImageOps, ImageDraw, ImageFont
|
|||
|
|
# from nonebot.internal.params import ArgPlainText
|
|||
|
|
# from nonebot.params import CommandArg
|
|||
|
|
# from nonebot.typing import T_State
|
|||
|
|
# from playwright.async_api import async_playwright
|
|||
|
|
# from nonebot import on_command
|
|||
|
|
# from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment, GroupMessageEvent, Message
|
|||
|
|
# from nonebot.plugin import PluginMetadata
|
|||
|
|
# from typing import Optional, Union
|
|||
|
|
# from nonebot.matcher import Matcher
|
|||
|
|
#
|
|||
|
|
# __plugin_meta__ = PluginMetadata(
|
|||
|
|
# name="DeadLock助手",
|
|||
|
|
# description="DeadLock助手,我要成为DeadLock高手",
|
|||
|
|
# usage="nekoscore:neko查分 [steam好友代码]\n"
|
|||
|
|
# "nekobind:neko绑定 [steam好友代码]\n"
|
|||
|
|
# "nekorank:neko排行[TODO]\n"
|
|||
|
|
# "nekomatch:dl比赛详情[TODO]\n",
|
|||
|
|
# type="application",
|
|||
|
|
# extra={
|
|||
|
|
#
|
|||
|
|
# }
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
# neko_score = on_command("nekoscore", aliases={"dl查分"})
|
|||
|
|
# neko_bind = on_command("nekobind", aliases={"dl绑定"})
|
|||
|
|
# neko_unbind = on_command("nekounbind", aliases={"取消dl绑定"})
|
|||
|
|
# neko_rank = on_command("nekorank", aliases={"dl排行"})
|
|||
|
|
# neko_match = on_command("nekomatch", aliases={"dl比赛详情"})
|
|||
|
|
#
|
|||
|
|
# basic_path = os.path.dirname(__file__)
|
|||
|
|
# save_path = os.path.join(basic_path, "temp")
|
|||
|
|
# img_path = os.path.join(basic_path, "img")
|
|||
|
|
# data_path = os.path.join(basic_path, "data")
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# @neko_score.handle()
|
|||
|
|
# async def get_neko_score(ev: MessageEvent, rgs: Message = CommandArg()):
|
|||
|
|
# steam_id = rgs.extract_plain_text()
|
|||
|
|
# # 查询绑定情况
|
|||
|
|
# if steam_id is None or steam_id == "" or steam_id.isdigit() is False:
|
|||
|
|
# bind_steam_id = await get_neko_bind(ev.group_id, ev.user_id)
|
|||
|
|
# if bind_steam_id is None:
|
|||
|
|
# await neko_score.finish("请输入正确的steam好友代码!")
|
|||
|
|
# return
|
|||
|
|
# else:
|
|||
|
|
# steam_id = bind_steam_id
|
|||
|
|
# await neko_score.send(f"正在获取{steam_id}的数据")
|
|||
|
|
# url = f"https://deadlock.blast.tv/users/{steam_id}"
|
|||
|
|
# time_present1 = get_present_time()
|
|||
|
|
# result = await capture_screenshot(url, time_present1)
|
|||
|
|
# if result != "success":
|
|||
|
|
# await neko_score.finish(MessageSegment.reply(ev.message_id) + result)
|
|||
|
|
# img_path1 = os.path.join(save_path, f"{time_present1}.png")
|
|||
|
|
# images = gen_ms_img(Image.open(img_path1))
|
|||
|
|
# mes = (MessageSegment.reply(ev.message_id), images)
|
|||
|
|
# await neko_score.send(mes)
|
|||
|
|
# os.remove(img_path1)
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# @neko_bind.handle()
|
|||
|
|
# async def play_neko_bind(ev: MessageEvent, rgs: Message = CommandArg()):
|
|||
|
|
# # 获取群号
|
|||
|
|
# group_id = ev.group_id
|
|||
|
|
# steam_id = rgs.extract_plain_text()
|
|||
|
|
# if steam_id is None or steam_id == "" or steam_id.isdigit() is False:
|
|||
|
|
# await neko_score.finish("请输入正确的steam好友代码!")
|
|||
|
|
# return
|
|||
|
|
# qq_id = ev.user_id
|
|||
|
|
# res = await to_neko_bind(group_id, steam_id, qq_id)
|
|||
|
|
# await neko_score.send(res)
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# @neko_unbind.handle()
|
|||
|
|
# async def play_neko_unbind(ev: MessageEvent, rgs: Message = CommandArg()):
|
|||
|
|
# # 获取群号
|
|||
|
|
# group_id = ev.group_id
|
|||
|
|
# steam_id = rgs.extract_plain_text()
|
|||
|
|
# qq_id = ev.user_id
|
|||
|
|
# res = await to_neko_unbind(group_id, qq_id)
|
|||
|
|
# if res is None:
|
|||
|
|
# await neko_score.finish(f"{qq_id}未绑定过")
|
|||
|
|
# else:
|
|||
|
|
# await neko_score.send(f"{qq_id}已经取消与{res}的绑定")
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# async def to_neko_bind(group_id, steam_id, qq_id):
|
|||
|
|
# if os.path.exists(f"{data_path}/{group_id}.json"):
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'r') as file:
|
|||
|
|
# json_data = json.load(file)
|
|||
|
|
# else:
|
|||
|
|
# # 如果文件不存在,创建一个新的空字典
|
|||
|
|
# json_data = {}
|
|||
|
|
# # 检查指定的 key 是否存在
|
|||
|
|
# if str(qq_id) in json_data:
|
|||
|
|
# # 如果 key 存在,将对应键值更新
|
|||
|
|
# old_steam_id = json_data[str(qq_id)]
|
|||
|
|
# json_data[str(qq_id)] = steam_id
|
|||
|
|
# # 将更新后的数据写回 JSON 文件
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'w') as file:
|
|||
|
|
# json.dump(json_data, file, indent=4)
|
|||
|
|
# return f"已经将[ {str(qq_id)} ]的绑定[ {old_steam_id} ]更新到了[ {str(steam_id)} ]"
|
|||
|
|
# else:
|
|||
|
|
# # 如果 key 不存在,添加新的 key-value 对
|
|||
|
|
# json_data[str(qq_id)] = steam_id
|
|||
|
|
#
|
|||
|
|
# # 将更新后的数据写回 JSON 文件
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'w') as file:
|
|||
|
|
# json.dump(json_data, file, indent=4)
|
|||
|
|
# return f"[ {str(qq_id)} ]已经绑定[ {steam_id} ]"
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# async def get_neko_bind(group_id, qq_id):
|
|||
|
|
# if os.path.exists(f"{data_path}/{group_id}.json"):
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'r') as file:
|
|||
|
|
# json_data = json.load(file)
|
|||
|
|
# else:
|
|||
|
|
# # 如果文件不存在,创建一个新的空字典
|
|||
|
|
# json_data = {
|
|||
|
|
# "000000": 000000
|
|||
|
|
# }
|
|||
|
|
# # 检查指定的 key 是否存在
|
|||
|
|
# if str(qq_id) in json_data:
|
|||
|
|
# return json_data[str(qq_id)]
|
|||
|
|
# else:
|
|||
|
|
# return None
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# async def to_neko_unbind(group_id, qq_id):
|
|||
|
|
# if os.path.exists(f"{data_path}/{group_id}.json"):
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'r') as file:
|
|||
|
|
# json_data = json.load(file)
|
|||
|
|
# else:
|
|||
|
|
# # 如果文件不存在,创建一个新的空字典
|
|||
|
|
# json_data = {
|
|||
|
|
# "000000": 000000
|
|||
|
|
# }
|
|||
|
|
# # 检查指定的 key 是否存在
|
|||
|
|
# if str(qq_id) in json_data:
|
|||
|
|
# remove_value = json_data.pop(str(qq_id))
|
|||
|
|
# # 将更新后的数据写回 JSON 文件
|
|||
|
|
# with open(f"{data_path}/{group_id}.json", 'w') as file:
|
|||
|
|
# json.dump(json_data, file, indent=4)
|
|||
|
|
# return remove_value
|
|||
|
|
# else:
|
|||
|
|
# return None
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# def get_present_time() -> int:
|
|||
|
|
# return int(datetime.timestamp(datetime.now()))
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# async def capture_screenshot(url: str, time_present: int):
|
|||
|
|
# async with async_playwright() as p:
|
|||
|
|
# browser = await p.chromium.launch()
|
|||
|
|
# page = await browser.new_page()
|
|||
|
|
# try:
|
|||
|
|
# # 设置视口大小
|
|||
|
|
# await page.set_viewport_size({"width": 1600, "height": 900})
|
|||
|
|
# await page.goto(url)
|
|||
|
|
# await page.wait_for_load_state('networkidle')
|
|||
|
|
#
|
|||
|
|
# except Exception as e:
|
|||
|
|
# return f"访问网站异常{type(e)}`{e}`"
|
|||
|
|
#
|
|||
|
|
# await asyncio.sleep(1)
|
|||
|
|
# i_path = os.path.join(save_path, f'{time_present}.png')
|
|||
|
|
# await page.screenshot(
|
|||
|
|
# path=i_path,
|
|||
|
|
# full_page=True
|
|||
|
|
# )
|
|||
|
|
# logger.info("正在压缩图片...")
|
|||
|
|
# await asyncio.sleep(2)
|
|||
|
|
# img_convert = Image.open(i_path)
|
|||
|
|
# img_convert.save(i_path, quality=80)
|
|||
|
|
# logger.info("图片保存成功!")
|
|||
|
|
# await browser.close()
|
|||
|
|
# return "success"
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# def gen_ms_img(image: Union[bytes, Image.Image]) -> MessageSegment:
|
|||
|
|
# if isinstance(image, bytes):
|
|||
|
|
# return MessageSegment.image(
|
|||
|
|
# pic2b64(Image.open(io.BytesIO(image)))
|
|||
|
|
# )
|
|||
|
|
# else:
|
|||
|
|
# return MessageSegment.image(
|
|||
|
|
# pic2b64(image)
|
|||
|
|
# )
|
|||
|
|
#
|
|||
|
|
#
|
|||
|
|
# def pic2b64(pic: Image) -> str:
|
|||
|
|
# buf = io.BytesIO()
|
|||
|
|
# pic.save(buf, format='PNG')
|
|||
|
|
# base64_str = base64.b64encode(buf.getvalue()).decode()
|
|||
|
|
# return 'base64://' + base64_str
|