Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] ClearMLLogger #546

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/api_reference/loggers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Loggers:
LocalFileLogger
S3FileLogger
WandbLogger
ClearMLLogger

There is global object :code:`tslogger` that can be imported. It has a class:

Expand Down
2 changes: 2 additions & 0 deletions etna/loggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@

if SETTINGS.wandb_required:
from etna.loggers.wandb_logger import WandbLogger
if SETTINGS.clearml_required:
from etna.loggers.clearml_logger import ClearMLLogger

tslogger = _Logger()
259 changes: 259 additions & 0 deletions etna/loggers/clearml_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
import base64
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Union
from uuid import uuid4

import pandas as pd

from etna import SETTINGS
from etna.loggers.base import BaseLogger

if TYPE_CHECKING:
from etna.datasets import TSDataset

if SETTINGS.clearml_required:
from clearml import Task
from clearml import TaskTypes


class ClearMLLogger(BaseLogger):
"""ClearML logger.

Note
----
This logger requires ``clearml`` extension to be installed.
Read more about this at :ref:`installation page <installation>`.
"""

def __init__(
self,
project_name: Optional[str] = None,
task_name: Optional[str] = None,
task_name_prefix: str = "",
task_type: Task.TaskTypes = TaskTypes.training,
tags: Optional[Sequence[str]] = None,
output_uri: Optional[Union[str, bool]] = None,
auto_connect_frameworks: Union[bool, Mapping[str, Union[bool, str, list]]] = False,
auto_resource_monitoring: Union[bool, Mapping[str, Any]] = True,
auto_connect_streams: Union[bool, Mapping[str, bool]] = True,
plot: bool = True,
table: bool = True,
config: Optional[Dict[str, Any]] = None,
):
"""
Create instance of ClearMLLogger.

Parameters
----------
project_name:
The name of the project in which the experiment will be created.
task_name
The name of Task (experiment).
task_name_prefix:
Prefix for the Task name field.
task_type:
The task type.
tags:
Add a list of tags (str) to the created Task.
output_uri:
The default location for output models and other artifacts.
auto_connect_frameworks:
Automatically connect frameworks.
auto_resource_monitoring:
Automatically create machine resource monitoring plots.
auto_connect_streams:
Control the automatic logging of stdout and stderr.
plot:
Indicator for making and sending plots.
table:
Indicator for making and sending tables.
config:
A dictionary-like object for saving inputs to your job,
like hyperparameters for a model or settings for a data preprocessing job.

Notes
-----
For more details see <https://clear.ml/docs/latest/docs/references/sdk/task/#taskinit>

"""
super().__init__()
self.project_name = project_name
self.task_name = (
task_name_prefix + base64.urlsafe_b64encode(uuid4().bytes).decode("utf8").rstrip("=\n")[:8]
if task_name is None
else task_name
)
self.task_name_prefix = task_name_prefix
self.task_type = task_type
self.tags = tags
self.output_uri = output_uri
self.auto_connect_frameworks = auto_connect_frameworks
self.auto_resource_monitoring = auto_resource_monitoring
self.auto_connect_streams = auto_connect_streams
self.plot = plot
self.table = table
self.config = config

self._task = Task.init(
project_name=self.project_name,
task_name=self.task_name,
task_type=self.task_type,
tags=self.tags,
output_uri=self.output_uri,
auto_connect_frameworks=self.auto_connect_frameworks,
auto_resource_monitoring=self.auto_resource_monitoring,
auto_connect_streams=self.auto_connect_streams,
reuse_last_task_id=False,
)
# self._task.launch_multi_node(total_num_nodes=3)
if self.config is not None:
self._task.connect(mutable=self.config)

self._task: Optional[Task] = None

def log(self, msg: Union[str, Dict[str, Any]], **kwargs):
"""
Log any event.

This class does nothing with it, use other loggers to do it.

Parameters
----------
msg:
Message or dict to log
kwargs:
Additional parameters for particular implementation
"""
pass

def log_backtest_metrics(
self, ts: "TSDataset", metrics_df: pd.DataFrame, forecast_df: pd.DataFrame, fold_info_df: pd.DataFrame
):
"""
Write metrics to logger.

Parameters
----------
ts:
TSDataset to with backtest data
metrics_df:
Dataframe produced with :py:meth:`etna.pipeline.Pipeline._get_backtest_metrics`
forecast_df:
Forecast from backtest
fold_info_df:
Fold information from backtest
"""

"""
from etna.analysis import plot_backtest_interactive
from etna.datasets import TSDataset
from etna.metrics.utils import aggregate_metrics_df

if self.table:
self.task.logger.report_table(title="Metrics", series=self.job_type, table_plot=metrics_df)
self.task.logger.report_table(
title="Forecast", series=self.job_type, table_plot=TSDataset.to_flatten(forecast_df)
)
self.task.logger.report_table(title="Fold info", series=self.job_type, table_plot=fold_info_df)

if self.plot:
fig = plot_backtest_interactive(forecast_df, ts, history_len=100)
self.task.logger.report_plotly(title="Plot backtest forecast", series=self.job_type, figure=fig)

metrics_dict = aggregate_metrics_df(metrics_df)
for metric, value in metrics_dict.items():
self.task.logger.report_single_value(name=metric, value=value)
"""

def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test: pd.DataFrame):
"""
Backtest metrics from one fold to logger.

Parameters
----------
metrics:
Dataframe with metrics from backtest fold
forecast:
Dataframe with forecast
test:
Dataframe with ground truth
"""

"""
from etna.datasets import TSDataset
from etna.metrics.utils import aggregate_metrics_df

columns_name = list(metrics.columns)
metrics = metrics.reset_index()
metrics.columns = ["segment"] + columns_name
if self.table:
self.task.logger.report_table(title="Metrics", series=self.job_type, iteration=self.fold_id, table_plot=metrics)
self.task.logger.report_table(
title="Forecast", series=self.job_type, iteration=self.fold_id, table_plot=TSDataset.to_flatten(forecast)
)
self.task.logger.report_table(title="Test", series=self.job_type, iteration=self.fold_id, table_plot=TSDataset.to_flatten(test))

metrics_dict = aggregate_metrics_df(metrics)
#metrics_dict = pd.Series(metrics_dict).to_frame()
#self.task.logger.report_table(title="Metrics Summary", series=self.job_type, iteration=self.fold_id, table_plot=metrics_dict)
for metric, value in metrics_dict.items():
self.task.logger.report_scalar(title=metric, series=self.job_type, iteration=self.fold_id, value=value)
print(str(self.job_type) + str(self.fold_id))
#self.task.flush()
"""


def start_experiment(self, job_type: Optional[str] = None, group: Optional[str] = None, *args, **kwargs):
"""Start Task.

Complete logger initialization or reinitialize it before the next experiment with the same name.

Parameters
----------
job_type:
Specify the type of task, which is useful when you're grouping runs together
into larger experiments using group.
group:
Specify a group to organize individual tasks into a larger experiment.
"""

self.job_type = job_type
try:
self.fold_id = int(group)
except:
self.fold_id = group
if self._task is None:
self.reinit_task()

def reinit_task(self):
"""Reinit Task."""
self._task = Task.init(
project_name=self.project_name,
task_name=self.task_name,
task_type=self.task_type,
tags=self.tags,
output_uri=self.output_uri,
auto_connect_frameworks=self.auto_connect_frameworks,
auto_resource_monitoring=self.auto_resource_monitoring,
auto_connect_streams=self.auto_connect_streams,
reuse_last_task_id=False,
)
#self._task.launch_multi_node(total_num_nodes=3)
if self.config is not None:
self._task.connect(mutable=self.config)

def finish_experiment(self, *args, **kwargs):
"""Finish Task."""
#self._task.close()

@property
def task(self):
"""Init Task."""
if self._task is None:
self.reinit_task()
return self._task
14 changes: 14 additions & 0 deletions etna/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ def _is_statsforecast_available():
return False


def _is_clearml_available():
if _module_available("clearml"):
return True
else:
warnings.warn("etna[clearml] is not available, to install it, run `pip install etna[clearml]`")
return False


def _get_optional_value(is_required: Optional[bool], is_available_fn: Callable, assert_msg: str) -> bool:
if is_required is None:
return is_available_fn()
Expand All @@ -102,6 +110,7 @@ def __init__( # noqa: D107
classification_required: Optional[bool] = None,
auto_required: Optional[bool] = None,
statsforecast_required: Optional[bool] = None,
clearml_required: Optional[bool] = None,
):
# True – use the package
# None – use the package if available
Expand Down Expand Up @@ -134,6 +143,11 @@ def __init__( # noqa: D107
_is_statsforecast_available,
"etna[statsforecast] is not available, to install it, run `pip install etna[statsforecast]`.",
)
self.clearml_required: bool = _get_optional_value(
clearml_required,
_is_clearml_available,
"etna[clearml] is not available, to install it, run `pip install etna[clearml]`.",
)

@staticmethod
def parse() -> "Settings":
Expand Down
66 changes: 66 additions & 0 deletions examples/clearml/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import random
from typing import Optional

import hydra
import hydra_slayer
import numpy as np
import pandas as pd
from omegaconf import DictConfig
from omegaconf import OmegaConf

from pathlib import Path

from etna.datasets import TSDataset
from etna.loggers import ClearMLLogger
from etna.loggers import tslogger
from etna.pipeline import Pipeline

OmegaConf.register_new_resolver("range", lambda x, y: list(range(x, y)))
OmegaConf.register_new_resolver("sum", lambda x, y: x + y)


FILE_PATH = Path(__file__)

def set_seed(seed: int = 42):
random.seed(seed)
np.random.seed(seed)


def init_logger(config: dict, project: str = "a.p.chikov/test/clearml_basic", tags: Optional[list] = ["test", "clearml", "concurrency"]):
tslogger.loggers = []
cml_logger = ClearMLLogger(project_name=project, tags=tags, config=config)
tslogger.add(cml_logger)


def dataloader(file_path: Path, freq: str) -> TSDataset:
df = pd.read_csv(file_path)
df = TSDataset.to_dataset(df)
ts = TSDataset(df=df, freq=freq)
return ts


@hydra.main(config_name="config.yaml")
def backtest(cfg: DictConfig):
config = OmegaConf.to_container(cfg, resolve=True)

# Set seed for reproducibility
set_seed(cfg.seed)

# Load data
ts = dataloader(file_path=cfg.dataset.file_path, freq=cfg.dataset.freq)

# Init pipeline
pipeline: Pipeline = hydra_slayer.get_from_params(**config["pipeline"])

# Init backtest parameters like metrics and e.t.c.
backtest_params = hydra_slayer.get_from_params(**config["backtest"])

# Init WandB logger
init_logger(pipeline.to_dict())

# Run backtest
_, _, _ = pipeline.backtest(ts, n_jobs=3, joblib_params=dict(backend="loky"), **backtest_params)


if __name__ == "__main__":
backtest()
Loading
Loading