import hashlib import os import json from io import BytesIO from PIL import Image, ImageOps, ImageDraw, ImageFont import requests import math import numpy as np from time import localtime, strftime, time import cjk_textwrap from pilmoji import Pilmoji from pilmoji.source import MicrosoftEmojiSource # from pilmoji.sources.microsoft import MicrosoftEmojiSource from .Reply import Reply import httpx import base64 from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment from typing import List, Union, Dict, Any import tempfile LOCAL_AVATAR_URL = "/tmp/{}.jpg" FINAL_IMAGE_URL = "/tmp/{}-final.jpg" FONT_WIDTH = 32 SMALL_FONT_WIDTH = 24 TIME_FONT_SMALL = 18 MAX_LINES = 5 font = ImageFont.truetype("./data/font/SourceHanSansCN-Medium.otf", FONT_WIDTH, encoding="unic") font_small = ImageFont.truetype("./data/font/SourceHanSansCN-Bold.otf", SMALL_FONT_WIDTH, encoding="unic") font_time = ImageFont.truetype("./data/font/SourceHanSansCN-Bold.otf", TIME_FONT_SMALL, encoding="unic") emoji_font = ImageFont.truetype("./data/font/seguiemj.ttf", FONT_WIDTH) async def generate(reply: Reply) -> str: """ 生成图片 """ image = Image.new('RGBA', (1280, 640), (0, 0, 0, 255)) text_wrapped = wrap_text(reply.message, 18) y_min = 100 - FONT_WIDTH y_max = 512 # center vertically y_start = (y_max - y_min) / 2 - min(len(text_wrapped), MAX_LINES) * FONT_WIDTH / 2 y = int(y_start) x_start = 601.1 x_end = 1241.1 draw = ImageDraw.Draw(image) # 过长截断 if len(text_wrapped) > MAX_LINES: text_wrapped[MAX_LINES - 1] = text_wrapped[3][:-3] + "..." text_wrapped = text_wrapped[:MAX_LINES] # --- 优化点:只开一次 Pilmoji 上下文 --- with Pilmoji(image, source=MicrosoftEmojiSource, emoji_position_offset=(0, 10)) as emoji: for i, line in enumerate(text_wrapped, start=1): text_size = draw.textlength(line, font) # center the text x = int(x_start + (x_end - x_start - text_size) / 2) emoji.text((x, y), line, font=font, fill=(255, 255, 255, 255), embedded_color=True) y += FONT_WIDTH + 20 # --- 头像处理 --- img_content = await download_avatar(str(reply.user_id)) avatar = Image.open(img_content).resize((640, 640)) avatar = alpha_gradient(avatar, 320, 640) image.paste(avatar, (0, 0), avatar) # --- 用户卡片 --- x_center = 921.1 card_size = draw.textlength("- " + reply.user_card, font_small) x_card_start = x_center - card_size / 2 draw.text((x_card_start, y), "- " + reply.user_card, font=font_small, fill=(169, 172, 184, 255)) # --- 时间 --- fmt_time = strftime("%Y.%m.%d %H:%M", localtime(reply.time)) time_size = draw.textlength(fmt_time, font_small) x_time_start = x_end - time_size draw.text((x_time_start, 640 - FONT_WIDTH - 20), fmt_time, font=font_time, fill=(169, 172, 184, 255)) # --- 输出 --- bytes_io = BytesIO() image.convert('RGB').save(bytes_io, format="JPEG") temp_file_path = bytesio2path(bytes_io) return temp_file_path def wrap_text(text: str, width: int) -> list[str]: """ 按宽度自动换行,支持中日韩文本。 先按换行符分行,再用 cjk_textwrap.wrap 按宽度折行。 """ return [wrapped for line in text.splitlines() for wrapped in cjk_textwrap.wrap(line, width)] def alpha_gradient(image, x_s: int, x_e: int): """ 为图像添加从 x_s 到 x_e 的左右渐变透明度遮罩。 使用 numpy 加速,避免逐像素 putpixel 的低效操作。 """ from PIL import Image # 延迟导入,避免循环依赖 image = image.convert("RGBA") w, h = image.size # 默认全不透明 gradient = np.full((h, w), 255, dtype=np.uint8) # 生成渐变列 xs = np.linspace(0, 1, x_e - x_s, endpoint=False) alphas = 255 - (np.sin(xs * np.pi - np.pi / 2) / 2 + 0.5) * 255 alphas = alphas.astype(np.uint8) # 应用到对应范围 gradient[:, x_s:x_e] = alphas[np.newaxis, :] mask = Image.fromarray(gradient, mode="L") image.putalpha(mask) return image def bytesio2path(img: Union[BytesIO, bytes]) -> str: if isinstance(img, BytesIO): img = img.getvalue() with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(img) temp_file_path = temp_file.name return f"file://{temp_file_path}" def smooth01(x: float) -> float: return math.sin(x * math.pi - math.pi / 2) / 2 + 0.5 async def download_avatar(user_id: str) -> BytesIO: url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640" data = await download_url(url) if not data or hashlib.md5(data).hexdigest() == "acef72340ac0e914090bd35799f5594e": url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=100" data = await download_url(url) return BytesIO(data) async def download_url(url: str) -> bytes: """ 下载指定 URL 内容,带 3 次重试和超时。 如果失败,返回空字节串 b""。 """ async with httpx.AsyncClient(timeout=5.0) as client: for i in range(3): try: resp = await client.get(url) if resp.status_code == 200: return resp.content except Exception as e: print(f"Error downloading {url}, retry {i + 1}/3: {e}") return b""