Skip to content

Commit

Permalink
Merge pull request #4 from wlatanowicz/wl/redis-serializer
Browse files Browse the repository at this point in the history
Redis serializer, Immediate backend, Purge on AWS
  • Loading branch information
wlatanowicz authored Mar 5, 2024
2 parents 53412b6 + 0b768b6 commit a28e6fb
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 3 deletions.
1 change: 1 addition & 0 deletions requirements/extras/redis.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
redis
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def get_about():
BUNDLES = (
"aws_batch",
"docker",
"redis",
)


Expand Down
80 changes: 77 additions & 3 deletions themule/backends.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING
from dataclasses import dataclass
from typing import TYPE_CHECKING, Generator
from uuid import uuid4

from .conf import NOTSET, settings
from .exceptions import ConfigurationError
from .import_helpers import import_by_path
from .job import StartedJob

if TYPE_CHECKING:
Expand All @@ -23,6 +27,9 @@ def __init__(self, **options) -> None:
def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
raise NotImplementedError()

def purge(self):
raise NotImplementedError()

def get_path(self):
return f"{self.__module__}.{self.__class__.__name__}"

Expand All @@ -39,15 +46,25 @@ def get_option_value(self, options, option, default=NOTSET, cast=None):
class AwsBatchBackend(BaseBackend):
OPTION_PREFIX = "aws_batch"

@dataclass
class _QueuedJob:
job_id: str

def __init__(self, **options) -> None:
self.queue_name = self.get_option_value(options, "queue_name")
self.job_definition = self.get_option_value(options, "job_definition")

def purge(self):
statuses = ("SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING")
for status in statuses:
for job in self._list_jobs(status):
self._terminate_job(job, "Queue purged")

def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
try:
import boto3
except ImportError:
raise ValueError("AWS support not installed")
raise ConfigurationError("AWS support not installed")

serialized_job = serializer.serialize(job)

Expand Down Expand Up @@ -75,6 +92,49 @@ def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
job_id,
)

def _list_jobs(
self, status: str | None = None
) -> Generator[_QueuedJob, None, None]:
try:
import boto3
except ImportError:
raise ConfigurationError("AWS support not installed")

is_first = True
client = boto3.client("batch")

while is_first or next_token:
is_first = False
kwargs = {}

if next_token:
kwargs["nextToken"] = next_token

if status:
kwargs["jobStatus"] = status

result = client.list_jobs(
jobQueue=self.queue_name,
**kwargs,
)
next_token = result.get("nextToken")
for job in result.get("jobSummaryList", []):
yield self._QueuedJob(
job_id=job["jobId"],
)

def _terminate_job(self, job: _QueuedJob, reason: str):
try:
import boto3
except ImportError:
raise ConfigurationError("AWS support not installed")

client = boto3.client("batch")
client.terminate_job(
jobId=job.job_id,
reason=reason,
)


class LocalDockerBackend(BaseBackend):
OPTION_PREFIX = "docker"
Expand All @@ -101,7 +161,7 @@ def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
try:
import docker
except ImportError:
raise ValueError("Docker support not installed")
raise ConfigurationError("Docker support not installed")

serialized_job = serializer.serialize(job)

Expand Down Expand Up @@ -168,3 +228,17 @@ def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
job,
job_id,
)


class Immediate(BaseBackend):
def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
func = import_by_path(job.func)

func(*job.args, **job.kwargs)
job_id = str(uuid4())

return StartedJob(
self.get_path(),
job,
job_id,
)
2 changes: 2 additions & 0 deletions themule/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ def execute_job_cli(serializer_path, job_spec):
job = serializer.unserialize(job_spec)

execute_job(job)

serializer.cleanup(job)
2 changes: 2 additions & 0 deletions themule/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ConfigurationError(Exception):
pass
91 changes: 91 additions & 0 deletions themule/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from uuid import UUID

from .conf import NOTSET, settings
from .exceptions import ConfigurationError

if TYPE_CHECKING:
from .job import Job
Expand All @@ -15,6 +16,8 @@


class BaseSerializer:
OPTION_PREFIX = None

def __init__(self, **options) -> None:
pass

Expand All @@ -24,6 +27,9 @@ def serialize(self, job: Job) -> str:
def unserialize(self, data: str) -> Job:
raise NotImplementedError()

def cleanup(self, job: Job):
pass

def get_path(self):
return f"{self.__module__}.{self.__class__.__name__}"

Expand Down Expand Up @@ -66,3 +72,88 @@ def _json_serializer(self, obj):
return obj.isoformat()

raise TypeError(f"Type {type(obj)} is not JSON serializable")


class RedisStoreSerializer(BaseSerializer):
OPTION_PREFIX = "redis_store"

DEFAULT_PREFIX = "themule_job/"
DEFAULT_TTL = 60 * 60 * 24 * 30 # 30 days
DEFAULT_CLEANUP_TTL = 600 # 10 minutes

def __init__(self, **options) -> None:
self.redis_url = self.get_option_value(options, "url")
self.ttl = self.get_option_value(
options, "ttl", default=self.DEFAULT_TTL, cast=int
)
self.cleanup_ttl = self.get_option_value(
options, "cleanup_ttl", default=self.DEFAULT_CLEANUP_TTL, cast=int
)
self.prefix = self.get_option_value(
options, "prefix", default=self.DEFAULT_PREFIX, cast=str
)

def _make_key(self, job: Job) -> str:
return f"{self.prefix}{job.id}"

def serialize(self, job: Job) -> str:
try:
import redis
except ImportError:
raise ConfigurationError("Redis support not installed")

conn = redis.from_url(self.redis_url)

payload = {
"id": str(job.id),
"func": job.func,
"args": job.args,
"kwargs": job.kwargs,
}
json_payload = json.dumps(
payload,
default=self._json_serializer,
)

key = self._make_key(job)
conn.setex(key, json_payload, self.ttl)

def unserialize(self, data: str) -> Job:
try:
import redis
except ImportError:
raise ConfigurationError("Redis support not installed")
from .job import Job

conn = redis.from_url(self.redis_url)

key = data
payload = conn.get(key)

json_payload = json.loads(payload)
job = Job(
id=UUID(json_payload["id"]),
func=json_payload["func"],
args=json_payload["args"],
kwargs=json_payload["kwargs"],
)

key_check = self._make_key(job)
assert key == key_check
return job

def cleanup(self, job: Job):
try:
import redis
except ImportError:
raise ConfigurationError("Redis support not installed")

conn = redis.from_url(self.redis_url)
key = self._make_key(job)
conn.expire(key, self.cleanup_ttl)

def _json_serializer(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()

raise TypeError(f"Type {type(obj)} is not JSON serializable")

0 comments on commit a28e6fb

Please sign in to comment.