From 3c501c026e622c4a2c361837b9bac9f853f2ec50 Mon Sep 17 00:00:00 2001 From: Michael Fitzgerald Date: Mon, 23 Dec 2024 08:33:55 +1100 Subject: [PATCH] better memory management for huge datasets --- astrosource/analyse.py | 33 ++++++++++++++++++++++++++++----- astrosource/comparison.py | 7 ++++--- astrosource/periodic.py | 2 +- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/astrosource/analyse.py b/astrosource/analyse.py index a317a02..a848b78 100644 --- a/astrosource/analyse.py +++ b/astrosource/analyse.py @@ -13,7 +13,7 @@ from tqdm import tqdm #import traceback import logging -from multiprocessing import Pool, cpu_count +from multiprocessing import Pool, cpu_count, shared_memory import traceback #from astrosource.utils import photometry_files_to_array, AstrosourceException from astrosource.utils import AstrosourceException @@ -25,7 +25,7 @@ matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.colors as colors - +from functools import partial logger = logging.getLogger('astrosource') @@ -58,7 +58,14 @@ def get_total_counts(photFileArray, compFile, loopLength, photCoords): #logger.debug(allCountsArray) return allCountsArray -def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, minimumNoOfObs): +#def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, minimumNoOfObs): +def process_varsearch_target(target, photFileArray_shape, photFileArray_dtype, shm_name, allCountsArray, matchRadius, minimumNoOfObs): + + # Attach to the shared memory for photFileArray + existing_shm = shared_memory.SharedMemory(name=shm_name) + photFileArray = np.ndarray(photFileArray_shape, dtype=photFileArray_dtype, buffer=existing_shm.buf) + + diffMagHolder = [] for allcountscount, photFile in enumerate(photFileArray): @@ -84,6 +91,8 @@ def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, if diffMagHolder.size == sizeBefore: break + existing_shm.close() + # Append to output if sufficient observations are available if diffMagHolder.size > minimumNoOfObs: return [target[0], target[1], np.median(diffMagHolder), np.std(diffMagHolder), diffMagHolder.size] @@ -91,11 +100,21 @@ def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, def process_varsearch_targets_multiprocessing(targetFile, photFileArray, allCountsArray, matchRadius, minimumNoOfObs): + # Create shared memory for photFileArray + # As this can lead to gigantic RAM use for large datasets + shm = shared_memory.SharedMemory(create=True, size=photFileArray.nbytes) + shared_photFileArray = np.ndarray(photFileArray.shape, dtype=photFileArray.dtype, buffer=shm.buf) + shared_photFileArray[:] = photFileArray[:] + + # Partial function to pass shared arguments - from functools import partial + worker = partial( process_varsearch_target, - photFileArray=photFileArray, + #photFileArray=photFileArray, + photFileArray_shape=photFileArray.shape, + photFileArray_dtype=photFileArray.dtype, + shm_name=shm.name, allCountsArray=allCountsArray, matchRadius=matchRadius, minimumNoOfObs=minimumNoOfObs, @@ -105,6 +124,10 @@ def process_varsearch_targets_multiprocessing(targetFile, photFileArray, allCoun with Pool(processes=max([cpu_count()-1,1])) as pool: results = pool.map(worker, targetFile) + # Clean up shared memory + shm.close() + shm.unlink() + # Filter out None results return [res for res in results if res is not None] diff --git a/astrosource/comparison.py b/astrosource/comparison.py index 813e9f7..a0ba96f 100644 --- a/astrosource/comparison.py +++ b/astrosource/comparison.py @@ -338,9 +338,10 @@ def ensemble_comparisons(photFileArray, compFile, parentPath, photSkyCoord): # return comp_diff_mag, instr_mag def process_phot_file_for_variation(args): - q, matchCoord, photSkyCoord, photFileArray, fileCount = args + #q, matchCoord, photSkyCoord, photFileArray, fileCount = args + q, matchCoord, photSkyCoord, photFile, fileCount = args idx, _, _ = matchCoord.match_to_catalog_sky(photSkyCoord[q]) - photFile = photFileArray[q] + #photFile = photFileArray[q] compDiffMags = 2.5 * np.log10(photFile[idx, 4] / fileCount[q]) instrMags = -2.5 * np.log10(photFile[idx, 4]) return compDiffMags, instrMags @@ -416,7 +417,7 @@ def calculate_comparison_variation(compFile, photFileArray, fileCount, parentPat # Case for single comparison star matchCoord = SkyCoord(ra=compFile[0] * degree, dec=compFile[1] * degree) - args = [(q, matchCoord, photSkyCoord, photFileArray, fileCount) for q in range(len(photFileArray))] + args = [(q, matchCoord, photSkyCoord, photFileArray[q], fileCount) for q in range(len(photFileArray))] with Pool(processes=max([cpu_count()-1,1])) as pool: results = pool.map(process_phot_file_for_variation, args) diff --git a/astrosource/periodic.py b/astrosource/periodic.py index 96e4801..54268c4 100644 --- a/astrosource/periodic.py +++ b/astrosource/periodic.py @@ -19,7 +19,7 @@ from astropy.timeseries import LombScargle logger = logging.getLogger('astrosource') -NCPUS = 1 +#NCPUS = 1 # Note that the functions that calculate the ANOVA periodograms have been adapted from the astrobase codeset # These are aov_theta, resort_by_time, get_frequency_grid, sigclip_magseries, phase_magseries, aov_periodfind, phase_magseries_with_errs, aovhm_theta, aovhm_periodfind