From 61ab7ef3d3573e5847d00ed71b2280f0410b1661 Mon Sep 17 00:00:00 2001 From: HibiKier <45528451+HibiKier@users.noreply.github.com> Date: Tue, 27 Aug 2024 09:24:32 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=E4=BB=A3=E7=A0=81=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../auto_update/_data_source.py | 76 +++++++++++-------- zhenxun/builtin_plugins/auto_update/config.py | 20 +++-- zhenxun/builtin_plugins/web_ui/__init__.py | 26 +++---- zhenxun/plugins/word_bank/_data_source.py | 38 ++++++---- zhenxun/plugins/word_bank/_model.py | 62 ++++++++------- zhenxun/plugins/word_bank/_rule.py | 12 +-- zhenxun/plugins/word_bank/word_handle.py | 49 ++++++------ 7 files changed, 155 insertions(+), 128 deletions(-) diff --git a/zhenxun/builtin_plugins/auto_update/_data_source.py b/zhenxun/builtin_plugins/auto_update/_data_source.py index 6d93255cd..8bdb349f9 100644 --- a/zhenxun/builtin_plugins/auto_update/_data_source.py +++ b/zhenxun/builtin_plugins/auto_update/_data_source.py @@ -1,9 +1,8 @@ import os import shutil -import subprocess import tarfile import zipfile -from pathlib import Path +import subprocess from nonebot.adapters import Bot from nonebot.utils import run_sync @@ -13,24 +12,28 @@ from zhenxun.utils.platform import PlatformUtils from .config import ( - BACKUP_PATH, - BASE_PATH, DEV_URL, - DOWNLOAD_GZ_FILE, - DOWNLOAD_ZIP_FILE, MAIN_URL, - PYPROJECT_FILE, - PYPROJECT_LOCK_FILE, + TMP_PATH, + BASE_PATH, + BACKUP_PATH, RELEASE_URL, - REPLACE_FOLDERS, REQ_TXT_FILE, - TMP_PATH, VERSION_FILE, + PYPROJECT_FILE, + REPLACE_FOLDERS, + BASE_PATH_STRING, + DOWNLOAD_GZ_FILE, + DOWNLOAD_ZIP_FILE, + PYPROJECT_LOCK_FILE, + REQ_TXT_FILE_STRING, + PYPROJECT_FILE_STRING, + PYPROJECT_LOCK_FILE_STRING, ) def install_requirement(): - requirement_path = (Path() / "requirements.txt").absolute() + requirement_path = (REQ_TXT_FILE).absolute() if not requirement_path.exists(): logger.debug( @@ -41,8 +44,7 @@ def install_requirement(): result = subprocess.run( ["pip", "install", "-r", str(requirement_path)], check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + capture_output=True, text=True, ) logger.debug(f"成功安装真寻依赖,日志:\n{result.stdout}", "插件管理") @@ -67,32 +69,32 @@ def _file_handle(latest_version: str | None): tf = zipfile.ZipFile(DOWNLOAD_ZIP_FILE) tf.extractall(TMP_PATH) logger.debug("解压文件压缩包完成...", "检查更新") - download_file_path = ( - TMP_PATH / [x for x in os.listdir(TMP_PATH) if (TMP_PATH / x).is_dir()][0] + download_file_path = TMP_PATH / next( + x for x in os.listdir(TMP_PATH) if (TMP_PATH / x).is_dir() ) - _pyproject = download_file_path / "pyproject.toml" - _lock_file = download_file_path / "poetry.lock" - _req_file = download_file_path / "requirements.txt" - extract_path = download_file_path / "zhenxun" + _pyproject = download_file_path / PYPROJECT_FILE_STRING + _lock_file = download_file_path / PYPROJECT_LOCK_FILE_STRING + _req_file = download_file_path / REQ_TXT_FILE_STRING + extract_path = download_file_path / BASE_PATH_STRING target_path = BASE_PATH if PYPROJECT_FILE.exists(): logger.debug(f"移除备份文件: {PYPROJECT_FILE}", "检查更新") - shutil.move(PYPROJECT_FILE, BACKUP_PATH / "pyproject.toml") + shutil.move(PYPROJECT_FILE, BACKUP_PATH / PYPROJECT_FILE_STRING) if PYPROJECT_LOCK_FILE.exists(): logger.debug(f"移除备份文件: {PYPROJECT_LOCK_FILE}", "检查更新") - shutil.move(PYPROJECT_LOCK_FILE, BACKUP_PATH / "poetry.lock") + shutil.move(PYPROJECT_LOCK_FILE, BACKUP_PATH / PYPROJECT_LOCK_FILE_STRING) if REQ_TXT_FILE.exists(): logger.debug(f"移除备份文件: {REQ_TXT_FILE}", "检查更新") - shutil.move(REQ_TXT_FILE, BACKUP_PATH / "requirements.txt") + shutil.move(REQ_TXT_FILE, BACKUP_PATH / REQ_TXT_FILE_STRING) if _pyproject.exists(): logger.debug("移动文件: pyproject.toml", "检查更新") - shutil.move(_pyproject, Path() / "pyproject.toml") + shutil.move(_pyproject, PYPROJECT_FILE) if _lock_file.exists(): logger.debug("移动文件: poetry.lock", "检查更新") - shutil.move(_lock_file, Path() / "poetry.lock") + shutil.move(_lock_file, PYPROJECT_LOCK_FILE) if _req_file.exists(): logger.debug("移动文件: requirements.txt", "检查更新") - shutil.move(_req_file, Path() / "requirements.txt") + shutil.move(_req_file, REQ_TXT_FILE) for folder in REPLACE_FOLDERS: """移动指定文件夹""" _dir = BASE_PATH / folder @@ -132,7 +134,6 @@ def _file_handle(latest_version: str | None): class UpdateManage: - @classmethod async def check_version(cls) -> str: """检查更新版本 @@ -144,7 +145,13 @@ async def check_version(cls) -> str: data = await cls.__get_latest_data() if not data: return "检查更新获取版本失败..." - return f"检测到当前版本更新\n当前版本:{cur_version}\n最新版本:{data.get('name')}\n创建日期:{data.get('created_at')}\n更新内容:\n{data.get('body')}" + return ( + "检测到当前版本更新\n" + f"当前版本:{cur_version}\n" + f"最新版本:{data.get('name')}\n" + f"创建日期:{data.get('created_at')}\n" + f"更新内容:\n{data.get('body')}" + ) @classmethod async def update(cls, bot: Bot, user_id: str, version_type: str) -> str | None: @@ -158,8 +165,10 @@ async def update(cls, bot: Bot, user_id: str, version_type: str) -> str | None: 返回: str | None: 返回消息 """ - logger.info(f"开始下载真寻最新版文件....", "检查更新") + logger.info("开始下载真寻最新版文件....", "检查更新") cur_version = cls.__get_version() + url = None + new_version = None if version_type == "dev": url = DEV_URL new_version = await cls.__get_version_from_branch("dev") @@ -197,7 +206,11 @@ async def update(cls, bot: Bot, user_id: str, version_type: str) -> str | None: if await AsyncHttpx.download_file(url, download_file): logger.debug("下载真寻最新版文件完成...", "检查更新") await _file_handle(new_version) - return f"版本更新完成\n版本: {cur_version} -> {new_version}\n请重新启动真寻以完成更新!" + return ( + f"版本更新完成\n" + f"版本: {cur_version} -> {new_version}\n" + "请重新启动真寻以完成更新!" + ) else: logger.debug("下载真寻最新版文件失败...", "检查更新") return None @@ -211,8 +224,7 @@ def __get_version(cls) -> str: """ _version = "v0.0.0" if VERSION_FILE.exists(): - text = VERSION_FILE.open(encoding="utf8").readline() - if text: + if text := VERSION_FILE.open(encoding="utf8").readline(): _version = text.split(":")[-1].strip() return _version @@ -231,7 +243,7 @@ async def __get_latest_data(cls) -> dict: except TimeoutError: pass except Exception as e: - logger.error(f"检查更新真寻获取版本失败", e=e) + logger.error("检查更新真寻获取版本失败", e=e) return {} @classmethod diff --git a/zhenxun/builtin_plugins/auto_update/config.py b/zhenxun/builtin_plugins/auto_update/config.py index db6c880c6..99cb7a197 100644 --- a/zhenxun/builtin_plugins/auto_update/config.py +++ b/zhenxun/builtin_plugins/auto_update/config.py @@ -6,14 +6,18 @@ MAIN_URL = "https://ghproxy.cc/https://github.com/HibiKier/zhenxun_bot/archive/refs/heads/main.zip" RELEASE_URL = "https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest" - -VERSION_FILE = Path() / "__version__" - -PYPROJECT_FILE = Path() / "pyproject.toml" -PYPROJECT_LOCK_FILE = Path() / "poetry.lock" -REQ_TXT_FILE = Path() / "requirements.txt" - -BASE_PATH = Path() / "zhenxun" +VERSION_FILE_STRING = "__version__" +VERSION_FILE = Path() / VERSION_FILE_STRING + +PYPROJECT_FILE_STRING = "pyproject.toml" +PYPROJECT_FILE = Path() / PYPROJECT_FILE_STRING +PYPROJECT_LOCK_FILE_STRING = "poetry.lock" +PYPROJECT_LOCK_FILE = Path() / PYPROJECT_LOCK_FILE_STRING +REQ_TXT_FILE_STRING = "requirements.txt" +REQ_TXT_FILE = Path() / REQ_TXT_FILE_STRING + +BASE_PATH_STRING = "zhenxun" +BASE_PATH = Path() / BASE_PATH_STRING TMP_PATH = TEMP_PATH / "auto_update" diff --git a/zhenxun/builtin_plugins/web_ui/__init__.py b/zhenxun/builtin_plugins/web_ui/__init__.py index 0369daf6a..a1beb51c2 100644 --- a/zhenxun/builtin_plugins/web_ui/__init__.py +++ b/zhenxun/builtin_plugins/web_ui/__init__.py @@ -2,26 +2,26 @@ import secrets import nonebot -from fastapi import APIRouter, FastAPI -from nonebot.log import default_filter, default_format +from fastapi import FastAPI, APIRouter from nonebot.plugin import PluginMetadata +from nonebot.log import default_filter, default_format -from zhenxun.configs.config import Config as gConfig -from zhenxun.configs.utils import PluginExtraData, RegisterConfig -from zhenxun.services.log import logger, logger_ from zhenxun.utils.enum import PluginType +from zhenxun.services.log import logger, logger_ +from zhenxun.configs.config import Config as gConfig +from zhenxun.configs.utils import RegisterConfig, PluginExtraData +from .public import init_public +from .auth import router as auth_router from .api.logs import router as ws_log_routes from .api.logs.log_manager import LOG_STORAGE -from .api.tabs.database import router as database_router from .api.tabs.main import router as main_router -from .api.tabs.main import ws_router as status_routes from .api.tabs.manage import router as manage_router +from .api.tabs.system import router as system_router +from .api.tabs.main import ws_router as status_routes +from .api.tabs.database import router as database_router from .api.tabs.manage.chat import ws_router as chat_routes from .api.tabs.plugin_manage import router as plugin_router -from .api.tabs.system import router as system_router -from .auth import router as auth_router -from .public import init_public __plugin_meta__ = PluginMetadata( name="WebUi", @@ -32,7 +32,7 @@ author="HibiKier", version="0.1", plugin_type=PluginType.HIDDEN, - Configs=[ + configs=[ RegisterConfig( module="web-ui", key="username", @@ -57,7 +57,7 @@ type=str, default_value=None, ), - ], + ], ).dict(), ) @@ -98,7 +98,7 @@ async def log_sink(message: str): logger.warning("Web Ui log_sink", e=e) if not loop: loop = asyncio.new_event_loop() - loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))) + loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))) # noqa: RUF006 logger_.add( log_sink, colorize=True, filter=default_filter, format=default_format diff --git a/zhenxun/plugins/word_bank/_data_source.py b/zhenxun/plugins/word_bank/_data_source.py index 201c3bc5c..dded404c0 100644 --- a/zhenxun/plugins/word_bank/_data_source.py +++ b/zhenxun/plugins/word_bank/_data_source.py @@ -1,13 +1,13 @@ from nonebot_plugin_alconna import At -from nonebot_plugin_alconna import At as alcAt from nonebot_plugin_alconna import Image -from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import At as alcAt from nonebot_plugin_alconna import Text as alcText -from nonebot_plugin_alconna import UniMessage, UniMsg +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import UniMsg, UniMessage -from zhenxun.plugins.word_bank._config import ScopeType -from zhenxun.utils.image_utils import ImageTemplate from zhenxun.utils.message import MessageUtils +from zhenxun.utils.image_utils import ImageTemplate +from zhenxun.plugins.word_bank._config import ScopeType from ._model import WordBank @@ -42,9 +42,9 @@ def get_problem(message: UniMsg) -> str: problem = "" a, b = True, True for msg in message: - if isinstance(msg, alcText) or isinstance(msg, str): + if isinstance(msg, alcText | str): msg = str(msg) - if "问" in str(msg) and a: + if "问" in msg and a: a = False split_text = msg.split("问") if len(split_text) > 1: @@ -53,7 +53,7 @@ def get_problem(message: UniMsg) -> str: if "答" in problem: b = False problem = problem.split("答")[0] - elif "答" in msg and b: + elif "答" in msg: b = False # problem += "答".join(msg.split("答")[:-1]) problem += msg.split("答")[0] @@ -78,7 +78,7 @@ def get_answer(message: UniMsg) -> UniMessage | None: index = 0 for msg in message: index += 1 - if isinstance(msg, alcText) or isinstance(msg, str): + if isinstance(msg, alcText | str): msg = str(msg) if "答" in msg: answer += "答".join(msg.split("答")[1:]) @@ -90,7 +90,6 @@ def get_answer(message: UniMsg) -> UniMessage | None: class WordBankManage: - @classmethod async def update_word( cls, @@ -175,10 +174,14 @@ async def __word_handle( ) if not _problem_list: return problem, "" - if await WordBank.delete_group_problem(problem, group_id, aid, word_scope): # type: ignore - return "删除词条成功!", "" - return "词条不存在", "" - if handle_type == "update": + return ( + ("删除词条成功!", "") + if await WordBank.delete_group_problem( + problem, group_id, aid, word_scope + ) + else ("词条不存在", "") + ) + elif handle_type == "update": old_problem = await WordBank.update_group_problem( problem, replace_problem, group_id, word_scope=word_scope ) @@ -187,7 +190,10 @@ async def __word_handle( @classmethod async def __get_problem_str( - cls, idx: int, group_id: str | None = None, word_scope: ScopeType = ScopeType.GROUP + cls, + idx: int, + group_id: str | None = None, + word_scope: ScopeType = ScopeType.GROUP, ) -> tuple[str, int]: """通过id获取问题字符串 @@ -222,7 +228,7 @@ async def show_word( word_scope: 词条范围 index: 指定回答下标 """ - if problem or index != None: + if problem or index is not None: msg_list = [] problem, _problem_list = await WordBank.get_problem_all_answer( problem, # type: ignore diff --git a/zhenxun/plugins/word_bank/_model.py b/zhenxun/plugins/word_bank/_model.py index 0d9be0c08..62d1b8608 100644 --- a/zhenxun/plugins/word_bank/_model.py +++ b/zhenxun/plugins/word_bank/_model.py @@ -1,31 +1,29 @@ -import random import re -import time import uuid -from datetime import datetime +import random from typing import Any +from datetime import datetime +from typing_extensions import Self +from tortoise.expressions import Q +from tortoise import Tortoise, fields +from nonebot_plugin_alconna import UniMessage from nonebot_plugin_alconna import At as alcAt -from nonebot_plugin_alconna import Image as alcImage from nonebot_plugin_alconna import Text as alcText -from nonebot_plugin_alconna import UniMessage -from tortoise import Tortoise, fields -from tortoise.expressions import Q -from typing_extensions import Self +from nonebot_plugin_alconna import Image as alcImage -from zhenxun.configs.path_config import DATA_PATH from zhenxun.services.db_context import Model +from zhenxun.utils.message import MessageUtils from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.configs.path_config import DATA_PATH from zhenxun.utils.image_utils import get_img_hash -from zhenxun.utils.message import MessageUtils -from ._config import ScopeType, WordType, int2type +from ._config import WordType, ScopeType, int2type path = DATA_PATH / "word_bank" class WordBank(Model): - id = fields.IntField(pk=True, generated=True, auto_increment=True) """自增id""" user_id = fields.CharField(255) @@ -57,12 +55,12 @@ class WordBank(Model): author = fields.CharField(255, null=True, default="") """收录人""" - class Meta: + class Meta: # type: ignore table = "word_bank2" table_description = "词条数据库" @classmethod - async def exists( + async def exists( # type: ignore cls, user_id: str | None, group_id: str | None, @@ -217,7 +215,6 @@ async def _format2answer( user_id: 用户id group_id: 群组id """ - result_list = [] if not query: query = await cls.get_or_none( problem=problem, @@ -228,18 +225,19 @@ async def _format2answer( if not answer: answer = str(query.answer) # type: ignore if query and query.placeholder: - type_list = re.findall(rf"\[(.*?):placeholder_.*?]", answer) - answer_split = re.split(rf"\[.*:placeholder_.*?]", answer) + type_list = re.findall(r"\[(.*?):placeholder_.*?]", answer) + answer_split = re.split(r"\[.*:placeholder_.*?]", answer) placeholder_split = query.placeholder.split(",") + result_list = [] for index, ans in enumerate(answer_split): result_list.append(ans) if index < len(type_list): t = type_list[index] p = placeholder_split[index] - if t == "image": - result_list.append(path / p) - elif t == "at": + if t == "at": result_list.append(alcAt(flag="user", target=p)) + elif t == "image": + result_list.append(path / p) return MessageUtils.build_message(result_list) return MessageUtils.build_message(answer) @@ -282,7 +280,10 @@ async def check_problem( return data_list db = Tortoise.get_connection("default") # 模糊匹配 - sql = query.filter(word_type=1).sql() + " and POSITION(problem in $1) > 0" + sql = ( + query.filter(word_type=WordType.FUZZY.value).sql() + + " and POSITION(problem in $1) > 0" + ) data_list = await db.execute_query_dict(sql, [problem]) if data_list: return [cls(**data) for data in data_list] @@ -292,9 +293,7 @@ async def check_problem( + " and $1 ~ problem;" ) data_list = await db.execute_query_dict(sql, [problem]) - if data_list: - return [cls(**data) for data in data_list] - return None + return [cls(**data) for data in data_list] if data_list else None @classmethod async def get_answer( @@ -318,7 +317,7 @@ async def get_answer( random_answer = random.choice(data_list) if random_answer.word_type == WordType.REGEX: r = re.search(random_answer.problem, problem) - has_placeholder = re.search(rf"\$(\d)", random_answer.answer) + has_placeholder = re.search(r"\$(\d)", random_answer.answer) if r and r.groups() and has_placeholder: pats = re.sub(r"\$(\d)", r"\\\1", random_answer.answer) random_answer.answer = re.sub(random_answer.problem, pats, problem) @@ -575,9 +574,16 @@ async def _move( async def _run_script(cls): return [ "ALTER TABLE word_bank2 ADD to_me varchar(255);", # 添加 to_me 字段 - "ALTER TABLE word_bank2 ALTER COLUMN create_time TYPE timestamp with time zone USING create_time::timestamp with time zone;", - "ALTER TABLE word_bank2 ALTER COLUMN update_time TYPE timestamp with time zone USING update_time::timestamp with time zone;", - "ALTER TABLE word_bank2 RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + ( + "ALTER TABLE word_bank2 ALTER COLUMN create_time TYPE timestamp" + " with time zone USING create_time::timestamp with time zone;" + ), + ( + "ALTER TABLE word_bank2 ALTER COLUMN update_time TYPE timestamp" + " with time zone USING update_time::timestamp with time zone;" + ), + "ALTER TABLE word_bank2 RENAME COLUMN user_qq TO user_id;", + # 将user_qq改为user_id "ALTER TABLE word_bank2 ALTER COLUMN user_id TYPE character varying(255);", "ALTER TABLE word_bank2 ALTER COLUMN group_id TYPE character varying(255);", "ALTER TABLE word_bank2 ADD platform varchar(255) DEFAULT 'qq';", diff --git a/zhenxun/plugins/word_bank/_rule.py b/zhenxun/plugins/word_bank/_rule.py index 3ce032ca4..f64daa75f 100644 --- a/zhenxun/plugins/word_bank/_rule.py +++ b/zhenxun/plugins/word_bank/_rule.py @@ -1,19 +1,19 @@ from io import BytesIO import imagehash -from nonebot.adapters import Bot, Event +from PIL import Image from nonebot.typing import T_State -from nonebot_plugin_alconna import At as alcAt -from nonebot_plugin_alconna import Text as alcText +from nonebot.adapters import Bot, Event from nonebot_plugin_alconna import UniMsg +from nonebot_plugin_alconna import At as alcAt from nonebot_plugin_session import EventSession -from PIL import Image +from nonebot_plugin_alconna import Text as alcText from zhenxun.services.log import logger from zhenxun.utils.http_utils import AsyncHttpx -from ._data_source import get_img_and_at_list from ._model import WordBank +from ._data_source import get_img_and_at_list async def check( @@ -31,7 +31,7 @@ async def check( r = await AsyncHttpx.get(img_list[0]) problem = str(imagehash.average_hash(Image.open(BytesIO(r.content)))) except Exception as e: - logger.warning(f"获取图片失败", "词条检测", session=session, e=e) + logger.warning("获取图片失败", "词条检测", session=session, e=e) if at_list: temp = "" # TODO: 支持更多消息类型 diff --git a/zhenxun/plugins/word_bank/word_handle.py b/zhenxun/plugins/word_bank/word_handle.py index 1fa28958c..0bda50c28 100644 --- a/zhenxun/plugins/word_bank/word_handle.py +++ b/zhenxun/plugins/word_bank/word_handle.py @@ -2,27 +2,26 @@ from typing import Any from nonebot.adapters import Bot -from nonebot.adapters.onebot.v11 import unescape -from nonebot.exception import FinishedException -from nonebot.internal.params import Arg, ArgStr -from nonebot.params import RegexGroup -from nonebot.plugin import PluginMetadata from nonebot.typing import T_State -from nonebot_plugin_alconna import AlconnaQuery, Arparma +from nonebot.params import RegexGroup from nonebot_plugin_alconna import Image -from nonebot_plugin_alconna import Image as alcImage -from nonebot_plugin_alconna import Match, Query, UniMsg +from nonebot.plugin import PluginMetadata +from nonebot.exception import FinishedException +from nonebot.internal.params import Arg, ArgStr from nonebot_plugin_session import EventSession +from nonebot.adapters.onebot.v11 import unescape +from nonebot_plugin_alconna import Image as alcImage +from nonebot_plugin_alconna import Match, Query, UniMsg, Arparma, AlconnaQuery -from zhenxun.configs.config import Config -from zhenxun.configs.utils import PluginExtraData from zhenxun.services.log import logger +from zhenxun.configs.config import Config from zhenxun.utils.message import MessageUtils +from zhenxun.configs.utils import PluginExtraData -from ._config import ScopeType, WordType, scope2int, type2int -from ._data_source import WordBankManage, get_answer, get_img_and_at_list, get_problem from ._model import WordBank +from ._config import WordType, ScopeType, type2int, scope2int from .command import _add_matcher, _del_matcher, _show_matcher, _update_matcher +from ._data_source import WordBankManage, get_answer, get_problem, get_img_and_at_list base_config = Config.get("word_bank") @@ -30,7 +29,7 @@ __plugin_meta__ = PluginMetadata( name="词库问答", description="自定义词条内容随机回复", - usage=""" + usage=r""" usage: 对指定问题的随机回答,对相同问题可以设置多个不同回答 删除词条后每个词条的id可能会变化,请查看后再删除 @@ -63,11 +62,11 @@ 查看词条 --id 2 : 查看词条序号为2的全部回答 查看词条 谁是萝莉 --all: 查看全局词条 谁是萝莉 的全部回答 查看词条 --id 2 --all: 查看全局词条序号为2的全部回答 - """.strip(), + """.strip(), # noqa: E501 extra=PluginExtraData( author="HibiKier & yajiwa", version="0.1", - superuser_help=""" + superuser_help=r""" 在私聊中超级用户额外设置 指令: (全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*):添加问答词条,可重复添加相同问题的不同回答 @@ -149,7 +148,7 @@ async def _( group_id = session.id3 or session.id2 try: if word_type == "图片": - problem = [m for m in message if isinstance(m, alcImage)][0].url + problem = next(m for m in message if isinstance(m, alcImage)).url elif word_type == "正则" and problem: problem = unescape(problem) try: @@ -158,7 +157,9 @@ async def _( await MessageUtils.build_message( f"添加词条失败,正则表达式 {problem} 非法!" ).finish(reply_to=True) - # if str(event.user_id) in bot.config.superusers and isinstance(event, PrivateMessageEvent): + # if str(event.user_id) in bot.config.superusers and isinstance( + # event, PrivateMessageEvent + # ): # word_scope = "私聊" nickname = None if problem and bot.config.nickname: @@ -300,17 +301,15 @@ async def _( word_scope = ScopeType.GROUP if session.id3 or session.id2 else ScopeType.PRIVATE group_id = session.id3 or session.id2 if all.result: - word_scope = 0 + word_scope = ScopeType.GLOBAL if gid.available: group_id = gid.result if problem.available: - if index.available: - if index.result < 0 or index.result > len( - await WordBank.get_problem_by_scope(word_scope) - ): - await MessageUtils.build_message("id必须在范围内...").finish( - reply_to=True - ) + if index.available and ( + index.result < 0 + or index.result > len(await WordBank.get_problem_by_scope(word_scope)) + ): + await MessageUtils.build_message("id必须在范围内...").finish(reply_to=True) result = await WordBankManage.show_word( problem.result, index.result if index.available else None,