Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook on_crashed not being called on WorkerProcess #16436

Open
williamjamir opened this issue Dec 18, 2024 · 1 comment
Open

Hook on_crashed not being called on WorkerProcess #16436

williamjamir opened this issue Dec 18, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@williamjamir
Copy link
Contributor

williamjamir commented Dec 18, 2024

Bug summary

When running this code:

import os
from prefect import flow

def hi(*args, **kwargs):
    print("hi 2")


@flow(log_prints=True, on_crashed=[hi])
def my_flow():
    print("From my flow")
    os.kill(os.getpid(), 9)


if __name__ == "__main__":
    my_flow.serve()

And trigger on terminal

prefect deployment run 'my-flow/my-flow'

I can see the following logs, so everything is fine.

17:19:16.511 | INFO    | prefect.flow_runs.runner - Runner 'my-flow' submitting flow run '711ea4e5-a583-4735-b3b1-f8c63104c00a'
17:19:16.537 | INFO    | prefect.flow_runs.runner - Opening process...
17:19:16.547 | INFO    | prefect.flow_runs.runner - Completed submission of flow run '711ea4e5-a583-4735-b3b1-f8c63104c00a'
17:19:17.705 | INFO    | Flow run 'uncovered-serval' - Downloading flow code from storage at '.'
17:19:17.754 | INFO    | Flow run 'uncovered-serval' - Running flow
17:19:17.754 | INFO    | Flow run 'uncovered-serval' - From my flow
17:19:17.759 | INFO    | prefect.flow_runs.runner - Process for flow run 'uncovered-serval' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
17:19:17.775 | INFO    | prefect.flow_runs.runner - Reported flow run '711ea4e5-a583-4735-b3b1-f8c63104c00a' as crashed: Flow run process exited with non-zero status code -9.
17:19:17.806 | INFO    | Flow run 'uncovered-serval' - Downloading flow code from storage at '.'
17:19:17.809 | INFO    | Flow run 'uncovered-serval' - Running hook 'hi' in response to entering state 'Crashed'
17:19:17.809 | INFO    | Flow run 'uncovered-serval' - Hook 'hi' finished running successfully
hi 2

However, if I execute this same code, but using WorkerProcess the hi hook is not being called.

Assume my file is named acme.py

my_flow.from_source(source=str(Path(__file__).parent), entrypoint="acme.py:my_flow").deploy(name="abc", work_pool_name="abc")
PREFECT_API_URL=http://127.0.0.1:4200/api prefect work-pool create ABC # In process mode

Then running on terminal 1:

PREFECT_API_URL=http://127.0.0.1:4200/api prefect worker start --pool abc

And then running on a terminal 2

prefect deployment run 'my-flow/abc'

I can see the following logs:

17:26:43.206 | INFO    | prefect.flow_runs.worker - Worker 'ProcessWorker 39a16187-b5cb-4302-8331-32269007291a' submitting flow run 'dab6fb54-0500-42d0-b36a-8e3c61ffe1f4'
17:26:43.276 | INFO    | prefect.flow_runs.worker - Opening process...
17:26:43.291 | INFO    | prefect.flow_runs.worker - Completed submission of flow run 'dab6fb54-0500-42d0-b36a-8e3c61ffe1f4'
17:26:44.483 | INFO    | Flow run 'macho-dragon' - Running flow
17:26:44.484 | INFO    | Flow run 'macho-dragon' - From my flow
17:26:44.488 | ERROR   | prefect.flow_runs.worker - Process 63020 exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is either caused by manual cancellation or high memory usage causing the operating system to terminate the process.
17:26:44.510 | INFO    | prefect.flow_runs.worker - Reported flow run 'dab6fb54-0500-42d0-b36a-8e3c61ffe1f4' as crashed: Flow run infrastructure exited with non-zero status code -9.

Version info

>>> PREFECT_API_URL=http://127.0.0.1:4200/api prefect version
Version:             3.1.6
API version:         0.8.4
Python version:      3.11.9
Git commit:          1695bd09
Built:               Mon, Dec 16, 2024 12:37 PM
OS/Arch:             darwin/arm64
Profile:             local
Server type:         server
Pydantic version:    2.9.2
Server:
  Database:          sqlite
  SQLite version:    3.43.2


I tried with Linux and doesn't work as well

Additional context

No response

@williamjamir williamjamir added the bug Something isn't working label Dec 18, 2024
@williamjamir
Copy link
Contributor Author

williamjamir commented Dec 18, 2024

It seems that this feature has not been implemented yet, right?

Please correct me if I’m mistaken, but it looks like when the runner is the serve, it calls the _run_process function here: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1183-L1187.
And the crashes for processes are handled here: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/runner/runner.py#L1234-L1237.

However, when it comes to the Worker, the process is called here: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/process.py#L270-L278, But I don’t see any handling for the crash hooks: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/workers/process.py#L317-L319.

Edit:

Ohh I see now that the crash handling is made way before here:

if state.is_crashed():

Is there any change that we could call _run_on_crashed_hooks that the process has here?

Or am I missing something, and this hook is not called by purpose here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant