Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use more EU sites #248

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions outsource/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
LOWER_CPUS = uconfig.getint("Outsource", "lower_cpus", fallback=1)
COMBINE_CPUS = uconfig.getint("Outsource", "combine_cpus", fallback=1)
UPPER_CPUS = uconfig.getint("Outsource", "upper_cpus", fallback=1)
EU_SEPARATE = uconfig.getboolean("Outsource", "eu_separate", fallback=False)

MAX_MEMORY = 30_000 # in MB
MIN_DISK = 200 # in MB
Expand Down Expand Up @@ -55,21 +56,44 @@ class RunConfig:
# Data availability to site selection map.
# This puts constraints on the sites that can be used for
# processing based on the input RSE for raw_records.

# Define an expression to give higher priority to EU sites
eu_high_rank = '((GLIDEIN_Site == "NL") * 999)'
eu_high_rank += ' + ((GLIDEIN_Site == "SURFsara") * 999)'
eu_high_rank += ' + ((GLIDEIN_Site == "FR") * 9)'
eu_high_rank += ' + ((GLIDEIN_Site == "IT") * 9)'

if EU_SEPARATE:
# In case we want to keep the pipelines separate
# let's add a requirement that the jobs run in the EU only
# we do not have a EU flag, so we use the countries
eu_separate_requriements = '((GLIDEIN_Country == "NL")'
eu_separate_requriements += ' || (GLIDEIN_Country == "FR")'
eu_separate_requriements += ' || (GLIDEIN_Country == "IT"))'
else:
# Let's keep all sites in the pool, we still give higher rank to EU sites
eu_separate_requriements = ""

rse_site_map = {
"UC_OSG_USERDISK": {"expr": 'GLIDEIN_Country == "US"'},
"UC_DALI_USERDISK": {"expr": 'GLIDEIN_Country == "US"'},
"UC_MIDWAY_USERDISK": {"expr": 'GLIDEIN_Country == "US"'},
"CCIN2P3_USERDISK": {"site": "CCIN2P3", "expr": 'GLIDEIN_Site == "CCIN2P3"'},
"CNAF_TAPE_USERDISK": {},
"CNAF_USERDISK": {},
"LNGS_USERDISK": {},
"NIKHEF2_USERDISK": {"site": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'},
"NIKHEF_USERDISK": {"site": "NIKHEF", "expr": 'GLIDEIN_Site == "NIKHEF"'},
"SURFSARA_USERDISK": {"site": "SURFsara", "expr": 'GLIDEIN_Site == "SURFsara"'},
"SURFSARA2_USERDISK": {"site": "SURFsara", "expr": 'GLIDEIN_Site == "SURFsara"'},
"WEIZMANN_USERDISK": {"site": "Weizmann", "expr": 'GLIDEIN_Site == "Weizmann"'},
"SDSC_USERDISK": {"expr": 'GLIDEIN_ResourceName == "SDSC-Expanse"'},
"SDSC_NSDF_USERDISK": {"expr": 'GLIDEIN_Country == "US"'},
# These are US sites
# We only send these to US sites
# Chicago, IL
"UC_OSG_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, # DISK
"UC_DALI_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, # DISK
"UC_MIDWAY_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, # DISK
# San Diego, CA
"SDSC_NSDF_USERDISK": {"expr": 'GLIDEIN_Country == "US"'}, # DISK
# These are European sites
# Paris, FR
"CCIN2P3_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # TAPE
"CCIN2P32_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # DISK
# Amsterdam, NL
"NIKHEF2_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # DISK
"SURFSARA_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # TAPE
"SURFSARA2_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # DISK
# Bologna, IT
"CNAF_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # DISK
"CNAF_TAPE3_USERDISK": {"rank": eu_high_rank, "expr": eu_separate_requriements}, # TAPE
}

chunks_per_job = uconfig.getint("Outsource", "chunks_per_job", fallback=None)
Expand Down Expand Up @@ -504,26 +528,31 @@ def _determine_target_sites(self, rses):

exprs = []
sites = []
ranks = []
for rse in rses:
if rse in self.rse_site_map:
if "expr" in self.rse_site_map[rse]:
exprs.append(self.rse_site_map[rse]["expr"])
if "site" in self.rse_site_map[rse]:
sites.append(self.rse_site_map[rse]["site"])
if "rank" in self.rse_site_map[rse]:
ranks.append(self.rse_site_map[rse]["rank"])
exprs = list(set(exprs))
sites = list(set(sites))
ranks = list(set(ranks))

# make sure we do not request XENON1T sites we do not need
if len(sites) == 0:
sites.append("NONE")

final_expr = " || ".join(exprs)
desired_sites = ", ".join(sites)
return final_expr, desired_sites
ranks = " + ".join(ranks)
return final_expr, desired_sites, ranks

def get_requirements(self, rses):
# Determine the job requirements based on the data locations
sites_expression, desired_sites = self._determine_target_sites(rses)
sites_expression, desired_sites, ranks = self._determine_target_sites(rses)
if len(rses) > 0:
requirements = self.requirements_base
else:
Expand All @@ -534,4 +563,4 @@ def get_requirements(self, rses):
if self._exclude_sites:
requirements += f" && ({self._exclude_sites})"

return desired_sites, requirements
return desired_sites, requirements, ranks
29 changes: 11 additions & 18 deletions outsource/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ def get_rse_sites(self, dbcfg, rses, per_chunk=False):
f"{rses} and the specified raw_records_rses {raw_records_rses}"
)

desired_sites, requirements = dbcfg.get_requirements(rses)
return desired_sites, requirements
desired_sites, requirements, site_ranks = dbcfg.get_requirements(rses)
return desired_sites, requirements, site_ranks

def get_key(self, dbcfg, level):
"""Get the key for the output files and check the file name."""
Expand Down Expand Up @@ -506,7 +506,7 @@ def add_higher_processing_job(
f"Hopefully those will be created by the workflow."
)

desired_sites, requirements = self.get_rse_sites(dbcfg, rses, per_chunk=False)
desired_sites, requirements, site_ranks = self.get_rse_sites(dbcfg, rses, per_chunk=False)

# High level data.. we do it all on one job
_key = self.get_key(dbcfg, level)
Expand Down Expand Up @@ -572,7 +572,10 @@ def add_lower_processing_job(
f"No data found as the dependency of {level['data_types'].not_processed}."
)

desired_sites, requirements = self.get_rse_sites(dbcfg, rses, per_chunk=True)
desired_sites, requirements, site_ranks = self.get_rse_sites(dbcfg, rses, per_chunk=True)
desired_sites_for_us, requirements_for_us, site_ranks_us = self.get_rse_sites(
dbcfg, ["UC_OSG_USERDISK"], per_chunk=False
)

# Set up the combine job first -
# we can then add to that job inside the chunk file loop
Expand All @@ -583,20 +586,8 @@ def add_lower_processing_job(
memory=level["combine_memory"],
disk=level["combine_disk"],
)
if desired_sites:
# Give a hint to glideinWMS for the sites we want
# (mostly useful for XENON VO in Europe).
# Glideinwms is the provisioning system.
# It starts pilot jobs (glideins) at sites when you
# have idle jobs in the queue.
# Most of the jobs you run to the OSPool (Open Science Pool),
# but you do have a few sites where you have allocations at,
# and those are labeled XENON VO (Virtual Organization).
# The "+" has to be used by non-standard HTCondor attributes.
# The attribute has to have double quotes,
# otherwise HTCondor will try to evaluate it as an expression.
combine_job.add_profiles(Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"')
combine_job.add_profiles(Namespace.CONDOR, "requirements", requirements)

combine_job.add_profiles(Namespace.CONDOR, "requirements", requirements_for_us)
# priority is given in the order they were submitted
combine_job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)
combine_job.add_inputs(installsh, combinepy, xenon_config, token, *tarballs)
Expand Down Expand Up @@ -674,6 +665,8 @@ def add_lower_processing_job(
job.add_profiles(Namespace.CONDOR, "+XENON_DESIRED_Sites", f'"{desired_sites}"')
job.add_profiles(Namespace.CONDOR, "requirements", requirements)
job.add_profiles(Namespace.CONDOR, "priority", dbcfg.priority)
# This allows us to set higher priority for EU sites when we have data in EU
job.add_profiles(Namespace.CONDOR, "rank", site_ranks)

job.add_args(
dbcfg.run_id,
Expand Down