-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable remote pilot logging system #269
base: main
Are you sure you want to change the base?
Conversation
@chrisburr I think we need a pilot logging router, even in its basic form. One of the tests fails because it is missing. |
@chrisburr What is a reason of test failures here ? The demo runs fine locally, can be contacted with a client etc. The test fails with |
@chrisburr @chaen This type of error occurs (in the demo) where charts don't list a relevant DB Both local and CI tests reach the same point - server installation failure. |
@chaen I fixed the database problem, now integration tests are passing, however I'm getting gubbins errors in unit tests. Is it an internal gubbins problem ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename pilot_logging
into pilots
, and we will have everything related to the pilots in this directory. We tend to go in that direction (see the jobs
directory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, could do this.
|
||
import logging | ||
|
||
logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to add the remote_logger
router here?
What about renaming remote_logger.py
as logging.py
?
logger = logging.getLogger(__name__) | |
from .logging import router as logging_router | |
logger = logging.getLogger(__name__) | |
router = DiracxRouter() | |
router.include_router(logging_router) |
@@ -48,6 +48,7 @@ types = [ | |||
] | |||
|
|||
[project.entry-points."diracx.services"] | |||
pilotlogs = "diracx.routers.pilot_logging.remote_logger:router" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following my previous comments:
pilotlogs = "diracx.routers.pilot_logging.remote_logger:router" | |
pilots = "diracx.routers.pilots.router" |
@@ -190,6 +191,13 @@ async def upsert(self, doc_id, document) -> None: | |||
) | |||
print(f"{response=}") | |||
|
|||
async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None: | |||
# bulk inserting to database |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# bulk inserting to database | |
"""bulk inserting to database.""" |
|
||
# Delete multiple documents by query. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Delete multiple documents by query. | |
"""Delete multiple documents by query.""" |
# here, users with privileged properties will see logs from all VOs. Is it what we want ? | ||
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}] | ||
if _non_privileged(user_info): | ||
search_params.append( | ||
{"parameter": "VO", "operator": "eq", "value": user_info.vo} | ||
) | ||
result = await db.search( | ||
["Message"], | ||
search_params, | ||
[{"parameter": "LineNumber", "direction": "asc"}], | ||
) | ||
if not result: | ||
return [{"Message": f"No logs for pilot ID = {pilot_id}"}] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, this bloc of code should be a method of diracx-dbs/src/diracx/db/sql/pilots/pilot_agents.py PilotAgentsDB
"""Delete either logs for a specific PilotID or a creation date range. | ||
Non-privileged users can only delete log files within their own VO. | ||
""" | ||
message = "no-op" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you initialize message
as "no-op"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is a return message to the client. As far as I can see, the delete by query operation is silent. It does not complain when you delete something which doesn't not exist etc. Or I might have missed something in the docs.
def _non_privileged(user_info: AuthorizedUserInfo): | ||
return ( | ||
SERVICE_ADMINISTRATOR not in user_info.properties | ||
and OPERATOR not in user_info.properties | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this should be moved in the access_policies.py
function? May be by adding an action type? Or by adding some logic like in:
diracx/diracx-routers/src/diracx/routers/jobs/access_policies.py
Lines 79 to 82 in 320c554
job_owners = await job_db.summary( | |
["Owner", "VO"], | |
[{"parameter": "JobID", "operator": "in", "values": job_ids}], | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a logical problem what to do here. I would like to search only once (and delete by query for a delete operation w/o prior searching). This is a reason why I removed the DB argument from a policy call. The problem is more visible in the delete case:
- I could search in the policy for a given client VO and if pilot IDs are not from the VO throw an exception (not sure what to do if some IDs are and some are not from the VO the client has - for a bulk PilotID search)
- then delete by query (which searches again)
Or could I use the AgentsDB to match the VO ? Still an extra DB query, this time and SQL one.
I opted for a restricted silent delete and and a restricted search by adding the VO, but outside the policy.
|
||
@router.delete("/logs") | ||
async def delete( | ||
pilot_id: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also tend to make bulk requests to the db to improve the performances of the system. I would take a list of pilot_ids
as inputs instead of a single pilot_id
.
Then you can return sucess
and failures
as it is done for the jobs for instance:
diracx/diracx-routers/src/diracx/routers/jobs/status.py
Lines 35 to 62 in 320c554
@router.delete("/") | |
async def remove_bulk_jobs( | |
job_ids: Annotated[list[int], Query()], | |
config: Config, | |
job_db: JobDB, | |
job_logging_db: JobLoggingDB, | |
sandbox_metadata_db: SandboxMetadataDB, | |
task_queue_db: TaskQueueDB, | |
background_task: BackgroundTasks, | |
check_permissions: CheckWMSPolicyCallable, | |
): | |
"""Fully remove a list of jobs from the WMS databases. | |
WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS | |
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should | |
be removed, and a status change to Deleted (PATCH /jobs/status) should be used instead for any other purpose. | |
""" | |
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=job_ids) | |
return await remove_jobs( | |
job_ids, | |
config, | |
job_db, | |
job_logging_db, | |
sandbox_metadata_db, | |
task_queue_db, | |
background_task, | |
) |
|
||
@router.get("/logs") | ||
async def get_logs( | ||
pilot_id: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly,
pilot_id: int, | |
pilot_ids: List[int], |
Enable remote logging system with open search. This PR only shows a router in its preliminary form.
The router defines following operations:
@router.post("/")
- bulk insert log record sent by a pilot. The method returns apilotID
, so it can use it next time to insert messaged w/o consultingPilotAgentsDB
to get PilotID fromPilotStamp
.@router.get("/logs")
- to retrieve logs for a given pilot_id@router.delete("/logs")
- to delete logs by:- pilotID
- from a given minimum date till now.
- for a date range - not yet implemented
Policies allow pilots to store VO-aware logs and operators/admins to perform delete operations for all VOs. Pilots and normal users can only see (get) logs for their own VO.