@@ -16,6 +16,8 @@ internal class CancellationTokenSourceHolder : ICancellationNotifier.ICancellati
1616
1717 public CancellationToken CancellationToken { get ; }
1818
19+ public bool IsCancellationRequestedByNotifier => _notifier . IsCancellationRequested ( _taskId ) ;
20+
1921 public CancellationTokenSourceHolder ( CancellationToken cancellationToken , string taskId , KafkaCancellationNotifier notifier )
2022 {
2123 CancellationToken = cancellationToken ;
@@ -26,10 +28,21 @@ public CancellationTokenSourceHolder(CancellationToken cancellationToken, string
2628 public void Dispose ( ) => _notifier . ClearTaskCts ( _taskId ) ;
2729 }
2830
31+ private class TaskCancellationInfo
32+ {
33+ public TaskCancellationInfo ( CancellationTokenSource cancellationTokenSource )
34+ {
35+ CancellationTokenSource = cancellationTokenSource ;
36+ }
37+
38+ public CancellationTokenSource CancellationTokenSource { get ; }
39+ public bool IsCancellationRequested { get ; set ; }
40+ }
41+
2942 private readonly HashSet < string > _tasks ;
3043 private readonly object _lock = new ( ) ;
3144 private readonly ILogger < KafkaCancellationNotifier > _logger ;
32- private readonly Dictionary < string , CancellationTokenSource > _taskIdToCtsMap = new ( ) ;
45+ private readonly Dictionary < string , TaskCancellationInfo > _taskIdToInfoMap = new ( ) ;
3346
3447 public KafkaCancellationNotifier ( IEnumerable < TaskToWorker > tasks , ILogger < KafkaCancellationNotifier > logger )
3548 {
@@ -39,8 +52,8 @@ public KafkaCancellationNotifier(IEnumerable<TaskToWorker> tasks, ILogger<KafkaC
3952
4053 public ICancellationNotifier . ICancellationTokenHolder GetCancellationToken ( string taskId , CancellationToken engineCancellationToken )
4154 {
42- var cts = CreateCts ( taskId , engineCancellationToken ) ;
43- return new CancellationTokenSourceHolder ( cts . Token , taskId , this ) ;
55+ var token = CreateTaskCancellationInfoAndGetToken ( taskId , engineCancellationToken ) ;
56+ return new CancellationTokenSourceHolder ( token , taskId , this ) ;
4457 }
4558
4659 public void HandleKafkaEvent ( TaskStatusModel taskStatusModel )
@@ -52,56 +65,65 @@ public void HandleKafkaEvent(TaskStatusModel taskStatusModel)
5265 )
5366 return ;
5467
55- var cts = GetCts ( taskStatusModel . TaskId ) ;
56- if ( cts is null )
68+ TryToCancelTask ( taskStatusModel ) ;
69+ }
70+
71+ private CancellationToken CreateTaskCancellationInfoAndGetToken ( string taskId , CancellationToken engineCancellationToken = default )
72+ {
73+ CancellationToken token ;
74+
75+ lock ( _lock )
5776 {
58- _logger . LogWarning (
59- "Unable to cancel task {TaskId} of workflow {WorkflowId}" ,
60- taskStatusModel . TaskId ,
61- taskStatusModel . WorkflowInstanceId
77+ var info = _taskIdToInfoMap [ taskId ] = new TaskCancellationInfo (
78+ CancellationTokenSource . CreateLinkedTokenSource ( engineCancellationToken )
6279 ) ;
63- return ;
80+ token = info . CancellationTokenSource . Token ;
6481 }
6582
66- cts . Cancel ( ) ;
83+ return token ;
6784 }
6885
69- private CancellationTokenSource CreateCts ( string taskId , CancellationToken engineCancellationToken = default )
86+ private void TryToCancelTask ( TaskStatusModel taskStatusModel )
7087 {
71- CancellationTokenSource cts ;
72- var stopwatch = Stopwatch . StartNew ( ) ;
88+ TaskCancellationInfo ? info ;
7389
7490 lock ( _lock )
7591 {
76- cts = _taskIdToCtsMap [ taskId ] = CancellationTokenSource . CreateLinkedTokenSource ( engineCancellationToken ) ;
92+ info = _taskIdToInfoMap . GetValueOrDefault ( taskStatusModel . TaskId ) ;
7793 }
78- _logger . LogDebug ( "CancellationTokenSource creation time {ElapsedMs}ms" , stopwatch . ElapsedMilliseconds ) ;
7994
80- return cts ;
95+ if ( info is null )
96+ {
97+ _logger . LogWarning (
98+ "Unable to cancel task {TaskId} of workflow {WorkflowId}" ,
99+ taskStatusModel . TaskId ,
100+ taskStatusModel . WorkflowInstanceId
101+ ) ;
102+ return ;
103+ }
104+
105+ lock ( _lock )
106+ {
107+ info . IsCancellationRequested = true ;
108+ info . CancellationTokenSource . Cancel ( ) ;
109+ }
81110 }
82111
83- private CancellationTokenSource ? GetCts ( string taskId )
112+ private void ClearTaskCts ( string taskId )
84113 {
85- CancellationTokenSource ? cts ;
86- var stopwatch = Stopwatch . StartNew ( ) ;
87-
88114 lock ( _lock )
89115 {
90- cts = _taskIdToCtsMap . GetValueOrDefault ( taskId ) ;
116+ _taskIdToInfoMap [ taskId ] . CancellationTokenSource . Dispose ( ) ;
117+ _taskIdToInfoMap . Remove ( taskId ) ;
91118 }
92- _logger . LogDebug ( "CancellationTokenSource get time {ElapsedMs}ms" , stopwatch . ElapsedMilliseconds ) ;
93-
94- return cts ;
95119 }
96120
97- private void ClearTaskCts ( string taskId )
121+ private bool IsCancellationRequested ( string taskId )
98122 {
99- var stopwatch = Stopwatch . StartNew ( ) ;
100123 lock ( _lock )
101124 {
102- _taskIdToCtsMap . Remove ( taskId ) ;
125+ return _taskIdToInfoMap [ taskId ] . IsCancellationRequested ;
103126 }
104- _logger . LogDebug ( "CancellationTokenSource removal time {ElapsedMs}ms" , stopwatch . ElapsedMilliseconds ) ;
105127 }
106128 }
107129}
0 commit comments