Skip to content

Commit

Permalink
Planner: Semaphore handling code removed
Browse files Browse the repository at this point in the history
  • Loading branch information
tangobravo62 committed Mar 14, 2024
1 parent ef2fa86 commit db33bb1
Show file tree
Hide file tree
Showing 17 changed files with 69 additions and 466 deletions.
73 changes: 0 additions & 73 deletions planner/src/main/java/de/dlr/proseo/planner/ProductionPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,6 @@ public class ProductionPlanner implements CommandLineRunner {
@PersistenceContext
private EntityManager em;

/**
* Semaphore to avoid concurrent kubernetes job generation.
*/
private Semaphore releaseSemaphore = new Semaphore(1);

/**
* Semaphore to avoid concurrent db access of threads.
*/
private Semaphore threadSemaphore = new Semaphore(1);

/**
* Collect at planner start all orders of state SUSPENDING
*/
Expand Down Expand Up @@ -213,21 +203,6 @@ public EntityManager getEm() {
return em;
}

/**
* @return the releaseSemaphore
*/
public Semaphore getReleaseSemaphore() {
return releaseSemaphore;
}


/**
* @return the threadSemaphore
*/
public Semaphore getThreadSemaphore() {
return threadSemaphore;
}

/**
* Get the user/pw for processing order
*
Expand All @@ -238,54 +213,6 @@ public Map<String, String> getAuth(Long orderId) {
return orderPwCache.get(orderId);
}

/**
* Acquires the release semaphore not interruptible
* @param here
*/
public void acquireReleaseSemaphore(String here) {
if (logger.isTraceEnabled()) logger.trace(">>> acquireReleaseSemaphore({})", here == null ? "null" : here);
// getReleaseSemaphore().acquireUninterruptibly();
if (logger.isTraceEnabled()) logger.trace("<<< acquireReleaseSemaphore({})", here == null ? "null" : here);
}

/**
* Release the release semaphore
* @param here
*/
public void releaseReleaseSemaphore(String here) {
if (logger.isTraceEnabled()) logger.trace(">>> releaseReleaseSemaphore({})", here == null ? "null" : here);
if (getReleaseSemaphore().availablePermits() <= 0) {
if (logger.isTraceEnabled()) logger.trace(" released({})", here);
// getReleaseSemaphore().release();
} else {
if (logger.isTraceEnabled()) logger.trace(" nothing to release({})", here == null ? "null" : here);
}
}

/**
* Acquires the thread semaphore not interruptible
* @param here
*/
public void acquireThreadSemaphore(String here) {
if (logger.isTraceEnabled()) logger.trace(">>> acquireThreadSemaphore({})", here == null ? "null" : here);
// getThreadSemaphore().acquireUninterruptibly();
if (logger.isTraceEnabled()) logger.trace("<<< acquireThreadSemaphore({})", here == null ? "null" : here);
}

/**
* Release the thread semaphore
* @param here
*/
public void releaseThreadSemaphore(String here) {
if (logger.isTraceEnabled()) logger.trace(">>> releaseThreadSemaphore({})", here == null ? "null" : here);
if (getThreadSemaphore().availablePermits() <= 0) {
if (logger.isTraceEnabled()) logger.trace(" released({})", here);
// getThreadSemaphore().release();
} else {
if (logger.isTraceEnabled()) logger.trace(" nothing to release({})", here == null ? "null" : here);
}
}

/**
* Set or update user/pw of a processing order
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,12 @@ public void run() {

if (kubeConfig != null) {
try {
// Acquire the thread semaphore for the kubeConfig's production planner
kubeConfig.getProductionPlanner().acquireThreadSemaphore("run");

// Check for job steps to run
UtilService.getJobStepUtil().checkForJobStepsToRun(kubeConfig, 0, onlyRun, true);

// Release the thread semaphore for the kubeConfig's production planner
kubeConfig.getProductionPlanner().releaseThreadSemaphore("run");
} catch (Exception e) {
if (logger.isDebugEnabled())
logger.debug("... exception in checkForJobStepsToRun(" + kubeConfig.getId() + ", " + 0 + ", " + onlyRun + ", true): ", e);

// Release the thread semaphore in case of an exception
kubeConfig.getProductionPlanner().releaseThreadSemaphore("run");
logger.log(GeneralMessage.EXCEPTION_ENCOUNTERED, e.getClass() + " - " + e.getMessage());
}
} else {
Expand All @@ -107,17 +99,9 @@ public void run() {
logger.log(PlannerMessage.KUBEDISPATCHER_RUN_ONCE);

try {
// Acquire the thread semaphore for the production planner
productionPlanner.acquireThreadSemaphore("run");

// Check for job steps to run using the JobStepUtil
UtilService.getJobStepUtil().checkForJobStepsToRun(kubeConfig, 0, onlyRun, true);

// Release the thread semaphore for the production planner
productionPlanner.releaseThreadSemaphore("run");
} catch (Exception e) {
// Release the thread semaphore in case of an exception
productionPlanner.releaseThreadSemaphore("run");
logger.log(GeneralMessage.EXCEPTION_ENCOUNTERED, e.getClass() + " - " + e.getMessage());

if (logger.isDebugEnabled()) logger.debug("... exception stack trace: ", e);
Expand All @@ -130,20 +114,12 @@ public void run() {
// Only check for job steps if there are no released threads in the production planner
if (productionPlanner.getReleaseThreads().size() == 0) {
try {
// Acquire the thread semaphore for the production planner
productionPlanner.acquireThreadSemaphore("run");

// Check for job steps to run using the JobStepUtil
UtilService.getJobStepUtil().checkForJobStepsToRun(kubeConfig, 0, onlyRun, true);

// Release the thread semaphore for the production planner
productionPlanner.releaseThreadSemaphore("run");
} catch (Exception e) {
if (logger.isDebugEnabled())
logger.debug("... exception in checkForJobStepsToRun(" + kubeConfig.getId() + ", " + 0 + ", " + onlyRun + ", true): ", e);

// Release the thread semaphore in case of an exception
productionPlanner.releaseThreadSemaphore("run");
logger.log(GeneralMessage.EXCEPTION_ENCOUNTERED, e.getClass() + " - " + e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,6 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
final List<Job> jobList = new ArrayList<>();

try {
// Acquire the thread semaphore
productionPlanner.acquireThreadSemaphore("createJobSteps");

// If present, find the order with the given orderId, and retrieve its jobs
transactionTemplate.setReadOnly(true);
order = transactionTemplate.execute((status) -> {
Expand All @@ -780,12 +777,7 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
}
return null;
});

// Release the thread semaphore
productionPlanner.releaseThreadSemaphore("createJobSteps");
} catch (Exception e) {
productionPlanner.releaseThreadSemaphore("createJobSteps");

if (logger.isDebugEnabled()) logger.debug("... exception stack trace: ", e);

throw e;
Expand Down Expand Up @@ -819,9 +811,6 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
// Prepare for transaction retry, if "org.springframework.dao.CannotAcquireLockException" is thrown
for (int i = 0; i < ProseoUtil.DB_MAX_RETRY; i++) {
try {
// Acquire the thread semaphore
productionPlanner.acquireThreadSemaphore("createJobSteps");

transactionTemplate.setReadOnly(false);
PlannerResultMessage plannerResponse = transactionTemplate.execute((status) -> {
currentJobStepList.set(0, 0);
Expand Down Expand Up @@ -858,7 +847,6 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
if (!plannerResponse.getSuccess()) {
// If an interrupt message is received, set the answer, release the semaphore and return
answer = plannerResponse;
productionPlanner.releaseThreadSemaphore("createJobSteps");

PlannerResultMessage msg = new PlannerResultMessage(PlannerMessage.PLANNING_INTERRUPTED);
msg.setText(logger.log(msg.getMessage(), orderId));
Expand All @@ -874,9 +862,6 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
if (logger.isDebugEnabled()) logger.debug("... failing after {} attempts!", ProseoUtil.DB_MAX_RETRY);
throw e;
}
} finally {
// Release the thread semaphore
productionPlanner.releaseThreadSemaphore("createJobSteps");
}
}
}
Expand All @@ -888,8 +873,6 @@ public PlannerResultMessage createJobSteps(long orderId, ProcessingFacility faci
/*
* Exception above is recaught here (due to rethrow)
*/

productionPlanner.releaseThreadSemaphore("createJobSteps");
throw e;
}

Expand Down
140 changes: 66 additions & 74 deletions planner/src/main/java/de/dlr/proseo/planner/kubernetes/KubeJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -794,94 +794,86 @@ public void finish(KubeConfig kubeConfig, String jobName) {
// Search for pods associated with the job
searchPod();

try {
// Acquire thread semaphore for "finish" operation
kubeConfig.getProductionPlanner().acquireThreadSemaphore("finish");

TransactionTemplate transactionTemplate = new TransactionTemplate(kubeConfig.getProductionPlanner().getTxManager());
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ);
for (int i = 0; i < ProseoUtil.DB_MAX_RETRY; i++) {
try {
transactionTemplate.execute((status) -> {

// Retrieve the kube job information
V1Job job = kubeConfig.getV1Job(jobName);
if (job == null) {
return null;
}
TransactionTemplate transactionTemplate = new TransactionTemplate(kubeConfig.getProductionPlanner().getTxManager());
transactionTemplate.setIsolationLevel(TransactionDefinition.ISOLATION_REPEATABLE_READ);
for (int i = 0; i < ProseoUtil.DB_MAX_RETRY; i++) {
try {
transactionTemplate.execute((status) -> {

// Find the job step in the database
Long jobStepId = this.getJobId();
Optional<JobStep> jobStep = RepositoryService.getJobStepRepository().findById(jobStepId);
if (jobStep.isEmpty()) {
return null;
}
// Retrieve the kube job information
V1Job job = kubeConfig.getV1Job(jobName);
if (job == null) {
return null;
}

// Update the job log
updateJobLog(jobStep.get());
// Find the job step in the database
Long jobStepId = this.getJobId();
Optional<JobStep> jobStep = RepositoryService.getJobStepRepository().findById(jobStepId);
if (jobStep.isEmpty()) {
return null;
}

if (job.getStatus() != null) {
// Set the processing start time
OffsetDateTime startTime = job.getStatus().getStartTime();
if (startTime != null) {
jobStep.get().setProcessingStartTime(startTime.toInstant());
}
// Update the job log
updateJobLog(jobStep.get());

// Set the processing completion time
OffsetDateTime completionTime = job.getStatus().getCompletionTime();
if (completionTime != null) {
jobStep.get().setProcessingCompletionTime(completionTime.toInstant());
}
if (job.getStatus() != null) {
// Set the processing start time
OffsetDateTime startTime = job.getStatus().getStartTime();
if (startTime != null) {
jobStep.get().setProcessingStartTime(startTime.toInstant());
}

// Set the job conditions
if (job.getStatus().getConditions() != null) {
List<V1JobCondition> jobConditions = job.getStatus().getConditions();
// Set the processing completion time
OffsetDateTime completionTime = job.getStatus().getCompletionTime();
if (completionTime != null) {
jobStep.get().setProcessingCompletionTime(completionTime.toInstant());
}

for (V1JobCondition jobCondition : jobConditions) {
if ((jobCondition.getType().equalsIgnoreCase("complete")
|| jobCondition.getType().equalsIgnoreCase("completed"))
&& jobCondition.getStatus().equalsIgnoreCase("true")) {
jobStep.get().setJobStepState(JobStepState.COMPLETED);
UtilService.getJobStepUtil().checkCreatedProducts(jobStep.get());
jobStep.get().incrementVersion();
} else if (jobCondition.getType().equalsIgnoreCase("failed")
|| jobCondition.getType().equalsIgnoreCase("failure")) {
jobStep.get().setJobStepState(JobStepState.FAILED);
jobStep.get().incrementVersion();
}
// Set the job conditions
if (job.getStatus().getConditions() != null) {
List<V1JobCondition> jobConditions = job.getStatus().getConditions();

for (V1JobCondition jobCondition : jobConditions) {
if ((jobCondition.getType().equalsIgnoreCase("complete")
|| jobCondition.getType().equalsIgnoreCase("completed"))
&& jobCondition.getStatus().equalsIgnoreCase("true")) {
jobStep.get().setJobStepState(JobStepState.COMPLETED);
UtilService.getJobStepUtil().checkCreatedProducts(jobStep.get());
jobStep.get().incrementVersion();
} else if (jobCondition.getType().equalsIgnoreCase("failed")
|| jobCondition.getType().equalsIgnoreCase("failure")) {
jobStep.get().setJobStepState(JobStepState.FAILED);
jobStep.get().incrementVersion();
}
}
}
}

// Save the updated job step in the repository
RepositoryService.getJobStepRepository().save(jobStep.get());
// Save the updated job step in the repository
RepositoryService.getJobStepRepository().save(jobStep.get());

// Log the order state for the job step's processing order
UtilService.getOrderUtil().logOrderState(jobStep.get().getJob().getProcessingOrder());
// Log the order state for the job step's processing order
UtilService.getOrderUtil().logOrderState(jobStep.get().getJob().getProcessingOrder());

jobSteps.add(jobStep.get());
return jobStep.get();
});
break;
} catch (CannotAcquireLockException e) {
if (logger.isDebugEnabled()) logger.debug("... database concurrency issue detected: ", e);

if ((i + 1) < ProseoUtil.DB_MAX_RETRY) {
ProseoUtil.dbWait();
} else {
if (logger.isDebugEnabled()) logger.debug("... failing after {} attempts!", ProseoUtil.DB_MAX_RETRY);
throw e;
}
} catch (Exception e) {
logger.log(GeneralMessage.RUNTIME_EXCEPTION_ENCOUNTERED, e.getClass() + " - " + e.getMessage());
jobSteps.add(jobStep.get());
return jobStep.get();
});
break;
} catch (CannotAcquireLockException e) {
if (logger.isDebugEnabled()) logger.debug("... database concurrency issue detected: ", e);

if (logger.isDebugEnabled()) logger.debug("... exception stack trace: ", e);
if ((i + 1) < ProseoUtil.DB_MAX_RETRY) {
ProseoUtil.dbWait();
} else {
if (logger.isDebugEnabled()) logger.debug("... failing after {} attempts!", ProseoUtil.DB_MAX_RETRY);
throw e;
}
}
} finally {
// Release the thread semaphore for "finish" operation
kubeConfig.getProductionPlanner().releaseThreadSemaphore("finish");
}
} catch (Exception e) {
logger.log(GeneralMessage.RUNTIME_EXCEPTION_ENCOUNTERED, e.getClass() + " - " + e.getMessage());

if (logger.isDebugEnabled()) logger.debug("... exception stack trace: ", e);
}
}

// Configure and start a KubeJobFinish object to monitor the completion of the kube job
KubeJobFinish jobMonitor = new KubeJobFinish(this, kubeConfig.getProductionPlanner(), jobName);
Expand Down
Loading

0 comments on commit db33bb1

Please sign in to comment.