Skip to content

Commit

Permalink
Merge pull request #4 from statisticsnorway/kildomat
Browse files Browse the repository at this point in the history
Add kildomat processing to pre-inndata
  • Loading branch information
arneso-ssb authored Nov 22, 2024
2 parents 0023b1a + afa5c61 commit de82954
Show file tree
Hide file tree
Showing 12 changed files with 1,018 additions and 517 deletions.
14 changes: 8 additions & 6 deletions config/settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

[default]
dapla_team = "tip-tutorials"
kildedata_root_dir = "@format gs://ssb-{this.dapla_team}-data-kilde-prod/frost"
product_root_dir = "@format gs://ssb-{this.dapla_team}-data-produkt-prod"
short_name = "metstat" # statistikkens kortnavn, metstat for meteorologisk statistikk
kildedata_root_dir = "@format gs://ssb-{this.dapla_team}-data-kilde-prod/{this.short_name}/frost"
product_root_dir = "@format gs://ssb-{this.dapla_team}-data-produkt-prod/{this.short_name}"
pre_inndata_dir = "@format {this.product_root_dir}/inndata/temp/pre-inndata/frost"
weather_stations_kildedata_file = "@format {this.kildedata_root_dir}/weather_stations_v1.json"
collect_from_date = "2011-01-01"
collect_to_date = "2012-01-01"
weather_station_names = ["OSLO - BLINDERN", "KONGSVINGER"]

[daplalab_files]
kildedata_root_dir = "/buckets/kilde/frost"
product_root_dir = "/buckets/produkt"
kildedata_root_dir = "@format /buckets/kilde/{this.short_name}/frost"
product_root_dir = "@format /buckets/produkt/{this.short_name}"

[local_files] # Relative paths to the config directory
kildedata_root_dir = "../data/kildedata/frost"
product_root_dir = "../data"
kildedata_root_dir = "@format ../data/{this.short_name}/kildedata/frost"
product_root_dir = "@format ../data/{this.short_name}"
915 changes: 455 additions & 460 deletions poetry.lock

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions src/functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
them available in the settings variable.
The default config environment is working with buckets on Dapla. You can change the
environment py setting the env variable to `daplalab_files` or `local_files`.
environment by setting the env variable to `daplalab_files` or `local_files`.
See the file `config/settings.toml` for details.
"""

Expand Down Expand Up @@ -38,16 +38,10 @@ def absolute_path(relative_path: str) -> Path:
environments=True,
env="default", # Change this to switch environment: daplalab_files or local_files
validators=[
Validator(
"dapla_team",
"collect_from_date",
"collect_to_date",
"weather_station_names",
must_exist=True,
),
Validator(
"kildedata_root_dir",
"product_root_dir",
"pre_inndata_dir",
"weather_stations_kildedata_file",
must_exist=True,
cast=Path,
Expand All @@ -56,10 +50,18 @@ def absolute_path(relative_path: str) -> Path:
Validator(
"kildedata_root_dir",
"product_root_dir",
"pre_inndata_dir",
"weather_stations_kildedata_file",
must_exist=True,
cast=absolute_path,
env="local_files",
),
Validator(
"dapla_team",
"collect_from_date",
"collect_to_date",
"weather_station_names",
must_exist=True,
),
],
)
73 changes: 73 additions & 0 deletions src/functions/file_abstraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Any
from typing import cast

import dapla as dp
import pandas as pd
from dapla import FileClient


Expand Down Expand Up @@ -50,6 +52,49 @@ def read_json_file(filepath: Path | str) -> list[dict[str, Any]]:
return cast(list[dict[str, Any]], json.load(file))


def write_parquet_file(filepath: Path | str, df: pd.DataFrame) -> None:
"""Writes a dataframe to a parquet file stored in a GCS bucket or in a local file system.
Args:
filepath: The path to the file where the data should be written.
Use the `pathlib.Path` type if it is a file on a file system.
Use the `str` type if it is a file stored in a GCS bucket.
df: The dataframe to be written to the file.
Raises:
TypeError: If the `filepath` is not of type `Path` or `str`.
"""
_validate_filepath(filepath)
if isinstance(filepath, Path):
df.to_parquet(filepath)
elif isinstance(filepath, str):
dp.write_pandas(df=df, gcs_path=filepath)


def read_parquet_file(filepath: Path | str) -> pd.DataFrame:
"""Read a parquet file stored in a GCS bucket or in a local file system to a dataframe.
Args:
filepath: The path to the file which should be read.
Use the `pathlib.Path` type if it is a file on a file system.
Use the `str` type if it is a file stored in a GCS bucket.
Returns:
The content of the parquet file.
Raises:
TypeError: If the `filepath` is not of type `Path` or `str`.
"""
_validate_filepath(filepath)
if isinstance(filepath, Path):
return pd.read_parquet(filepath)
elif isinstance(filepath, str):
result = dp.read_pandas(gcs_path=filepath)
if not isinstance(result, pd.DataFrame):
raise TypeError("Expected a pandas DataFrame but got a different type")
return result


def add_filename_to_path(filepath: Path | str, filename: str) -> Path | str:
"""Add a filename to a filepath, handling both filepath as Path and str.
Expand All @@ -72,6 +117,34 @@ def add_filename_to_path(filepath: Path | str, filename: str) -> Path | str:
return f"{filepath}/{filename}"


def create_dir_if_not_exist(directory: Path | str) -> None:
"""Create directory if it does not exist, handling both directory as Path and str.
The function handles the case on DaplaLab where the first two levels of the
directory path are read-only, for example `/bucket/produkt`.
If the type is `str`, that means representing a path in a GCS bucket, there is
no need to do anything. Since directories does not exist in a bucket.
Args:
directory: The directory to check or create.
Use the `pathlib.Path` type if it is a file on a file system.
Use the `str` type if it is a file stored in a GCS bucket.
"""
if isinstance(directory, Path):
if str(directory).startswith("/buckets"):
parts = directory.parts
if len(parts) < 3:
raise ValueError("The provided path must have at least three levels.")

# Construct the writable path starting from the third level
writable_path = Path(*parts[:2]) / Path(*parts[2:])
if not writable_path.exists():
writable_path.mkdir(parents=True, exist_ok=True)
else:
directory.mkdir(parents=True, exist_ok=True)


def _validate_filepath(filepath: Path | str) -> None:
if not isinstance(filepath, Path | str):
raise TypeError("Expected filepath to be of type Path or str.")
36 changes: 4 additions & 32 deletions src/notebooks/collect_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# FROST_CLIENT_ID="5dc4-mange-nummer-e71cc"

import os
from pathlib import Path
from typing import Any
from typing import cast

Expand All @@ -14,6 +13,7 @@

from functions.config import settings
from functions.file_abstraction import add_filename_to_path
from functions.file_abstraction import create_dir_if_not_exist
from functions.file_abstraction import read_json_file
from functions.file_abstraction import write_json_file
from functions.versions import get_latest_file_version
Expand Down Expand Up @@ -172,36 +172,8 @@ def get_weather_stations_ids(
return [name_to_id[name] for name in weather_stations_names]


def create_dir_if_not_exist(directory: Path | str) -> None:
"""Create directory if it does not exist, handling both directory as Path and str.
The function handles the case on DaplaLab where the first two levels of the
directory path are read-only, for example `/bucket/produkt`.
If the type is `str`, that means representing a path in a GCS bucket, there is
no need to do anything. Since directories does not exist in a bucket.
Args:
directory: The directory to check or create.
Use the `pathlib.Path` type if it is a file on a file system.
Use the `str` type if it is a file stored in a GCS bucket.
"""
if isinstance(directory, Path):
if str(directory).startswith("/buckets"):
parts = directory.parts
if len(parts) < 3:
raise ValueError("The provided path must have at least three levels.")

# Construct the writable path starting from the third level
writable_path = Path(*parts[:2]) / Path(*parts[2:])
if not writable_path.exists():
writable_path.mkdir(parents=True, exist_ok=True)
else:
directory.mkdir(parents=True, exist_ok=True)


def run() -> None:
"""Run functions in this module."""
def run_all() -> None:
"""Run the code in this module."""
create_dir_if_not_exist(settings.kildedata_root_dir)

print("Start collecting data.")
Expand All @@ -213,4 +185,4 @@ def run() -> None:


if __name__ == "__main__":
run()
run_all()
Loading

0 comments on commit de82954

Please sign in to comment.