169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
|
|
import asyncio
|
|||
|
|
import io
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Optional, Union
|
|||
|
|
import base64
|
|||
|
|
|
|||
|
|
from PIL import Image
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
from nonebot.adapters.onebot.v11 import MessageEvent, MessageSegment
|
|||
|
|
from nonebot import logger
|
|||
|
|
|
|||
|
|
basic_path = os.path.dirname(__file__)
|
|||
|
|
save_path = os.path.join(basic_path, "temp")
|
|||
|
|
|
|||
|
|
headers = {
|
|||
|
|
"User-Agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1.6) ",
|
|||
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|||
|
|
"Accept-Language": "zh-cn"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def gen_ms_img(image: Union[bytes, Image.Image]) -> MessageSegment:
|
|||
|
|
if isinstance(image, bytes):
|
|||
|
|
return MessageSegment.image(
|
|||
|
|
pic2b64(Image.open(io.BytesIO(image)))
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
return MessageSegment.image(
|
|||
|
|
pic2b64(image)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_present_time() -> int:
|
|||
|
|
return int(datetime.timestamp(datetime.now()))
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def screen_shot(url: str, time_present: int) -> Optional[str or bool]:
|
|||
|
|
async with async_playwright() as p:
|
|||
|
|
browser = await p.chromium.launch()
|
|||
|
|
page = await browser.new_page()
|
|||
|
|
try:
|
|||
|
|
# 访问页面
|
|||
|
|
await page.goto(url)
|
|||
|
|
# 等待页面加载完成
|
|||
|
|
await page.wait_for_load_state('domcontentloaded')
|
|||
|
|
await page.wait_for_load_state('networkidle')
|
|||
|
|
|
|||
|
|
# 1. 获取卡片数量
|
|||
|
|
card_selector = "div.flex.cursor-pointer[style*='width: 320px;']"
|
|||
|
|
# 等待至少一个卡片元素出现
|
|||
|
|
await page.wait_for_selector(card_selector, state='visible')
|
|||
|
|
|
|||
|
|
# 获取卡片数量
|
|||
|
|
card_count = await page.eval_on_selector_all(card_selector, "els => els.length")
|
|||
|
|
logger.info(f"获取到的星球卡片数量:{card_count}")
|
|||
|
|
|
|||
|
|
# 2. 单个卡片高度(固定为 270)
|
|||
|
|
card_height = 270
|
|||
|
|
extra_padding = 100 # 防止 margin、padding、gap
|
|||
|
|
|
|||
|
|
# 3. 计算所需 viewport 高度
|
|||
|
|
if card_count == 0 or card_count <= 15:
|
|||
|
|
logger.warning("卡片元素数量未超过设定值,使用默认视口高度。")
|
|||
|
|
required_height = 1080 # 默认高度
|
|||
|
|
else:
|
|||
|
|
required_height = 1080 + (card_count - 15) / 5 * card_height + extra_padding
|
|||
|
|
|
|||
|
|
viewport_width = 1920 # 固定宽度
|
|||
|
|
await page.set_viewport_size({"width": viewport_width, "height": required_height})
|
|||
|
|
|
|||
|
|
# 记录时间
|
|||
|
|
time_start = get_present_time()
|
|||
|
|
# 4. 加载替换文本的脚本
|
|||
|
|
with open(f'{basic_path}/data/plantes_mix.json', 'r', encoding='utf-8') as file:
|
|||
|
|
replacements = json.load(file)
|
|||
|
|
|
|||
|
|
# 构建替换脚本
|
|||
|
|
replacement_script = ""
|
|||
|
|
for keyword, replacement in replacements.items():
|
|||
|
|
escaped_keyword = json.dumps(keyword)
|
|||
|
|
escaped_replacement = json.dumps(replacement)
|
|||
|
|
replacement_script += f"""
|
|||
|
|
document.body.outerHTML = document.body.outerHTML.replace(new RegExp({escaped_keyword}, 'g'), {escaped_replacement});
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
# 执行替换脚本
|
|||
|
|
await page.evaluate(replacement_script)
|
|||
|
|
|
|||
|
|
# 结束时间
|
|||
|
|
time_end = get_present_time()
|
|||
|
|
duration = time_end - time_start
|
|||
|
|
logger.info(f"截图文本替换耗时:{duration}s")
|
|||
|
|
|
|||
|
|
# 等待页面更新
|
|||
|
|
await asyncio.sleep(1)
|
|||
|
|
|
|||
|
|
# 保存截图
|
|||
|
|
logger.info("正在保存图片...")
|
|||
|
|
img_path = os.path.join(save_path, f'{time_present}.png')
|
|||
|
|
await page.screenshot(
|
|||
|
|
path=img_path,
|
|||
|
|
full_page=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 压缩图片
|
|||
|
|
logger.info("正在压缩图片...")
|
|||
|
|
img_convert = Image.open(img_path)
|
|||
|
|
img_convert.save(img_path, quality=80)
|
|||
|
|
logger.info("图片保存成功!")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"访问网站异常:{type(e)} `{e}`")
|
|||
|
|
return f"访问网站异常:{type(e)} `{e}`"
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
return "success"
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def screen_shot_2(url: str, time_present: int) -> Optional[str or bool]:
|
|||
|
|
async with async_playwright() as p:
|
|||
|
|
browser = await p.chromium.launch()
|
|||
|
|
page = await browser.new_page()
|
|||
|
|
try:
|
|||
|
|
# 设置视口大小
|
|||
|
|
await page.set_viewport_size({"width": 1920, "height": 1080})
|
|||
|
|
await page.goto(url)
|
|||
|
|
await page.wait_for_load_state('networkidle')
|
|||
|
|
with open(f'{basic_path}/data/plantes_mix.json', 'r', encoding='utf-8') as file:
|
|||
|
|
replacements = json.load(file)
|
|||
|
|
# 遍历字典,构建替换脚本
|
|||
|
|
replacement_script = ""
|
|||
|
|
for keyword, replacement in replacements.items():
|
|||
|
|
escaped_keyword = json.dumps(keyword)
|
|||
|
|
escaped_replacement = json.dumps(replacement)
|
|||
|
|
replacement_script += f"""
|
|||
|
|
document.body.outerHTML = document.body.outerHTML.replace(new RegExp({escaped_keyword}, 'g'), {escaped_replacement});
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
# 在页面上执行替换脚本
|
|||
|
|
await page.evaluate(replacement_script)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
return f"访问网站异常{type(e)}`{e}`"
|
|||
|
|
await asyncio.sleep(1)
|
|||
|
|
logger.info("正在保存图片...")
|
|||
|
|
img_path = os.path.join(save_path, f'{time_present}.png')
|
|||
|
|
await page.screenshot(
|
|||
|
|
path=img_path,
|
|||
|
|
full_page=True
|
|||
|
|
)
|
|||
|
|
logger.info("正在压缩图片...")
|
|||
|
|
img_convert = Image.open(img_path)
|
|||
|
|
img_convert.save(img_path, quality=80)
|
|||
|
|
logger.info("图片保存成功!")
|
|||
|
|
await browser.close()
|
|||
|
|
return "success"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def pic2b64(pic: Image) -> str:
|
|||
|
|
buf = io.BytesIO()
|
|||
|
|
pic.save(buf, format='PNG')
|
|||
|
|
base64_str = base64.b64encode(buf.getvalue()).decode()
|
|||
|
|
return 'base64://' + base64_str
|