HeXi/hexi/plugins/nonebot_plugin_steam_info/__init__.py
sansenhoshi 275f05ee4a 重构
2026-01-04 17:15:40 +08:00

525 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import httpx
import nonebot
from io import BytesIO
from pathlib import Path
from nonebot.log import logger
from PIL import Image as PILImage
from nonebot.params import Depends
from nonebot.params import CommandArg
from nonebot import on_command, require
from typing import Union, Optional, List, Dict
from nonebot.adapters import Message, Event, Bot
from nonebot.plugin import PluginMetadata, inherit_supported_adapters
require("nonebot_plugin_alconna")
require("nonebot_plugin_localstore")
require("nonebot_plugin_apscheduler")
import nonebot_plugin_localstore as store
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_alconna import Text, Image, UniMessage, Target, At
from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment
from .config import Config
from .models import ProcessedPlayer
from .data_source import BindData, SteamInfoData, ParentData, DisableParentData
from .steam import (
get_steam_id,
get_user_data,
STEAM_ID_OFFSET,
get_steam_users_info,
)
from .draw import (
check_font,
set_font_paths,
draw_start_gaming,
draw_player_status,
draw_friends_status,
vertically_concatenate_images,
)
from .utils import (
fetch_avatar,
image_to_bytes,
simplize_steam_player_data,
convert_player_name_to_nickname,
)
__plugin_meta__ = PluginMetadata(
name="Steam Info",
description="播报绑定的 Steam 好友状态",
usage="""
steamhelp: 查看帮助
steambind [Steam ID 或 Steam 好友代码]: 绑定 Steam ID
steamunbind: 解绑 Steam ID
steaminfo (可选)[@某人 或 Steam ID 或 Steam好友代码]: 查看 Steam 主页
steamcheck: 查看 Steam 好友状态
steamenable: 启用 Steam 播报
steamdisable: 禁用 Steam 播报
steamupdate [名称] [图片]: 更新群信息
steamnickname [昵称]: 设置玩家昵称
""".strip(),
type="application",
homepage="https://github.com/zhaomaoniu/nonebot-plugin-steam-info",
config=Config,
supported_adapters=inherit_supported_adapters("nonebot_plugin_alconna"),
)
help = on_command("steamhelp", aliases={"steam帮助"}, priority=10)
bind = on_command("steambind", aliases={"绑定steam"}, priority=10)
unbind = on_command("steamunbind", aliases={"解绑steam"}, priority=10)
info = on_command("steaminfo", aliases={"steam信息"}, priority=10)
check = on_command("steamcheck", aliases={"查看steam", "查steam", "谁在玩游戏"}, priority=10)
enable = on_command("steamenable", aliases={"启用steam"}, priority=10)
disable = on_command("steamdisable", aliases={"禁用steam"}, priority=10)
update_parent_info = on_command("steamupdate", aliases={"更新群信息"}, priority=10)
set_nickname = on_command("steamnickname", aliases={"steam昵称"}, priority=10)
get_play_time = on_command("steamplaytime", aliases={"游玩时长表"}, priority=10)
if hasattr(nonebot, "get_plugin_config"):
config = nonebot.get_plugin_config(Config)
else:
from nonebot import get_driver
config = Config.parse_obj(get_driver().config)
set_font_paths(
config.steam_font_regular_path,
config.steam_font_light_path,
config.steam_font_bold_path,
)
bind_data_path = store.get_data_file("nonebot_plugin_steam_info", "bind_data.json")
steam_info_data_path = store.get_data_file(
"nonebot_plugin_steam_info", "steam_info.json"
)
parent_data_path = store.get_data_file("nonebot_plugin_steam_info", "parent_data.json")
disable_parent_data_path = store.get_data_file(
"nonebot_plugin_steam_info", "disable_parent_data.json"
)
avatar_path = store.get_cache_dir("nonebot_plugin_steam_info")
cache_path = avatar_path
bind_data = BindData(bind_data_path)
steam_info_data = SteamInfoData(steam_info_data_path)
parent_data = ParentData(parent_data_path)
disable_parent_data = DisableParentData(disable_parent_data_path)
try:
check_font()
except FileNotFoundError as e:
logger.error(
f"{e}, nonebot_plugin_steam_info 无法使用,请参照 `https://github.com/zhaomaoniu/nonebot-plugin-steam-info` 配置字体文件"
)
async def get_target(event: Event, bot: Bot) -> Optional[Target]:
target = UniMessage.get_target(event, bot, bot.adapter.get_name())
if target.private:
# 不支持私聊消息
return None
return target
async def to_image_data(image: Image) -> Union[BytesIO, bytes]:
if image.raw is not None:
return image.raw
if image.path is not None:
return Path(image.path).read_bytes()
if image.url is not None:
async with httpx.AsyncClient() as client:
response = await client.get(image.url)
if response.status_code != 200:
raise ValueError(f"无法获取图片数据: {response.status_code}")
return response.content
raise ValueError("无法获取图片数据")
async def broadcast_steam_info(
parent_id: str,
old_players: List[ProcessedPlayer],
new_players: List[ProcessedPlayer],
):
if disable_parent_data.is_disabled(parent_id):
return None
bot = nonebot.get_bot()
play_data = steam_info_data.compare(old_players, new_players)
msg = []
for entry in play_data:
player: ProcessedPlayer = entry["player"]
old_player: ProcessedPlayer = entry.get("old_player")
if entry["type"] == "start":
msg.append(f"{player['personaname']} 开始玩 {player['gameextrainfo']}")
elif entry["type"] == "stop":
time_start = old_player["game_start_time"]
time_stop = time.time()
hours = int((time_stop - time_start) / 3600)
minutes = int((time_stop - time_start) % 3600 / 60)
time_str = (
f"{hours} 小时 {minutes} 分钟" if hours > 0 else f"{minutes} 分钟"
)
msg.append(
f"{player['personaname']} 玩了 {time_str} {old_player['gameextrainfo']} 后不玩了"
)
elif entry["type"] == "change":
msg.append(
f"{player['personaname']} 停止玩 {old_player['gameextrainfo']},开始玩 {player['gameextrainfo']}"
)
elif entry["type"] == "error":
f"出现错误!{player['personaname']}\nNew: {player.get('gameextrainfo')}\nOld: {old_player.get('gameextrainfo')}"
else:
logger.error(f"未知的播报类型: {entry['type']}")
if msg == []:
return None
if config.steam_broadcast_type == "all":
steam_status_data = [
convert_player_name_to_nickname(
(await simplize_steam_player_data(player, config.proxy, avatar_path)),
parent_id,
bind_data,
)
for player in new_players
]
parent_avatar, parent_name = parent_data.get(parent_id)
image = draw_friends_status(parent_avatar, parent_name, steam_status_data)
uni_msg = UniMessage([Text("\n".join(msg)), Image(raw=image_to_bytes(image))])
elif config.steam_broadcast_type == "part":
images = [
draw_start_gaming(
(await fetch_avatar(entry["player"], avatar_path, config.proxy)),
entry["player"]["personaname"],
entry["player"]["gameextrainfo"],
bind_data.get_by_steam_id(parent_id, entry["player"]["steamid"])[
"nickname"
],
)
for entry in play_data
if entry["type"] == "start"
]
if images == []:
uni_msg = UniMessage([Text("\n".join(msg))])
else:
image = (
vertically_concatenate_images(images) if len(images) > 1 else images[0]
)
uni_msg = UniMessage(
[Text("\n".join(msg)), Image(raw=image_to_bytes(image))]
)
elif config.steam_broadcast_type == "none":
uni_msg = UniMessage([Text("\n".join(msg))])
else:
logger.error(f"未知的播报类型: {config.steam_broadcast_type}")
return None
try:
logger.info(f"主动消息触发:{uni_msg}")
await uni_msg.send(
Target(parent_id, parent_id, True, False, "", bot.adapter.get_name()), bot
)
except Exception as e:
logger.error(f"UniMessage异常{e}")
# 初始化计数器
current_key_index = 0
def key_select_2():
global current_key_index
# 获取key列表
keys = config.steam_api_key
# 获取当前使用的 key
current_key: str = keys[current_key_index]
# 更新计数器
current_key_index = (current_key_index + 1) % len(keys)
return [current_key]
async def update_steam_info():
steam_ids = bind_data.get_all_steam_id()
api_keys = config.steam_api_key
if not api_keys:
logger.warning("没有可用的 key。")
current_key = key_select_2()
logger.info(f"当前调用的key{current_key}")
steam_info = await get_steam_users_info(
steam_ids, current_key, config.proxy
)
old_players_dict: Dict[str, List[ProcessedPlayer]] = {}
for parent_id in bind_data.content.keys():
steam_ids = bind_data.get_all(parent_id)
old_players_dict[parent_id] = steam_info_data.get_players(steam_ids)
steam_info_data.update_by_players(steam_info["response"]["players"])
steam_info_data.save()
return bind_data, old_players_dict
@scheduler.scheduled_job(
"interval", minutes=config.steam_request_interval / 60, id="update_steam_info"
)
async def fetch_and_broadcast_steam_info():
bind_data, old_players_dict = await update_steam_info()
for parent_id in bind_data.content.keys():
old_players = old_players_dict[parent_id]
new_players = steam_info_data.get_players(bind_data.get_all(parent_id))
await broadcast_steam_info(parent_id, old_players, new_players)
if not config.steam_disable_broadcast_on_startup:
nonebot.get_driver().on_bot_connect(update_steam_info)
else:
logger.info("已禁用启动时的 Steam 播报")
@help.handle()
async def help_handle():
await help.finish(__plugin_meta__.usage)
@bind.handle()
async def bind_handle(
event: Event, target: Target = Depends(get_target), cmd_arg: Message = CommandArg()
):
parent_id = target.parent_id or target.id
arg = cmd_arg.extract_plain_text()
if not arg.isdigit():
await bind.finish(
"请输入正确的 Steam ID 或 Steam好友代码格式: steambind [Steam ID 或 Steam好友代码]"
)
steam_id = get_steam_id(arg)
if user_data := bind_data.get(parent_id, event.get_user_id()):
user_data["steam_id"] = steam_id
bind_data.save()
await bind.finish(f"已更新你的 Steam ID 为 {steam_id}")
else:
bind_data.add(
parent_id,
{"user_id": event.get_user_id(), "steam_id": steam_id, "nickname": None},
)
bind_data.save()
await bind.finish(f"已绑定你的 Steam ID 为 {steam_id}")
@unbind.handle()
async def unbind_handle(event: Event, target: Target = Depends(get_target)):
parent_id = target.parent_id or target.id
user_id = event.get_user_id()
if bind_data.get(parent_id, user_id) is not None:
bind_data.remove(parent_id, user_id)
bind_data.save()
await unbind.finish("已解绑 Steam ID")
else:
await unbind.finish("未绑定 Steam ID")
@info.handle()
async def info_handle(
bot: Bot,
event: Event,
target: Target = Depends(get_target),
arg: Message = CommandArg(),
):
parent_id = target.parent_id or target.id
uni_arg = await UniMessage.generate(message=arg, event=event, bot=bot)
at = uni_arg[At]
if len(at) != 0:
user_id: str = at[0].target
user_data = bind_data.get(parent_id, user_id)
if user_data is None:
await info.finish("该用户未绑定 Steam ID")
steam_id = user_data["steam_id"]
steam_friend_code = str(int(steam_id) - STEAM_ID_OFFSET)
elif arg.extract_plain_text().strip() != "":
steam_id = int(arg.extract_plain_text().strip())
if steam_id < STEAM_ID_OFFSET:
steam_friend_code = steam_id
steam_id += STEAM_ID_OFFSET
else:
steam_friend_code = steam_id - STEAM_ID_OFFSET
else:
user_data = bind_data.get(parent_id, event.get_user_id())
if user_data is None:
await info.finish(
"未绑定 Steam ID, 请使用 “steambind [Steam ID 或 Steam好友代码]” 绑定 Steam ID"
)
steam_id = user_data["steam_id"]
steam_friend_code = str(int(steam_id) - STEAM_ID_OFFSET)
player_data = await get_user_data(steam_id, cache_path, config.proxy)
draw_data = [
{
"game_header": game["game_image"],
"game_name": game["game_name"],
"game_time": f"{game['play_time']} 小时",
"last_play_time": game["last_played"],
"achievements": game["achievements"],
"completed_achievement_number": game.get("completed_achievement_number"),
"total_achievement_number": game.get("total_achievement_number"),
}
for game in player_data["game_data"]
]
image = draw_player_status(
player_data["background"],
player_data["avatar"],
player_data["player_name"],
str(steam_friend_code),
player_data["description"],
player_data["recent_2_week_play_time"],
draw_data,
)
await info.finish(
await UniMessage(
Image(raw=image_to_bytes(image)),
).export(bot)
)
# 初始化计数器
current_key_index_1 = 0
def key_select():
global current_key_index_1
# 获取key列表
keys = config.steam_api_key
# 获取当前使用的 key
current_key: str = keys[current_key_index_1]
# 更新计数器
current_key_index_1 = (current_key_index_1 + 1) % len(keys)
return [current_key]
@check.handle()
async def check_handle(
target: Target = Depends(get_target), arg: Message = CommandArg()
):
if arg.extract_plain_text().strip() != "":
return None
parent_id = target.parent_id or target.id
steam_ids = bind_data.get_all(parent_id)
current_key = key_select()
logger.info(f"当前调用的key{current_key}")
steam_info = await get_steam_users_info(
steam_ids, current_key, config.proxy
)
logger.debug(f"{parent_id} Players info: {steam_info}")
parent_avatar, parent_name = parent_data.get(parent_id)
steam_status_data = [
convert_player_name_to_nickname(
(await simplize_steam_player_data(player, config.proxy, avatar_path)),
parent_id,
bind_data,
)
for player in steam_info["response"]["players"]
]
image = draw_friends_status(parent_avatar, parent_name, steam_status_data)
await target.send(UniMessage(Image(raw=image_to_bytes(image))))
@update_parent_info.handle()
async def update_parent_info_handle(
bot: Bot,
event: Event,
target: Target = Depends(get_target),
arg: Message = CommandArg(),
):
msg = await UniMessage.generate(message=arg, event=event, bot=bot)
info = {}
for seg in msg:
if isinstance(seg, Image):
info["avatar"] = PILImage.open(BytesIO(await to_image_data(seg)))
elif isinstance(seg, Text) and seg.text != "":
info["name"] = seg.text
if "avatar" not in info or "name" not in info:
await update_parent_info.finish("文本中应包含图片和文字")
parent_data.update(target.parent_id or target.id, info["avatar"], info["name"])
await update_parent_info.finish("更新成功")
@enable.handle()
async def enable_handle(target: Target = Depends(get_target)):
parent_id = target.parent_id or target.id
disable_parent_data.remove(parent_id)
disable_parent_data.save()
await enable.finish("已启用 Steam 播报")
@disable.handle()
async def disable_handle(target: Target = Depends(get_target)):
parent_id = target.parent_id or target.id
disable_parent_data.add(parent_id)
disable_parent_data.save()
await disable.finish("已禁用 Steam 播报")
@set_nickname.handle()
async def set_nickname_handle(
event: Event, target: Target = Depends(get_target), cmd_arg: Message = CommandArg()
):
parent_id = target.parent_id or target.id
nickname = cmd_arg.extract_plain_text().strip()
if nickname == "":
await set_nickname.finish("请输入昵称,格式: steamnickname [昵称]")
user_data = bind_data.get(parent_id, event.get_user_id())
if user_data is None:
await set_nickname.finish(
"未绑定 Steam ID请先使用 steambind 绑定 Steam ID 后再设置昵称"
)
user_data["nickname"] = nickname
bind_data.save()
await set_nickname.finish(f"已设置你的昵称为 {nickname},将在 Steam 播报中显示")