diff --git a/diracx-db/pyproject.toml b/diracx-db/pyproject.toml index f7a145fc5..1371856cc 100644 --- a/diracx-db/pyproject.toml +++ b/diracx-db/pyproject.toml @@ -36,6 +36,7 @@ TaskQueueDB = "diracx.db.sql:TaskQueueDB" [project.entry-points."diracx.db.os"] JobParametersDB = "diracx.db.os:JobParametersDB" +PilotLogsDB = "diracx.db.os:PilotLogsDB" [tool.setuptools.packages.find] where = ["src"] diff --git a/diracx-db/src/diracx/db/os/__init__.py b/diracx-db/src/diracx/db/os/__init__.py index 535e2a954..c1ce89bcb 100644 --- a/diracx-db/src/diracx/db/os/__init__.py +++ b/diracx-db/src/diracx/db/os/__init__.py @@ -1,5 +1,9 @@ from __future__ import annotations -__all__ = ("JobParametersDB",) +__all__ = ( + "JobParametersDB", + "PilotLogsDB", +) from .job_parameters import JobParametersDB +from .pilot_logs import PilotLogsDB diff --git a/diracx-db/src/diracx/db/os/pilot_logs.py b/diracx-db/src/diracx/db/os/pilot_logs.py new file mode 100644 index 000000000..afaf7ab19 --- /dev/null +++ b/diracx-db/src/diracx/db/os/pilot_logs.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from diracx.db.os.utils import BaseOSDB + + +class PilotLogsDB(BaseOSDB): + fields = { + "PilotStamp": {"type": "keyword"}, + "LineNumber": {"type": "long"}, + "Message": {"type": "text"}, + "VO": {"type": "keyword"}, + "timestamp": {"type": "date"}, + } + index_prefix = "pilot_logs" + + def index_name(self, doc_id: int) -> str: + # TODO decide how to define the index name + return f"{self.index_prefix}_0" diff --git a/diracx-db/src/diracx/db/os/utils.py b/diracx-db/src/diracx/db/os/utils.py index c14899432..7254e87ac 100644 --- a/diracx-db/src/diracx/db/os/utils.py +++ b/diracx-db/src/diracx/db/os/utils.py @@ -12,6 +12,7 @@ from typing import Any, AsyncIterator, Self from opensearchpy import AsyncOpenSearch +from opensearchpy.helpers import async_bulk from diracx.core.exceptions import InvalidQueryError from diracx.core.extensions import select_from_extension @@ -189,6 +190,13 @@ async def upsert(self, doc_id, document) -> None: ) print(f"{response=}") + async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None: + # bulk inserting to database + n_inserted = await async_bulk( + self.client, actions=[doc | {"_index": index_name} for doc in docs] + ) + logger.info("Inserted %s documents to %s", n_inserted, index_name) + async def search( self, parameters, search, sorts, *, per_page: int = 100, page: int | None = None ) -> list[dict[str, Any]]: diff --git a/diracx-routers/pyproject.toml b/diracx-routers/pyproject.toml index 97fdccb31..89df7ab72 100644 --- a/diracx-routers/pyproject.toml +++ b/diracx-routers/pyproject.toml @@ -47,6 +47,7 @@ types = [ ] [project.entry-points."diracx.services"] +pilotlogs = "diracx.routers.pilot_logging.remote_logger:router" jobs = "diracx.routers.job_manager:router" config = "diracx.routers.configuration:router" auth = "diracx.routers.auth:router" @@ -55,6 +56,7 @@ auth = "diracx.routers.auth:router" [project.entry-points."diracx.access_policies"] WMSAccessPolicy = "diracx.routers.job_manager.access_policies:WMSAccessPolicy" SandboxAccessPolicy = "diracx.routers.job_manager.access_policies:SandboxAccessPolicy" +PilotLogsAccessPolicy = "diracx.routers.pilot_logging.access_policies:PilotLogsAccessPolicy" [tool.setuptools.packages.find] diff --git a/diracx-routers/src/diracx/routers/dependencies.py b/diracx-routers/src/diracx/routers/dependencies.py index 7a67b94f4..6401978fe 100644 --- a/diracx-routers/src/diracx/routers/dependencies.py +++ b/diracx-routers/src/diracx/routers/dependencies.py @@ -7,6 +7,8 @@ "JobLoggingDB", "SandboxMetadataDB", "TaskQueueDB", + "JobParametersDB", + "PilotLogsDB", "add_settings_annotation", "AvailableSecurityProperties", ) @@ -18,6 +20,8 @@ from diracx.core.config import Config as _Config from diracx.core.config import ConfigSource from diracx.core.properties import SecurityProperty +from diracx.db.os import JobParametersDB as _JobParametersDB +from diracx.db.os import PilotLogsDB as _PilotLogsDB from diracx.db.sql import AuthDB as _AuthDB from diracx.db.sql import JobDB as _JobDB from diracx.db.sql import JobLoggingDB as _JobLoggingDB @@ -32,7 +36,7 @@ def add_settings_annotation(cls: T) -> T: return Annotated[cls, Depends(cls.create)] # type: ignore -# Databases +# SQL Databases AuthDB = Annotated[_AuthDB, Depends(_AuthDB.transaction)] JobDB = Annotated[_JobDB, Depends(_JobDB.transaction)] JobLoggingDB = Annotated[_JobLoggingDB, Depends(_JobLoggingDB.transaction)] @@ -41,6 +45,10 @@ def add_settings_annotation(cls: T) -> T: ] TaskQueueDB = Annotated[_TaskQueueDB, Depends(_TaskQueueDB.transaction)] +# OpenSearch Databases +JobParametersDB = Annotated[_JobParametersDB, Depends(_JobParametersDB.session)] +PilotLogsDB = Annotated[_PilotLogsDB, Depends(_PilotLogsDB.session)] + # Miscellaneous Config = Annotated[_Config, Depends(ConfigSource.create)] AvailableSecurityProperties = Annotated[ diff --git a/diracx-routers/src/diracx/routers/pilot_logging/__init__.py b/diracx-routers/src/diracx/routers/pilot_logging/__init__.py new file mode 100644 index 000000000..a51555a40 --- /dev/null +++ b/diracx-routers/src/diracx/routers/pilot_logging/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) diff --git a/diracx-routers/src/diracx/routers/pilot_logging/access_policies.py b/diracx-routers/src/diracx/routers/pilot_logging/access_policies.py new file mode 100644 index 000000000..9473c455b --- /dev/null +++ b/diracx-routers/src/diracx/routers/pilot_logging/access_policies.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from enum import StrEnum, auto +from typing import Annotated, Callable + +from fastapi import Depends, HTTPException, status + +from diracx.core.properties import GENERIC_PILOT, OPERATOR, PILOT, SERVICE_ADMINISTRATOR +from diracx.db.os import PilotLogsDB +from diracx.routers.access_policies import BaseAccessPolicy + +from ..utils.users import AuthorizedUserInfo + + +class ActionType(StrEnum): + #: Create/update pilot log records + CREATE = auto() + #: download pilot logs + READ = auto() + #: delete pilot logs + DELETE = auto() + #: Search + QUERY = auto() + + +class PilotLogsAccessPolicy(BaseAccessPolicy): + """ToDo + ---- + + """ + + @staticmethod + async def policy( + policy_name: str, + user_info: AuthorizedUserInfo, + /, + *, + action: ActionType | None = None, + pilot_db: PilotLogsDB | None = None, + pilot_ids: list[int] | None = None, # or pilot stamp list ? + ): + print("user_info.properties:", user_info.properties) + assert action, "action is a mandatory parameter" + assert pilot_db, "pilot_db is a mandatory parameter" + + if GENERIC_PILOT in user_info.properties: + return + if PILOT in user_info.properties: + return + if SERVICE_ADMINISTRATOR in user_info.properties: + return + if OPERATOR in user_info.properties: + return + + raise HTTPException(status.HTTP_403_FORBIDDEN, detail=user_info.properties) + + +CheckPilotLogsPolicyCallable = Annotated[Callable, Depends(PilotLogsAccessPolicy.check)] diff --git a/diracx-routers/src/diracx/routers/pilot_logging/remote_logger.py b/diracx-routers/src/diracx/routers/pilot_logging/remote_logger.py new file mode 100644 index 000000000..03f7f3674 --- /dev/null +++ b/diracx-routers/src/diracx/routers/pilot_logging/remote_logger.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from pydantic import BaseModel + +from ..dependencies import PilotLogsDB +from ..fastapi_classes import DiracxRouter +from ..pilot_logging import logger +from .access_policies import ActionType, CheckPilotLogsPolicyCallable + +router = DiracxRouter() + + +class LogLine(BaseModel): + line_no: int + line: str + + +class LogMessage(BaseModel): + pilot_stamp: str + lines: list[LogLine] + vo: str + + +@router.post("/") +async def send_message( + data: LogMessage, + pilot_logs_db: PilotLogsDB, + check_permissions: CheckPilotLogsPolicyCallable, +): + logger.warning(f"Message received '{data}'") + await check_permissions(action=ActionType.CREATE, pilot_db=pilot_logs_db) + + pilot_id = 1234 # need to get pilot id from pilot_stamp (via pilot DB) + + docs = [] + for line in data.lines: + docs.append( + { + "PilotStamp": data.pilot_stamp, + "VO": data.vo, + "LineNumber": line.line_no, + "Message": line.line, + } + ) + await pilot_logs_db.bulk_insert(pilot_logs_db.index_name(pilot_id), docs) + return data