Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable remote pilot logging system #269

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

martynia
Copy link
Contributor

@martynia martynia commented Jul 1, 2024

Enable remote logging system with open search. This PR only shows a router in its preliminary form.
The router defines following operations:

  1. @router.post("/") - bulk insert log record sent by a pilot. The method returns a pilotID, so it can use it next time to insert messaged w/o consulting PilotAgentsDB to get PilotID from PilotStamp.
  2. @router.get("/logs") - to retrieve logs for a given pilot_id
  3. @router.delete("/logs") - to delete logs by:
    - pilotID
    - from a given minimum date till now.
    - for a date range - not yet implemented

Policies allow pilots to store VO-aware logs and operators/admins to perform delete operations for all VOs. Pilots and normal users can only see (get) logs for their own VO.

@martynia
Copy link
Contributor Author

martynia commented Jul 1, 2024

@chrisburr I think we need a pilot logging router, even in its basic form. One of the tests fails because it is missing.

@martynia
Copy link
Contributor Author

@chrisburr What is a reason of test failures here ? The demo runs fine locally, can be contacted with a client etc. The test fails with dirac login, waiting for diracX to be installed. Could it be that I missed a setting somewhere ? The failure is persistent.

@martynia
Copy link
Contributor Author

martynia commented Nov 1, 2024

@chrisburr @chaen
I managed to reproduce the problem locally. There is the same failure on GitHub and locally. Since the demo runs fine (the DB is there), is it possible that integration tests fail (NotImplementedError: Cannot enable system_name='pilotlogs' as it requires missing_os_dbs={<class 'diracx.db.os.pilot_logs.PilotLogsDB'>} because of charts being no discoverable by the CI workflow ?

This type of error occurs (in the demo) where charts don't list a relevant DB

Both local and CI tests reach the same point - server installation failure.

@chrisburr
Copy link
Member

@martynia
Copy link
Contributor Author

@chaen I fixed the database problem, now integration tests are passing, however I'm getting gubbins errors in unit tests. Is it an internal gubbins problem ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename pilot_logging into pilots, and we will have everything related to the pilots in this directory. We tend to go in that direction (see the jobs directory).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, could do this.


import logging

logger = logging.getLogger(__name__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to add the remote_logger router here?
What about renaming remote_logger.py as logging.py?

Suggested change
logger = logging.getLogger(__name__)
from .logging import router as logging_router
logger = logging.getLogger(__name__)
router = DiracxRouter()
router.include_router(logging_router)

@@ -48,6 +48,7 @@ types = [
]

[project.entry-points."diracx.services"]
pilotlogs = "diracx.routers.pilot_logging.remote_logger:router"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following my previous comments:

Suggested change
pilotlogs = "diracx.routers.pilot_logging.remote_logger:router"
pilots = "diracx.routers.pilots.router"

@@ -190,6 +191,13 @@ async def upsert(self, doc_id, document) -> None:
)
print(f"{response=}")

async def bulk_insert(self, index_name: str, docs: list[dict[str, Any]]) -> None:
# bulk inserting to database
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# bulk inserting to database
"""bulk inserting to database."""

Comment on lines +243 to +245

# Delete multiple documents by query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Delete multiple documents by query.
"""Delete multiple documents by query."""

Comment on lines +96 to +108
# here, users with privileged properties will see logs from all VOs. Is it what we want ?
search_params = [{"parameter": "PilotID", "operator": "eq", "value": pilot_id}]
if _non_privileged(user_info):
search_params.append(
{"parameter": "VO", "operator": "eq", "value": user_info.vo}
)
result = await db.search(
["Message"],
search_params,
[{"parameter": "LineNumber", "direction": "asc"}],
)
if not result:
return [{"Message": f"No logs for pilot ID = {pilot_id}"}]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this bloc of code should be a method of diracx-dbs/src/diracx/db/sql/pilots/pilot_agents.py PilotAgentsDB

"""Delete either logs for a specific PilotID or a creation date range.
Non-privileged users can only delete log files within their own VO.
"""
message = "no-op"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you initialize message as "no-op"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is a return message to the client. As far as I can see, the delete by query operation is silent. It does not complain when you delete something which doesn't not exist etc. Or I might have missed something in the docs.

Comment on lines +152 to +156
def _non_privileged(user_info: AuthorizedUserInfo):
return (
SERVICE_ADMINISTRATOR not in user_info.properties
and OPERATOR not in user_info.properties
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this should be moved in the access_policies.py function? May be by adding an action type? Or by adding some logic like in:

job_owners = await job_db.summary(
["Owner", "VO"],
[{"parameter": "JobID", "operator": "in", "values": job_ids}],
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a logical problem what to do here. I would like to search only once (and delete by query for a delete operation w/o prior searching). This is a reason why I removed the DB argument from a policy call. The problem is more visible in the delete case:

  • I could search in the policy for a given client VO and if pilot IDs are not from the VO throw an exception (not sure what to do if some IDs are and some are not from the VO the client has - for a bulk PilotID search)
  • then delete by query (which searches again)

Or could I use the AgentsDB to match the VO ? Still an extra DB query, this time and SQL one.
I opted for a restricted silent delete and and a restricted search by adding the VO, but outside the policy.


@router.delete("/logs")
async def delete(
pilot_id: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also tend to make bulk requests to the db to improve the performances of the system. I would take a list of pilot_ids as inputs instead of a single pilot_id.

Then you can return sucess and failures as it is done for the jobs for instance:

@router.delete("/")
async def remove_bulk_jobs(
job_ids: Annotated[list[int], Query()],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
sandbox_metadata_db: SandboxMetadataDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
check_permissions: CheckWMSPolicyCallable,
):
"""Fully remove a list of jobs from the WMS databases.
WARNING: This endpoint has been implemented for the compatibility with the legacy DIRAC WMS
and the JobCleaningAgent. However, once this agent is ported to diracx, this endpoint should
be removed, and a status change to Deleted (PATCH /jobs/status) should be used instead for any other purpose.
"""
await check_permissions(action=ActionType.MANAGE, job_db=job_db, job_ids=job_ids)
return await remove_jobs(
job_ids,
config,
job_db,
job_logging_db,
sandbox_metadata_db,
task_queue_db,
background_task,
)


@router.get("/logs")
async def get_logs(
pilot_id: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly,

Suggested change
pilot_id: int,
pilot_ids: List[int],

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants