From a8fdd3ce93839359d11091f5e5b7ff1184a8036a Mon Sep 17 00:00:00 2001 From: Maxim Zherelo <60392282+brsnw250@users.noreply.github.com> Date: Thu, 21 Sep 2023 17:16:50 +0300 Subject: [PATCH] Implement `BasePredictionIntervals` (#86) * added implementation * added tests * updated documentation * updated `fit` signature * updated changelog * changed tests * moved intervals to experimental * updated documentation * fixed tests * removed duplications * reworked `params_to_tune` * reworked tests * updated changelog * updated test * reformatted tests --- CHANGELOG.md | 1 + docs/source/api_reference/experimental.rst | 8 + .../prediction_intervals/__init__.py | 1 + .../experimental/prediction_intervals/base.py | 199 ++++++++++++++++ .../test_prediction_intervals/__init__.py | 0 .../test_prediction_intervals/common.py | 51 ++++ .../test_prediction_intervals/test_base.py | 217 ++++++++++++++++++ .../test_prediction_intervals/utils.py | 28 +++ 8 files changed, 505 insertions(+) create mode 100644 etna/experimental/prediction_intervals/__init__.py create mode 100644 etna/experimental/prediction_intervals/base.py create mode 100644 tests/test_experimental/test_prediction_intervals/__init__.py create mode 100644 tests/test_experimental/test_prediction_intervals/common.py create mode 100644 tests/test_experimental/test_prediction_intervals/test_base.py create mode 100644 tests/test_experimental/test_prediction_intervals/utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e30f5ef0..11a39df43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased ### Added +- Base class `BasePredictionIntervals` for prediction intervals into experimental module. ([#86](https://github.com/etna-team/etna/pull/86)) - Add `fit_params` parameter to `etna.models.sarimax.SARIMAXModel` ([#69](https://github.com/etna-team/etna/pull/69)) - Add `quickstart` notebook, add `mechanics_of_forecasting` notebook ([#1343](https://github.com/tinkoff-ai/etna/pull/1343)) - Add gallery of tutorials divided by level ([#46](https://github.com/etna-team/etna/pull/46)) diff --git a/docs/source/api_reference/experimental.rst b/docs/source/api_reference/experimental.rst index 8c7c8d6b5..8621d6c98 100644 --- a/docs/source/api_reference/experimental.rst +++ b/docs/source/api_reference/experimental.rst @@ -26,3 +26,11 @@ Classification of time-series: classification.PredictabilityAnalyzer classification.feature_extraction.TSFreshFeatureExtractor classification.feature_extraction.WEASELFeatureExtractor + +Prediction Intervals: + +.. autosummary:: + :toctree: api/ + :template: class.rst + + prediction_intervals.BasePredictionIntervals diff --git a/etna/experimental/prediction_intervals/__init__.py b/etna/experimental/prediction_intervals/__init__.py new file mode 100644 index 000000000..9330ab65e --- /dev/null +++ b/etna/experimental/prediction_intervals/__init__.py @@ -0,0 +1 @@ +from etna.experimental.prediction_intervals.base import BasePredictionIntervals diff --git a/etna/experimental/prediction_intervals/base.py b/etna/experimental/prediction_intervals/base.py new file mode 100644 index 000000000..98a5342eb --- /dev/null +++ b/etna/experimental/prediction_intervals/base.py @@ -0,0 +1,199 @@ +import pathlib +from abc import abstractmethod +from typing import Dict +from typing import Optional +from typing import Sequence + +import pandas as pd + +from etna.datasets import TSDataset +from etna.distributions import BaseDistribution +from etna.pipeline.base import BasePipeline + + +class BasePredictionIntervals(BasePipeline): + """Base class for prediction intervals methods. + + This class implements a wrapper interface for pipelines and ensembles that provides the ability to + estimate prediction intervals. + + To implement a particular method, one must inherit from this class and provide an implementation for the + abstract method ``_forecast_prediction_interval``. This method should estimate and store prediction + intervals for out-of-sample forecasts. + + In-sample prediction is not supported by default and will raise a corresponding error while attempting to do so. + This functionality could be implemented if needed by overriding ``_predict`` method. This method is responsible + for building an in-sample point forecast and adding prediction intervals. + """ + + def __init__(self, pipeline: BasePipeline): + """Initialize instance of ``BasePredictionIntervals`` with given parameters. + + Parameters + ---------- + pipeline: + Base pipeline or ensemble for prediction intervals estimation. + """ + ts = pipeline.ts + self.pipeline = pipeline + super().__init__(pipeline.horizon) + self.pipeline.ts = ts + + def fit(self, ts: TSDataset, save_ts: bool = True) -> "BasePredictionIntervals": + """Fit the pipeline or ensemble of pipelines. + + Fit and apply given transforms to the data, then fit the model on the transformed data. + + Parameters + ---------- + ts: + Dataset with timeseries data. + save_ts: + Whether to save ``ts`` in the pipeline during ``fit``. + + Returns + ------- + : + Fitted instance. + """ + self.pipeline.fit(ts=ts, save_ts=save_ts) + return self + + @property + def ts(self) -> Optional[TSDataset]: + """Access internal pipeline dataset.""" + return self.pipeline.ts + + @ts.setter + def ts(self, ts: Optional[TSDataset]): + """Set internal pipeline dataset.""" + self.pipeline.ts = ts + + def _predict( + self, + ts: TSDataset, + start_timestamp: Optional[pd.Timestamp], + end_timestamp: Optional[pd.Timestamp], + prediction_interval: bool, + quantiles: Sequence[float], + return_components: bool, + ) -> TSDataset: + """Make in-sample predictions on dataset in a given range. + + This method is not implemented by default. A custom implementation could be added by overriding if needed. + + Parameters + ---------- + ts: + Dataset to make predictions on. + start_timestamp: + First timestamp of prediction range to return, should be >= than first timestamp in ``ts``; + expected that beginning of each segment <= ``start_timestamp``; + if isn't set the first timestamp where each segment began is taken. + end_timestamp: + Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken. + Expected that value is less or equal to the last timestamp in ``ts``. + prediction_interval: + If ``True`` returns prediction interval. + quantiles: + Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. + return_components: + If ``True`` additionally returns forecast components. + + Returns + ------- + : + Dataset with predictions in ``[start_timestamp, end_timestamp]`` range. + """ + raise NotImplementedError( + "In-sample sample prediction is not supported! See documentation on how it could be implemented." + ) + + def _forecast(self, ts: TSDataset, return_components: bool) -> TSDataset: + """Make point forecasts using base pipeline or ensemble.""" + return self.pipeline._forecast(ts=ts, return_components=return_components) + + def save(self, path: pathlib.Path): + """Implement in SavePredictionIntervalsMixin.""" + pass + + @classmethod + def load(cls, path: pathlib.Path): + """Implement in SavePredictionIntervalsMixin.""" + pass + + def forecast( + self, + ts: Optional[TSDataset] = None, + prediction_interval: bool = False, + quantiles: Sequence[float] = (0.025, 0.975), + n_folds: int = 3, + return_components: bool = False, + ) -> TSDataset: + """Make a forecast of the next points of a dataset. + + The result of forecasting starts from the last point of ``ts``, not including it. + + Parameters + ---------- + ts: + Dataset to forecast. + prediction_interval: + If True returns prediction interval for forecast. + quantiles: + Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. + If method don't use or estimate quantiles this parameter will be ignored. + n_folds: + Number of folds to use in the backtest for prediction interval estimation. + return_components: + If True additionally returns forecast components. + + Returns + ------- + : + Dataset with predictions. + """ + predictions = super().forecast( + ts=ts, + prediction_interval=prediction_interval, + quantiles=quantiles, + n_folds=n_folds, + return_components=return_components, + ) + return predictions + + def params_to_tune(self) -> Dict[str, BaseDistribution]: + """Get hyperparameter grid of the base pipeline to tune. + + Returns + ------- + : + Grid with hyperparameters. + """ + pipeline_params = self.pipeline.params_to_tune() + pipeline_params = {f"pipeline.{key}": value for key, value in pipeline_params.items()} + return pipeline_params + + @abstractmethod + def _forecast_prediction_interval( + self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int + ) -> TSDataset: + """Estimate and store prediction intervals. + + Parameters + ---------- + ts: + Dataset to forecast. + predictions: + Dataset with point predictions. + quantiles: + Levels of prediction distribution. + n_folds: + Number of folds to use in the backtest for prediction interval estimation. + + Returns + ------- + : + Dataset with predictions. + """ + pass diff --git a/tests/test_experimental/test_prediction_intervals/__init__.py b/tests/test_experimental/test_prediction_intervals/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_experimental/test_prediction_intervals/common.py b/tests/test_experimental/test_prediction_intervals/common.py new file mode 100644 index 000000000..047554f64 --- /dev/null +++ b/tests/test_experimental/test_prediction_intervals/common.py @@ -0,0 +1,51 @@ +from typing import Dict +from typing import Sequence + +import pandas as pd + +from etna.datasets import TSDataset +from etna.distributions import BaseDistribution +from etna.distributions import FloatDistribution +from etna.experimental.prediction_intervals import BasePredictionIntervals +from etna.models import NaiveModel +from etna.pipeline import BasePipeline +from etna.pipeline import Pipeline +from etna.transforms import AddConstTransform +from etna.transforms import DateFlagsTransform + + +def get_naive_pipeline(horizon): + return Pipeline(model=NaiveModel(), transforms=[], horizon=horizon) + + +def get_naive_pipeline_with_transforms(horizon): + transforms = [AddConstTransform(in_column="target", value=1e6), DateFlagsTransform()] + return Pipeline(model=NaiveModel(), transforms=transforms, horizon=horizon) + + +class DummyPredictionIntervals(BasePredictionIntervals): + """Dummy class for testing.""" + + def __init__(self, pipeline: BasePipeline, width: float = 0.0): + self.width = width + super().__init__(pipeline=pipeline) + + def _forecast_prediction_interval( + self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int + ) -> TSDataset: + """Set intervals borders as point forecast.""" + borders = [] + for segment in ts.segments: + target_df = (predictions[:, segment, "target"]).to_frame() + borders.append(target_df.rename({"target": f"target_lower"}, axis=1) - self.width / 2) + borders.append(target_df.rename({"target": f"target_upper"}, axis=1) + self.width / 2) + + # directly store borders in ts.df + predictions.df = pd.concat([predictions.df] + borders, axis=1).sort_index(axis=1, level=(0, 1)) + + return predictions + + def params_to_tune(self) -> Dict[str, BaseDistribution]: + params = super().params_to_tune() + params["width"] = FloatDistribution(low=-5.0, high=5.0) + return params diff --git a/tests/test_experimental/test_prediction_intervals/test_base.py b/tests/test_experimental/test_prediction_intervals/test_base.py new file mode 100644 index 000000000..a7ae95a01 --- /dev/null +++ b/tests/test_experimental/test_prediction_intervals/test_base.py @@ -0,0 +1,217 @@ +import numpy as np +import pandas as pd +import pytest + +from etna.distributions import CategoricalDistribution +from etna.distributions import FloatDistribution +from etna.distributions import IntDistribution +from etna.ensembles import DirectEnsemble +from etna.ensembles import StackingEnsemble +from etna.ensembles import VotingEnsemble +from etna.models import CatBoostPerSegmentModel +from etna.models import LinearPerSegmentModel +from etna.models import NaiveModel +from etna.models import SeasonalMovingAverageModel +from etna.pipeline import AutoRegressivePipeline +from etna.pipeline import HierarchicalPipeline +from etna.pipeline import Pipeline +from etna.reconciliation import BottomUpReconciliator +from etna.transforms import DateFlagsTransform +from etna.transforms import DeseasonalityTransform +from tests.test_experimental.test_prediction_intervals.common import DummyPredictionIntervals +from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline +from tests.test_experimental.test_prediction_intervals.common import get_naive_pipeline_with_transforms +from tests.test_experimental.test_prediction_intervals.utils import assert_sampling_is_valid + + +def run_base_pipeline_compat_check(ts, pipeline, expected_columns): + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + intervals_pipeline.fit(ts=ts) + + intervals_pipeline_pred = intervals_pipeline.forecast(prediction_interval=True) + columns = intervals_pipeline_pred.df.columns.get_level_values("feature") + + assert len(expected_columns - set(columns)) == 0 + assert np.sum(intervals_pipeline_pred.df.isna().values) == 0 + + +@pytest.fixture() +def naive_pipeline(): + return get_naive_pipeline(horizon=5) + + +@pytest.fixture() +def naive_pipeline_with_transforms(): + return get_naive_pipeline_with_transforms(horizon=5) + + +def test_pipeline_ref_initialized(naive_pipeline): + intervals_pipeline = DummyPredictionIntervals(pipeline=naive_pipeline) + + assert hasattr(intervals_pipeline, "pipeline") + assert intervals_pipeline.pipeline is naive_pipeline + + +def test_ts_property(naive_pipeline): + intervals_pipeline = DummyPredictionIntervals(pipeline=naive_pipeline) + + assert hasattr(intervals_pipeline, "ts") + assert intervals_pipeline.ts is naive_pipeline.ts + + +def test_predict_default_error(example_tsds, naive_pipeline): + intervals_pipeline = DummyPredictionIntervals(pipeline=naive_pipeline) + intervals_pipeline.fit(ts=example_tsds) + + with pytest.raises(NotImplementedError, match="In-sample sample prediction is not supported"): + _ = intervals_pipeline.predict(ts=example_tsds) + + +@pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms")) +def test_pipeline_fit_forecast(example_tsds, pipeline_name, request): + pipeline = request.getfixturevalue(pipeline_name) + + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + + intervals_pipeline.fit(ts=example_tsds) + + intervals_pipeline_pred = intervals_pipeline.forecast(prediction_interval=False) + pipeline_pred = pipeline.forecast(prediction_interval=False) + + pd.testing.assert_frame_equal(intervals_pipeline_pred.df, pipeline_pred.df) + + +@pytest.mark.parametrize("pipeline_name", ("naive_pipeline", "naive_pipeline_with_transforms")) +def test_forecast_with_fitted_pipeline(example_tsds, pipeline_name, request): + pipeline = request.getfixturevalue(pipeline_name) + + pipeline.fit(ts=example_tsds) + pipeline_pred = pipeline.forecast(prediction_interval=False) + + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + intervals_pipeline_pred = intervals_pipeline.forecast(prediction_interval=False) + + pd.testing.assert_frame_equal(intervals_pipeline_pred.df, pipeline_pred.df) + + +@pytest.mark.parametrize( + "expected_columns", + ({"target", "target_lower", "target_upper"},), +) +@pytest.mark.parametrize( + "pipeline", + ( + get_naive_pipeline(horizon=1), + get_naive_pipeline_with_transforms(horizon=1), + AutoRegressivePipeline(model=NaiveModel(), horizon=1), + HierarchicalPipeline( + model=NaiveModel(), + horizon=1, + reconciliator=BottomUpReconciliator(target_level="market", source_level="product"), + ), + ), +) +def test_pipelines_forecast_intervals(product_level_constant_hierarchical_ts, pipeline, expected_columns): + run_base_pipeline_compat_check( + ts=product_level_constant_hierarchical_ts, pipeline=pipeline, expected_columns=expected_columns + ) + + +@pytest.mark.parametrize( + "expected_columns", + ({"target", "target_lower", "target_upper"},), +) +@pytest.mark.parametrize( + "ensemble", + ( + DirectEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=2)]), + VotingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]), + StackingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]), + ), +) +def test_ensembles_forecast_intervals(example_tsds, ensemble, expected_columns): + run_base_pipeline_compat_check(ts=example_tsds, pipeline=ensemble, expected_columns=expected_columns) + + +@pytest.mark.parametrize( + "pipeline,expected_params_to_tune", + ( + ( + Pipeline( + model=SeasonalMovingAverageModel(), transforms=[DeseasonalityTransform(in_column="target", period=7)] + ), + { + "pipeline.model.window": IntDistribution(low=1, high=10), + "pipeline.transforms.0.model": CategoricalDistribution(["additive", "multiplicative"]), + "width": FloatDistribution(low=-5.0, high=5.0), + }, + ), + ( + AutoRegressivePipeline(model=CatBoostPerSegmentModel(), transforms=[DateFlagsTransform()], horizon=1), + { + "pipeline.model.learning_rate": FloatDistribution(low=1e-4, high=0.5, log=True), + "pipeline.model.depth": IntDistribution(low=1, high=11, step=1), + "pipeline.model.l2_leaf_reg": FloatDistribution(low=0.1, high=200.0, log=True), + "pipeline.model.random_strength": FloatDistribution(low=1e-05, high=10.0, log=True), + "pipeline.transforms.0.day_number_in_week": CategoricalDistribution([False, True]), + "pipeline.transforms.0.day_number_in_month": CategoricalDistribution([False, True]), + "pipeline.transforms.0.day_number_in_year": CategoricalDistribution([False, True]), + "pipeline.transforms.0.week_number_in_month": CategoricalDistribution([False, True]), + "pipeline.transforms.0.week_number_in_year": CategoricalDistribution([False, True]), + "pipeline.transforms.0.month_number_in_year": CategoricalDistribution([False, True]), + "pipeline.transforms.0.season_number": CategoricalDistribution([False, True]), + "pipeline.transforms.0.year_number": CategoricalDistribution([False, True]), + "pipeline.transforms.0.is_weekend": CategoricalDistribution([False, True]), + "width": FloatDistribution(low=-5.0, high=5.0), + }, + ), + ( + HierarchicalPipeline( + model=SeasonalMovingAverageModel(), + transforms=[DeseasonalityTransform(in_column="target", period=7)], + horizon=1, + reconciliator=BottomUpReconciliator(target_level="market", source_level="product"), + ), + { + "pipeline.model.window": IntDistribution(low=1, high=10), + "pipeline.transforms.0.model": CategoricalDistribution(["additive", "multiplicative"]), + "width": FloatDistribution(low=-5.0, high=5.0), + }, + ), + ), +) +def test_params_to_tune(pipeline, expected_params_to_tune): + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + + params_to_tune = intervals_pipeline.params_to_tune() + + assert params_to_tune == expected_params_to_tune + + +@pytest.mark.parametrize( + "pipeline", + ( + Pipeline(model=LinearPerSegmentModel(), transforms=[DateFlagsTransform()]), + AutoRegressivePipeline(model=LinearPerSegmentModel(), transforms=[DateFlagsTransform()], horizon=1), + HierarchicalPipeline( + model=LinearPerSegmentModel(), + transforms=[DateFlagsTransform()], + horizon=1, + reconciliator=BottomUpReconciliator(target_level="market", source_level="product"), + ), + ), +) +def test_valid_params_sampling(product_level_constant_hierarchical_ts, pipeline): + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + assert_sampling_is_valid(intervals_pipeline=intervals_pipeline, ts=product_level_constant_hierarchical_ts) + + +@pytest.mark.parametrize( + "pipeline", + (VotingEnsemble(pipelines=[get_naive_pipeline(horizon=1), get_naive_pipeline_with_transforms(horizon=1)]),), +) +def test_default_params_to_tune_error(pipeline): + intervals_pipeline = DummyPredictionIntervals(pipeline=pipeline) + + with pytest.raises(NotImplementedError, match=f"{pipeline.__class__.__name__} doesn't support"): + _ = intervals_pipeline.params_to_tune() diff --git a/tests/test_experimental/test_prediction_intervals/utils.py b/tests/test_experimental/test_prediction_intervals/utils.py new file mode 100644 index 000000000..25e32e4d8 --- /dev/null +++ b/tests/test_experimental/test_prediction_intervals/utils.py @@ -0,0 +1,28 @@ +from typing import Callable +from typing import Optional + +import optuna + +from etna.auto.utils import suggest_parameters +from etna.datasets import TSDataset +from etna.experimental.prediction_intervals import BasePredictionIntervals + + +def assert_sampling_is_valid( + intervals_pipeline: BasePredictionIntervals, + ts: TSDataset, + seed: int = 0, + n_trials: int = 3, + skip_parameters: Optional[Callable] = None, +): + params_to_tune = intervals_pipeline.params_to_tune() + + def _objective(trial: optuna.Trial) -> float: + parameters = suggest_parameters(trial, params_to_tune) + if skip_parameters is None or not skip_parameters(parameters): + new_intervals_pipeline = intervals_pipeline.set_params(**parameters) + new_intervals_pipeline.fit(ts) + return 0.0 + + study = optuna.create_study(sampler=optuna.samplers.RandomSampler(seed=seed)) + study.optimize(_objective, n_trials=n_trials)