Skip to content

Commit

Permalink
Merge pull request #97 from AllenNeuralDynamics/han_fix_docDB_query
Browse files Browse the repository at this point in the history
fix (docDB): fetching dynamic foraging sessions by software name
  • Loading branch information
hanhou authored Nov 28, 2024
2 parents 37b259c + 5ffa67b commit dae1773
Showing 1 changed file with 36 additions and 27 deletions.
63 changes: 36 additions & 27 deletions code/util/fetch_data_docDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
@st.cache_data(ttl=3600*12) # Cache the df_docDB up to 12 hours
def load_data_from_docDB():
client = load_client()
df = fetch_fip_data(client)
df = fetch_dynamic_foraging_data(client)
return df

@st.cache_resource
Expand All @@ -39,14 +39,14 @@ def fetch_individual_procedures(r):
else: # pre Surgery
sub_procs = (r.get('procedures') or {}).get('subject_procedures') or {}
yield from sub_procs

def fetch_fiber_probes(r):
probes = []
for sp in fetch_individual_procedures(r):
if sp['procedure_type'] == 'Fiber implant':
if sp.get('procedure_type') == 'Fiber implant':
probes += sp['probes']
return probes

def fetch_injections(r):
injections=[]
for sp in fetch_individual_procedures(r):
Expand All @@ -60,7 +60,7 @@ def fetch_injections(r):
})

return injections


def get_viruses(injections):
virus_names = []
Expand All @@ -86,7 +86,7 @@ def get_viruses(injections):
else:
NM_recorded.append(NM)
return virus_names, NM_recorded


def strip_dict_for_id(co_asset_id_dict_list):
result_list = []
Expand All @@ -109,32 +109,32 @@ def strip_dict_for_id(co_asset_id_dict_list):

return result_list

def fetch_fip_data(client):
# search for records that have the "fib" (for fiber photometry) modality in data_description
logger.warning("fetching 'fib' records...")
modality_results = client.retrieve_docdb_records(
filter_query={"data_description.modality.abbreviation": "fib"},
def fetch_dynamic_foraging_data(client):
# To compare the new FIP pipeline with my temporary pipeline for all dynamic foraging sessions,
# let's directly query the software name
logger.warning("fetching 'dynamic foraging' in software name...")
software_name_results = client.retrieve_docdb_records(
filter_query={"session.data_streams.software.name": "dynamic-foraging-task",
"name": {"$not": {"$regex": ".*processed.*"}}, # only raw data
},
paginate_batch_size=500
)
logger.warning(f"found {len(modality_results)} results")
logger.warning(f"found {len(software_name_results)} results")

# there are more from the past that didn't specify modality correctly.
# until this is fixed, need to guess by asset name
logger.warning("fetching FIP records by name...")
name_results = client.retrieve_docdb_records(
name_FIP_results = client.retrieve_docdb_records(
filter_query={"name": {"$regex": "^FIP.*"}},
paginate_batch_size=500
)
logger.warning(f"found {len(name_results)} results")
logger.warning(f"found {len(name_FIP_results)} results")

# in case there is overlap between these two queries, filter down to a single list with unique IDs
unique_results_by_id = {**{ r['_id']: r for r in modality_results }, **{ r['_id']: r for r in name_results }}
unique_results_by_id = {**{ r['_id']: r for r in software_name_results }, **{ r['_id']: r for r in name_FIP_results }}
results = list(unique_results_by_id.values())
logger.warning(f"found {len(results)} unique results")

# filter out results with 'processed' in the name because I can't rely on data_description.data_level :(
results = [ r for r in results if not 'processed' in r['name'] ]


# make a dataframe
records_df = pd.DataFrame.from_records([map_record_to_dict(d) for d in results ])

Expand Down Expand Up @@ -166,25 +166,35 @@ def fetch_fip_data(client):
return records_df




def map_record_to_dict(record):
""" function to map a metadata dictionary to a simpler dictionary with the fields we care about """
dd = record.get('data_description', {}) or {}
co_data_asset_id_raw = record.get('external_links')
creation_time = dd.get('creation_time', '') or ''
subject = record.get('subject', {}) or {}
subject_id = subject.get('subject_id') or ''
subject_genotype = subject.get('genotype') or ''
session = record.get('session') or {}
task_type = session.get('session_type') or ''
subject_genotype = subject.get("genotype") or ""
session = record.get("session") or {}
task_type = session.get("session_type") or ""

# -- Check whether FIP exists
# per this issue, https://github.com/AllenNeuralDynamics/dynamic-foraging-task/issues/1056
# here I check whether fib exists in the data streams
data_streams = session.get("data_streams") or []
has_fib_in_data_streams = any(
[
"fib" in (mod.get("abbreviation") or "")
for ds in (session.get("data_streams") or [])
for mod in (ds.get("stream_modalities") or [])
]
)

try:
injections = fetch_injections(record)
virus_names, NM_recorded = get_viruses(injections)
except:
injections, virus_names, NM_recorded = [], [], []

return {
'session_loc': record['location'],
'session_name': record['name'],
Expand All @@ -196,6 +206,7 @@ def map_record_to_dict(record):
'injections': str(injections),
'task_type': task_type,
'virus':virus_names,
'has_fib_in_data_streams': has_fib_in_data_streams,
# 'NM_recorded':NM_recorded

}
Expand All @@ -206,5 +217,3 @@ def find_result(x, lookup):
if result_name.startswith(x):
return result
return {}


0 comments on commit dae1773

Please sign in to comment.