diff --git a/singer_sdk/_singerlib/encoding/_base.py b/singer_sdk/_singerlib/encoding/_base.py index 62793776b..404c6ed6c 100644 --- a/singer_sdk/_singerlib/encoding/_base.py +++ b/singer_sdk/_singerlib/encoding/_base.py @@ -38,6 +38,10 @@ class SingerMessageType(str, enum.Enum): class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): """Interface for all plugins reading Singer messages as strings or bytes.""" + def __init__(self) -> None: + super().__init__() + self._current_message: T | None = None + @t.final def listen(self, file_input: t.IO[T] | None = None) -> None: """Read from input until all messages are processed. @@ -45,7 +49,14 @@ def listen(self, file_input: t.IO[T] | None = None) -> None: Args: file_input: Readable stream of messages. Defaults to standard in. """ - self._process_lines(file_input or self.default_input) + try: + self._process_lines(file_input or self.default_input) + except Exception: + logger.debug( + "Failed while processing Singer message: %s", + self._current_message, + ) + raise self._process_endofpipe() def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: @@ -59,6 +70,8 @@ def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: """ stats: dict[str, int] = defaultdict(int) for line in file_input: + self._current_message = line + line_dict = self.deserialize_json(line) self._assert_line_requires(line_dict, requires={"type"}) diff --git a/tests/core/test_io.py b/tests/core/test_io.py index a48a785df..4f8b20199 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -6,6 +6,7 @@ import io import itertools import json +import logging from contextlib import nullcontext, redirect_stdout from textwrap import dedent @@ -83,6 +84,29 @@ def test_listen_unknown_message(): reader.listen(input_lines) +def test_listen_error(caplog: pytest.LogCaptureFixture): + class ErrorReader(DummyReader): + def _process_record_message(self, message_dict: dict) -> None: # noqa: ARG002 + msg = "Bad record" + raise ValueError(msg) + + message = RecordMessage( + stream="users", + record={"id": 1, "value": 1.23}, + ) + + input_lines = io.StringIO(json.dumps(message.to_dict()) + "\n") + + reader = ErrorReader() + with caplog.at_level(logging.DEBUG), pytest.raises(ValueError, match="Bad record"): + reader.listen(input_lines) + + assert caplog.records + + message = caplog.records[0].message + assert "Failed while processing Singer message" in message + + def test_write_message(): writer = SingerWriter() message = RecordMessage(