Skip to content

Commit

Permalink
feat: Interruption and termination signals in taps and targets
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Aug 22, 2024
1 parent 0c079e2 commit 57ed344
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
2 changes: 1 addition & 1 deletion samples/sample_target_csv/csv_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SampleTargetCSV(Target):

name = "target-csv"
config_jsonschema = th.PropertiesList(
th.Property("target_folder", th.StringType, required=True),
th.Property("target_folder", th.StringType, default="output"),
th.Property("file_naming_scheme", th.StringType),
).to_dict()
default_sink_class = SampleCSVTargetSink
25 changes: 24 additions & 1 deletion singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import abc
import logging
import os
import signal
import sys
import time
import typing as t
from importlib import metadata
from pathlib import Path, PurePath
from types import MappingProxyType
from types import FrameType, MappingProxyType

import click

Expand Down Expand Up @@ -176,6 +177,10 @@ def __init__(
# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)

# Signal handling
signal.signal(signal.SIGINT, self._handle_termination)
signal.signal(signal.SIGTERM, self._handle_termination)

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
Expand Down Expand Up @@ -402,6 +407,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:

return errors

def _handle_termination( # pragma: no cover
self,
signum: int, # noqa: ARG002
frame: FrameType | None, # noqa: ARG002
) -> None:
"""Handle termination signal.
Args:
signum: Signal number.
frame: Frame.
Raises:
click.Abort: If the termination signal is received.
"""
self.logger.info("Gracefully shutting down...")
errmsg = "Received termination signal"
raise click.Abort(errmsg)

@classmethod
def print_version(
cls: type[PluginBase],
Expand Down
16 changes: 16 additions & 0 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

if t.TYPE_CHECKING:
from pathlib import PurePath
from types import FrameType

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
Expand Down Expand Up @@ -473,6 +474,21 @@ def sync_all(self) -> None:

# Command Line Execution

def _handle_termination( # pragma: no cover
self,
signum: int,
frame: FrameType | None,
) -> None:
"""Handle termination signal.
Args:
signum: Signal number.
frame: Frame.
"""
for stream in self.streams.values():
stream.finalize_state_progress_markers()
super()._handle_termination(signum, frame)

@classmethod
def invoke( # type: ignore[override]
cls: type[Tap],
Expand Down
19 changes: 19 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

if t.TYPE_CHECKING:
from pathlib import PurePath
from types import FrameType

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
Expand Down Expand Up @@ -540,6 +541,24 @@ def _write_state_message(self, state: dict) -> None:

# CLI handler

def _handle_termination( # pragma: no cover
self,
signum: int,
frame: FrameType | None,
) -> None:
"""Handle termination signals.
Args:
signum: Signal number.
frame: Frame object.
"""
self.logger.info(
"Received termination signal %d, draining all sinks...",
signum,
)
self.drain_all(is_endofpipe=True)
super()._handle_termination(signum, frame)

@classmethod
def invoke( # type: ignore[override]
cls: type[Target],
Expand Down

0 comments on commit 57ed344

Please sign in to comment.