From 5956ec1148593639b3131ce8b13926783e6bda97 Mon Sep 17 00:00:00 2001 From: HibiKier <45528451+HibiKier@users.noreply.github.com> Date: Mon, 16 Dec 2024 22:56:17 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E7=BE=A4=E6=AC=A2=E8=BF=8E=E6=B6=88=E6=81=AF=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A=E6=9D=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=9A=8F=E6=9C=BA=E5=8F=91=E9=80=81=20(#1768)=20(#1774)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../builtin_plugins/admin/welcome_message.py | 145 ---------- .../admin/welcome_message/__init__.py | 132 +++++++++ .../admin/welcome_message/data_source.py | 262 ++++++++++++++++++ .../platform/qq/group_handle/data_source.py | 81 ++++-- 4 files changed, 446 insertions(+), 174 deletions(-) delete mode 100644 zhenxun/builtin_plugins/admin/welcome_message.py create mode 100644 zhenxun/builtin_plugins/admin/welcome_message/__init__.py create mode 100644 zhenxun/builtin_plugins/admin/welcome_message/data_source.py diff --git a/zhenxun/builtin_plugins/admin/welcome_message.py b/zhenxun/builtin_plugins/admin/welcome_message.py deleted file mode 100644 index 47097ff0..00000000 --- a/zhenxun/builtin_plugins/admin/welcome_message.py +++ /dev/null @@ -1,145 +0,0 @@ -from pathlib import Path -import shutil -from typing import Annotated - -from nonebot import on_command -from nonebot.params import Command -from nonebot.plugin import PluginMetadata -from nonebot_plugin_alconna import Image, Text, UniMsg -from nonebot_plugin_session import EventSession -import ujson as json - -from zhenxun.configs.config import Config -from zhenxun.configs.path_config import DATA_PATH -from zhenxun.configs.utils import PluginExtraData, RegisterConfig -from zhenxun.services.log import logger -from zhenxun.utils.enum import PluginType -from zhenxun.utils.http_utils import AsyncHttpx -from zhenxun.utils.rules import admin_check, ensure_group - -base_config = Config.get("admin_bot_manage") - -__plugin_meta__ = PluginMetadata( - name="自定义群欢迎消息", - description="自定义群欢迎消息", - usage=""" - 设置群欢迎消息,当消息中包含 -at 时会at入群用户 - 设置欢迎消息 欢迎新人![图片] - 设置欢迎消息 欢迎你 -at - """.strip(), - extra=PluginExtraData( - author="HibiKier", - version="0.1", - plugin_type=PluginType.ADMIN, - admin_level=base_config.get("SET_GROUP_WELCOME_MESSAGE_LEVEL", 2), - configs=[ - RegisterConfig( - module="admin_bot_manage", - key="SET_GROUP_WELCOME_MESSAGE_LEVEL", - value=2, - help="设置群欢迎消息所需要的管理员权限等级", - default_value=2, - ) - ], - ).dict(), -) - -_matcher = on_command( - "设置欢迎消息", - rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL") - & ensure_group, - priority=5, - block=True, -) - - -BASE_PATH = DATA_PATH / "welcome_message" -BASE_PATH.mkdir(parents=True, exist_ok=True) - -# 旧数据迁移 -old_file = DATA_PATH / "custom_welcome_msg" / "custom_welcome_msg.json" -if old_file.exists(): - try: - old_data: dict[str, str] = json.load(old_file.open(encoding="utf8")) - for group_id, message in old_data.items(): - file = BASE_PATH / "qq" / f"{group_id}" / "text.json" - file.parent.mkdir(parents=True, exist_ok=True) - json.dump( - {"at": "[at]" in message, "message": message.replace("[at]", "")}, - file.open("w", encoding="utf8"), - ensure_ascii=False, - indent=4, - ) - logger.debug("群欢迎消息数据迁移", group_id=group_id) - shutil.rmtree(old_file.parent.absolute()) - except Exception as e: - logger.error("群欢迎消息数据迁移失败...", e=e) - - -def get_path(session: EventSession) -> Path: - """根据Session获取存储路径 - - 参数: - session: EventSession: - - 返回: - Path: 存储路径 - """ - path = BASE_PATH / f"{session.platform or session.bot_type}" / f"{session.id2}" - if session.id3: - path = ( - BASE_PATH - / f"{session.platform or session.bot_type}" - / f"{session.id3}" - / f"{session.id2}" - ) - path.mkdir(parents=True, exist_ok=True) - for f in path.iterdir(): - f.unlink() - return path - - -async def save(path: Path, message: UniMsg) -> str: - """保存群欢迎消息 - - 参数: - path: 存储路径 - message: 消息内容 - - 返回: - str: 消息内容文本格式 - """ - idx = 0 - text = "" - file = path / "text.json" - for msg in message: - if isinstance(msg, Text): - text += msg.text - elif isinstance(msg, Image): - if msg.url: - text += f"[image:{idx}]" - if await AsyncHttpx.download_file(msg.url, path / f"{idx}.png"): - idx += 1 - else: - logger.warning("图片 URL 为空...", "设置欢迎消息") - json.dump( - {"at": "-at" in text, "message": text.replace("-at", "", 1)}, - file.open("w", encoding="utf-8"), - ensure_ascii=False, - indent=4, - ) - return text - - -@_matcher.handle() -async def _( - session: EventSession, - message: UniMsg, - command: Annotated[tuple[str, ...], Command()], -): - path = get_path(session) - message[0].text = message[0].text.replace(command[0], "").strip() - text = await save(path, message) - uni_msg = Text("设置欢迎消息成功: \n") + message - await uni_msg.send() - logger.info(f"设置群欢迎消息成功: {text}", command[0], session=session) diff --git a/zhenxun/builtin_plugins/admin/welcome_message/__init__.py b/zhenxun/builtin_plugins/admin/welcome_message/__init__.py new file mode 100644 index 00000000..35b3fee1 --- /dev/null +++ b/zhenxun/builtin_plugins/admin/welcome_message/__init__.py @@ -0,0 +1,132 @@ +from typing import Annotated + +from nonebot import on_command +from nonebot.params import Command +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import ( + Alconna, + AlconnaMatcher, + Args, + Arparma, + Field, + Match, + Text, + UniMsg, + on_alconna, +) +from nonebot_plugin_uninfo import Uninfo + +from zhenxun.configs.config import Config +from zhenxun.configs.utils import PluginExtraData, RegisterConfig +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType +from zhenxun.utils.message import MessageUtils +from zhenxun.utils.rules import admin_check, ensure_group + +from .data_source import Manager + +base_config = Config.get("admin_bot_manage") + +__plugin_meta__ = PluginMetadata( + name="自定义群欢迎消息", + description="自定义群欢迎消息", + usage=""" + 设置群欢迎消息,当消息中包含 -at 时会at入群用户 + 可以设置多条欢迎消息,包含多条欢迎消息时将随机发送 + 指令: + 设置欢迎消息 + 查看欢迎消息 ?[id]: 存在id时查看指定欢迎消息内容 + 删除欢迎消息 [id] + 示例: + 设置欢迎消息 欢迎新人![图片] + 设置欢迎消息 欢迎你 -at + 查看欢迎消息 + 查看欢迎消息 2 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.ADMIN, + admin_level=base_config.get("SET_GROUP_WELCOME_MESSAGE_LEVEL", 2), + configs=[ + RegisterConfig( + module="admin_bot_manage", + key="SET_GROUP_WELCOME_MESSAGE_LEVEL", + value=2, + help="设置群欢迎消息所需要的管理员权限等级", + default_value=2, + ) + ], + ).dict(), +) + +_matcher = on_command( + "设置欢迎消息", + rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL") + & ensure_group, + priority=5, + block=True, +) + +_show_matcher = on_alconna( + Alconna("查看欢迎消息", Args["idx?", int]), + rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL") + & ensure_group, + priority=5, + block=True, +) + +_del_matcher: type[AlconnaMatcher] = on_alconna( + Alconna( + "删除欢迎消息", + Args[ + "idx", + int, + Field( + missing_tips=lambda: "请在命令后跟随指定id!", + unmatch_tips=lambda _: "删除指定id必须为数字!", + ), + ], + ), + skip_for_unmatch=False, + rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL") + & ensure_group, + priority=5, + block=True, +) + + +@_matcher.handle() +async def _( + session: Uninfo, + message: UniMsg, + command: Annotated[tuple[str, ...], Command()], +): + path = Manager.get_path(session) + if not path: + await MessageUtils.build_message("群组不存在...").finish() + message[0].text = message[0].text.replace(command[0], "").strip() + await Manager.save(path, message) + uni_msg = Text("设置欢迎消息成功: \n") + message + await uni_msg.send() + logger.info(f"设置群欢迎消息成功: {message}", command[0], session=session) + + +@_show_matcher.handle() +async def _(session: Uninfo, arparma: Arparma, idx: Match[int]): + result = await Manager.get_group_message( + session, idx.result if idx.available else None + ) + if not result: + await MessageUtils.build_message("当前还未设置群组欢迎消息哦...").finish() + await MessageUtils.build_message(result).send() + logger.info("查看群组欢迎信息", arparma.header_result, session=session) + + +@_del_matcher.handle() +async def _(session: Uninfo, arparma: Arparma, idx: int): + result = await Manager.delete_group_message(session, int(idx)) + if not result: + await MessageUtils.build_message("未查找到指定id的群组欢迎消息...").finish() + await MessageUtils.build_message(result).send() + logger.info(f"删除群组欢迎信息: {result}", arparma.header_result, session=session) diff --git a/zhenxun/builtin_plugins/admin/welcome_message/data_source.py b/zhenxun/builtin_plugins/admin/welcome_message/data_source.py new file mode 100644 index 00000000..99f18033 --- /dev/null +++ b/zhenxun/builtin_plugins/admin/welcome_message/data_source.py @@ -0,0 +1,262 @@ +import os +from pathlib import Path +import re +import shutil +import uuid + +import nonebot +from nonebot_plugin_alconna import UniMessage, UniMsg +from nonebot_plugin_uninfo import Uninfo +import ujson as json + +from zhenxun.configs.path_config import DATA_PATH +from zhenxun.services.log import logger +from zhenxun.utils._build_image import BuildImage +from zhenxun.utils._image_template import ImageTemplate +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.platform import PlatformUtils + +BASE_PATH = DATA_PATH / "welcome_message" +BASE_PATH.mkdir(parents=True, exist_ok=True) + +driver = nonebot.get_driver() + + +old_file = DATA_PATH / "custom_welcome_msg" / "custom_welcome_msg.json" +if old_file.exists(): + try: + old_data: dict[str, str] = json.load(old_file.open(encoding="utf8")) + for group_id, message in old_data.items(): + file = BASE_PATH / "qq" / f"{group_id}" / "text.json" + file.parent.mkdir(parents=True, exist_ok=True) + json.dump( + { + uuid.uuid4(): { + "at": "[at]" in message, + "status": True, + "message": message.replace("[at]", ""), + } + }, + file.open("w", encoding="utf8"), + ensure_ascii=False, + indent=4, + ) + logger.debug("群欢迎消息数据迁移", group_id=group_id) + shutil.rmtree(old_file.parent.absolute()) + except Exception as e: + logger.error("群欢迎消息数据迁移失败...", e=e) + + +def migrate(path: Path): + """数据迁移 + + 参数: + path: 路径 + """ + text_file = path / "text.json" + with text_file.open(encoding="utf8") as f: + json_data = json.load(f) + new_data = {} + if "at" in json_data: + split_msg = re.split(r"\[image:\d\]", str(json_data["message"])) + data = [] + for i in range(len(split_msg)): + msg = split_msg[i] + data.append( + { + "type": "text", + "text": msg, + } + ) + image_file = path / f"{i}.png" + if image_file.exists(): + data.append( + { + "type": "image", + "path": str(image_file), + } + ) + new_data[uuid.uuid4()] = { + "at": json_data.get("at", False), + "status": json_data.get("status", True), + "message": data, + } + with text_file.open("w", encoding="utf8") as f: + json.dump(new_data, f, ensure_ascii=False, indent=4) + + +@driver.on_startup +def _(): + """数据迁移 + + 参数: + path: 存储路径 + json_data: 存储数据 + """ + flag_file = BASE_PATH / "flag.txt" + if flag_file.exists(): + return + logger.info("开始迁移群欢迎消息数据...") + base_path = BASE_PATH + path_list = [] + for platform in os.listdir(BASE_PATH): + base_path = base_path / platform + for group_id in os.listdir(base_path): + group_path = base_path / group_id + is_channel = False + for file in os.listdir(group_path): + inner_file = group_path / file + if inner_file.is_dir(): + path_list.append(inner_file) + is_channel = True + if not is_channel: + path_list.append(group_path) + if path_list: + for path in path_list: + migrate(path) + if not flag_file.exists(): + flag_file.touch() + logger.success("迁移群欢迎消息数据完成!", "") + + +class Manager: + @classmethod + def __get_data(cls, session: Uninfo) -> dict | None: + """获取存储数据 + + 参数: + session: Uninfo + + 返回: + dict | None: 欢迎消息数据 + """ + if not session.group: + return None + path = cls.get_path(session) + if not path: + return None + file = path / "text.json" + if not file.exists(): + return None + with file.open(encoding="utf8") as f: + return json.load(f) + + @classmethod + def get_path(cls, session: Uninfo) -> Path | None: + """根据Session获取存储路径 + + 参数: + session: Uninfo: + + 返回: + Path: 存储路径 + """ + if not session.group: + return None + platform = PlatformUtils.get_platform(session) + path = BASE_PATH / f"{platform}" / f"{session.group.id}" + if session.group.parent: + path = ( + BASE_PATH + / f"{platform}" + / f"{session.group.parent.id}" + / f"{session.group.id}" + ) + path.mkdir(parents=True, exist_ok=True) + return path + + @classmethod + async def save(cls, path: Path, message: UniMsg): + """保存群欢迎消息 + + 参数: + path: 存储路径 + message: 消息内容 + """ + file = path / "text.json" + json_data = {} + if file.exists(): + with file.open(encoding="utf8") as f: + json_data = json.load(f) + data = [] + is_at = False + for msg in message.dump(True): + if msg["type"] == "image": + image_file = path / f"{uuid.uuid4()}.png" + await AsyncHttpx.download_file(msg["url"], image_file) + msg["path"] = str(image_file) + if not is_at and msg["type"] == "text" and "-at" in msg["text"]: + msg["text"] = msg["text"].replace("-at", "", 1).strip() + is_at = True + data.append(msg) + json_data[str(uuid.uuid4())] = {"at": is_at, "status": True, "message": data} + with file.open("w", encoding="utf8") as f: + json.dump(json_data, f, ensure_ascii=False, indent=4) + + @classmethod + async def get_group_message( + cls, session: Uninfo, idx: int | None + ) -> BuildImage | UniMessage | None: + """获取群欢迎消息 + + 参数: + session: Uninfo + idx: 指定id + + 返回: + list: 消息内容 + """ + json_data = cls.__get_data(session) + if not json_data: + return None + if idx is not None: + key_list = list(json_data.keys()) + if idx < 0 or idx > len(key_list): + return None + return UniMessage().load(json_data[key_list[idx]]["message"]) + else: + msg_list = [] + for i, uid in enumerate(json_data): + msg_data = json_data[uid] + msg_list.append( + [ + i, + "开启" if msg_data["status"] else "关闭", + "是" if msg_data["at"] else "否", + str(UniMessage().load(msg_data["message"])), + ] + ) + if not msg_list: + return None + column_name = ["ID", "状态", "是否@", "消息"] + return await ImageTemplate.table_page( + "群欢迎消息", session.group.id, column_name, msg_list + ) + + @classmethod + async def delete_group_message(cls, session: Uninfo, idx: int) -> str | None: + """获取群欢迎消息 + + 参数: + session: EventSession: + id: 消息ID + + 返回: + list: 消息内容 + """ + json_data = cls.__get_data(session) + if not json_data: + return None + key_list = list(json_data.keys()) + if idx < 0 or idx >= len(key_list): + return None + old_msg = str(UniMessage().load(json_data[key_list[idx]]["message"])) + for msg in json_data[key_list[idx]]["message"]: + if msg["type"] == "image" and msg["path"]: + image_path = Path(msg["path"]) + if image_path.exists(): + image_path.unlink() + del json_data[key_list[idx]] + with file.open("w", encoding="utf8") as f: + json.dump(json_data, f, ensure_ascii=False, indent=4) + return f"删除群组欢迎消息成功!消息内容: {old_msg}" diff --git a/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py b/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py index 9ffde102..fc188eb1 100644 --- a/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py +++ b/zhenxun/builtin_plugins/platform/qq/group_handle/data_source.py @@ -2,10 +2,9 @@ import os from pathlib import Path import random -import re from nonebot.adapters import Bot -from nonebot_plugin_alconna import At +from nonebot_plugin_alconna import At, UniMessage from nonebot_plugin_uninfo import Uninfo import ujson as json @@ -28,7 +27,7 @@ limit_cd = base_config.get("welcome_msg_cd") -WELCOME_PATH = DATA_PATH / "welcome_message" / "qq" +WELCOME_PATH = DATA_PATH / "welcome_message" DEFAULT_IMAGE_PATH = IMAGE_PATH / "qxz" @@ -153,44 +152,68 @@ async def add_bot( await cls.__refresh_level(bot, group_id) @classmethod - def __build_welcome_message(cls, user_id: str, path: Path) -> list[At | Path | str]: - """构造群欢迎消息 + def get_path(cls, session: Uninfo) -> Path | None: + """根据Session获取存储路径 参数: - user_id: 用户id - path: 群欢迎消息存储路径 + session: Uninfo: + + 返回: + Path: 存储路径 + """ + if not session.group: + return None + platform = PlatformUtils.get_platform(session) + path = WELCOME_PATH / f"{platform}" / f"{session.group.id}" + if session.group.parent: + path = ( + WELCOME_PATH + / f"{platform}" + / f"{session.group.parent.id}" + / f"{session.group.id}" + ) + path.mkdir(parents=True, exist_ok=True) + return path + + @classmethod + def __get_welcome_data(cls, session: Uninfo) -> dict | None: + """获取存储数据 + + 参数: + session: Uninfo 返回: - list[At | Path | str]: 消息列表 + dict | None: 欢迎消息数据 """ + if not session.group: + return None + path = cls.get_path(session) + if not path: + return None file = path / "text.json" - data = json.load(file.open(encoding="utf-8")) - message = data["message"] - msg_split = re.split(r"\[image:\d+\]", message) - msg_list = [] - if data["at"]: - msg_list.append(At(flag="user", target=user_id)) - for i, text in enumerate(msg_split): - msg_list.append(text) - img_file = path / f"{i}.png" - if img_file.exists(): - msg_list.append(img_file) - return msg_list + if not file.exists(): + return None + with file.open(encoding="utf8") as f: + return json.load(f) @classmethod - async def __send_welcome_message(cls, user_id: str, group_id: str): + async def __send_welcome_message(cls, session: Uninfo, user_id: str): """发送群欢迎消息 参数: user_id: 用户id group_id: 群组id """ - cls._flmt.start_cd(group_id) - path = WELCOME_PATH / f"{group_id}" - file = path / "text.json" - if file.exists(): - msg_list = cls.__build_welcome_message(user_id, path) - logger.info("发送群欢迎消息...", "入群检测", group_id=group_id) + if not session.group: + return + cls._flmt.start_cd(session.group.id) + if json_data := cls.__get_welcome_data(session): + key = random.choice([k for k in json_data.keys() if json_data[k]["status"]]) + welcome_data = json_data[key] + msg_list = UniMessage().load(welcome_data["message"]) + if welcome_data["at"]: + msg_list.insert(0, At("user", user_id)) + logger.info("发送群欢迎消息...", "入群检测", session=session) if msg_list: await MessageUtils.build_message(msg_list).send() # type: ignore else: @@ -199,7 +222,7 @@ async def __send_welcome_message(cls, user_id: str, group_id: str): ) await MessageUtils.build_message( [ - "新人快跑啊!!本群现状↓(快使用自定义!)", + "新人快跑啊!!本群现状↓(快使用自定义群欢迎消息!)", image, ] ).send() @@ -224,7 +247,7 @@ async def add_user(cls, session: Uninfo, bot: Bot, user_id: str, group_id: str): if not await CommonUtils.task_is_block( session, "group_welcome" ) and cls._flmt.check(group_id): - await cls.__send_welcome_message(user_id, group_id) + await cls.__send_welcome_message(session, user_id) @classmethod async def kick_bot(cls, bot: Bot, group_id: str, operator_id: str):