Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4205 batch execution job changes process instance ID #4577

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected void postProcessJob(RestartProcessInstancesBatchConfiguration configur
.findDeployedProcessDefinitionById(configuration.getProcessDefinitionId());
job.setDeploymentId(processDefinitionEntity.getDeploymentId());
}
job.setProcessDefinitionId(configuration.getProcessDefinitionId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.camunda.bpm.engine.impl.batch.externaltask.SetExternalTaskRetriesJobHandler;
import org.camunda.bpm.engine.impl.batch.job.SetJobRetriesJobHandler;
import org.camunda.bpm.engine.impl.batch.removaltime.DecisionSetRemovalTimeJobHandler;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.context.Context;
import org.camunda.bpm.engine.impl.db.DbEntity;
import org.camunda.bpm.engine.impl.db.entitymanager.OptimisticLockingListener;
import org.camunda.bpm.engine.impl.db.entitymanager.OptimisticLockingResult;
import org.camunda.bpm.engine.impl.db.entitymanager.operation.DbEntityOperation;
import org.camunda.bpm.engine.impl.db.entitymanager.operation.DbOperation;
import org.camunda.bpm.engine.impl.dmn.batch.DeleteHistoricDecisionInstancesJobHandler;
import org.camunda.bpm.engine.impl.interceptor.CommandContext;
import org.camunda.bpm.engine.impl.jobexecutor.JobDeclaration;
import org.camunda.bpm.engine.impl.persistence.entity.ByteArrayEntity;
Expand Down Expand Up @@ -169,6 +174,17 @@ protected void createJobEntities(BatchEntity batch, T configuration, String depl
ByteArrayEntity configurationEntity = saveConfiguration(byteArrayManager, jobConfiguration);

JobEntity job = createBatchJob(batch, configurationEntity);

if (jobConfiguration.getIds() != null && jobConfiguration.getIds().size() == 1
&& !(this instanceof DecisionSetRemovalTimeJobHandler)
&& !(this instanceof DeleteHistoricDecisionInstancesJobHandler)
&& !(this instanceof SetJobRetriesJobHandler)
&& !(this instanceof SetExternalTaskRetriesJobHandler)
// && !(this instanceof BatchSetRemovalTimeJobHandler)
) {
job.setProcessInstanceId(jobConfiguration.getIds().get(0));
}

job.setDeploymentId(deploymentId);
postProcessJob(configuration, job, jobConfiguration);
jobManager.insertAndHintJobExecutor(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ protected MessageCorrelationBatchConfiguration createJobConfiguration(MessageCor

@Override
protected void postProcessJob(MessageCorrelationBatchConfiguration configuration, JobEntity job, MessageCorrelationBatchConfiguration jobConfiguration) {
// if there is only one process instance to adjust, set its ID to the job so exclusive scheduling is possible
if (jobConfiguration.getIds() != null && jobConfiguration.getIds().size() == 1) {
job.setProcessInstanceId(jobConfiguration.getIds().get(0));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ public String getType() {

@Override
protected void postProcessJob(BatchConfiguration configuration, JobEntity job, BatchConfiguration jobConfiguration) {
// if there is only one process instance to adjust, set its ID to the job so exclusive scheduling is possible
if (jobConfiguration.getIds() != null && jobConfiguration.getIds().size() == 1) {
job.setProcessInstanceId(jobConfiguration.getIds().get(0));
}

}

protected ByteArrayEntity findByteArrayById(String byteArrayId, CommandContext commandContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ protected HistoricBatchEntity loadBatchEntity(BatchEntity batch) {

// Implementation ////////////////////////////////

@Override
public HistoryEvent createProcessInstanceStartEvt(DelegateExecution execution) {
final ExecutionEntity executionEntity = (ExecutionEntity) execution;

Expand Down Expand Up @@ -583,6 +584,7 @@ public HistoryEvent createProcessInstanceStartEvt(DelegateExecution execution) {
return evt;
}

@Override
public HistoryEvent createProcessInstanceUpdateEvt(DelegateExecution execution) {
final ExecutionEntity executionEntity = (ExecutionEntity) execution;

Expand Down Expand Up @@ -620,6 +622,7 @@ public HistoryEvent createProcessInstanceMigrateEvt(DelegateExecution execution)
return evt;
}

@Override
public HistoryEvent createProcessInstanceEndEvt(DelegateExecution execution) {
final ExecutionEntity executionEntity = (ExecutionEntity) execution;

Expand Down Expand Up @@ -690,6 +693,7 @@ protected void determineEndState(ExecutionEntity executionEntity, HistoricProces
}
}

@Override
public HistoryEvent createActivityInstanceStartEvt(DelegateExecution execution) {
final ExecutionEntity executionEntity = (ExecutionEntity) execution;

Expand Down Expand Up @@ -744,6 +748,7 @@ public HistoryEvent createActivityInstanceMigrateEvt(MigratingActivityInstance a
}


@Override
public HistoryEvent createActivityInstanceEndEvt(DelegateExecution execution) {
final ExecutionEntity executionEntity = (ExecutionEntity) execution;

Expand All @@ -762,6 +767,7 @@ public HistoryEvent createActivityInstanceEndEvt(DelegateExecution execution) {
return evt;
}

@Override
public HistoryEvent createTaskInstanceCreateEvt(DelegateTask task) {

// create event instance
Expand All @@ -775,6 +781,7 @@ public HistoryEvent createTaskInstanceCreateEvt(DelegateTask task) {
return evt;
}

@Override
public HistoryEvent createTaskInstanceUpdateEvt(DelegateTask task) {

// create event instance
Expand All @@ -797,6 +804,7 @@ public HistoryEvent createTaskInstanceMigrateEvt(DelegateTask task) {
return evt;
}

@Override
public HistoryEvent createTaskInstanceCompleteEvt(DelegateTask task, String deleteReason) {

// create event instance
Expand All @@ -819,6 +827,7 @@ public HistoryEvent createTaskInstanceCompleteEvt(DelegateTask task, String dele

// User Operation Logs ///////////////////////////

@Override
public List<HistoryEvent> createUserOperationLogEvents(UserOperationLogContext context) {
List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();

Expand All @@ -840,14 +849,17 @@ public List<HistoryEvent> createUserOperationLogEvents(UserOperationLogContext c

// variables /////////////////////////////////

@Override
public HistoryEvent createHistoricVariableCreateEvt(VariableInstanceEntity variableInstance, VariableScope sourceVariableScope) {
return createHistoricVariableEvent(variableInstance, sourceVariableScope, HistoryEventTypes.VARIABLE_INSTANCE_CREATE);
}

@Override
public HistoryEvent createHistoricVariableDeleteEvt(VariableInstanceEntity variableInstance, VariableScope sourceVariableScope) {
return createHistoricVariableEvent(variableInstance, sourceVariableScope, HistoryEventTypes.VARIABLE_INSTANCE_DELETE);
}

@Override
public HistoryEvent createHistoricVariableUpdateEvt(VariableInstanceEntity variableInstance, VariableScope sourceVariableScope) {
return createHistoricVariableEvent(variableInstance, sourceVariableScope, HistoryEventTypes.VARIABLE_INSTANCE_UPDATE);
}
Expand All @@ -859,6 +871,7 @@ public HistoryEvent createHistoricVariableMigrateEvt(VariableInstanceEntity vari

// form Properties ///////////////////////////

@Override
public HistoryEvent createFormPropertyUpdateEvt(ExecutionEntity execution, String propertyId, String propertyValue, String taskId) {

final IdGenerator idGenerator = Context.getProcessEngineConfiguration().getIdGenerator();
Expand Down Expand Up @@ -903,22 +916,27 @@ public HistoryEvent createFormPropertyUpdateEvt(ExecutionEntity execution, Strin

// Incidents //////////////////////////////////

@Override
public HistoryEvent createHistoricIncidentCreateEvt(Incident incident) {
return createHistoricIncidentEvt(incident, HistoryEventTypes.INCIDENT_CREATE);
}

@Override
public HistoryEvent createHistoricIncidentUpdateEvt(Incident incident) {
return createHistoricIncidentEvt(incident, HistoryEventTypes.INCIDENT_UPDATE);
}

@Override
public HistoryEvent createHistoricIncidentResolveEvt(Incident incident) {
return createHistoricIncidentEvt(incident, HistoryEventTypes.INCIDENT_RESOLVE);
}

@Override
public HistoryEvent createHistoricIncidentDeleteEvt(Incident incident) {
return createHistoricIncidentEvt(incident, HistoryEventTypes.INCIDENT_DELETE);
}

@Override
public HistoryEvent createHistoricIncidentMigrateEvt(Incident incident) {
return createHistoricIncidentEvt(incident, HistoryEventTypes.INCIDENT_MIGRATE);
}
Expand Down Expand Up @@ -1078,10 +1096,12 @@ protected HistoryEvent createBatchEvent(BatchEntity batch, HistoryEventTypes eve

// Job Log

@Override
public HistoryEvent createHistoricJobLogCreateEvt(Job job) {
return createHistoricJobLogEvt(job, HistoryEventTypes.JOB_CREATE);
}

@Override
public HistoryEvent createHistoricJobLogFailedEvt(Job job, Throwable exception) {
HistoricJobLogEventEntity event = (HistoricJobLogEventEntity) createHistoricJobLogEvt(job, HistoryEventTypes.JOB_FAIL);

Expand All @@ -1104,10 +1124,12 @@ public HistoryEvent createHistoricJobLogFailedEvt(Job job, Throwable exception)
return event;
}

@Override
public HistoryEvent createHistoricJobLogSuccessfulEvt(Job job) {
return createHistoricJobLogEvt(job, HistoryEventTypes.JOB_SUCCESS);
}

@Override
public HistoryEvent createHistoricJobLogDeleteEvt(Job job) {
return createHistoricJobLogEvt(job, HistoryEventTypes.JOB_DELETE);
}
Expand All @@ -1124,6 +1146,8 @@ protected void initHistoricJobLogEvent(HistoricJobLogEventEntity evt, Job job, H

JobEntity jobEntity = (JobEntity) job;
evt.setJobId(jobEntity.getId());

System.out.println("In history id: "+ jobEntity.getId());
evt.setJobDueDate(jobEntity.getDuedate());
evt.setJobRetries(jobEntity.getRetries());
evt.setJobPriority(jobEntity.getPriority());
Expand Down Expand Up @@ -1169,6 +1193,7 @@ protected void initHistoricJobLogEvent(HistoricJobLogEventEntity evt, Job job, H
evt.setFailedActivityId(jobEntity.getFailedActivityId());
evt.setExecutionId(jobEntity.getExecutionId());
evt.setProcessInstanceId(jobEntity.getProcessInstanceId());
System.out.println("In history: "+ jobEntity.getProcessInstanceId());
evt.setProcessDefinitionId(jobEntity.getProcessDefinitionId());
evt.setProcessDefinitionKey(jobEntity.getProcessDefinitionKey());
evt.setDeploymentId(jobEntity.getDeploymentId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public Object getPersistentState() {
persistentState.put("exceptionMessage", exceptionMessage);
persistentState.put("suspensionState", suspensionState);
persistentState.put("processDefinitionId", processDefinitionId);
// persistentState.put("processInstanceId", processInstanceId);
persistentState.put("jobDefinitionId", jobDefinitionId);
persistentState.put("deploymentId", deploymentId);
persistentState.put("jobHandlerConfiguration", jobHandlerConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.camunda.bpm.engine.test.ProcessEngineRule;
import org.camunda.bpm.engine.test.RequiredHistoryLevel;
import org.camunda.bpm.engine.test.api.runtime.BatchHelper;
import org.camunda.bpm.engine.test.util.AssertUtil;
import org.camunda.bpm.engine.test.util.ProcessEngineTestRule;
import org.camunda.bpm.engine.test.util.ProvidedProcessEngineRule;
import org.camunda.bpm.engine.variable.VariableMap;
Expand Down Expand Up @@ -666,6 +665,7 @@ public BatchDeletionHelper(ProcessEngineRule engineRule) {
super(engineRule);
}

@Override
public JobDefinition getExecutionJobDefinition(Batch batch) {
return engineRule.getManagementService().createJobDefinitionQuery()
.jobDefinitionId(batch.getBatchJobDefinitionId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.camunda.bpm.engine.test.api.history;

import static org.assertj.core.api.Assertions.assertThat;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.not;
Expand Down Expand Up @@ -109,6 +111,21 @@ public void testDeleteHistoryProcessInstancesAsyncWithList() throws Exception {
assertAllHistoricProcessInstancesAreDeleted();
}

@Test
public void testDeleteHistoryProcessInstances_shouldCreateProcessInstanceRelatedBatchJobsForSingleInvocations() {
//when
Batch batch = historyService.deleteHistoricProcessInstancesAsync(historicProcessInstances, TEST_REASON);

completeSeedJobs(batch);
List<Job> executionJobs = managementService.createJobQuery().jobDefinitionId(batch.getBatchJobDefinitionId()).list();

// then
//Making sure that processInstanceId is set in execution jobs #4205
assertThat(executionJobs)
.extracting("processInstanceId")
.containsExactlyInAnyOrder(historicProcessInstances.toArray());
}

@Test
public void testDeleteHistoryProcessInstancesAsyncWithListForDeletedDeployment() throws Exception {
// given a second deployment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,34 @@ public void shouldSetRemovalTimeForBatch_MultipleInvocationsPerBatchJob() {
assertThat(historicBatches.get(1).getRemovalTime()).isEqualTo(REMOVAL_TIME);
}

@Test
public void testRemovalTimeProcess_shouldCreateProcessInstanceRelatedBatchJobsForSingleInvocations() {
// given
testRule.getProcessEngineConfiguration().setInvocationsPerBatchJob(1);

String processInstanceIdOne = testRule.process().userTask().deploy().start();
String processInstanceIdTwo = testRule.process().userTask().deploy().start();

// when
HistoricProcessInstanceQuery query = historyService.createHistoricProcessInstanceQuery();
Batch batch = historyService.setRemovalTimeToHistoricProcessInstances()
.absoluteRemovalTime(REMOVAL_TIME)
.byQuery(query)
.executeAsync();

testRule.executeSeedJobs(batch);

// then
//Making sure that processInstanceId is set in execution jobs #4205
List<Job> executionJobs = testRule.getExecutionJobs(batch);
assertThat(executionJobs)
.extracting("processInstanceId")
.containsExactlyInAnyOrder(processInstanceIdOne, processInstanceIdTwo);

// clear
managementService.deleteBatch(batch.getId(), true);
}

@Test
public void shouldSetRemovalTime_SingleInvocationPerBatchJob() {
// given
Expand Down Expand Up @@ -2835,6 +2863,7 @@ public void shouldSetExecutionStartTimeInBatchAndHistoryForDecisions() {
.putValue("temperature", 32)
.putValue("dayType", "Weekend")
).evaluate();

Batch batch = historyService.setRemovalTimeToHistoricDecisionInstances()
.absoluteRemovalTime(CURRENT_DATE)
.byQuery(historyService.createHistoricDecisionInstanceQuery())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ public void shouldCreateProcessInstanceRelatedBatchJobsForSingleInvocations() {
rule.executeSeedJobs(batch);

// then
//Making sure that processInstanceId is set in execution jobs #4205
List<Job> executionJobs = rule.getExecutionJobs(batch);
assertThat(executionJobs)
.extracting("processInstanceId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ public void createBatchModification() {
Batch batch = runtimeService.createModification(processDefinition.getId()).startAfterActivity("user2").processInstanceIds(processInstanceIds).executeAsync();

assertBatchCreated(batch, 2);

//Making sure that processInstanceId is set in execution jobs #4205
helper.executeSeedJob(batch);
List<Job> executionJobs = helper.getExecutionJobs(batch);
assertThat(executionJobs)
.extracting("processInstanceId")
.containsExactlyInAnyOrder(processInstanceIds.toArray());
}

@Test
Expand Down Expand Up @@ -312,7 +319,7 @@ public void createModificationJobs() {
assertEquals(currentTime, modificationJob.getDuedate());
assertNull(modificationJob.getProcessDefinitionId());
assertNull(modificationJob.getProcessDefinitionKey());
assertNull(modificationJob.getProcessInstanceId());
// assertNull(modificationJob.getProcessInstanceId());
assertNull(modificationJob.getExecutionId());
}

Expand Down
Loading