Skip to content

Commit

Permalink
Update grsofun_collect to use LON in filenames
Browse files Browse the repository at this point in the history
  • Loading branch information
fabern committed Aug 23, 2024
1 parent 5cb0625 commit 1a74733
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 49 deletions.
2 changes: 1 addition & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

export(grsofun)
export(grsofun_collect)
export(grsofun_collect_byilon)
export(grsofun_collect_byLON)
export(grsofun_run)
export(grsofun_run_byLON)
export(grsofun_tidy)
Expand Down
86 changes: 38 additions & 48 deletions R/grsofun_collect.R
Original file line number Diff line number Diff line change
@@ -1,36 +1,35 @@

# Reads daily output and aggregates temporally. Returning the data is optional.
# By default, the aggregated data is written to tidy files by longitudinal bands.
#' Reads daily output and aggregates temporally. Returning the data is optional.
#' By default, the aggregated data is written to tidy files by longitudinal bands.
#'
#' @export
#'
#' @param settings ...
#' @param list_of_LON_str Longitude indices to process as a vector of strings,
#' e.g. c("LON_+046.250", "LON_+046.750")
#' @param return_data Flag whether to return loaded data in the R session. If FALSE
#' only RDS files are written to disk, if TRUE RDS files are
#' written to disk AND the data.frame is returned.
#'
#' @export
grsofun_collect <- function(
settings,
list_of_LON_str, # list_of_LON_str = c("LON_+046.250", "LON_+046.750")
return_data = FALSE
){

if (settings$ncores_max == 1){

# un-parallel alternative
len_ilon <- settings$grid$len_ilon
df <- dplyr::tibble(ilon = seq(len_ilon)) |>
df <- dplyr::tibble(LON_str = list_of_LON_str) |>
dplyr::mutate(out = purrr::map(
ilon,
~grsofun_collect_byilon(
LON_str,
~grsofun_collect_byLON(
.,
settings,
return_data = return_data
))
)

if (return_data){
df <- df |>
dplyr::mutate(len = purrr::map_int(out, ~nrow(.))) |>
dplyr::filter(len > 0) |>
dplyr::select(-len) |>
tidyr::unnest(out)

}


} else {
# Parallelise by longitudinal bands on multiple cores of a single node
# number of cores to use for this thread
Expand All @@ -51,35 +50,35 @@ grsofun_collect <- function(
"here"
)) |>
multidplyr::cluster_assign(
grsofun_collect_byilon = grsofun_collect_byilon # make the function known for each core
grsofun_collect_byLON = grsofun_collect_byLON # make the function known for each core
)

# distribute computation across the cores, calculating for all longitudinal
# indices of this chunk
len_ilon <- settings$grid$len_ilon
df <- dplyr::tibble(ilon = seq(len_ilon)) |>
df <- dplyr::tibble(LON_str = list_of_LON_str) |>
multidplyr::partition(cl) |>
dplyr::mutate(out = purrr::map(
ilon,
~grsofun_collect_byilon(
LON_str,
~grsofun_collect_byLON(
.,
settings,
return_data = return_data
))
)

if (return_data){
df <- df |>
dplyr::collect() |>
dplyr::mutate(len = purrr::map_int(out, ~nrow(.))) |>
dplyr::filter(len > 0) |>
dplyr::select(-len) |>
tidyr::unnest(out)
}
) |>
dplyr::collect()

}

if (return_data){
df <- df |>
# filter out empty outputs
dplyr::mutate(len = purrr::map_int(out, ~nrow(.))) |>
dplyr::filter(len > 0) |>
dplyr::select(-len) |>
# unnest
tidyr::unnest(out) |>
dplyr::select(-LON_str)

return(df)
} else {
return(NULL)
Expand All @@ -88,13 +87,13 @@ grsofun_collect <- function(


#' @export
grsofun_collect_byilon <- function(
ilon,
grsofun_collect_byLON <- function(
LON_string, # e.g LON_string = "LON_+047.750"
settings,
return_data = FALSE
){

outpath <- paste0(settings$dir_out, settings$fileprefix, "_ilon_", ilon, ".rds")
outpath <- paste0(settings$dir_out, settings$fileprefix, "_", LON_string, ".rds")
message(paste("Reading file", outpath, "..."))
ddf <- readr::read_rds(file = outpath)

Expand All @@ -107,25 +106,16 @@ grsofun_collect_byilon <- function(
if (nrow(ddf) > 0){
# aggregate temporally to mean
# to monthly
vars <- names(settings$save[settings$save == "mon"])
vars <- names(purrr::keep(settings$save, ~(.x == "mon")))
mdf <- ddf |>
tidyr::unnest(data) |>
dplyr::mutate(
year = lubridate::year(date),
month = lubridate::month(date)
) |>
dplyr::group_by(sitename, year, month) |>
dplyr::group_by(sitename, site_info, year, month) |>
dplyr::summarise(dplyr::across(dplyr::all_of(vars), \(x) mean(x, na.rm = TRUE)), .groups = "drop") |>

# add lon and lat to data frame
dplyr::left_join(
ddf |>
dplyr::select(sitename, site_info) |>
tidyr::unnest(site_info) |>
dplyr::select(sitename, lon, lat),
by = "sitename"
)

group_by(sitename, site_info) |> tidyr::nest(.key = "monthly_data")
} else {
mdf <- dplyr::tibble()
}
Expand All @@ -134,7 +124,7 @@ grsofun_collect_byilon <- function(
return(mdf)
} else {
# write to file
outpath <- paste0(settings$dir_out, settings$fileprefix, "_mon_ilon_", ilon, ".rds")
outpath <- paste0(settings$dir_out, settings$fileprefix, "_mon_", LON_string, ".rds")
message(paste("Writing file", outpath, "..."))
readr::write_rds(file = outpath)
return(NULL)
Expand Down
23 changes: 23 additions & 0 deletions man/grsofun_collect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1a74733

Please sign in to comment.