Skip to content

Commit

Permalink
Add file splitting to recover 5330 (#5367)
Browse files Browse the repository at this point in the history
* unrelated fixes

* make recover work for fileBased split. Fix #5330

* uniform naming
  • Loading branch information
belforte authored Dec 18, 2024
1 parent 5a98852 commit 8b8da2a
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 46 deletions.
152 changes: 106 additions & 46 deletions src/python/CRABClient/Commands/recover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import tarfile
import datetime
import json

from CRABClient.Commands.SubCommand import SubCommand

Expand Down Expand Up @@ -30,8 +31,8 @@
from CRABClient.UserUtilities import getColumn
from ServerUtilities import SERVICE_INSTANCES

SPLITTING_RECOVER_LUMIBASED = set(("LumiBased", "Automatic", "EventAwareLumiBased"))
SPLITTING_RECOVER_FILEBASED = set(("FileBased"))
SPLITTING_RECOVER_LUMIBASED = set(["LumiBased", "Automatic", "EventAwareLumiBased"])
SPLITTING_RECOVER_FILEBASED = set(["FileBased"])

class recover(SubCommand):
"""
Expand Down Expand Up @@ -72,6 +73,7 @@ def __call__(self):
self.logger.debug("no need to run crab remake - self.restHostCommonname %s", self.restHostCommonname)
self.crabProjDir = self.requestarea

self.logger.info("Collecting information about original task ...")

retval = self.stepValidate()
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
Expand All @@ -89,19 +91,36 @@ def __call__(self):
retval = self.stepExtractSandbox(retval["sandbox_paths"])
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)

if self.failingTaskInfo["splitalgo"] in SPLITTING_RECOVER_LUMIBASED:
retval = self.stepReport()
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
retval = self.stepReport()
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
if "report" not in retval: return self.stepExit(retval)
report = retval["report"]

if "recoverLumimaskPath" not in retval:
return self.stepExit(retval)
self.logger.info("Prepare recovery task ...")

retval = self.stepSubmitLumiBased(retval["recoverLumimaskPath"])
retval = self.prepareSubmission()
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
if "cmdArgs" not in retval: return self.stepExit(retval)
submitArgs = retval["cmdArgs"]

if self.failingTaskInfo["splitalgo"] in SPLITTING_RECOVER_LUMIBASED:
retval = self.stepBuildLumiRecoveryInfo(report)
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
if "recoverLumimaskPath" not in retval: return self.stepExit(retval)
recoverLumimaskPath = retval["recoverLumimaskPath"]
submitArgs.append("Data.lumiMask={}".format(recoverLumimaskPath))

elif self.failingTaskInfo["splitalgo"] in SPLITTING_RECOVER_FILEBASED:
retval = self.stepSubmitFileBased()
retval = self.stepBuildFileRecoveryInfo()
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)
if "filesToRecover" not in retval: return self.stepExit(retval)
filesToRecover = retval["filesToRecover"]
submitArgs.append("Data.userInputFiles={}".format(filesToRecover))

self.logger.info("Submit recovery task...")

retval = self.stepSubmit(submitArgs)
if retval["commandStatus"] != "SUCCESS": return self.stepExit(retval)

# no need for "else" here, the splitting algo should already be checked in
# stepRemakeAndValidate
Expand Down Expand Up @@ -440,7 +459,7 @@ def stepCheckKill(self):
# - [x] if all jobs failed, then exit. it is better to submit again the task than using crab recover :)
# check that "failed" is the only key of the jobsPerStatus dictionary
if set(self.failingTaskStatus["jobsPerStatus"].keys()) == set(("failed",)):
msg = "All the jobs of the original task failed. Better investigate and submit it again than recover."
msg = "All the jobs of the original task failed. Better to investigate and submit it again, rather than recover."
retval.update({"commandStatus": "NothingToDo", "msg": msg})
return retval

Expand All @@ -449,10 +468,6 @@ def stepCheckKill(self):

def stepReport(self):
"""
used to compute which lumisections have not been processed by the original task.
requires TW to have processed lumisection information for submitting the original task.
it does not support filebased splitting.
side effects:
- populates the directory "result" inside the workdir of the original failing task
with the output of crab report
Expand Down Expand Up @@ -494,23 +509,33 @@ def stepReport(self):
reportCmd = report(logger=self.logger, cmdargs=cmdargs)
with SilenceLogging(self.logger, "report") as _:
# FIXME - stays noisy because interference with getMutedStatusInfo()
retval.update(reportCmd())
retval.update({"report": reportCmd()})
self.logger.debug("stepReport() - report, after, self.configuration: %s", self.configuration)
self.logger.debug("stepReport() - report, retval: %s", retval)
retval.update({'commandStatus' : 'SUCCESS'})
return retval

def stepBuildLumiRecoveryInfo(self, report):
"""
used to compute which lumisections have not been processed by the original task.
requires TW to have processed lumisection information for submitting the original task.
returns the path of a file with the lumiMask listing the lumis to recover
"""

retval = {"step": "buildLumiRecoveryInfo"}
recoverLumimaskPath = ""
if failingTaskPublish == "T" and self.options.__dict__["strategy"] == "notPublished":
recoverLumimaskPath = os.path.join(self.crabProjDir, "results", "notPublishedLumis.json")
# print a proper error message if the original task+recovery task(s) have processed everything.
publishedAllLumis = True
for dataset, lumis in retval["outputDatasetsLumis"].items():
notPublishedLumis = BasicJobType.subtractLumis(retval["lumisToProcess"], lumis )
self.logger.debug("stepReport() - report, subtract: %s %s",
for dataset, lumis in report["outputDatasetsLumis"].items():
notPublishedLumis = BasicJobType.subtractLumis(report["lumisToProcess"], lumis )
self.logger.debug("stepBuildLumiReocoveryInfo() - report, subtract: %s %s",
dataset, notPublishedLumis)
if notPublishedLumis:
publishedAllLumis = False
if publishedAllLumis:
msg = "stepReport() - all lumis have been published in the output dataset. crab recover will exit"
msg = "stepBuildLumiReocoveryInfo() - all lumis have been published in the output dataset. crab recover will exit"
self.logger.info(msg)
retval.update({"commandStatus": "NothingToDo", "msg": msg})
return retval
Expand All @@ -523,20 +548,53 @@ def stepReport(self):
if not retval["notProcessedLumis"]:
# we will likely never reach this if, because in this case the status on the schedd
# should be COMPLETED, which is not accepted by stepCheckKill
self.logger.info("stepReport() - all lumis have been processed by original task. crab recover will exit")
self.logger.info("stepBuildLumiReocoveryInfo() - all lumis have been processed by original task. crab recover will exit")
retval.update({'commandStatus' : 'SUCCESS'})
return retval

self.logger.debug("crab report - recovery task will process lumis contained in file %s", recoverLumimaskPath)


if os.path.exists(recoverLumimaskPath):
retval.update({'commandStatus' : 'SUCCESS', 'recoverLumimaskPath': recoverLumimaskPath})
else:
msg = 'File {} does not exist. crab report could not produce it, the task can not be recovered'.format(recoverLumimaskPath)
self.logger.info(msg)
retval.update({'commandStatus' : 'FAILED', 'msg': msg})

self.logger.warning("recovery task will process lumis contained in file config.Data.lumiMask=%s",
recoverLumimaskPath)

return retval

def stepBuildFileRecoveryInfo(self):
"""
returns the list of files to be recovered
"""

retval = {"step": "buildFileRecoveryInfo"}

# files to process, processed, failed are in projectDir/results, care of report command
filesToProcessPath = os.path.join(self.crabProjDir, "results", "filesToProcess.json")
processedFilesPath = os.path.join(self.crabProjDir, "results", "processedFiles.json")
with open(filesToProcessPath, 'r') as fd:
filesToProcessDict = json.load(fd)
with open(processedFilesPath, 'r') as fd:
processedFilesDict = json.load(fd)

# turn {jobid:filelist,..} dictionaries into a simple set
filesToProcess = set()
processedFiles = set()
for job in filesToProcessDict:
filesToProcess = filesToProcess.union(filesToProcessDict[job])
for job in processedFilesDict:
processedFiles = processedFiles.union(processedFilesDict[job])

# build list of files to recover
filesToRecover = list(filesToProcess - processedFiles)
retval.update({'commandStatus' : 'SUCCESS', 'filesToRecover': filesToRecover})

self.logger.warning("recovery task will process files: %s", filesToRecover)

return retval

def stepGetsandbox(self):
Expand Down Expand Up @@ -588,17 +646,12 @@ def stepExtractSandbox(self, sandbox_paths):
retval.update({"commandStatus": "SUCCESS"})
return retval

def stepSubmitLumiBased(self, notFinishedJsonPath):
def prepareSubmission(self):
"""
Submit a recovery task in the case that the original failing task
- is of type Analysis
- used LumiBased splitting algorithm
side effect:
- submits a new task
returns common command arguments for a new submit command which will do the recover
"""

retval = {"step": "submitLumiBased"}
retval = {"step": "prepareSubmission"}

cmdargs = []
cmdargs.append("-c")
Expand All @@ -614,7 +667,6 @@ def stepSubmitLumiBased(self, notFinishedJsonPath):
# https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRAB3ConfigurationFile#Passing_CRAB_configuration_param
cmdargs.append("General.requestName=None")
cmdargs.append("General.workArea=.")
cmdargs.append("Data.lumiMask={}".format(notFinishedJsonPath))
cmdargs.append("JobType.pluginName=Recover")
cmdargs.append("JobType.copyCatTaskname={}".format(self.failingTaskName))
cmdargs.append("JobType.copyCatWorkdir={}".format(self.crabProjDir))
Expand All @@ -632,31 +684,39 @@ def stepSubmitLumiBased(self, notFinishedJsonPath):
if self.failingTaskInfo["username"] != username:
cmdargs.append("Data.outLFNDirBase=/store/user/{}".format(username))

self.logger.warning("crab submit - recovery task will process lumis contained in file config.Data.lumiMask=%s", notFinishedJsonPath)
self.logger.debug("stepSubmit() - cmdargs %s", cmdargs)
submitCmd = submit(logger=self.logger, cmdargs=cmdargs)

# with SilenceLogging(self.logger, "submit") as _:
retval.update(submitCmd())
self.logger.debug("stepSubmit() - retval %s", retval)
retval.update({"commandStatus": "SUCCESS", "cmdArgs": cmdargs})
return retval

def stepSubmitFileBased(self):
def stepSubmit(self, cmdargs):
"""
Submit a recovery task in the case that the original failing task
- is of type Analysis
- used FileBased splitting algorithm
- used FileBased splitting algorithm
what's missing?
- [ ] if the input is from DBS, then write info to runs_and_lumis.tar.gz
side effect:
- submits a new task
"""

retval = {"step": "submitFileBased"}
retval = {"step": "submit"}

self.logger.debug("stepSubmit() - cmdargs %s", cmdargs)
submitCmd = submit(logger=self.logger, cmdargs=cmdargs)
submitConfiguration = submitCmd.configuration
workArea = getattr(submitConfiguration.General, 'workArea', '.')

# submit !
submitInfo = submitCmd()

projDir = os.path.join(workArea, submitInfo['requestname'])
recoveryConfigFilePath = os.path.join(projDir,'inputs/recoveryConfig.py')
with open(recoveryConfigFilePath, 'w') as fd:
print(submitConfiguration, file=fd)
self.logger.info('Submission configuration saved in %s', recoveryConfigFilePath)

# TODO
# I will need to implement this!
raise NotImplementedError
return {'commandStatus': 'FAILED', 'error': 'not implemented yet'}
retval.update(submitInfo)

self.logger.debug("stepSubmit() - retval %s", retval)
return retval

def setOptions(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions src/python/CRABClient/JobType/Recover.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def run(self, filecacheurl = None):
configreq['lumis'] = [str(reduce(lambda x,y: x+y, lumi_mask[run]))[1:-1].replace(' ','') for run in configreq['runs']]
## RECOVER - set lumimask from crab report, not from original task - END

## RECOVER - set userInputFiles from crab report, not from original task - START
userInputFiles = getattr(self.config.Data, 'userInputFiles', None)
if userInputFiles:
configreq['userfiles'] = userInputFiles
## RECOVER - set userInputFiles from crab report, not from original task - START


# new filename
configreq['cachefilename'] = newCachefilename
Expand Down

0 comments on commit 8b8da2a

Please sign in to comment.