Skip to content

Commit

Permalink
Garbage collect task result queue when worker context exits (#2973)
Browse files Browse the repository at this point in the history
* Close our link to the task result queue when a worker exits

* Don't exit worker context until worker has run in test
  • Loading branch information
Steve Pletcher authored Sep 1, 2020
1 parent 5156828 commit 7d16f29
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
1 change: 1 addition & 0 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ def __exit__(self, type, value, traceback):
for task in self._running_tasks.values():
if task.is_alive():
task.terminate()
self._task_result_queue.close()
return False # Don't suppress exception

def _generate_worker_info(self):
Expand Down
6 changes: 3 additions & 3 deletions test/worker_external_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ def test_external_task_complete_but_missing_dep_at_runtime(self):
# split up scheduling task and running to simulate runtime scenario
with self._make_worker() as w:
w.add(test_task)
# touch output so test_task should be considered complete at runtime
open(test_task.output_path, 'a').close()
success = w.run()
# touch output so test_task should be considered complete at runtime
open(test_task.output_path, 'a').close()
success = w.run()

self.assertTrue(success)
# upstream dependency output didn't exist at runtime
Expand Down

0 comments on commit 7d16f29

Please sign in to comment.