diff --git a/zhenxun/builtin_plugins/web_ui/__init__.py b/zhenxun/builtin_plugins/web_ui/__init__.py index d05e55f97..0369daf6a 100644 --- a/zhenxun/builtin_plugins/web_ui/__init__.py +++ b/zhenxun/builtin_plugins/web_ui/__init__.py @@ -1,4 +1,5 @@ import asyncio +import secrets import nonebot from fastapi import APIRouter, FastAPI @@ -6,7 +7,7 @@ from nonebot.plugin import PluginMetadata from zhenxun.configs.config import Config as gConfig -from zhenxun.configs.utils import PluginExtraData +from zhenxun.configs.utils import PluginExtraData, RegisterConfig from zhenxun.services.log import logger, logger_ from zhenxun.utils.enum import PluginType @@ -28,15 +29,40 @@ usage=""" """.strip(), extra=PluginExtraData( - author="HibiKier", version="0.1", plugin_type=PluginType.HIDDEN + author="HibiKier", + version="0.1", + plugin_type=PluginType.HIDDEN, + Configs=[ + RegisterConfig( + module="web-ui", + key="username", + value="admin", + help="前端管理用户名", + type=str, + default_value="admin", + ), + RegisterConfig( + module="web-ui", + key="password", + value=None, + help="前端管理密码", + type=str, + default_value=None, + ), + RegisterConfig( + module="web-ui", + key="secret", + value=secrets.token_urlsafe(32), + help="JWT密钥", + type=str, + default_value=None, + ), + ], ).dict(), ) driver = nonebot.get_driver() -gConfig.add_plugin_config("web-ui", "username", "admin", help="前端管理用户名") - -gConfig.add_plugin_config("web-ui", "password", None, help="前端管理密码") gConfig.set_name("web-ui", "web-ui") diff --git a/zhenxun/builtin_plugins/web_ui/utils.py b/zhenxun/builtin_plugins/web_ui/utils.py index 8d4fd25a9..6fee53885 100644 --- a/zhenxun/builtin_plugins/web_ui/utils.py +++ b/zhenxun/builtin_plugins/web_ui/utils.py @@ -28,8 +28,6 @@ token_data = json.load(open(token_file, "r", encoding="utf8")) except json.JSONDecodeError: pass -if not token_data.get("secret"): - token_data["secret"] = secrets.token_hex(64) def get_user(uname: str) -> User | None: @@ -57,7 +55,7 @@ def create_token(user: User, expires_delta: timedelta | None = None): expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15)) return jwt.encode( claims={"sub": user.username, "exp": expire}, - key=token_data["secret"], + key=Config.get_config("web-ui", "secret"), algorithm=ALGORITHM, ) @@ -73,7 +71,7 @@ def authentication(): # if token not in token_data["token"]: def inner(token: str = Depends(oauth2_scheme)): try: - payload = jwt.decode(token, token_data["secret"], algorithms=[ALGORITHM]) + payload = jwt.decode(token, Config.get_config("web-ui", "secret"), algorithms=[ALGORITHM]) username, expire = payload.get("sub"), payload.get("exp") user = get_user(username) # type: ignore if user is None: diff --git a/zhenxun/plugins/word_bank/_config.py b/zhenxun/plugins/word_bank/_config.py index d003b1394..d4d22b2b2 100644 --- a/zhenxun/plugins/word_bank/_config.py +++ b/zhenxun/plugins/word_bank/_config.py @@ -1,5 +1,4 @@ from enum import Enum -from pickle import GLOBAL from zhenxun.configs.path_config import DATA_PATH data_dir = DATA_PATH / "word_bank" diff --git a/zhenxun/plugins/word_bank/_data_source.py b/zhenxun/plugins/word_bank/_data_source.py index c4ed9b124..201c3bc5c 100644 --- a/zhenxun/plugins/word_bank/_data_source.py +++ b/zhenxun/plugins/word_bank/_data_source.py @@ -5,6 +5,7 @@ from nonebot_plugin_alconna import Text as alcText from nonebot_plugin_alconna import UniMessage, UniMsg +from zhenxun.plugins.word_bank._config import ScopeType from zhenxun.utils.image_utils import ImageTemplate from zhenxun.utils.message import MessageUtils @@ -97,7 +98,7 @@ async def update_word( problem: str = "", index: int | None = None, group_id: str | None = None, - word_scope: int = 1, + word_scope: ScopeType = ScopeType.GROUP, ) -> tuple[str, str]: """修改群词条 @@ -120,7 +121,7 @@ async def delete_word( index: int | None = None, aid: int | None = None, group_id: str | None = None, - word_scope: int = 1, + word_scope: ScopeType = ScopeType.GROUP, ) -> tuple[str, str]: """删除群词条 @@ -146,7 +147,7 @@ async def __word_handle( handle_type: str, index: int | None = None, aid: int | None = None, - word_scope: int = 0, + word_scope: ScopeType = ScopeType.GLOBAL, replace_problem: str = "", ) -> tuple[str, str]: """词条操作 @@ -186,7 +187,7 @@ async def __word_handle( @classmethod async def __get_problem_str( - cls, idx: int, group_id: str | None = None, word_scope: int = 1 + cls, idx: int, group_id: str | None = None, word_scope: ScopeType = ScopeType.GROUP ) -> tuple[str, int]: """通过id获取问题字符串 @@ -195,7 +196,7 @@ async def __get_problem_str( group_id: 群号 word_scope: 获取类型 """ - if word_scope in [0, 2]: + if word_scope in [ScopeType.GLOBAL, ScopeType.PRIVATE]: all_problem = await WordBank.get_problem_by_scope(word_scope) elif group_id: all_problem = await WordBank.get_group_all_problem(group_id) @@ -211,7 +212,7 @@ async def show_word( problem: str | None, index: int | None = None, group_id: str | None = None, - word_scope: int | None = 1, + word_scope: ScopeType | None = ScopeType.GROUP, ) -> UniMessage: """获取群词条 @@ -266,7 +267,7 @@ async def show_word( _problem_list = await WordBank.get_problem_by_scope(word_scope) else: raise Exception("群组id和词条范围不能都为空") - global_problem_list = await WordBank.get_problem_by_scope(0) + global_problem_list = await WordBank.get_problem_by_scope(ScopeType.GLOBAL) if not _problem_list and not global_problem_list: return MessageUtils.build_message("未收录任何词条...") column_name = ["序号", "关键词", "匹配类型", "收录用户"] diff --git a/zhenxun/plugins/word_bank/_model.py b/zhenxun/plugins/word_bank/_model.py index abccdc618..d33cf8935 100644 --- a/zhenxun/plugins/word_bank/_model.py +++ b/zhenxun/plugins/word_bank/_model.py @@ -32,9 +32,9 @@ class WordBank(Model): """用户id""" group_id = fields.CharField(255, null=True) """群聊id""" - word_scope = fields.IntField(default=0) + word_scope = fields.IntField(default=ScopeType.GLOBAL.value) """生效范围 0: 全局 1: 群聊 2: 私聊""" - word_type = fields.IntField(default=0) + word_type = fields.IntField(default=WordType.EXACT.value) """词条类型 0: 完全匹配 1: 模糊 2: 正则 3: 图片""" status = fields.BooleanField() """词条状态""" @@ -68,8 +68,8 @@ async def exists( group_id: str | None, problem: str, answer: str | None, - word_scope: int | None = None, - word_type: int | None = None, + word_scope: ScopeType | None = None, + word_type: WordType | None = None, ) -> bool: """检测问题是否存在 @@ -99,8 +99,8 @@ async def add_problem_answer( cls, user_id: str, group_id: str | None, - word_scope: int, - word_type: int, + word_scope: ScopeType, + word_type: WordType, problem: str, answer: list[str | alcText | alcAt | alcImage], to_me_nickname: str | None = None, @@ -122,7 +122,7 @@ async def add_problem_answer( """ # 对图片做额外处理 image_path = None - if word_type == 3: + if word_type == WordType.IMAGE: _uuid = uuid.uuid1() _file = path / "problem" / f"{group_id}" / f"{user_id}_{_uuid}.jpg" _file.parent.mkdir(exist_ok=True, parents=True) @@ -248,8 +248,8 @@ async def check_problem( cls, group_id: str | None, problem: str, - word_scope: int | None = None, - word_type: int | None = None, + word_scope: ScopeType | None = None, + word_type: WordType | None = None, ) -> Any: """检测是否包含该问题并获取所有回答 @@ -264,14 +264,14 @@ async def check_problem( if word_scope: query = query.filter(word_scope=word_scope) else: - query = query.filter(Q(group_id=group_id) | Q(word_scope=0)) + query = query.filter(Q(group_id=group_id) | Q(word_scope=WordType.EXACT)) else: - query = query.filter(Q(word_scope=2) | Q(word_scope=0)) + query = query.filter(Q(word_scope=ScopeType.PRIVATE) | Q(word_scope=ScopeType.GLOBAL)) if word_type: query = query.filter(word_scope=word_type) # 完全匹配 if data_list := await query.filter( - Q(Q(word_type=0) | Q(word_type=3)), Q(problem=problem) + Q(Q(word_type=WordType.EXACT) | Q(word_type=WordType.IMAGE)), Q(problem=problem) ).all(): return data_list db = Tortoise.get_connection("default") @@ -282,7 +282,7 @@ async def check_problem( return [cls(**data) for data in data_list] # 正则 sql = ( - query.filter(word_type=2, word_scope__not=999).sql() + " and $1 ~ problem;" + query.filter(word_type=WordType.REGEX, word_scope__not=999).sql() + " and $1 ~ problem;" ) data_list = await db.execute_query_dict(sql, [problem]) if data_list: @@ -294,8 +294,8 @@ async def get_answer( cls, group_id: str | None, problem: str, - word_scope: int | None = None, - word_type: int | None = None, + word_scope: ScopeType | None = None, + word_type: WordType | None = None, ) -> UniMessage | None: """根据问题内容获取随机回答 @@ -333,7 +333,7 @@ async def get_problem_all_answer( problem: str, index: int | None = None, group_id: str | None = None, - word_scope: int | None = 0, + word_scope: ScopeType | None = ScopeType.GLOBAL, ) -> tuple[str, list[UniMessage]]: """获取指定问题所有回答 @@ -385,7 +385,7 @@ async def delete_group_problem( problem: str, group_id: str | None, index: int | None = None, - word_scope: int = 1, + word_scope: ScopeType = ScopeType.GROUP, ): """删除指定问题全部或指定回答 @@ -425,7 +425,7 @@ async def update_group_problem( replace_str: str, group_id: str | None, index: int | None = None, - word_scope: int = 1, + word_scope: ScopeType = ScopeType.GROUP, ) -> str: """修改词条问题 @@ -532,8 +532,8 @@ async def _move( answer: 回答 placeholder: 占位符 """ - word_scope = 0 - word_type = 0 + word_scope = ScopeType.GLOBAL + word_type = WordType.EXACT # 对图片做额外处理 if not await cls.exists( user_id, group_id, problem, answer, word_scope, word_type