diff --git a/CHANGELOG.md b/CHANGELOG.md index b3904008e..e44d0ff32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add `load_dataset` to public API ([#484](https://github.com/etna-team/etna/pull/484)) - Add example of using custom pipeline pools in `Auto` ([#504](https://github.com/etna-team/etna/pull/504)) -- +- Add `MetricWithMissingHandling` base class, change signature of `etna.metrics.Metric` to return `None` values ([#514](https://github.com/etna-team/etna/pull/514)) - - - diff --git a/docs/source/api_reference/metrics.rst b/docs/source/api_reference/metrics.rst index 39a728b9d..fa5607010 100644 --- a/docs/source/api_reference/metrics.rst +++ b/docs/source/api_reference/metrics.rst @@ -19,6 +19,7 @@ Base: :template: class.rst Metric + MetricWithMissingHandling Enums: @@ -27,6 +28,7 @@ Enums: :template: class.rst MetricAggregationMode + MetricMissingMode Scalar metrics: diff --git a/etna/metrics/__init__.py b/etna/metrics/__init__.py index 479b25489..24f2e679a 100644 --- a/etna/metrics/__init__.py +++ b/etna/metrics/__init__.py @@ -8,6 +8,8 @@ from etna.metrics.base import Metric from etna.metrics.base import MetricAggregationMode +from etna.metrics.base import MetricMissingMode +from etna.metrics.base import MetricWithMissingHandling from etna.metrics.functional_metrics import mae from etna.metrics.functional_metrics import mape from etna.metrics.functional_metrics import max_deviation diff --git a/etna/metrics/base.py b/etna/metrics/base.py index 244fde011..0dfc0304e 100644 --- a/etna/metrics/base.py +++ b/etna/metrics/base.py @@ -1,9 +1,14 @@ +import reprlib +import warnings from abc import ABC from abc import abstractmethod from enum import Enum +from typing import Callable from typing import Dict from typing import Optional +from typing import Sequence from typing import Union +from typing import cast import numpy as np import pandas as pd @@ -32,6 +37,22 @@ def _missing_(cls, value): ) +class MetricMissingMode(str, Enum): + """Enum for different metric modes of working with missing values.""" + + #: The error is raised on missing values in y_true or y_pred. + error = "error" + + #: Missing values in y_true are ignored, the error is raised on missing values in y_pred. + ignore = "ignore" + + @classmethod + def _missing_(cls, value): + raise NotImplementedError( + f"{value} is not a valid {cls.__name__}. Only {', '.join([repr(m.value) for m in cls])} modes allowed" + ) + + class MetricFunctionSignature(str, Enum): """Enum for different metric function signatures.""" @@ -61,7 +82,7 @@ class AbstractMetric(ABC): """Abstract class for metric.""" @abstractmethod - def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]: + def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]: """ Compute metric's value with ``y_true`` and ``y_pred``. @@ -137,6 +158,9 @@ def __init__( NotImplementedError: If non-existent ``metric_fn_signature`` is used. """ + self._aggregate_metrics: Callable[ + [Dict[str, Optional[float]]], Union[Optional[float], Dict[str, Optional[float]]] + ] if MetricAggregationMode(mode) is MetricAggregationMode.macro: self._aggregate_metrics = self._macro_average elif MetricAggregationMode(mode) is MetricAggregationMode.per_segment: @@ -230,8 +254,7 @@ def _validate_index(y_true: TSDataset, y_pred: TSDataset): if not y_true.index.equals(y_pred.index): raise ValueError("y_true and y_pred have different timestamps") - @staticmethod - def _validate_nans(y_true: TSDataset, y_pred: TSDataset): + def _validate_nans(self, y_true: TSDataset, y_pred: TSDataset): """Check that ``y_true`` and ``y_pred`` doesn't have NaNs. Parameters @@ -249,41 +272,47 @@ def _validate_nans(y_true: TSDataset, y_pred: TSDataset): df_true = y_true.df.loc[:, pd.IndexSlice[:, "target"]] df_pred = y_pred.df.loc[:, pd.IndexSlice[:, "target"]] - df_true_isna = df_true.isna().any().any() - if df_true_isna > 0: - raise ValueError("There are NaNs in y_true") + df_true_isna_sum = df_true.isna().sum() + if (df_true_isna_sum > 0).any(): + error_segments = set(df_true_isna_sum[df_true_isna_sum > 0].index.droplevel("feature").tolist()) + raise ValueError(f"There are NaNs in y_true! Segments with NaNs: {reprlib.repr(error_segments)}.") - df_pred_isna = df_pred.isna().any().any() - if df_pred_isna > 0: - raise ValueError("There are NaNs in y_pred") + df_pred_isna_sum = df_pred.isna().sum() + if (df_pred_isna_sum > 0).any(): + error_segments = set(df_pred_isna_sum[df_pred_isna_sum > 0].index.droplevel("feature").tolist()) + raise ValueError(f"There are NaNs in y_pred Segments with NaNs: {reprlib.repr(error_segments)}.") @staticmethod - def _macro_average(metrics_per_segments: Dict[str, float]) -> Union[float, Dict[str, float]]: + def _macro_average(metrics_per_segments: Dict[str, Optional[float]]) -> Optional[float]: """ Compute macro averaging of metrics over segment. Parameters ---------- - metrics_per_segments: dict of {segment: metric_value} for segments to aggregate + metrics_per_segments: + dict of {segment: metric_value} for segments to aggregate Returns ------- - aggregated value of metric + : + aggregated value of metric """ - return np.mean(list(metrics_per_segments.values())).item() + return np.mean(list(metrics_per_segments.values())).item() # type: ignore @staticmethod - def _per_segment_average(metrics_per_segments: Dict[str, float]) -> Union[float, Dict[str, float]]: + def _per_segment_average(metrics_per_segments: Dict[str, Optional[float]]) -> Dict[str, Optional[float]]: """ Compute per-segment averaging of metrics over segment. Parameters ---------- - metrics_per_segments: dict of {segment: metric_value} for segments to aggregate + metrics_per_segments: + dict of {segment: metric_value} for segments to aggregate Returns ------- - aggregated dict of metric + : + aggregated dict of metric """ return metrics_per_segments @@ -291,7 +320,7 @@ def _log_start(self): """Log metric computation.""" tslogger.log(f"Metric {self.__repr__()} is calculated on dataset") - def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]: + def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]: """ Compute metric's value with ``y_true`` and ``y_pred``. @@ -322,16 +351,27 @@ def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[st segments = df_true.columns.get_level_values("segment").unique() - metrics_per_segment: Dict[str, float] + metrics_per_segment: Dict[str, Optional[float]] if self._metric_fn_signature is MetricFunctionSignature.array_to_scalar: metrics_per_segment = {} - for i, segment in enumerate(segments): + for i, cur_segment in enumerate(segments): cur_y_true = df_true.iloc[:, i].values cur_y_pred = df_pred.iloc[:, i].values - metrics_per_segment[segment] = self.metric_fn(y_true=cur_y_true, y_pred=cur_y_pred, **self.kwargs) # type: ignore + cur_value = self.metric_fn(y_true=cur_y_true, y_pred=cur_y_pred, **self.kwargs) + cur_value = cast(float, cur_value) + if np.isnan(cur_value): + metrics_per_segment[cur_segment] = None + else: + metrics_per_segment[cur_segment] = cur_value elif self._metric_fn_signature is MetricFunctionSignature.matrix_to_array: values = self.metric_fn(y_true=df_true.values, y_pred=df_pred.values, **self.kwargs) - metrics_per_segment = dict(zip(segments, values)) # type: ignore + values = cast(Sequence[float], values) + metrics_per_segment = {} + for cur_segment, cur_value in zip(segments, values): + if np.isnan(cur_value): + metrics_per_segment[cur_segment] = None + else: + metrics_per_segment[cur_segment] = cur_value else: assert_never(self._metric_fn_signature) @@ -339,4 +379,106 @@ def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[st return metrics -__all__ = ["Metric", "MetricAggregationMode"] +class MetricWithMissingHandling(Metric): + """Base class for all the multi-segment metrics that can handle missing values.""" + + def __init__( + self, + metric_fn: MetricFunction, + mode: str = MetricAggregationMode.per_segment, + metric_fn_signature: str = "array_to_scalar", + missing_mode: str = "error", + **kwargs, + ): + """ + Init Metric. + + Parameters + ---------- + metric_fn: + functional metric + mode: + "macro" or "per-segment", way to aggregate metric values over segments: + + * if "macro" computes average value + + * if "per-segment" -- does not aggregate metrics + + metric_fn_signature: + type of signature of ``metric_fn`` (see :py:class:`~etna.metrics.base.MetricFunctionSignature`) + missing_mode: + mode of handling missing values (see :py:class:`~etna.metrics.base.MetricMissingMode`) + kwargs: + functional metric's params + + Raises + ------ + NotImplementedError: + If non-existent ``mode`` is used. + NotImplementedError: + If non-existent ``metric_fn_signature`` is used. + NotImplementedError: + If non-existent ``missing_mode`` is used. + """ + super().__init__(metric_fn=metric_fn, mode=mode, metric_fn_signature=metric_fn_signature, **kwargs) + self.missing_mode = MetricMissingMode(missing_mode) + + def _validate_nans(self, y_true: TSDataset, y_pred: TSDataset): + """Check that ``y_true`` and ``y_pred`` doesn't have NaNs depending on ``missing_mode``. + + Parameters + ---------- + y_true: + y_true dataset + y_pred: + y_pred dataset + + Raises + ------ + ValueError: + If there are NaNs in ``y_true`` or ``y_pred`` + """ + df_true = y_true.df.loc[:, pd.IndexSlice[:, "target"]] + df_pred = y_pred.df.loc[:, pd.IndexSlice[:, "target"]] + + df_true_isna_sum = df_true.isna().sum() + if self.missing_mode is MetricMissingMode.error and (df_true_isna_sum > 0).any(): + error_segments = set(df_true_isna_sum[df_true_isna_sum > 0].index.droplevel("feature").tolist()) + raise ValueError(f"There are NaNs in y_true! Segments with NaNs: {reprlib.repr(error_segments)}.") + + df_pred_isna_sum = df_pred.isna().sum() + if (df_pred_isna_sum > 0).any(): + error_segments = set(df_pred_isna_sum[df_pred_isna_sum > 0].index.droplevel("feature").tolist()) + raise ValueError(f"There are NaNs in y_pred Segments with NaNs: {reprlib.repr(error_segments)}.") + + @staticmethod + def _macro_average(metrics_per_segments: Dict[str, Optional[float]]) -> Optional[float]: + """ + Compute macro averaging of metrics over segment. + + None values are ignored during computation. + + Parameters + ---------- + metrics_per_segments: dict of {segment: metric_value} for segments to aggregate + + Returns + ------- + : + aggregated value of metric + """ + with warnings.catch_warnings(): + # this helps to prevent warning in case of all nans + warnings.filterwarnings( + message="Mean of empty slice", + action="ignore", + ) + # dtype=float is used to cast None to np.nan + value = np.nanmean(np.fromiter(metrics_per_segments.values(), dtype=float)).item() + if np.isnan(value): + return None + else: + return value + + +__all__ = ["Metric", "MetricWithMissingHandling", "MetricAggregationMode", "MetricMissingMode"] diff --git a/etna/metrics/intervals_metrics.py b/etna/metrics/intervals_metrics.py index 38553f5be..6e70525af 100644 --- a/etna/metrics/intervals_metrics.py +++ b/etna/metrics/intervals_metrics.py @@ -96,7 +96,7 @@ def __init__( self.upper_name = upper_name self.lower_name = lower_name - def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]: + def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]: """ Compute metric's value with y_true and y_pred. @@ -209,7 +209,7 @@ def __init__( self.upper_name = upper_name self.lower_name = lower_name - def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]: + def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]: """ Compute metric's value with y_true and y_pred. diff --git a/etna/metrics/utils.py b/etna/metrics/utils.py index 761257d11..5e31c5d78 100644 --- a/etna/metrics/utils.py +++ b/etna/metrics/utils.py @@ -1,6 +1,7 @@ from typing import Callable from typing import Dict from typing import List +from typing import Optional from typing import Union import numpy as np @@ -13,7 +14,7 @@ def compute_metrics( metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset -) -> Dict[str, Union[float, Dict[str, float]]]: +) -> Dict[str, Union[Optional[float], Dict[str, Optional[float]]]]: """ Compute metrics for given y_true, y_pred. diff --git a/etna/pipeline/base.py b/etna/pipeline/base.py index 888434cdf..d6728ee6c 100644 --- a/etna/pipeline/base.py +++ b/etna/pipeline/base.py @@ -408,9 +408,9 @@ def _compute_metric(y_true: ArrayLike, y_pred: ArrayLike) -> float: def greater_is_better(self) -> bool: return False - def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[float, Dict[str, float]]: + def __call__(self, y_true: TSDataset, y_pred: TSDataset) -> Union[Optional[float], Dict[str, Optional[float]]]: segments = set(y_true.df.columns.get_level_values("segment")) - metrics_per_segment = {} + metrics_per_segment: Dict[str, Optional[float]] = {} for segment in segments: metrics_per_segment[segment] = 0.0 metrics = self._aggregate_metrics(metrics_per_segment)