HeXi/hexi/plugins/makeaquote/make_a_qoute.py
sansenhoshi 275f05ee4a 重构
2026-01-04 17:15:40 +08:00

161 lines
5.3 KiB
Python

import hashlib
import os
import json
from io import BytesIO
from PIL import Image, ImageOps, ImageDraw, ImageFont
import requests
import math
import numpy as np
from time import localtime, strftime, time
import cjk_textwrap
from pilmoji import Pilmoji
from pilmoji.source import MicrosoftEmojiSource
# from pilmoji.sources.microsoft import MicrosoftEmojiSource
from .Reply import Reply
import httpx
import base64
from nonebot.adapters.onebot.v11 import MessageEvent, Bot, MessageSegment
from typing import List, Union, Dict, Any
import tempfile
LOCAL_AVATAR_URL = "/tmp/{}.jpg"
FINAL_IMAGE_URL = "/tmp/{}-final.jpg"
FONT_WIDTH = 32
SMALL_FONT_WIDTH = 24
TIME_FONT_SMALL = 18
MAX_LINES = 5
font = ImageFont.truetype("./data/font/SourceHanSansCN-Medium.otf", FONT_WIDTH, encoding="unic")
font_small = ImageFont.truetype("./data/font/SourceHanSansCN-Bold.otf", SMALL_FONT_WIDTH, encoding="unic")
font_time = ImageFont.truetype("./data/font/SourceHanSansCN-Bold.otf", TIME_FONT_SMALL, encoding="unic")
emoji_font = ImageFont.truetype("./data/font/seguiemj.ttf", FONT_WIDTH)
async def generate(reply: Reply) -> str:
"""
生成图片
"""
image = Image.new('RGBA', (1280, 640), (0, 0, 0, 255))
text_wrapped = wrap_text(reply.message, 18)
y_min = 100 - FONT_WIDTH
y_max = 512
# center vertically
y_start = (y_max - y_min) / 2 - min(len(text_wrapped), MAX_LINES) * FONT_WIDTH / 2
y = int(y_start)
x_start = 601.1
x_end = 1241.1
draw = ImageDraw.Draw(image)
# 过长截断
if len(text_wrapped) > MAX_LINES:
text_wrapped[MAX_LINES - 1] = text_wrapped[3][:-3] + "..."
text_wrapped = text_wrapped[:MAX_LINES]
# --- 优化点:只开一次 Pilmoji 上下文 ---
with Pilmoji(image, source=MicrosoftEmojiSource, emoji_position_offset=(0, 10)) as emoji:
for i, line in enumerate(text_wrapped, start=1):
text_size = draw.textlength(line, font)
# center the text
x = int(x_start + (x_end - x_start - text_size) / 2)
emoji.text((x, y), line, font=font, fill=(255, 255, 255, 255), embedded_color=True)
y += FONT_WIDTH + 20
# --- 头像处理 ---
img_content = await download_avatar(str(reply.user_id))
avatar = Image.open(img_content).resize((640, 640))
avatar = alpha_gradient(avatar, 320, 640)
image.paste(avatar, (0, 0), avatar)
# --- 用户卡片 ---
x_center = 921.1
card_size = draw.textlength("- " + reply.user_card, font_small)
x_card_start = x_center - card_size / 2
draw.text((x_card_start, y), "- " + reply.user_card, font=font_small, fill=(169, 172, 184, 255))
# --- 时间 ---
fmt_time = strftime("%Y.%m.%d %H:%M", localtime(reply.time))
time_size = draw.textlength(fmt_time, font_small)
x_time_start = x_end - time_size
draw.text((x_time_start, 640 - FONT_WIDTH - 20), fmt_time, font=font_time, fill=(169, 172, 184, 255))
# --- 输出 ---
bytes_io = BytesIO()
image.convert('RGB').save(bytes_io, format="JPEG")
temp_file_path = bytesio2path(bytes_io)
return temp_file_path
def wrap_text(text: str, width: int) -> list[str]:
"""
按宽度自动换行,支持中日韩文本。
先按换行符分行,再用 cjk_textwrap.wrap 按宽度折行。
"""
return [wrapped for line in text.splitlines() for wrapped in cjk_textwrap.wrap(line, width)]
def alpha_gradient(image, x_s: int, x_e: int):
"""
为图像添加从 x_s 到 x_e 的左右渐变透明度遮罩。
使用 numpy 加速,避免逐像素 putpixel 的低效操作。
"""
from PIL import Image # 延迟导入,避免循环依赖
image = image.convert("RGBA")
w, h = image.size
# 默认全不透明
gradient = np.full((h, w), 255, dtype=np.uint8)
# 生成渐变列
xs = np.linspace(0, 1, x_e - x_s, endpoint=False)
alphas = 255 - (np.sin(xs * np.pi - np.pi / 2) / 2 + 0.5) * 255
alphas = alphas.astype(np.uint8)
# 应用到对应范围
gradient[:, x_s:x_e] = alphas[np.newaxis, :]
mask = Image.fromarray(gradient, mode="L")
image.putalpha(mask)
return image
def bytesio2path(img: Union[BytesIO, bytes]) -> str:
if isinstance(img, BytesIO):
img = img.getvalue()
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(img)
temp_file_path = temp_file.name
return f"file://{temp_file_path}"
def smooth01(x: float) -> float:
return math.sin(x * math.pi - math.pi / 2) / 2 + 0.5
async def download_avatar(user_id: str) -> BytesIO:
url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=640"
data = await download_url(url)
if not data or hashlib.md5(data).hexdigest() == "acef72340ac0e914090bd35799f5594e":
url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=100"
data = await download_url(url)
return BytesIO(data)
async def download_url(url: str) -> bytes:
"""
下载指定 URL 内容,带 3 次重试和超时。
如果失败,返回空字节串 b""
"""
async with httpx.AsyncClient(timeout=5.0) as client:
for i in range(3):
try:
resp = await client.get(url)
if resp.status_code == 200:
return resp.content
except Exception as e:
print(f"Error downloading {url}, retry {i + 1}/3: {e}")
return b""