Skip to content

Commit

Permalink
Add tmp results
Browse files Browse the repository at this point in the history
  • Loading branch information
Чиков Александр Павлович committed Dec 26, 2024
1 parent 48921d4 commit aea8e6c
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/source/api_reference/loggers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Loggers:
LocalFileLogger
S3FileLogger
WandbLogger
ClearMLLogger

There is global object :code:`tslogger` that can be imported. It has a class:

Expand Down
29 changes: 28 additions & 1 deletion etna/loggers/clearml_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ def __init__(
self.table = table
self.config = config

self._task = Task.init(
project_name=self.project_name,
task_name=self.task_name,
task_type=self.task_type,
tags=self.tags,
output_uri=self.output_uri,
auto_connect_frameworks=self.auto_connect_frameworks,
auto_resource_monitoring=self.auto_resource_monitoring,
auto_connect_streams=self.auto_connect_streams,
reuse_last_task_id=False,
)
# self._task.launch_multi_node(total_num_nodes=3)
if self.config is not None:
self._task.connect(mutable=self.config)

self._task: Optional[Task] = None

def log(self, msg: Union[str, Dict[str, Any]], **kwargs):
Expand Down Expand Up @@ -133,6 +148,8 @@ def log_backtest_metrics(
fold_info_df:
Fold information from backtest
"""

"""
from etna.analysis import plot_backtest_interactive
from etna.datasets import TSDataset
from etna.metrics.utils import aggregate_metrics_df
Expand All @@ -151,6 +168,7 @@ def log_backtest_metrics(
metrics_dict = aggregate_metrics_df(metrics_df)
for metric, value in metrics_dict.items():
self.task.logger.report_single_value(name=metric, value=value)
"""

def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test: pd.DataFrame):
"""
Expand All @@ -165,6 +183,8 @@ def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test:
test:
Dataframe with ground truth
"""

"""
from etna.datasets import TSDataset
from etna.metrics.utils import aggregate_metrics_df
Expand All @@ -179,8 +199,13 @@ def log_backtest_run(self, metrics: pd.DataFrame, forecast: pd.DataFrame, test:
self.task.logger.report_table(title="Test", series=self.job_type, iteration=self.fold_id, table_plot=TSDataset.to_flatten(test))
metrics_dict = aggregate_metrics_df(metrics)
#metrics_dict = pd.Series(metrics_dict).to_frame()
#self.task.logger.report_table(title="Metrics Summary", series=self.job_type, iteration=self.fold_id, table_plot=metrics_dict)
for metric, value in metrics_dict.items():
self.task.logger.report_scalar(title=metric, series=self.job_type, iteration=self.fold_id, value=value)
print(str(self.job_type) + str(self.fold_id))
#self.task.flush()
"""


def start_experiment(self, job_type: Optional[str] = None, group: Optional[str] = None, *args, **kwargs):
Expand All @@ -202,7 +227,8 @@ def start_experiment(self, job_type: Optional[str] = None, group: Optional[str]
self.fold_id = int(group)
except:
self.fold_id = group
#self.reinit_task()
if self._task is None:
self.reinit_task()

def reinit_task(self):
"""Reinit Task."""
Expand All @@ -217,6 +243,7 @@ def reinit_task(self):
auto_connect_streams=self.auto_connect_streams,
reuse_last_task_id=False,
)
#self._task.launch_multi_node(total_num_nodes=3)
if self.config is not None:
self._task.connect(mutable=self.config)

Expand Down
66 changes: 66 additions & 0 deletions examples/clearml/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import random
from typing import Optional

import hydra
import hydra_slayer
import numpy as np
import pandas as pd
from omegaconf import DictConfig
from omegaconf import OmegaConf

from pathlib import Path

from etna.datasets import TSDataset
from etna.loggers import ClearMLLogger
from etna.loggers import tslogger
from etna.pipeline import Pipeline

OmegaConf.register_new_resolver("range", lambda x, y: list(range(x, y)))
OmegaConf.register_new_resolver("sum", lambda x, y: x + y)


FILE_PATH = Path(__file__)

def set_seed(seed: int = 42):
random.seed(seed)
np.random.seed(seed)


def init_logger(config: dict, project: str = "a.p.chikov/test/clearml_basic", tags: Optional[list] = ["test", "clearml", "concurrency"]):
tslogger.loggers = []
cml_logger = ClearMLLogger(project_name=project, tags=tags, config=config)
tslogger.add(cml_logger)


def dataloader(file_path: Path, freq: str) -> TSDataset:
df = pd.read_csv(file_path)
df = TSDataset.to_dataset(df)
ts = TSDataset(df=df, freq=freq)
return ts


@hydra.main(config_name="config.yaml")
def backtest(cfg: DictConfig):
config = OmegaConf.to_container(cfg, resolve=True)

# Set seed for reproducibility
set_seed(cfg.seed)

# Load data
ts = dataloader(file_path=cfg.dataset.file_path, freq=cfg.dataset.freq)

# Init pipeline
pipeline: Pipeline = hydra_slayer.get_from_params(**config["pipeline"])

# Init backtest parameters like metrics and e.t.c.
backtest_params = hydra_slayer.get_from_params(**config["backtest"])

# Init WandB logger
init_logger(pipeline.to_dict())

# Run backtest
_, _, _ = pipeline.backtest(ts, n_jobs=3, joblib_params=dict(backend="loky"), **backtest_params)


if __name__ == "__main__":
backtest()
61 changes: 61 additions & 0 deletions examples/clearml/concurancy_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import random
from typing import Optional

import hydra
import hydra_slayer
import numpy as np
import pandas as pd
from omegaconf import DictConfig
from omegaconf import OmegaConf

from pathlib import Path

from etna.datasets import TSDataset
from etna.loggers import ClearMLLogger
from etna.loggers import tslogger
from etna.pipeline import Pipeline
from etna.models.nn import MLPModel
from etna.transforms import LagTransform, StandardScalerTransform
from joblib import Parallel,delayed

FILE_PATH = Path(__file__)

OmegaConf.register_new_resolver("range", lambda x, y: list(range(x, y)))
OmegaConf.register_new_resolver("sum", lambda x, y: x + y)

def set_seed(seed: int = 42):
random.seed(seed)
np.random.seed(seed)


def init_logger(config: dict, project: str = "a.p.chikov/test/clearml_basic", tags: Optional[list] = ["test", "clearml", "nn"]):
tslogger.loggers = []
cml_logger = ClearMLLogger(project_name=project, tags=tags, config=config, auto_connect_frameworks=True)
tslogger.add(cml_logger)




@hydra.main(config_name="config.yaml")
def backtest(cfg: DictConfig):
config = OmegaConf.to_container(cfg, resolve=True)

# Set seed for reproducibility
set_seed(cfg.seed)

logger = ClearMLLogger(project_name="a.p.chikov/test/clearml_basic", task_name="conc_example",
tags=["test", "clearml", "nn"], auto_connect_frameworks=True)
def log(i, ob):
logger.task.logger.report_scalar(title="lol", series=f"lol_{ob}", iteration=i, value=i)

obj = np.random.randint(0, 3, size=1000)

Parallel(n_jobs=3)(
delayed(log)(i, ob) for i,ob in enumerate(obj)
)




if __name__ == "__main__":
backtest()
Loading

0 comments on commit aea8e6c

Please sign in to comment.