From c3f87c144c533deccd5b721517e14bc021582cc2 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 20 Dec 2024 18:06:45 -0800 Subject: [PATCH] Handle system time move backward case in timer task processing --- service/history/queues/queue_scheduled.go | 7 +- .../timer_queue_active_task_executor.go | 18 +- .../timer_queue_active_task_executor_test.go | 292 +++++++++++------- .../timer_queue_standby_task_executor.go | 16 +- .../timer_queue_standby_task_executor_test.go | 56 +++- .../history/timer_queue_task_executor_base.go | 13 +- .../transfer_queue_standby_task_executor.go | 3 + ...ansfer_queue_standby_task_executor_test.go | 4 + .../history/workflow/cache/export_testing.go | 12 +- 9 files changed, 275 insertions(+), 146 deletions(-) diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 8534b793ae3..ef855ee4d1d 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/timer" + "go.temporal.io/server/common/util" hshard "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" ) @@ -303,10 +304,14 @@ func (p *scheduledQueue) lookAheadTask() { // IsTimeExpired checks if the testing time is equal or before // the reference time. The precision of the comparison is millisecond. func IsTimeExpired( + task tasks.Task, referenceTime time.Time, testingTime time.Time, ) bool { - referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision) + // NOTE: Persistence layer may lose precision when persisting the task, which essentially moves + // task fire time backward. But we are already performing truncation here, so doesn't need to + // account for that. + referenceTime = util.MaxTime(referenceTime, task.GetKey().FireTime).Truncate(persistence.ScheduledTaskMinPrecision) testingTime = testingTime.Truncate(persistence.ScheduledTaskMinPrecision) return !testingTime.After(referenceTime) } diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index cdf7d95aa76..0a268b9c203 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -171,9 +171,8 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask( } timerSequence := t.getTimerSequence(mutableState) - referenceTime := t.shardContext.GetTimeSource().Now() + referenceTime := t.Now() timerFired := false - Loop: for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() { timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID) @@ -183,7 +182,7 @@ Loop: return serviceerror.NewInternal(errString) } - if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) { + if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) { // Timer sequence IDs are sorted; once we encounter a timer whose // sequence ID has not expired, all subsequent timers will not have // expired. @@ -231,7 +230,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( } timerSequence := t.getTimerSequence(mutableState) - referenceTime := t.shardContext.GetTimeSource().Now() + referenceTime := t.Now() updateMutableState := false scheduleWorkflowTask := false @@ -242,7 +241,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( // created. isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID) - if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) { + if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), heartbeatTimeoutVis) { if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat( ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil { return err @@ -252,7 +251,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( Loop: for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() { - if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) { + if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) { // timer sequence IDs are sorted, once there is one timer // sequence ID not expired, all after that wil not expired break Loop @@ -618,7 +617,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask( return err } - if !t.isValidWorkflowRunTimeoutTask(mutableState) { + if !t.isValidWorkflowRunTimeoutTask(mutableState, task) { return nil } @@ -628,7 +627,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask( initiator := enumspb.CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED wfExpTime := mutableState.GetExecutionInfo().WorkflowExecutionExpirationTime - if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.shardContext.GetTimeSource().Now()) { + if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.Now()) { backoffInterval, retryState = mutableState.GetRetryBackoffDuration(timeoutFailure) if backoffInterval != backoff.NoBackoff { // We have a retry policy and we should retry. @@ -678,7 +677,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask( mutableState.GetNamespaceEntry(), mutableState.GetWorkflowKey().WorkflowID, newRunID, - t.shardContext.GetTimeSource().Now(), + t.Now(), mutableState, ) if err != nil { @@ -802,6 +801,7 @@ func (t *timerQueueActiveTaskExecutor) executeStateMachineTimerTask( ctx, wfCtx, ms, + task, func(node *hsm.Node, task hsm.Task) error { return t.shardContext.StateMachineRegistry().ExecuteTimerTask(t, node, task) }, diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index f6cd2bbc364..85243a9ba15 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -215,6 +215,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -258,26 +263,31 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Fire() { task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.UserTimerTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, TaskID: s.mustGenerateTaskID(), VisibilityTimestamp: task.(*tasks.UserTimerTask).VisibilityTimestamp, EventID: event.EventId, } persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - s.timeSource.Update(s.now.Add(2 * timerTimeout)) - resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) - s.NoError(resp.ExecutionErr) + for _, currentTime := range []time.Time{ + s.now.Add(-timerTimeout), + s.now.Add(2 * timerTimeout), + } { + getWorkflowExecutionResponse := &persistence.GetWorkflowExecutionResponse{State: common.CloneProto(persistenceMutableState)} + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(getWorkflowExecutionResponse, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - _, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetUserTimerInfo(timerID) - s.False(ok) + s.timeSource.Update(currentTime) + resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr) + + _, ok := s.getMutableStateFromCache(workflowKey).GetUserTimerInfo(timerID) + s.False(ok) + + s.clearMutableStateFromCache(workflowKey) + } } func (s *timerQueueActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Noop() { @@ -477,6 +487,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -526,11 +541,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Attempt: 1, TaskID: s.mustGenerateTaskID(), TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, @@ -539,15 +550,24 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPo } persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - s.timeSource.Update(s.now.Add(2 * timerTimeout)) - resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) - s.NoError(resp.ExecutionErr) + for _, currentTime := range []time.Time{ + s.now.Add(-timerTimeout), + s.now.Add(2 * timerTimeout), + } { + getWorkflowExecutionResponse := &persistence.GetWorkflowExecutionResponse{State: common.CloneProto(persistenceMutableState)} + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(getWorkflowExecutionResponse, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - _, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetActivityInfo(scheduledEvent.GetEventId()) - s.False(ok) + s.timeSource.Update(currentTime) + resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr) + + _, ok := s.getMutableStateFromCache(workflowKey).GetActivityInfo(scheduledEvent.GetEventId()) + s.False(ok) + + s.clearMutableStateFromCache(workflowKey) + } } func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_NoRetryPolicy_Noop() { @@ -635,6 +655,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -694,11 +719,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Attempt: 1, TaskID: s.mustGenerateTaskID(), TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, @@ -714,7 +735,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - activityInfo, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetActivityInfo(scheduledEvent.GetEventId()) + activityInfo, ok := s.getMutableStateFromCache(workflowKey).GetActivityInfo(scheduledEvent.GetEventId()) s.True(ok) s.Equal(scheduledEvent.GetEventId(), activityInfo.ScheduledEventId) s.Equal(common.EmptyEventID, activityInfo.StartedEventId) @@ -828,6 +849,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -884,11 +910,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli task := mutableState.InsertTasks[tasks.CategoryTimer][0] timerTask := &tasks.ActivityTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Attempt: 1, TaskID: s.mustGenerateTaskID(), TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, @@ -904,7 +926,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_RetryPoli resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - _, ok := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetActivityInfo(scheduledEvent.GetEventId()) + _, ok := s.getMutableStateFromCache(workflowKey).GetActivityInfo(scheduledEvent.GetEventId()) s.False(ok) } @@ -1086,6 +1108,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTaskTimeout_Fire() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1109,11 +1136,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTaskTimeout_Fire() { startedEvent := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.New()) timerTask := &tasks.WorkflowTaskTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, ScheduleAttempt: 1, Version: s.version, TaskID: s.mustGenerateTaskID(), @@ -1129,7 +1152,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowTaskTimeout_Fire() { resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - workflowTask := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() + workflowTask := s.getMutableStateFromCache(workflowKey).GetPendingWorkflowTask() s.NotNil(workflowTask) s.True(workflowTask.ScheduledEventID != common.EmptyEventID) s.Equal(common.EmptyEventID, workflowTask.StartedEventID) @@ -1189,6 +1212,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Fire() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1209,11 +1237,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Fire() { s.Nil(err) timerTask := &tasks.WorkflowBackoffTimerTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Version: s.version, TaskID: s.mustGenerateTaskID(), WorkflowBackoffType: enumsspb.WORKFLOW_BACKOFF_TYPE_RETRY, @@ -1227,7 +1251,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowBackoffTimer_Fire() { resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - workflowTask := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetPendingWorkflowTask() + workflowTask := s.getMutableStateFromCache(workflowKey).GetPendingWorkflowTask() s.NotNil(workflowTask) s.True(workflowTask.ScheduledEventID != common.EmptyEventID) s.Equal(common.EmptyEventID, workflowTask.StartedEventID) @@ -1455,6 +1479,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Fire() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1483,27 +1512,31 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Fire() { completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowRunTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Version: s.version, TaskID: s.mustGenerateTaskID(), - VisibilityTimestamp: s.now, + VisibilityTimestamp: s.now.Add(expirationTime), } persistenceMutableState := s.createPersistenceMutableState(mutableState, completionEvent.GetEventId(), completionEvent.GetVersion()) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - // advance timer past run expiration time - s.timeSource.Advance(expirationTime + 1*time.Second) - resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) - s.NoError(resp.ExecutionErr) + for _, currrentTime := range []time.Time{ + s.now.Add(expirationTime - 1*time.Second), + s.now.Add(expirationTime + 1*time.Second), + } { + getWorkflowExecutionResponse := &persistence.GetWorkflowExecutionResponse{State: common.CloneProto(persistenceMutableState)} + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(getWorkflowExecutionResponse, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + + s.timeSource.Update(currrentTime) + resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr) - running := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).IsWorkflowExecutionRunning() - s.False(running) + running := s.getMutableStateFromCache(workflowKey).IsWorkflowExecutionRunning() + s.False(running) + + s.clearMutableStateFromCache(workflowKey) + } } func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Retry() { @@ -1511,6 +1544,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Retry() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1548,11 +1586,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Retry() { completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowRunTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Version: s.version, TaskID: s.mustGenerateTaskID(), VisibilityTimestamp: s.now, @@ -1568,7 +1602,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Retry() { resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - state, status := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetWorkflowStateStatus() + state, status := s.getMutableStateFromCache(workflowKey).GetWorkflowStateStatus() s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, state) s.EqualValues(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, status) } @@ -1578,6 +1612,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Cron() { WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1608,11 +1647,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Cron() { completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowRunTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Version: s.version, TaskID: s.mustGenerateTaskID(), VisibilityTimestamp: s.now, @@ -1628,7 +1663,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_Cron() { resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - state, status := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetWorkflowStateStatus() + state, status := s.getMutableStateFromCache(workflowKey).GetWorkflowStateStatus() s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, state) s.EqualValues(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, status) } @@ -1638,6 +1673,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_WorkflowExpir WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1665,11 +1705,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_WorkflowExpir completionEvent := addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") timerTask := &tasks.WorkflowRunTimeoutTask{ - WorkflowKey: definition.NewWorkflowKey( - s.namespaceID.String(), - execution.GetWorkflowId(), - execution.GetRunId(), - ), + WorkflowKey: workflowKey, Version: s.version, TaskID: s.mustGenerateTaskID(), VisibilityTimestamp: s.now, @@ -1682,7 +1718,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowRunTimeout_WorkflowExpir resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.NoError(resp.ExecutionErr) - state, status := s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()).GetWorkflowStateStatus() + state, status := s.getMutableStateFromCache(workflowKey).GetWorkflowStateStatus() s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, state) s.EqualValues(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, status) } @@ -1693,6 +1729,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowExecutionTimeout_Fire() WorkflowId: "some random workflow ID", RunId: uuid.New(), } + workflowKey := definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ) workflowType := "some random workflow type" taskQueueName := "some random task queue" @@ -1721,36 +1762,44 @@ func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowExecutionTimeout_Fire() NamespaceID: s.namespaceID.String(), WorkflowID: execution.GetWorkflowId(), FirstRunID: firstRunID, - VisibilityTimestamp: s.now, + VisibilityTimestamp: s.now.Add(10 * time.Second), TaskID: s.mustGenerateTaskID(), } persistenceMutableState := s.createPersistenceMutableState(mutableState, startedEvent.GetEventId(), startedEvent.GetVersion()) - persistenceExecutionState := persistenceMutableState.ExecutionState - s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{ - ShardID: s.mockShard.GetShardID(), - NamespaceID: s.namespaceID.String(), - WorkflowID: execution.GetWorkflowId(), - }).Return(&persistence.GetCurrentExecutionResponse{ - StartRequestID: persistenceExecutionState.CreateRequestId, - RunID: persistenceExecutionState.RunId, - State: persistenceExecutionState.State, - Status: persistenceExecutionState.Status, - }, nil).Times(1) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - - // advance the clock to be sure workflow is expired - s.timeSource.Update(s.now.Add(15 * time.Second)) - - resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) - s.NoError(resp.ExecutionErr) - mutableState = s.getMutableStateFromCache(s.namespaceID, execution.GetWorkflowId(), execution.GetRunId()) - s.False(mutableState.IsWorkflowExecutionRunning()) - state, status := mutableState.GetWorkflowStateStatus() - s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, state) - s.EqualValues(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, status) + for _, currentTime := range []time.Time{ + s.now.Add(5 * time.Second), + s.now.Add(15 * time.Second), + } { + getWorkflowExecutionResponse := &persistence.GetWorkflowExecutionResponse{State: common.CloneProto(persistenceMutableState)} + persistenceExecutionState := getWorkflowExecutionResponse.State.ExecutionState + s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), &persistence.GetCurrentExecutionRequest{ + ShardID: s.mockShard.GetShardID(), + NamespaceID: s.namespaceID.String(), + WorkflowID: execution.GetWorkflowId(), + }).Return(&persistence.GetCurrentExecutionResponse{ + StartRequestID: persistenceExecutionState.CreateRequestId, + RunID: persistenceExecutionState.RunId, + State: persistenceExecutionState.State, + Status: persistenceExecutionState.Status, + }, nil).Times(1) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(getWorkflowExecutionResponse, nil) + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) + + s.timeSource.Update(currentTime) + + resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr) + + mutableState = s.getMutableStateFromCache(workflowKey) + s.False(mutableState.IsWorkflowExecutionRunning()) + state, status := mutableState.GetWorkflowStateStatus() + s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, state) + s.EqualValues(enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT, status) + + s.clearMutableStateFromCache(workflowKey) + } } func (s *timerQueueActiveTaskExecutorSuite) TestWorkflowExecutionTimeout_Noop() { @@ -1913,11 +1962,11 @@ func (s *timerQueueActiveTaskExecutorSuite) TestExecuteStateMachineTimerTask_Exe } // Past deadline, should get executed. - workflow.TrackStateMachineTimer(ms, s.mockShard.GetTimeSource().Now().Add(-time.Hour), invalidTask) - workflow.TrackStateMachineTimer(ms, s.mockShard.GetTimeSource().Now().Add(-time.Hour), validTask) - workflow.TrackStateMachineTimer(ms, s.mockShard.GetTimeSource().Now().Add(-time.Minute), validTask) + workflow.TrackStateMachineTimer(ms, s.now.Add(-time.Hour), invalidTask) + workflow.TrackStateMachineTimer(ms, s.now.Add(-time.Hour), validTask) + workflow.TrackStateMachineTimer(ms, s.now.Add(-time.Minute), validTask) // Future deadline, new task should be scheduled. - futureDeadline := s.mockShard.GetTimeSource().Now().Add(time.Hour) + futureDeadline := s.now.Add(time.Hour) workflow.TrackStateMachineTimer(ms, futureDeadline, validTask) wfCtx := workflow.NewMockContext(s.controller) @@ -1930,10 +1979,14 @@ func (s *timerQueueActiveTaskExecutorSuite) TestExecuteStateMachineTimerTask_Exe ).Return(wfCtx, wcache.NoopReleaseFn, nil) task := &tasks.StateMachineTimerTask{ - WorkflowKey: tests.WorkflowKey, - Version: 2, + WorkflowKey: tests.WorkflowKey, + VisibilityTimestamp: s.now, + Version: 2, } + // change now to a value earilier than task's visibility timestamp to test the case where system wall clock go backwards. + s.timeSource.Update(s.now.Add(-30 * time.Minute)) + //nolint:revive // unchecked-type-assertion timerQueueActiveTaskExecutor := newTimerQueueActiveTaskExecutor( s.mockShard, @@ -1967,17 +2020,24 @@ func (s *timerQueueActiveTaskExecutorSuite) createPersistenceMutableState( } func (s *timerQueueActiveTaskExecutorSuite) getMutableStateFromCache( - namespaceID namespace.ID, - workflowID string, - runID string, + workflowKey definition.WorkflowKey, ) workflow.MutableState { key := wcache.Key{ - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, runID), + WorkflowKey: workflowKey, ShardUUID: s.mockShard.GetOwner(), } return wcache.GetMutableState(s.workflowCache, key) } +func (s *timerQueueActiveTaskExecutorSuite) clearMutableStateFromCache( + workflowKey definition.WorkflowKey, +) { + wcache.ClearMutableState(s.workflowCache, wcache.Key{ + WorkflowKey: workflowKey, + ShardUUID: s.mockShard.GetOwner(), + }) +} + func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable( task tasks.Task, ) queues.Executable { @@ -2002,7 +2062,7 @@ func (s *timerQueueActiveTaskExecutorSuite) mustGenerateTaskID() int64 { return taskID } -func (s *timerQueueActiveTaskExecutorSuite) TestProcessTimeoutTask() { +func (s *timerQueueActiveTaskExecutorSuite) TestProcessSingleActivityTimeoutTask() { ms := workflow.NewMockMutableState(s.controller) testCases := []struct { diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 76e692e3172..dffeec8f71e 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -139,6 +139,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( ctx context.Context, timerTask *tasks.UserTimerTask, ) error { + referenceTime := t.Now() actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer @@ -157,7 +158,8 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( } if queues.IsTimeExpired( - timerTask.GetVisibilityTime(), + timerTask, + referenceTime, timerSequenceID.Timestamp, ) { return &struct{}{}, nil @@ -197,6 +199,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // // the overall solution is to attempt to generate a new activity timer task whenever the // task passed in is safe to be throw away. + referenceTime := t.Now() actionFn := func(ctx context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer @@ -216,7 +219,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( } if queues.IsTimeExpired( - timerTask.GetVisibilityTime(), + timerTask, + referenceTime, timerSequenceID.Timestamp, ) { return &struct{}{}, nil @@ -236,7 +240,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // created. isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID) - if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask.GetVisibilityTime(), heartbeatTimeoutVis) { + if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), heartbeatTimeoutVis) { if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil { return nil, err } @@ -424,7 +428,7 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowRunTimeoutTask( ) error { actionFn := func(_ context.Context, wfContext workflow.Context, mutableState workflow.MutableState) (interface{}, error) { - if !t.isValidWorkflowRunTimeoutTask(mutableState) { + if !t.isValidWorkflowRunTimeoutTask(mutableState, timerTask) { return nil, nil } @@ -501,6 +505,7 @@ func (t *timerQueueStandbyTaskExecutor) executeStateMachineTimerTask( ctx, wfContext, mutableState, + timerTask, func(node *hsm.Node, task hsm.Task) error { // If this line of code is reached, the task's Validate() function returned no error, which indicates // that it is still expected to run. Return ErrTaskRetry to wait the machine to transition on the active @@ -652,6 +657,9 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity( ) } +// TODO: deprecate this function and always use t.Now() +// Only test code sets t.clusterName to be non-current cluster name +// and advance the time by setting calling shardContext.SetCurrentTime. func (t *timerQueueStandbyTaskExecutor) getCurrentTime() time.Time { return t.shardContext.GetCurrentTime(t.clusterName) } diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 0cec758101f..25dc655a335 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -75,6 +75,10 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +// TODO: remove all SetCurrentTime usage in this test suite +// after clusterName & getCurrentTime() method are deprecated +// from timerQueueStandbyTaskExecutor + type ( timerQueueStandbyTaskExecutorSuite struct { suite.Suite @@ -276,14 +280,22 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_Pending persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockShard.SetCurrentTime(s.clusterName, s.now) + s.timeSource.Update(s.now.Add(-time.Second)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(-time.Second)) resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(2 * timerTimeout)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(2*timerTimeout)) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + + s.timeSource.Update(s.now.Add(s.fetchHistoryDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(s.discardDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr) @@ -483,14 +495,22 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Pending( persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockShard.SetCurrentTime(s.clusterName, s.now) + s.timeSource.Update(s.now.Add(-time.Second)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(-time.Second)) resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(2 * timerTimeout)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(2*timerTimeout)) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + + s.timeSource.Update(s.now.Add(s.fetchHistoryDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(s.discardDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr) @@ -1041,21 +1061,28 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowRunTimeout_Pendi ), Version: s.version, TaskID: s.mustGenerateTaskID(), - VisibilityTimestamp: s.now, + VisibilityTimestamp: s.now.Add(workflowRunTimeout), } persistenceMutableState := s.createPersistenceMutableState(mutableState, completionEvent.GetEventId(), completionEvent.GetVersion()) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockShard.SetCurrentTime(s.clusterName, s.now) - s.timeSource.Advance(workflowRunTimeout + 1*time.Second) + s.timeSource.Update(s.now.Add(-time.Second)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(-time.Second)) resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(2*workflowRunTimeout)) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + + s.timeSource.Update(s.now.Add(s.fetchHistoryDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(s.discardDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr) @@ -1121,6 +1148,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout workflowType := "some random workflow type" taskQueueName := "some random task queue" + workflowRunTimeout := 200 * time.Second + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) startedEvent, err := mutableState.AddWorkflowExecutionStartedEventWithOptions( execution, @@ -1130,7 +1159,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout StartRequest: &workflowservice.StartWorkflowExecutionRequest{ WorkflowType: &commonpb.WorkflowType{Name: workflowType}, TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, - WorkflowRunTimeout: durationpb.New(200 * time.Second), + WorkflowRunTimeout: durationpb.New(workflowRunTimeout), WorkflowTaskTimeout: durationpb.New(1 * time.Second), }, }, @@ -1147,7 +1176,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout NamespaceID: s.namespaceID.String(), WorkflowID: execution.GetWorkflowId(), FirstRunID: firstRunID, - VisibilityTimestamp: s.now, + VisibilityTimestamp: s.now.Add(workflowRunTimeout), TaskID: s.mustGenerateTaskID(), } @@ -1162,18 +1191,25 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowExecutionTimeout RunID: persistenceExecutionState.RunId, State: persistenceExecutionState.State, Status: persistenceExecutionState.Status, - }, nil).Times(3) + }, nil).Times(4) s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockShard.SetCurrentTime(s.clusterName, s.now) + s.timeSource.Update(s.now.Add(-time.Second)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(-time.Second)) resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) - s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) + s.timeSource.Update(s.now.Add(2 * workflowRunTimeout)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(2*workflowRunTimeout)) + resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(s.fetchHistoryDuration)) + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.fetchHistoryDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskRetry, resp.ExecutionErr) + s.timeSource.Update(s.now.Add(s.discardDuration)) s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(s.discardDuration)) resp = s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) s.Equal(consts.ErrTaskDiscarded, resp.ExecutionErr) diff --git a/service/history/timer_queue_task_executor_base.go b/service/history/timer_queue_task_executor_base.go index b5fb3811082..5c79dc47f82 100644 --- a/service/history/timer_queue_task_executor_base.go +++ b/service/history/timer_queue_task_executor_base.go @@ -173,27 +173,29 @@ func (t *timerQueueTaskExecutorBase) deleteHistoryBranch( func (t *timerQueueTaskExecutorBase) isValidExpirationTime( mutableState workflow.MutableState, + task tasks.Task, expirationTime *timestamppb.Timestamp, ) bool { if !mutableState.IsWorkflowExecutionRunning() { return false } - now := t.shardContext.GetTimeSource().Now() + now := t.Now() taskShouldTriggerAt := expirationTime.AsTime() - expired := queues.IsTimeExpired(now, taskShouldTriggerAt) + expired := queues.IsTimeExpired(task, now, taskShouldTriggerAt) return expired } func (t *timerQueueTaskExecutorBase) isValidWorkflowRunTimeoutTask( mutableState workflow.MutableState, + task *tasks.WorkflowRunTimeoutTask, ) bool { executionInfo := mutableState.GetExecutionInfo() // Check if workflow execution timeout is not expired // This can happen if the workflow is reset but old timer task is still fired. - return t.isValidExpirationTime(mutableState, executionInfo.WorkflowRunExpirationTime) + return t.isValidExpirationTime(mutableState, task, executionInfo.WorkflowRunExpirationTime) } func (t *timerQueueTaskExecutorBase) isValidWorkflowExecutionTimeoutTask( @@ -210,7 +212,7 @@ func (t *timerQueueTaskExecutorBase) isValidWorkflowExecutionTimeoutTask( // Check if workflow execution timeout is not expired // This can happen if the workflow is reset since reset re-calculates // the execution timeout but shares the same firstRunID as the base run - return t.isValidExpirationTime(mutableState, executionInfo.WorkflowExecutionExpirationTime) + return t.isValidExpirationTime(mutableState, task, executionInfo.WorkflowExecutionExpirationTime) // NOTE: we don't need to do version check here because if we were to do it, we need to compare the task version // and the start version in the first run. However, failover & conflict resolution will never change @@ -269,6 +271,7 @@ func (t *timerQueueTaskExecutorBase) executeStateMachineTimers( ctx context.Context, workflowContext workflow.Context, ms workflow.MutableState, + task *tasks.StateMachineTimerTask, execute func(node *hsm.Node, task hsm.Task) error, ) (int, error) { @@ -285,7 +288,7 @@ func (t *timerQueueTaskExecutorBase) executeStateMachineTimers( // StateMachineTimers are sorted by Deadline, iterate through them as long as the deadline is expired. for len(timers) > 0 { group := timers[0] - if !queues.IsTimeExpired(t.Now(), group.Deadline.AsTime()) { + if !queues.IsTimeExpired(task, t.Now(), group.Deadline.AsTime()) { break } diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 4deeb55bdc6..b42a8205a9e 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -556,6 +556,9 @@ func (t *transferQueueStandbyTaskExecutor) pushWorkflowTask( ) } +// TODO: deprecate this function and always use t.Now() +// Only test code sets t.clusterName to be non-current cluster name +// and advance the time by setting calling shardContext.SetCurrentTime. func (t *transferQueueStandbyTaskExecutor) getCurrentTime() time.Time { return t.shardContext.GetCurrentTime(t.clusterName) } diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index 89c4275dc87..3d196b4f7fd 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -76,6 +76,10 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +// TODO: remove all SetCurrentTime usage in this test suite +// after clusterName & getCurrentTime() method are deprecated +// from transferQueueStandbyTaskExecutor + type ( transferQueueStandbyTaskExecutorSuite struct { suite.Suite diff --git a/service/history/workflow/cache/export_testing.go b/service/history/workflow/cache/export_testing.go index c93270de337..d0ef44deec6 100644 --- a/service/history/workflow/cache/export_testing.go +++ b/service/history/workflow/cache/export_testing.go @@ -31,7 +31,7 @@ import ( // GetMutableState returns the MutableState for the given key from the cache. // Exported for testing purposes. func GetMutableState(cache Cache, key Key) workflow.MutableState { - return cache.(*cacheImpl).Get(key).(*cacheItem).wfContext.(*workflow.ContextImpl).MutableState + return getWorkflowContext(cache, key).(*workflow.ContextImpl).MutableState } // PutContextIfNotExist puts the given workflow Context into the cache, if it doens't already exist. @@ -40,3 +40,13 @@ func PutContextIfNotExist(cache Cache, key Key, value workflow.Context) error { _, err := cache.(*cacheImpl).PutIfNotExist(key, &cacheItem{wfContext: value}) return err } + +// ClearMutableState clears cached mutable state for the given key to +// force a reload from persistence on the next access. +func ClearMutableState(cache Cache, key Key) { + getWorkflowContext(cache, key).Clear() +} + +func getWorkflowContext(cache Cache, key Key) workflow.Context { + return cache.(*cacheImpl).Get(key).(*cacheItem).wfContext +}