Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DIG-1778: Improve timing of query calls #52

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 50 additions & 69 deletions query_server/query_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ def add_or_increment(dict, key):
else:
dict[key] = 1

def get_summary_stats(donors, primary_diagnoses, headers):
def get_summary_stats(donors, primary_sites, treatments):
# Perform (and cache) summary statistics
age_at_diagnosis = {}
donors_by_id = {}
primary_site_count = {}
patients_per_cohort = {}
treatment_type_count = {}
for donor in donors:
# A donor's date of birth is defined as the (negative) interval between actual DOB and the date of first diagnosis
# So we just use that info
Expand All @@ -97,35 +98,19 @@ def get_summary_stats(donors, primary_diagnoses, headers):
add_or_increment(age_at_diagnosis, f'{age}-{age+9} Years')

program_id = donor['program_id']
if program_id in patients_per_cohort:
patients_per_cohort[program_id] += 1
elif program_id is not None:
patients_per_cohort[program_id] = 1

# primary sites
for primary_diagnosis in primary_diagnoses:
primary_site = primary_diagnosis['primary_site']
if primary_site in primary_site_count:
primary_site_count[primary_site] += 1
else:
primary_site_count[primary_site] = 1

# Treatment types
# http://candig.docker.internal:8008/v3/authorized/treatments/
treatments = requests.get(f"{config.KATSU_URL}/v3/authorized/treatments/?page_size={PAGE_SIZE}",
headers=headers)
treatments = safe_get_request_json(treatments, 'Katsu treatments')['items']
treatment_type_count = {}
for treatment in treatments:
if (treatment["submitter_donor_id"] in donors_by_id and
"treatment_type" in treatment and
treatment["treatment_type"] is not None):
try:
for treatment_type in treatment["treatment_type"]:
add_or_increment(treatment_type_count, treatment_type)
except TypeError as e:
logger.error(f"Could not grab summary treatment stats: {e}")
pass
add_or_increment(patients_per_cohort, program_id)

# primary sites
if donor['submitter_donor_id'] in primary_sites:
if primary_sites[donor['submitter_donor_id']] is not None:
for primary_site in primary_sites[donor['submitter_donor_id']]:
add_or_increment(primary_site_count, str(primary_site))
else:
add_or_increment(primary_site_count, 'None')

if donor['submitter_donor_id'] in treatments and treatments[donor['submitter_donor_id']] is not None:
for treatment_type in treatments[donor['submitter_donor_id']]:
add_or_increment(treatment_type_count, treatment_type)

return {
'age_at_diagnosis': age_at_diagnosis,
Expand Down Expand Up @@ -226,44 +211,44 @@ def query(treatment="", primary_site="", drug_name="", systemic_therapy_type="",
# We're grabbing (and storing in memory) all the donor data in Katsu with the below request

# Query the appropriate Katsu endpoint
params = { 'page_size': PAGE_SIZE }
url = f"{config.KATSU_URL}/v3/authorized/donors/"
r = safe_get_request_json(requests.get(f"{url}?{urllib.parse.urlencode(params, True)}",
# Reuse their bearer token
headers=headers), 'Katsu Donors')
donors = r['items']
url = f"{config.KATSU_URL}/v3/authorized/query/"
headers = {}
for k in request.headers.keys():
headers[k] = request.headers[k]
headers["X-Service-Token"] = config.SERVICE_TOKEN

param_mapping = [
(treatment, "treatment_type"),
(primary_site, "primary_site"),
(drug_name, "systemic_therapy_drug_name"),
(systemic_therapy_type, "systemic_therapy_type"),
(exclude_cohorts, "exclude_cohorts")
]
params = {}
for param in param_mapping:
if param[0] == "" or param[0] == []:
continue
params[param[1]] = param[0]

full_url = f"{url}?{urllib.parse.urlencode(params, doseq=True)}"
donors = safe_get_request_json(requests.get(full_url, headers=headers), 'Katsu explorer donors')['items']

# Filter on excluded cohorts
donors = [donor for donor in donors if donor['program_id'] not in exclude_cohorts]

# Will need to look into how to go about this -- ideally we implement this into the SQL in Katsu's side
filters = [
(treatment, f"{config.KATSU_URL}/v3/authorized/treatments/", 'treatment_type', None)
]
if type(systemic_therapy_type) is list:
for this_type in systemic_therapy_type:
filters.append((drug_name, f"{config.KATSU_URL}/v3/authorized/systemic_therapies/", 'drug_name', this_type))
else:
filters.append((drug_name, f"{config.KATSU_URL}/v3/authorized/systemic_therapies/", 'drug_name', None))

for (this_filter, url, param_name, therapy_type) in filters:
if this_filter != "":
permissible_donors, _ = get_donors_from_katsu(
url,
param_name,
this_filter,
headers,
therapy_type
)
donors = [donor for donor in donors if donor['submitter_donor_id'] in permissible_donors]
permissible_donors, primary_diagnoses = get_donors_from_katsu(
f"{config.KATSU_URL}/v3/authorized/primary_diagnoses/",
'primary_site',
primary_site if primary_site != "" else [],
headers,
keep_all=True
)
donors = [donor for donor in donors if donor['submitter_donor_id'] in permissible_donors]
# Note: We get three extra things from /authorized/query that aren't part of the Donors object:
# 1) submitter_sample_ids
# 2) primary_site
# 3) treatment_type
# These are used to build the summary information without needing to re-query Katsu
# For the purposes of the return value, let's remove all three of these into their own variables
summary_info = {}
summary_headers = ['submitter_sample_ids', 'primary_site', 'treatment_type']
for header in summary_headers:
summary_info[header] = {}
for donor in donors:
summary_info[header][donor['submitter_donor_id']] = donor[header]
del donor[header]

# Now we combine this with HTSGet, if any
genomic_query = []
Expand Down Expand Up @@ -327,12 +312,8 @@ def query(treatment="", primary_site="", drug_name="", systemic_therapy_type="",
except Exception as ex:
logger.error(f"Error while reading HTSGet response: {ex}")

# We also need to cut down the list of primary diagnoses based on the filtered list of donors at this point
permissible_donors = set([donor['submitter_donor_id'] for donor in donors])
primary_diagnoses = [primary_diagnosis for primary_diagnosis in primary_diagnoses if primary_diagnosis['submitter_donor_id'] in permissible_donors]

# TODO: Cache the above list of donor IDs and summary statistics
summary_stats = get_summary_stats(donors, primary_diagnoses, headers)
summary_stats = get_summary_stats(donors, summary_info['primary_site'], summary_info['treatment_type'])

# Determine which part of the filtered donors to send back
full_data = {
Expand Down