Skip to content

Commit dd0297a

Browse files
feiyang3catclaude
andcommitted
persistence: regenerate executions.pb.go with to_time api-linter comment
add timeSkippingEvent to workflow rebuilder fix bugs of taskRefersher, and mutable virtual time The proto source had the api-linter suppression comment for to_time but the generated .pb.go was not updated to reflect it. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent b959b31 commit dd0297a

7 files changed

Lines changed: 232 additions & 14 deletions

File tree

api/persistence/v1/executions.pb.go

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/mutable_state_impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ func NewMutableStateFromDB(
502502
mutableState.approximateSize += dbRecord.ExecutionInfo.Size() - mutableState.executionInfo.Size()
503503
mutableState.executionInfo = dbRecord.ExecutionInfo
504504

505-
if mutableState.executionInfo.GetTimeSkippingInfo().GetEnabled() {
505+
if mutableState.executionInfo.GetTimeSkippingInfo() != nil {
506506
mutableState.timeSource = clock.NewTimeSkippingTimeSource(
507507
mutableState.timeSource,
508508
mutableState.executionInfo.TimeSkippingInfo.TimeSkippedDetails,

service/history/workflow/mutable_state_impl_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6242,3 +6242,26 @@ func (s *mutableStateSuite) TestTimeSkippedDurationRoundtrip() {
62426242
s.Equal(d, clock.TimeSkippedDurationFromTimestamp(clock.TimeSkippedDurationToTimestamp(d)), "duration: %v", d)
62436243
}
62446244
}
6245+
6246+
func (s *mutableStateSuite) TestNewMutableStateFromDB_TimeSkippingDisabled_VirtualTimeUsed() {
6247+
// Verify that when TimeSkippingInfo exists but Enabled=false, the mutable state still uses
6248+
// a TimeSkippingTimeSource so that virtual time (real time + skipped offset) is returned.
6249+
skipDuration := 2 * time.Hour
6250+
dbState := s.buildWorkflowMutableState()
6251+
dbState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
6252+
Enabled: false,
6253+
TimeSkippedDetails: []*persistencespb.TimeSkippedDetails{
6254+
{
6255+
DurationToSkip: clock.TimeSkippedDurationToTimestamp(skipDuration),
6256+
},
6257+
},
6258+
}
6259+
6260+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
6261+
s.NoError(err)
6262+
6263+
// VirtualTimeNow should be ahead of real time by the accumulated skipped duration.
6264+
realNow := s.mockShard.GetTimeSource().Now()
6265+
virtualNow := ms.VirtualTimeNow()
6266+
s.Equal(skipDuration, virtualNow.Sub(realNow))
6267+
}

service/history/workflow/mutable_state_rebuilder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,10 @@ func (b *MutableStateRebuilderImpl) applyEvents(
672672
if err := b.mutableState.ApplyWorkflowExecutionUnpausedEvent(event); err != nil {
673673
return nil, err
674674
}
675+
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPED:
676+
if err := b.mutableState.ApplyWorkflowExecutionTimeSkippedEvent(ctx, event); err != nil {
677+
return nil, err
678+
}
675679

676680
default:
677681
def, ok := b.shard.StateMachineRegistry().EventDefinition(event.GetEventType())

service/history/workflow/mutable_state_rebuilder_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2159,6 +2159,37 @@ func (s *stateBuilderSuite) TestApplyEvents_HSMRegistry() {
21592159
s.Equal(enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, sm.State())
21602160
}
21612161

2162+
func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimeSkipped() {
2163+
version := int64(1)
2164+
requestID := uuid.NewString()
2165+
execution := &commonpb.WorkflowExecution{
2166+
WorkflowId: "some random workflow ID",
2167+
RunId: tests.RunID,
2168+
}
2169+
2170+
now := time.Now().UTC()
2171+
event := &historypb.HistoryEvent{
2172+
TaskId: rand.Int63(),
2173+
Version: version,
2174+
EventId: 130,
2175+
EventTime: timestamppb.New(now),
2176+
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIME_SKIPPED,
2177+
Attributes: &historypb.HistoryEvent_WorkflowExecutionTimeSkippedEventAttributes{
2178+
WorkflowExecutionTimeSkippedEventAttributes: &historypb.WorkflowExecutionTimeSkippedEventAttributes{
2179+
ToTime: timestamppb.New(now.Add(time.Hour)),
2180+
},
2181+
},
2182+
}
2183+
2184+
s.mockMutableState.EXPECT().ApplyWorkflowExecutionTimeSkippedEvent(gomock.Any(), protomock.Eq(event)).Return(nil)
2185+
s.mockUpdateVersion(event)
2186+
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
2187+
2188+
_, err := s.stateRebuilder.ApplyEvents(context.Background(), tests.NamespaceID, requestID, execution, s.toHistory(event), nil, "")
2189+
s.NoError(err)
2190+
s.Equal(event.TaskId, s.executionInfo.LastRunningClock)
2191+
}
2192+
21622193
func (p *testTaskGeneratorProvider) NewTaskGenerator(
21632194
shardContext historyi.ShardContext,
21642195
mutableState historyi.MutableState,

service/history/workflow/task_refresher.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
enumsspb "go.temporal.io/server/api/enums/v1"
1111
persistencespb "go.temporal.io/server/api/persistence/v1"
1212
"go.temporal.io/server/common"
13+
"go.temporal.io/server/common/clock"
1314
"go.temporal.io/server/common/persistence/transitionhistory"
1415
"go.temporal.io/server/common/primitives/timestamp"
1516
"go.temporal.io/server/service/history/hsm"
@@ -76,7 +77,7 @@ func (r *TaskRefresherImpl) Refresh(
7677
return err
7778
}
7879

79-
r.applyTimeSkippingOffsetToUserTimerTasks(mutableState)
80+
r.applyTimeSkippingOffsetToTimerTasks(mutableState)
8081

8182
if err := mutableState.ChasmTree().RefreshTasks(); err != nil {
8283
return err
@@ -692,13 +693,16 @@ func (r *TaskRefresherImpl) refreshTasksForSubStateMachines(
692693
return nil
693694
}
694695

695-
// applyTimeSkippingOffsetToUserTimerTasks shifts the VisibilityTimestamp of all user timer tasks
696+
// applyTimeSkippingOffsetToTimerTasks shifts the VisibilityTimestamp of all pending timer tasks
696697
// that were just generated by subtracting the total skipped duration. This ensures the timer queue
697698
// fires them at the correct real-clock time relative to the advanced virtual clock.
698699
//
699-
// skippedDuration = latestSkipToTimePoint - realNow
700-
// adjustedFireTime = virtualFireTime - skippedDuration
701-
func (r *TaskRefresherImpl) applyTimeSkippingOffsetToUserTimerTasks(mutableState historyi.MutableState) {
700+
// skippedDuration = sum of all DurationToSkip in TimeSkippedDetails
701+
// adjustedFireTime = max(now, virtualFireTime - skippedDuration)
702+
//
703+
// DeleteHistoryEventTask and TimeSkippingTimerTask are excluded: the former is a retention-based
704+
// cleanup task unrelated to virtual time; the latter is the time-skipping mechanism itself.
705+
func (r *TaskRefresherImpl) applyTimeSkippingOffsetToTimerTasks(mutableState historyi.MutableState) {
702706
timeSkippingInfo := mutableState.GetExecutionInfo().GetTimeSkippingInfo()
703707
if timeSkippingInfo == nil {
704708
return
@@ -708,9 +712,7 @@ func (r *TaskRefresherImpl) applyTimeSkippingOffsetToUserTimerTasks(mutableState
708712
return
709713
}
710714

711-
latestDetail := details[len(details)-1]
712-
latestTargetVirtualTime := latestDetail.GetToTime().AsTime()
713-
skippedDuration := latestTargetVirtualTime.Sub(r.shard.GetTimeSource().Now())
715+
skippedDuration := clock.ComputeTotalSkippedOffset(details)
714716
if skippedDuration <= 0 {
715717
return
716718
}
@@ -719,9 +721,18 @@ func (r *TaskRefresherImpl) applyTimeSkippingOffsetToUserTimerTasks(mutableState
719721
if !ok {
720722
return
721723
}
724+
now := r.shard.GetTimeSource().Now()
722725
for _, task := range ms.InsertTasks[tasks.CategoryTimer] {
723-
if userTimerTask, ok := task.(*tasks.UserTimerTask); ok {
724-
userTimerTask.VisibilityTimestamp = userTimerTask.VisibilityTimestamp.Add(-skippedDuration)
726+
switch task.GetType() {
727+
case enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT, enumsspb.TASK_TYPE_TIME_SKIPPING:
728+
// not virtual-time based, skip
729+
continue
730+
default:
731+
}
732+
adjusted := task.GetVisibilityTime().Add(-skippedDuration)
733+
if adjusted.Before(now) {
734+
adjusted = now
725735
}
736+
task.SetVisibilityTime(adjusted)
726737
}
727738
}

service/history/workflow/task_refresher_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
historyspb "go.temporal.io/server/api/history/v1"
1515
persistencespb "go.temporal.io/server/api/persistence/v1"
1616
"go.temporal.io/server/common"
17+
"go.temporal.io/server/common/clock"
1718
"go.temporal.io/server/common/cluster"
1819
"go.temporal.io/server/common/log"
1920
"go.temporal.io/server/common/namespace"
@@ -1434,6 +1435,151 @@ func (s *taskRefresherSuite) TestRefreshSubStateMachineTasks() {
14341435
s.False(hsmRoot.Dirty())
14351436
}
14361437

1438+
// buildMutableStateWithTimeSkipping creates a MutableStateImpl with TimeSkippingInfo having
1439+
// the given total skipped duration in a single entry.
1440+
func (s *taskRefresherSuite) buildMutableStateWithTimeSkipping(totalSkip time.Duration, enabled bool) *MutableStateImpl {
1441+
dbRecord := &persistencespb.WorkflowMutableState{
1442+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
1443+
NamespaceId: tests.NamespaceID.String(),
1444+
WorkflowId: tests.WorkflowID,
1445+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
1446+
Enabled: enabled,
1447+
TimeSkippedDetails: []*persistencespb.TimeSkippedDetails{
1448+
{DurationToSkip: clock.TimeSkippedDurationToTimestamp(totalSkip)},
1449+
},
1450+
},
1451+
},
1452+
ExecutionState: &persistencespb.WorkflowExecutionState{
1453+
RunId: tests.RunID,
1454+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
1455+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
1456+
},
1457+
NextEventId: 1,
1458+
}
1459+
ms, err := NewMutableStateFromDB(
1460+
s.mockShard,
1461+
s.mockShard.GetEventsCache(),
1462+
log.NewTestLogger(),
1463+
tests.LocalNamespaceEntry,
1464+
dbRecord,
1465+
1,
1466+
)
1467+
s.NoError(err)
1468+
return ms
1469+
}
1470+
1471+
func (s *taskRefresherSuite) TestApplyTimeSkippingOffset_AdjustsAllTimerTaskTypes() {
1472+
skipDuration := 2 * time.Hour
1473+
ms := s.buildMutableStateWithTimeSkipping(skipDuration, true)
1474+
1475+
now := s.mockShard.GetTimeSource().Now()
1476+
fireTime := now.Add(3 * time.Hour)
1477+
wfKey := ms.GetWorkflowKey()
1478+
1479+
ms.InsertTasks[tasks.CategoryTimer] = []tasks.Task{
1480+
&tasks.UserTimerTask{WorkflowKey: wfKey, VisibilityTimestamp: fireTime},
1481+
&tasks.ActivityTimeoutTask{WorkflowKey: wfKey, VisibilityTimestamp: fireTime},
1482+
&tasks.WorkflowRunTimeoutTask{WorkflowKey: wfKey, VisibilityTimestamp: fireTime},
1483+
&tasks.WorkflowExecutionTimeoutTask{NamespaceID: wfKey.NamespaceID, WorkflowID: wfKey.WorkflowID, VisibilityTimestamp: fireTime},
1484+
}
1485+
1486+
s.taskRefresher.applyTimeSkippingOffsetToTimerTasks(ms)
1487+
1488+
expected := fireTime.Add(-skipDuration) // now + 1h
1489+
for _, task := range ms.InsertTasks[tasks.CategoryTimer] {
1490+
s.Equal(expected, task.GetVisibilityTime(), "task type %v should be adjusted", task.GetType())
1491+
}
1492+
}
1493+
1494+
func (s *taskRefresherSuite) TestApplyTimeSkippingOffset_ExcludesDeleteAndTimeSkippingTasks() {
1495+
skipDuration := 2 * time.Hour
1496+
ms := s.buildMutableStateWithTimeSkipping(skipDuration, true)
1497+
1498+
now := s.mockShard.GetTimeSource().Now()
1499+
fireTime := now.Add(3 * time.Hour)
1500+
wfKey := ms.GetWorkflowKey()
1501+
1502+
ms.InsertTasks[tasks.CategoryTimer] = []tasks.Task{
1503+
&tasks.DeleteHistoryEventTask{WorkflowKey: wfKey, VisibilityTimestamp: fireTime},
1504+
&tasks.TimeSkippingTimerTask{WorkflowKey: wfKey, VisibilityTimestamp: fireTime},
1505+
}
1506+
1507+
s.taskRefresher.applyTimeSkippingOffsetToTimerTasks(ms)
1508+
1509+
for _, task := range ms.InsertTasks[tasks.CategoryTimer] {
1510+
s.Equal(fireTime, task.GetVisibilityTime(), "task type %v should not be adjusted", task.GetType())
1511+
}
1512+
}
1513+
1514+
func (s *taskRefresherSuite) TestApplyTimeSkippingOffset_MultipleSkipEntries_UsesTotalOffset() {
1515+
// Two separate skip entries: 1h + 1h = 2h total. The old bug used
1516+
// latestTargetVirtualTime - realNow instead of summing DurationToSkip.
1517+
dbRecord := &persistencespb.WorkflowMutableState{
1518+
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
1519+
NamespaceId: tests.NamespaceID.String(),
1520+
WorkflowId: tests.WorkflowID,
1521+
TimeSkippingInfo: &persistencespb.TimeSkippingInfo{
1522+
Enabled: true,
1523+
TimeSkippedDetails: []*persistencespb.TimeSkippedDetails{
1524+
{DurationToSkip: clock.TimeSkippedDurationToTimestamp(time.Hour)},
1525+
{DurationToSkip: clock.TimeSkippedDurationToTimestamp(time.Hour)},
1526+
},
1527+
},
1528+
},
1529+
ExecutionState: &persistencespb.WorkflowExecutionState{
1530+
RunId: tests.RunID,
1531+
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
1532+
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
1533+
},
1534+
NextEventId: 1,
1535+
}
1536+
ms, err := NewMutableStateFromDB(s.mockShard, s.mockShard.GetEventsCache(), log.NewTestLogger(), tests.LocalNamespaceEntry, dbRecord, 1)
1537+
s.NoError(err)
1538+
1539+
now := s.mockShard.GetTimeSource().Now()
1540+
fireTime := now.Add(3 * time.Hour)
1541+
ms.InsertTasks[tasks.CategoryTimer] = []tasks.Task{
1542+
&tasks.UserTimerTask{WorkflowKey: ms.GetWorkflowKey(), VisibilityTimestamp: fireTime},
1543+
}
1544+
1545+
s.taskRefresher.applyTimeSkippingOffsetToTimerTasks(ms)
1546+
1547+
// Total offset = 2h, so adjusted = now + 3h - 2h = now + 1h
1548+
s.Equal(now.Add(time.Hour), ms.InsertTasks[tasks.CategoryTimer][0].GetVisibilityTime())
1549+
}
1550+
1551+
func (s *taskRefresherSuite) TestApplyTimeSkippingOffset_ClampsToNow() {
1552+
skipDuration := 5 * time.Hour
1553+
ms := s.buildMutableStateWithTimeSkipping(skipDuration, true)
1554+
1555+
now := s.mockShard.GetTimeSource().Now()
1556+
fireTime := now.Add(3 * time.Hour) // adjusted = now + 3h - 5h = 2h in the past
1557+
ms.InsertTasks[tasks.CategoryTimer] = []tasks.Task{
1558+
&tasks.UserTimerTask{WorkflowKey: ms.GetWorkflowKey(), VisibilityTimestamp: fireTime},
1559+
}
1560+
1561+
s.taskRefresher.applyTimeSkippingOffsetToTimerTasks(ms)
1562+
1563+
s.Equal(now, ms.InsertTasks[tasks.CategoryTimer][0].GetVisibilityTime())
1564+
}
1565+
1566+
func (s *taskRefresherSuite) TestApplyTimeSkippingOffset_DisabledButHasSkips_StillAdjusts() {
1567+
// Even when Enabled=false, accumulated skips must be applied because virtual time
1568+
// has deviated from wall clock time and tasks must fire at the correct real time.
1569+
skipDuration := 2 * time.Hour
1570+
ms := s.buildMutableStateWithTimeSkipping(skipDuration, false)
1571+
1572+
now := s.mockShard.GetTimeSource().Now()
1573+
fireTime := now.Add(3 * time.Hour)
1574+
ms.InsertTasks[tasks.CategoryTimer] = []tasks.Task{
1575+
&tasks.UserTimerTask{WorkflowKey: ms.GetWorkflowKey(), VisibilityTimestamp: fireTime},
1576+
}
1577+
1578+
s.taskRefresher.applyTimeSkippingOffsetToTimerTasks(ms)
1579+
1580+
s.Equal(fireTime.Add(-skipDuration), ms.InsertTasks[tasks.CategoryTimer][0].GetVisibilityTime())
1581+
}
1582+
14371583
type mockTaskGeneratorProvider struct {
14381584
mockTaskGenerator *MockTaskGenerator
14391585
}

0 commit comments

Comments
 (0)