Skip to content

Commit

Permalink
Add a stress test (#110)
Browse files Browse the repository at this point in the history
* add stress test

* show pytest durations

* Fix the warning.

* Shorter snapshot test.

* Fix a bug in manage_running_pips

* Use new test file instead.
  • Loading branch information
PeterKraus authored Nov 20, 2024
1 parent b334c93 commit 27e8ff5
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/workflow-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ jobs:
run: tomato --version
- name: Run pytest
shell: bash
run: pytest -vv
run: pytest -vvv --durations=0
6 changes: 4 additions & 2 deletions src/tomato/daemon/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import psutil

from tomato.daemon.io import merge_netcdfs, data_to_pickle
from tomato.models import Pipeline, Daemon, Component, Device, Driver, Job
from tomato.models import Pipeline, Daemon, Component, Device, Driver, Job, CompletedJob
from dgbowl_schemas.tomato import to_payload
from dgbowl_schemas.tomato.payload import Task

Expand Down Expand Up @@ -100,7 +100,9 @@ def manage_running_pips(daemon: Daemon, req):
logger.debug(f"{running=}")
for pip in running:
job = daemon.jobs[pip.jobid]
if job.pid is None:
if isinstance(job, CompletedJob):
continue
elif job.pid is None:
continue
pidexists = psutil.pid_exists(job.pid)
logger.debug(f"{pidexists=}")
Expand Down
2 changes: 1 addition & 1 deletion src/tomato/ketchup/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def snapshot(
return Reply(success=False, msg=f"job {jobid} is still queued")

for jobid in jobids:
jobs[jobid].snappath = Path(f"snapshot.{jobid}.nc")
jobs[jobid].snappath = f"snapshot.{jobid}.nc"
merge_netcdfs(jobs[jobid], snapshot=True)
if len(jobids) > 1:
msg = f"snapshot for jobs {jobids} created successfully"
Expand Down
10 changes: 10 additions & 0 deletions tests/common/counter_20_1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "0.2"
sample:
name: counter_20_1
method:
- device: "counter"
technique: "count"
time: 20.0
delay: 1.0
tomato:
verbosity: "DEBUG"
11 changes: 11 additions & 0 deletions tests/common/counter_stresstest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: "1.0"
sample:
name: counter_stresstest
method:
- component_tag: "counter"
technique_name: "count"
max_duration: 0.2
sampling_interval: 0.1
settings:
unlock_when_done: true
verbosity: DEBUG
2 changes: 1 addition & 1 deletion tests/test_99_example_counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_counter_cancel(casename, datadir, start_tomato_daemon, stop_tomato_daem
@pytest.mark.parametrize(
"casename, external",
[
("counter_60_0.1", True),
("counter_20_1", True),
("counter_snapshot", False),
],
)
Expand Down
42 changes: 42 additions & 0 deletions tests/test_99_stresstest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import pytest
import os
import subprocess
import xarray as xr
from datetime import datetime

from tomato.models import Job
from . import utils

PORT = 12345


@pytest.mark.parametrize(
"case, nreps",
[
("counter_stresstest", 5),
],
)
def test_stresstest(case, nreps, datadir, stop_tomato_daemon):
os.chdir(datadir)
subprocess.run(["tomato", "init", "-p", f"{PORT}", "-A", ".", "-D", "."])
subprocess.run(["tomato", "start", "-p", f"{PORT}", "-A", ".", "-L", "."])
utils.wait_until_tomato_running(port=PORT, timeout=3000)

subprocess.run(["tomato", "pipeline", "load", "-p", f"{PORT}", "pip-counter", case])
for i in range(nreps):
subprocess.run(["ketchup", "submit", "-p", f"{PORT}", f"{case}.yml"])

subprocess.run(["tomato", "pipeline", "ready", "-p", f"{PORT}", "pip-counter"])

utils.wait_until_ketchup_status(jobid=nreps, status="c", port=PORT, timeout=40000)

prev = None
for i in range(nreps):
i += 1
assert os.path.exists(f"results.{i}.nc")
dt = xr.open_datatree(f"results.{i}.nc")
completed_at = Job.model_validate_json(dt.attrs["tomato_Job"]).completed_at
ti = datetime.fromisoformat(completed_at)
if prev is not None:
assert ti > prev
prev = ti

0 comments on commit 27e8ff5

Please sign in to comment.