Skip to content

Commit

Permalink
better memory management for huge datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
mfitzasp committed Dec 22, 2024
1 parent 6506490 commit 3c501c0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
33 changes: 28 additions & 5 deletions astrosource/analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tqdm import tqdm
#import traceback
import logging
from multiprocessing import Pool, cpu_count
from multiprocessing import Pool, cpu_count, shared_memory
import traceback
#from astrosource.utils import photometry_files_to_array, AstrosourceException
from astrosource.utils import AstrosourceException
Expand All @@ -25,7 +25,7 @@
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.colors as colors

from functools import partial
logger = logging.getLogger('astrosource')


Expand Down Expand Up @@ -58,7 +58,14 @@ def get_total_counts(photFileArray, compFile, loopLength, photCoords):
#logger.debug(allCountsArray)
return allCountsArray

def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, minimumNoOfObs):
#def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius, minimumNoOfObs):
def process_varsearch_target(target, photFileArray_shape, photFileArray_dtype, shm_name, allCountsArray, matchRadius, minimumNoOfObs):

# Attach to the shared memory for photFileArray
existing_shm = shared_memory.SharedMemory(name=shm_name)
photFileArray = np.ndarray(photFileArray_shape, dtype=photFileArray_dtype, buffer=existing_shm.buf)


diffMagHolder = []

for allcountscount, photFile in enumerate(photFileArray):
Expand All @@ -84,18 +91,30 @@ def process_varsearch_target(target, photFileArray, allCountsArray, matchRadius,
if diffMagHolder.size == sizeBefore:
break

existing_shm.close()

# Append to output if sufficient observations are available
if diffMagHolder.size > minimumNoOfObs:
return [target[0], target[1], np.median(diffMagHolder), np.std(diffMagHolder), diffMagHolder.size]
return None


def process_varsearch_targets_multiprocessing(targetFile, photFileArray, allCountsArray, matchRadius, minimumNoOfObs):
# Create shared memory for photFileArray
# As this can lead to gigantic RAM use for large datasets
shm = shared_memory.SharedMemory(create=True, size=photFileArray.nbytes)
shared_photFileArray = np.ndarray(photFileArray.shape, dtype=photFileArray.dtype, buffer=shm.buf)
shared_photFileArray[:] = photFileArray[:]


# Partial function to pass shared arguments
from functools import partial

worker = partial(
process_varsearch_target,
photFileArray=photFileArray,
#photFileArray=photFileArray,
photFileArray_shape=photFileArray.shape,
photFileArray_dtype=photFileArray.dtype,
shm_name=shm.name,
allCountsArray=allCountsArray,
matchRadius=matchRadius,
minimumNoOfObs=minimumNoOfObs,
Expand All @@ -105,6 +124,10 @@ def process_varsearch_targets_multiprocessing(targetFile, photFileArray, allCoun
with Pool(processes=max([cpu_count()-1,1])) as pool:
results = pool.map(worker, targetFile)

# Clean up shared memory
shm.close()
shm.unlink()

# Filter out None results
return [res for res in results if res is not None]

Expand Down
7 changes: 4 additions & 3 deletions astrosource/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,10 @@ def ensemble_comparisons(photFileArray, compFile, parentPath, photSkyCoord):
# return comp_diff_mag, instr_mag

def process_phot_file_for_variation(args):
q, matchCoord, photSkyCoord, photFileArray, fileCount = args
#q, matchCoord, photSkyCoord, photFileArray, fileCount = args
q, matchCoord, photSkyCoord, photFile, fileCount = args
idx, _, _ = matchCoord.match_to_catalog_sky(photSkyCoord[q])
photFile = photFileArray[q]
#photFile = photFileArray[q]
compDiffMags = 2.5 * np.log10(photFile[idx, 4] / fileCount[q])
instrMags = -2.5 * np.log10(photFile[idx, 4])
return compDiffMags, instrMags
Expand Down Expand Up @@ -416,7 +417,7 @@ def calculate_comparison_variation(compFile, photFileArray, fileCount, parentPat
# Case for single comparison star
matchCoord = SkyCoord(ra=compFile[0] * degree, dec=compFile[1] * degree)

args = [(q, matchCoord, photSkyCoord, photFileArray, fileCount) for q in range(len(photFileArray))]
args = [(q, matchCoord, photSkyCoord, photFileArray[q], fileCount) for q in range(len(photFileArray))]

with Pool(processes=max([cpu_count()-1,1])) as pool:
results = pool.map(process_phot_file_for_variation, args)
Expand Down
2 changes: 1 addition & 1 deletion astrosource/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from astropy.timeseries import LombScargle

logger = logging.getLogger('astrosource')
NCPUS = 1
#NCPUS = 1

# Note that the functions that calculate the ANOVA periodograms have been adapted from the astrobase codeset
# These are aov_theta, resort_by_time, get_frequency_grid, sigclip_magseries, phase_magseries, aov_periodfind, phase_magseries_with_errs, aovhm_theta, aovhm_periodfind
Expand Down

0 comments on commit 3c501c0

Please sign in to comment.