Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Expose metric dictionary to make logging metrics as JSON easier #2162

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions docs/implementation/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,43 @@ Users of a tap can configure the SDK logging by setting the `SINGER_SDK_LOG_CONF
environment variable. The value of this variable should be a path to a YAML file in the
[Python logging dict format](https://docs.python.org/3/library/logging.config.html#dictionary-schema-details).

For example, to send [metrics](./metrics.md) (with logger name `singer_sdk.metrics`) to a file, you could use the following config:
### Metrics logging

The Singer SDK provides a logger named `singer_sdk.metrics` for logging [Singer metrics](./metrics.md). Metric log records contain an extra field `point` which is a dictionary containing the metric data. The `point` field is formatted as JSON by default.

To send metrics to a file in JSON format, you could use the following config:

```yaml
version: 1
disable_existing_loggers: false
formatters:
metrics:
format: "{asctime} {levelname} {message}"
(): pythonjsonlogger.jsonlogger.JsonFormatter
format: "{created} {message} {point}"
style: "{"
handlers:
metrics:
class: logging.FileHandler
formatter: metrics
filename: metrics.log
filename: metrics.jsonl
loggers:
singer_sdk.metrics:
level: INFO
handlers: [ metrics ]
propagate: yes
propagate: no
```

This will send metrics to a `metrics.log`:

```
2022-09-29 00:48:52,746 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.501743, "tags": {"stream": "continents", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}
2022-09-29 00:48:52,775 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "continents", "endpoint": ""}}
2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.7397160530090332, "tags": {"stream": "continents", "context": {}, "status": "succeeded"}}
2022-09-29 00:48:52,776 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 7, "tags": {"stream": "continents", "context": {}}}
2022-09-29 00:48:53,225 INFO METRIC: {"metric_type": "timer", "metric": "http_request_duration", "value": 0.392148, "tags": {"stream": "countries", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}
2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "countries", "endpoint": ""}}
2022-09-29 00:48:53,302 INFO METRIC: {"metric_type": "timer", "metric": "sync_duration", "value": 0.5258760452270508, "tags": {"stream": "countries", "context": {}, "status": "succeeded"}}
2022-09-29 00:48:53,303 INFO METRIC: {"metric_type": "counter", "metric": "record_count", "value": 250, "tags": {"stream": "countries", "context": {}}}
This will send metrics to a `metrics.jsonl`:

```json
{"created": 1705709074.883021, "message": "METRIC", "point": {"type": "timer", "metric": "http_request_duration", "value": 0.501743, "tags": {"stream": "continents", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}}
{"created": 1705709074.897184, "message": "METRIC", "point": {"type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "continents", "endpoint": ""}}}
{"created": 1705709074.897256, "message": "METRIC", "point": {"type": "timer", "metric": "sync_duration", "value": 0.7397160530090332, "tags": {"stream": "continents", "context": {}, "status": "succeeded"}}}
{"created": 1705709074.897292, "message": "METRIC", "point": {"type": "counter", "metric": "record_count", "value": 7, "tags": {"stream": "continents", "context": {}}}}
{"created": 1705709075.397254, "message": "METRIC", "point": {"type": "timer", "metric": "http_request_duration", "value": 0.392148, "tags": {"stream": "countries", "endpoint": "", "http_status_code": 200, "status": "succeeded"}}}
{"created": 1705709075.421888, "message": "METRIC", "point": {"type": "counter", "metric": "http_request_count", "value": 1, "tags": {"stream": "countries", "endpoint": ""}}}
{"created": 1705709075.422001, "message": "METRIC", "point": {"type": "timer", "metric": "sync_duration", "value": 0.5258760452270508, "tags": {"stream": "countries", "context": {}, "status": "succeeded"}}}
{"created": 1705709075.422047, "message": "METRIC", "point": {"type": "counter", "metric": "record_count", "value": 250, "tags": {"stream": "countries", "context": {}}}}
```

## For package developers
Expand Down
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jsonpath-ng = ">=1.5.3"
jsonschema = ">=4.16.0"
packaging = ">=23.1"
python-dotenv = ">=0.20"
python-json-logger = ">=2"
PyYAML = ">=6.0"
referencing = ">=0.30.0"
requests = ">=2.25.1"
Expand Down
13 changes: 13 additions & 0 deletions singer_sdk/default_logging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@ formatters:
console:
format: "{asctime:23s} | {levelname:8s} | {name:20s} | {message}"
style: "{"
metrics:
(): singer_sdk.metrics.SingerMetricsFormatter
format: "{asctime:23s} | {levelname:8s} | {name:20s} | {message}: {metric_json}"
style: "{"
handlers:
default:
class: logging.StreamHandler
formatter: console
stream: ext://sys.stderr
metrics:
class: logging.StreamHandler
formatter: metrics
stream: ext://sys.stderr
root:
level: INFO
handlers: [default]
loggers:
singer_sdk.metrics:
level: INFO
handlers: [ metrics ]
propagate: no
40 changes: 23 additions & 17 deletions singer_sdk/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,35 @@ class Point(t.Generic[_TVal]):
value: _TVal
tags: dict[str, t.Any] = field(default_factory=dict)

def __str__(self) -> str:
"""Get string representation of this measurement.
def to_dict(self) -> dict[str, t.Any]:
"""Convert this measure to a dictionary.

Returns:
A string representation of this measurement.
A dictionary.
"""
return self.to_json()
return {
"type": self.metric_type,
"metric": self.metric.value,
"value": self.value,
"tags": self.tags,
}

def to_json(self) -> str:
"""Convert this measure to a JSON object.

class SingerMetricsFormatter(logging.Formatter):
"""Logging formatter that adds a ``metric_json`` field to the log record."""

def format(self, record: logging.LogRecord) -> str:
"""Format a log record.

Args:
record: A log record.

Returns:
A JSON object.
A formatted log record.
"""
return json.dumps(
{
"type": self.metric_type,
"metric": self.metric.value,
"value": self.value,
"tags": self.tags,
},
default=str,
)
point = record.__dict__.get("point")
record.__dict__["metric_json"] = json.dumps(point, default=str) if point else ""
return super().format(record)


def log(logger: logging.Logger, point: Point) -> None:
Expand All @@ -101,7 +107,7 @@ def log(logger: logging.Logger, point: Point) -> None:
logger: An logger instance.
point: A measurement.
"""
logger.info("METRIC: %s", point)
logger.info("METRIC", extra={"point": point.to_dict()})


class Meter(metaclass=abc.ABCMeta):
Expand Down
60 changes: 47 additions & 13 deletions tests/core/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
import logging

import pytest
Expand All @@ -17,6 +18,32 @@ def __str__(self) -> str:
return f"{self.name}={self.value}"


def test_singer_metrics_formatter():
formatter = metrics.SingerMetricsFormatter(fmt="{metric_json}", style="{")

record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="test.py",
lineno=1,
msg="METRIC",
args=(),
exc_info=None,
)

assert formatter.format(record) == ""

metric_dict = {
"type": "counter",
"metric": "test_metric",
"tags": {"test_tag": "test_value"},
"value": 1,
}
record.__dict__["point"] = metric_dict

assert formatter.format(record) == json.dumps(metric_dict)


def test_meter():
class _MyMeter(metrics.Meter):
def __enter__(self):
Expand All @@ -38,6 +65,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):


def test_record_counter(caplog: pytest.LogCaptureFixture):
metrics_logger = logging.getLogger(metrics.METRICS_LOGGER_NAME)
metrics_logger.propagate = True

caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)
custom_object = CustomObject("test", 1)

Expand All @@ -56,29 +86,33 @@ def test_record_counter(caplog: pytest.LogCaptureFixture):
total = 0

assert len(caplog.records) == 100 + 1
# raise AssertionError

for record in caplog.records:
assert record.levelname == "INFO"
assert record.msg == "METRIC: %s"
assert "test=1" in record.message
assert record.msg.startswith("METRIC")

point: metrics.Point[int] = record.args[0]
assert point.metric_type == "counter"
assert point.metric == "record_count"
assert point.tags == {
point = record.__dict__["point"]
assert point["type"] == "counter"
assert point["metric"] == "record_count"
assert point["tags"] == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.ENDPOINT: "test_endpoint",
"custom_tag": "pytest",
"custom_obj": custom_object,
}

total += point.value
total += point["value"]

assert total == 100


def test_sync_timer(caplog: pytest.LogCaptureFixture):
metrics_logger = logging.getLogger(metrics.METRICS_LOGGER_NAME)
metrics_logger.propagate = True

caplog.set_level(logging.INFO, logger=metrics.METRICS_LOGGER_NAME)

traveler = time_machine.travel(0, tick=False)
traveler.start()

Expand All @@ -92,15 +126,15 @@ def test_sync_timer(caplog: pytest.LogCaptureFixture):

record = caplog.records[0]
assert record.levelname == "INFO"
assert record.msg == "METRIC: %s"
assert record.msg.startswith("METRIC")

point: metrics.Point[float] = record.args[0]
assert point.metric_type == "timer"
assert point.metric == "sync_duration"
assert point.tags == {
point = record.__dict__["point"]
assert point["type"] == "timer"
assert point["metric"] == "sync_duration"
assert point["tags"] == {
metrics.Tag.STREAM: "test_stream",
metrics.Tag.STATUS: "succeeded",
"custom_tag": "pytest",
}

assert pytest.approx(point.value, rel=0.001) == 10.0
assert pytest.approx(point["value"], rel=0.001) == 10.0
Loading