169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
import asyncio
|
||
import io
|
||
import json
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Optional, Union
|
||
import base64
|
||
|
||
from PIL import Image
|
||
from playwright.async_api import async_playwright
|
||
from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
|
||
from nonebot import logger
|
||
|
||
basic_path = os.path.dirname(__file__)
|
||
save_path = os.path.join(basic_path, "temp")
|
||
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) ",
|
||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||
"Accept-Language": "zh-cn"
|
||
}
|
||
|
||
|
||
def gen_ms_img(image: Union[bytes, Image.Image]) -> MessageSegment:
|
||
if isinstance(image, bytes):
|
||
return MessageSegment.image(
|
||
pic2b64(Image.open(io.BytesIO(image)))
|
||
)
|
||
else:
|
||
return MessageSegment.image(
|
||
pic2b64(image)
|
||
)
|
||
|
||
|
||
def get_present_time() -> int:
|
||
return int(datetime.timestamp(datetime.now()))
|
||
|
||
|
||
async def screen_shot(url: str, time_present: int) -> Optional[str or bool]:
|
||
async with async_playwright() as p:
|
||
browser = await p.chromium.launch()
|
||
page = await browser.new_page()
|
||
try:
|
||
# 访问页面
|
||
await page.goto(url)
|
||
# 等待页面加载完成
|
||
await page.wait_for_load_state('domcontentloaded')
|
||
await page.wait_for_load_state('networkidle')
|
||
|
||
# 1. 获取卡片数量
|
||
card_selector = "div.flex.cursor-pointer[style*='width: 320px;']"
|
||
# 等待至少一个卡片元素出现
|
||
await page.wait_for_selector(card_selector, state='visible')
|
||
|
||
# 获取卡片数量
|
||
card_count = await page.eval_on_selector_all(card_selector, "els => els.length")
|
||
logger.info(f"获取到的星球卡片数量:{card_count}")
|
||
|
||
# 2. 单个卡片高度(固定为 270)
|
||
card_height = 270
|
||
extra_padding = 100 # 防止 margin、padding、gap
|
||
|
||
# 3. 计算所需 viewport 高度
|
||
if card_count == 0 or card_count <= 15:
|
||
logger.warning("卡片元素数量未超过设定值,使用默认视口高度。")
|
||
required_height = 1080 # 默认高度
|
||
else:
|
||
required_height = 1080 + (card_count - 15) / 5 * card_height + extra_padding
|
||
|
||
viewport_width = 1920 # 固定宽度
|
||
await page.set_viewport_size({"width": viewport_width, "height": required_height})
|
||
|
||
# 记录时间
|
||
time_start = get_present_time()
|
||
# 4. 加载替换文本的脚本
|
||
with open(f'{basic_path}/data/plantes_mix.json', 'r', encoding='utf-8') as file:
|
||
replacements = json.load(file)
|
||
|
||
# 构建替换脚本
|
||
replacement_script = ""
|
||
for keyword, replacement in replacements.items():
|
||
escaped_keyword = json.dumps(keyword)
|
||
escaped_replacement = json.dumps(replacement)
|
||
replacement_script += f"""
|
||
document.body.outerHTML = document.body.outerHTML.replace(new RegExp({escaped_keyword}, 'g'), {escaped_replacement});
|
||
"""
|
||
|
||
# 执行替换脚本
|
||
await page.evaluate(replacement_script)
|
||
|
||
# 结束时间
|
||
time_end = get_present_time()
|
||
duration = time_end - time_start
|
||
logger.info(f"截图文本替换耗时:{duration}s")
|
||
|
||
# 等待页面更新
|
||
await asyncio.sleep(1)
|
||
|
||
# 保存截图
|
||
logger.info("正在保存图片...")
|
||
img_path = os.path.join(save_path, f'{time_present}.png')
|
||
await page.screenshot(
|
||
path=img_path,
|
||
full_page=True
|
||
)
|
||
|
||
# 压缩图片
|
||
logger.info("正在压缩图片...")
|
||
img_convert = Image.open(img_path)
|
||
img_convert.save(img_path, quality=80)
|
||
logger.info("图片保存成功!")
|
||
|
||
except Exception as e:
|
||
logger.error(f"访问网站异常:{type(e)} `{e}`")
|
||
return f"访问网站异常:{type(e)} `{e}`"
|
||
|
||
finally:
|
||
await browser.close()
|
||
|
||
return "success"
|
||
|
||
|
||
async def screen_shot_2(url: str, time_present: int) -> Optional[str or bool]:
|
||
async with async_playwright() as p:
|
||
browser = await p.chromium.launch()
|
||
page = await browser.new_page()
|
||
try:
|
||
# 设置视口大小
|
||
await page.set_viewport_size({"width": 1920, "height": 1080})
|
||
await page.goto(url)
|
||
await page.wait_for_load_state('networkidle')
|
||
with open(f'{basic_path}/data/plantes_mix.json', 'r', encoding='utf-8') as file:
|
||
replacements = json.load(file)
|
||
# 遍历字典,构建替换脚本
|
||
replacement_script = ""
|
||
for keyword, replacement in replacements.items():
|
||
escaped_keyword = json.dumps(keyword)
|
||
escaped_replacement = json.dumps(replacement)
|
||
replacement_script += f"""
|
||
document.body.outerHTML = document.body.outerHTML.replace(new RegExp({escaped_keyword}, 'g'), {escaped_replacement});
|
||
"""
|
||
|
||
# 在页面上执行替换脚本
|
||
await page.evaluate(replacement_script)
|
||
|
||
except Exception as e:
|
||
return f"访问网站异常{type(e)}`{e}`"
|
||
await asyncio.sleep(1)
|
||
logger.info("正在保存图片...")
|
||
img_path = os.path.join(save_path, f'{time_present}.png')
|
||
await page.screenshot(
|
||
path=img_path,
|
||
full_page=True
|
||
)
|
||
logger.info("正在压缩图片...")
|
||
img_convert = Image.open(img_path)
|
||
img_convert.save(img_path, quality=80)
|
||
logger.info("图片保存成功!")
|
||
await browser.close()
|
||
return "success"
|
||
|
||
|
||
def pic2b64(pic: Image) -> str:
|
||
buf = io.BytesIO()
|
||
pic.save(buf, format='PNG')
|
||
base64_str = base64.b64encode(buf.getvalue()).decode()
|
||
return 'base64://' + base64_str
|