Skip to content

Commit

Permalink
Rework missing values validation in etna metrics (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
d-a-bunin authored Dec 4, 2024
1 parent a1647bb commit 0b1b29d
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 28 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Add `load_dataset` to public API ([#484](https://github.com/etna-team/etna/pull/484))
- Add example of using custom pipeline pools in `Auto` ([#504](https://github.com/etna-team/etna/pull/504))
-
- Add `MetricWithMissingHandling` base class, change signature of `etna.metrics.Metric` to return `None` values ([#514](https://github.com/etna-team/etna/pull/514))
-
-
-
Expand Down
2 changes: 2 additions & 0 deletions docs/source/api_reference/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Base:
:template: class.rst

Metric
MetricWithMissingHandling

Enums:

Expand All @@ -27,6 +28,7 @@ Enums:
:template: class.rst

MetricAggregationMode
MetricMissingMode

Scalar metrics:

Expand Down
2 changes: 2 additions & 0 deletions etna/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from etna.metrics.base import Metric
from etna.metrics.base import MetricAggregationMode
from etna.metrics.base import MetricMissingMode
from etna.metrics.base import MetricWithMissingHandling
from etna.metrics.functional_metrics import mae
from etna.metrics.functional_metrics import mape
from etna.metrics.functional_metrics import max_deviation
Expand Down
186 changes: 164 additions & 22 deletions etna/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import reprlib
import warnings
from abc import ABC
from abc import abstractmethod
from enum import Enum
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -32,6 +37,22 @@ def _missing_(cls, value):
)


class MetricMissingMode(str, Enum):
"""Enum for different metric modes of working with missing values."""

#: The error is raised on missing values in y_true or y_pred.
error = "error"

#: Missing values in y_true are ignored, the error is raised on missing values in y_pred.
ignore = "ignore"

@classmethod
def _missing_(cls, value):
raise NotImplementedError(
f"{value} is not a valid {cls.__name__}. Only {', '.join([repr(m.value) for m in cls])} modes allowed"
)


class MetricFunctionSignature(str, Enum):
"""Enum for different metric function signatures."""

Expand Down Expand Up @@ -61,7 +82,7 @@ class AbstractMetric(ABC):
"""Abstract class for metric."""

@abstractmethod
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]:
"""
Compute metric's value with ``y_true`` and ``y_pred``.
Expand Down Expand Up @@ -137,6 +158,9 @@ def __init__(
NotImplementedError:
If non-existent ``metric_fn_signature`` is used.
"""
self._aggregate_metrics: Callable[
[Dict[str, Optional[float]]], Union[Optional[float], Dict[str, Optional[float]]]
]
if MetricAggregationMode(mode) is MetricAggregationMode.macro:
self._aggregate_metrics = self._macro_average
elif MetricAggregationMode(mode) is MetricAggregationMode.per_segment:
Expand Down Expand Up @@ -230,8 +254,7 @@ def _validate_index(y_true: TSDataset, y_pred: TSDataset):
if not y_true.index.equals(y_pred.index):
raise ValueError("y_true and y_pred have different timestamps")

@staticmethod
def _validate_nans(y_true: TSDataset, y_pred: TSDataset):
def _validate_nans(self, y_true: TSDataset, y_pred: TSDataset):
"""Check that ``y_true`` and ``y_pred`` doesn't have NaNs.
Parameters
Expand All @@ -249,49 +272,55 @@ def _validate_nans(y_true: TSDataset, y_pred: TSDataset):
df_true = y_true.df.loc[:, pd.IndexSlice[:, "target"]]
df_pred = y_pred.df.loc[:, pd.IndexSlice[:, "target"]]

df_true_isna = df_true.isna().any().any()
if df_true_isna > 0:
raise ValueError("There are NaNs in y_true")
df_true_isna_sum = df_true.isna().sum()
if (df_true_isna_sum > 0).any():
error_segments = set(df_true_isna_sum[df_true_isna_sum > 0].index.droplevel("feature").tolist())
raise ValueError(f"There are NaNs in y_true! Segments with NaNs: {reprlib.repr(error_segments)}.")

df_pred_isna = df_pred.isna().any().any()
if df_pred_isna > 0:
raise ValueError("There are NaNs in y_pred")
df_pred_isna_sum = df_pred.isna().sum()
if (df_pred_isna_sum > 0).any():
error_segments = set(df_pred_isna_sum[df_pred_isna_sum > 0].index.droplevel("feature").tolist())
raise ValueError(f"There are NaNs in y_pred Segments with NaNs: {reprlib.repr(error_segments)}.")

@staticmethod
def _macro_average(metrics_per_segments: Dict[str, float]) -> Union[float, Dict[str, float]]:
def _macro_average(metrics_per_segments: Dict[str, Optional[float]]) -> Optional[float]:
"""
Compute macro averaging of metrics over segment.
Parameters
----------
metrics_per_segments: dict of {segment: metric_value} for segments to aggregate
metrics_per_segments:
dict of {segment: metric_value} for segments to aggregate
Returns
-------
aggregated value of metric
:
aggregated value of metric
"""
return np.mean(list(metrics_per_segments.values())).item()
return np.mean(list(metrics_per_segments.values())).item() # type: ignore

@staticmethod
def _per_segment_average(metrics_per_segments: Dict[str, float]) -> Union[float, Dict[str, float]]:
def _per_segment_average(metrics_per_segments: Dict[str, Optional[float]]) -> Dict[str, Optional[float]]:
"""
Compute per-segment averaging of metrics over segment.
Parameters
----------
metrics_per_segments: dict of {segment: metric_value} for segments to aggregate
metrics_per_segments:
dict of {segment: metric_value} for segments to aggregate
Returns
-------
aggregated dict of metric
:
aggregated dict of metric
"""
return metrics_per_segments

def _log_start(self):
"""Log metric computation."""
tslogger.log(f"Metric {self.__repr__()} is calculated on dataset")

def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]:
"""
Compute metric's value with ``y_true`` and ``y_pred``.
Expand Down Expand Up @@ -322,21 +351,134 @@ def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[st

segments = df_true.columns.get_level_values("segment").unique()

metrics_per_segment: Dict[str, float]
metrics_per_segment: Dict[str, Optional[float]]
if self._metric_fn_signature is MetricFunctionSignature.array_to_scalar:
metrics_per_segment = {}
for i, segment in enumerate(segments):
for i, cur_segment in enumerate(segments):
cur_y_true = df_true.iloc[:, i].values
cur_y_pred = df_pred.iloc[:, i].values
metrics_per_segment[segment] = self.metric_fn(y_true=cur_y_true, y_pred=cur_y_pred, **self.kwargs) # type: ignore
cur_value = self.metric_fn(y_true=cur_y_true, y_pred=cur_y_pred, **self.kwargs)
cur_value = cast(float, cur_value)
if np.isnan(cur_value):
metrics_per_segment[cur_segment] = None
else:
metrics_per_segment[cur_segment] = cur_value
elif self._metric_fn_signature is MetricFunctionSignature.matrix_to_array:
values = self.metric_fn(y_true=df_true.values, y_pred=df_pred.values, **self.kwargs)
metrics_per_segment = dict(zip(segments, values)) # type: ignore
values = cast(Sequence[float], values)
metrics_per_segment = {}
for cur_segment, cur_value in zip(segments, values):
if np.isnan(cur_value):
metrics_per_segment[cur_segment] = None
else:
metrics_per_segment[cur_segment] = cur_value
else:
assert_never(self._metric_fn_signature)

metrics = self._aggregate_metrics(metrics_per_segment)
return metrics


__all__ = ["Metric", "MetricAggregationMode"]
class MetricWithMissingHandling(Metric):
"""Base class for all the multi-segment metrics that can handle missing values."""

def __init__(
self,
metric_fn: MetricFunction,
mode: str = MetricAggregationMode.per_segment,
metric_fn_signature: str = "array_to_scalar",
missing_mode: str = "error",
**kwargs,
):
"""
Init Metric.
Parameters
----------
metric_fn:
functional metric
mode:
"macro" or "per-segment", way to aggregate metric values over segments:
* if "macro" computes average value
* if "per-segment" -- does not aggregate metrics
metric_fn_signature:
type of signature of ``metric_fn`` (see :py:class:`~etna.metrics.base.MetricFunctionSignature`)
missing_mode:
mode of handling missing values (see :py:class:`~etna.metrics.base.MetricMissingMode`)
kwargs:
functional metric's params
Raises
------
NotImplementedError:
If non-existent ``mode`` is used.
NotImplementedError:
If non-existent ``metric_fn_signature`` is used.
NotImplementedError:
If non-existent ``missing_mode`` is used.
"""
super().__init__(metric_fn=metric_fn, mode=mode, metric_fn_signature=metric_fn_signature, **kwargs)
self.missing_mode = MetricMissingMode(missing_mode)

def _validate_nans(self, y_true: TSDataset, y_pred: TSDataset):
"""Check that ``y_true`` and ``y_pred`` doesn't have NaNs depending on ``missing_mode``.
Parameters
----------
y_true:
y_true dataset
y_pred:
y_pred dataset
Raises
------
ValueError:
If there are NaNs in ``y_true`` or ``y_pred``
"""
df_true = y_true.df.loc[:, pd.IndexSlice[:, "target"]]
df_pred = y_pred.df.loc[:, pd.IndexSlice[:, "target"]]

df_true_isna_sum = df_true.isna().sum()
if self.missing_mode is MetricMissingMode.error and (df_true_isna_sum > 0).any():
error_segments = set(df_true_isna_sum[df_true_isna_sum > 0].index.droplevel("feature").tolist())
raise ValueError(f"There are NaNs in y_true! Segments with NaNs: {reprlib.repr(error_segments)}.")

df_pred_isna_sum = df_pred.isna().sum()
if (df_pred_isna_sum > 0).any():
error_segments = set(df_pred_isna_sum[df_pred_isna_sum > 0].index.droplevel("feature").tolist())
raise ValueError(f"There are NaNs in y_pred Segments with NaNs: {reprlib.repr(error_segments)}.")

@staticmethod
def _macro_average(metrics_per_segments: Dict[str, Optional[float]]) -> Optional[float]:
"""
Compute macro averaging of metrics over segment.
None values are ignored during computation.
Parameters
----------
metrics_per_segments: dict of {segment: metric_value} for segments to aggregate
Returns
-------
:
aggregated value of metric
"""
with warnings.catch_warnings():
# this helps to prevent warning in case of all nans
warnings.filterwarnings(
message="Mean of empty slice",
action="ignore",
)
# dtype=float is used to cast None to np.nan
value = np.nanmean(np.fromiter(metrics_per_segments.values(), dtype=float)).item()
if np.isnan(value):
return None
else:
return value


__all__ = ["Metric", "MetricWithMissingHandling", "MetricAggregationMode", "MetricMissingMode"]
4 changes: 2 additions & 2 deletions etna/metrics/intervals_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
self.upper_name = upper_name
self.lower_name = lower_name

def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]:
"""
Compute metric's value with y_true and y_pred.
Expand Down Expand Up @@ -209,7 +209,7 @@ def __init__(
self.upper_name = upper_name
self.lower_name = lower_name

def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]:
"""
Compute metric's value with y_true and y_pred.
Expand Down
3 changes: 2 additions & 1 deletion etna/metrics/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import numpy as np
Expand All @@ -13,7 +14,7 @@

def compute_metrics(
metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
) -> Dict[str, Union[float, Dict[str, float]]]:
) -> Dict[str, Union[Optional[float], Dict[str, Optional[float]]]]:
"""
Compute metrics for given y_true, y_pred.
Expand Down
4 changes: 2 additions & 2 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ def _compute_metric(y_true: ArrayLike, y_pred: ArrayLike) -> float:
def greater_is_better(self) -> bool:
return False

def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]:
def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]:
segments = set(y_true.df.columns.get_level_values("segment"))
metrics_per_segment = {}
metrics_per_segment: Dict[str, Optional[float]] = {}
for segment in segments:
metrics_per_segment[segment] = 0.0
metrics = self._aggregate_metrics(metrics_per_segment)
Expand Down

0 comments on commit 0b1b29d

Please sign in to comment.