Skip to content

Commit f5c20ff

Browse files
committed
fixup! add test
1 parent e0c8922 commit f5c20ff

9 files changed

Lines changed: 224 additions & 154 deletions

File tree

src/intemporal/core.clj

Lines changed: 71 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,54 +26,60 @@
2626
effective-timeout (or timeout-ms (:timeout-ms activity-info))
2727
effective-retry (or retry-policy (:retry-policy activity-info))]
2828
(fn [& args]
29-
(log/with-mdc {:activity activity-name}
30-
31-
(ctx/check-cancelled!)
32-
(let [ctx (ctx/current-context)
33-
seq-num (ctx/next-seq!)
34-
store (ctx/current-store)
35-
workflow-id (ctx/current-workflow-id)
36-
existing (p/find-event store workflow-id :activity-completed seq-num)
37-
existing-failed (p/find-event store workflow-id :activity-failed seq-num)
38-
err (some-> (:error existing-failed) (error/map->exception))
39-
interrupted? (boolean (some-> err (error/interruption?)))]
40-
(cond
41-
;; Replay: return cached result
42-
existing
43-
(do
44-
(log/infof "Found existing result for activity")
45-
(:result existing))
46-
47-
;; Replay: throw cached error
48-
;; TODO decide how to handle interruptions, interrupt policy?
49-
(and existing-failed #_(not interrupted?))
50-
(do
51-
(log/infof "Found existing error for activity")
52-
(throw err))
53-
54-
;; Execute: need to run the activity
55-
:else
56-
(let [scheduled-event {:event-type :activity-scheduled
57-
:seq seq-num
58-
:activity-name activity-name
59-
:args (vec args)
60-
:timeout-ms effective-timeout
61-
:retry-policy (when effective-retry
62-
{:max-attempts (:max-attempts effective-retry)
63-
:backoff-ms (:backoff-ms effective-retry)})
64-
:timestamp (System/currentTimeMillis)}]
65-
(when interrupted?
66-
(log/infof "Activity was forcefully interrupted"))
67-
68-
(ctx/add-pending-event! scheduled-event)
69-
(ctx/notify-observer p/on-activity-scheduled
70-
(:workflow-id ctx) seq-num activity-name (vec args))
71-
(log/infof "Scheduling activity with sequence number %d and suspending" seq-num)
72-
(throw (error/make-suspension :activity {:seq seq-num
73-
:activity-name activity-name
74-
:args (vec args)
75-
:timeout-ms effective-timeout
76-
:retry-policy effective-retry})))))))))
29+
(let [seq-num (ctx/next-seq!)]
30+
(log/with-mdc {:activity activity-name :seqnum seq-num}
31+
32+
(ctx/check-cancelled!)
33+
(let [ctx (ctx/current-context)
34+
store (ctx/current-store)
35+
workflow-id (ctx/current-workflow-id)
36+
existing (p/find-event store workflow-id :activity-completed seq-num)
37+
existing-failed (p/find-event store workflow-id :activity-failed seq-num)
38+
err (some-> (:error existing-failed) (error/map->exception))
39+
interrupted? (boolean (some-> err (error/interruption?)))
40+
rejected? (boolean (some-> err (error/rejection?)))]
41+
(cond
42+
;; Replay: return cached result
43+
existing
44+
(do
45+
(log/infof "Found existing result for activity: %s" (pr-str (:result existing)))
46+
(:result existing))
47+
48+
;; Replay: throw cached error
49+
;; TODO decide how to handle interruptions, interrupt policy?
50+
(and existing-failed (not interrupted?) (not rejected?))
51+
(do
52+
(log/infof "Found existing error for activity")
53+
(throw err))
54+
55+
;; Execute: need to run the activity
56+
;; either due to rejection or interruption
57+
:else
58+
(let [scheduled-event {:event-type :activity-scheduled
59+
:seq seq-num
60+
:activity-name activity-name
61+
:args (vec args)
62+
:timeout-ms effective-timeout
63+
:retry-policy (when effective-retry
64+
{:max-attempts (:max-attempts effective-retry)
65+
:backoff-ms (:backoff-ms effective-retry)})
66+
:timestamp (System/currentTimeMillis)}]
67+
;; interruptions are scheduled just the same
68+
(when interrupted?
69+
(log/infof "Activity was interrupted: rescheduling"))
70+
;; rejections are scheduled just the same
71+
(when rejected?
72+
(log/infof "Activity execution was rejected: rescheduling"))
73+
74+
(ctx/add-pending-event! scheduled-event)
75+
(ctx/notify-observer p/on-activity-scheduled
76+
(:workflow-id ctx) seq-num activity-name (vec args))
77+
(log/infof "Scheduling activity suspension")
78+
(throw (error/make-suspension :activity {:seq seq-num
79+
:activity-name activity-name
80+
:args (vec args)
81+
:timeout-ms effective-timeout
82+
:retry-policy effective-retry}))))))))))
7783

7884
;; ============================================================================
7985
;; Async Support
@@ -92,28 +98,31 @@
9298
workflow-id (ctx/current-workflow-id)
9399
existing-completed (p/find-event store workflow-id :async-completed seq-num)
94100
existing-failed (p/find-event store workflow-id :async-failed seq-num)
95-
existing-started (p/find-event store workflow-id :async-started seq-num)]
101+
existing-started (p/find-event store workflow-id :async-started seq-num)
102+
err (some-> (:error existing-failed) (error/map->exception))
103+
interrupted? (boolean (some-> err (error/interruption?)))]
96104
(cond
97105
;; Already completed - advance seq past consumed numbers during replay
98106
existing-completed
99107
(do
100108
;; Advance seq counter to skip past all seqs consumed by this async
101109
(ctx/update-seq! existing-completed)
102-
(log/tracef "Async already succeeded with sequence number %d, advancing sequence number" seq-num)
110+
(log/tracef "Async already succeeded advancing sequence number")
103111
(->AsyncHandle seq-num))
104112

105113
;; Already failed - advance seq past consumed numbers during replay
106-
existing-failed
114+
;; TODO decide how to handle interruptions, interrupt policy?
115+
existing-failed #_(not interrupted?)
107116
(do
108117
(ctx/update-seq! existing-failed)
109-
(log/infof "Async already failed with sequence number %d, advancing sequence number" seq-num)
118+
(log/infof "Async already failed advancing sequence number")
110119
(->AsyncHandle seq-num))
111120

112121
;; Already started but not completed - return handle (will block on join)
113122
;; During replay, don't re-execute the thunk - just wait for completion event
114123
existing-started
115124
(do
116-
(log/infof "Async already started with sequence number %d" seq-num)
125+
(log/infof "Async already started")
117126
(->AsyncHandle seq-num))
118127

119128
;; Need to start - record and try to capture what activity it needs
@@ -126,7 +135,7 @@
126135
(ctx/notify-observer p/on-async-started (:workflow-id ctx) seq-num)
127136
;; Try to execute the thunk to see what activity it wants
128137
(try
129-
(log/tracef "Invoking Async thunk with sequence number %d" seq-num)
138+
(log/tracef "Invoking Async thunk")
130139
(let [result (thunk)
131140
;; Capture the last seq number after thunk execution
132141
end-seq (dec @(:seq-counter (ctx/current-context)))]
@@ -138,14 +147,14 @@
138147
:result result
139148
:timestamp (System/currentTimeMillis)})
140149
(ctx/notify-observer p/on-async-completed (:workflow-id ctx) start-seq result)
141-
(log/tracef "Async completed successfully with sequence number %d and result %s" seq-num result)
150+
(log/tracef "Async completed successfully with result %s" result)
142151
(->AsyncHandle start-seq))
143152
(catch Throwable e
144153
(if (error/suspension? e)
145154
;; The thunk suspended on an activity - capture it for parallel execution
146155
(let [suspension-info (error/suspension-data e)
147156
activity-name (:activity-name suspension-info)]
148-
(log/tracef "Async suspended with sequence number %d for activity %s" seq-num activity-name)
157+
(log/tracef "Async suspended activity %s" activity-name)
149158
(ctx/add-pending-async! {:handle-seq start-seq
150159
:activity-name (:activity-name suspension-info)
151160
:activity-seq (:seq suspension-info)
@@ -156,7 +165,7 @@
156165
(->AsyncHandle start-seq))
157166
;; else
158167
(do
159-
(log/tracef e "Async failed with sequence number %d" seq-num)
168+
(log/tracef e "Async failed")
160169
(throw e)))))))))
161170

162171
(defn join
@@ -300,12 +309,15 @@
300309
store (ctx/current-store)
301310
workflow-id (ctx/current-workflow-id)
302311
existing (p/find-event store workflow-id :child-workflow-completed seq-num)
303-
existing-failed (p/find-event store workflow-id :child-workflow-failed seq-num)]
312+
existing-failed (p/find-event store workflow-id :child-workflow-failed seq-num)
313+
err (some-> (:error existing-failed) (error/map->exception))
314+
interrupted? (boolean (some-> err (error/interruption?)))]
304315
(cond
305316
existing
306317
(:result existing)
307318

308-
existing-failed
319+
;; TODO decide how to handle interruptions, interrupt policy?
320+
existing-failed #_(not interrupted?)
309321
(throw (error/map->exception (:error existing-failed)))
310322

311323
:else

src/intemporal/internal/error.clj

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
(and (instance? IExceptionInfo e)
3131
(::activity-interrupted (ex-data e))))
3232

33+
(defn rejection? [e]
34+
(and (instance? IExceptionInfo e)
35+
(::rejected (ex-data e))))
36+
3337
(defn suspension-type [e]
3438
(-> e ex-data :type))
3539

@@ -44,6 +48,13 @@
4448
(instance? IExceptionInfo e)
4549
(::cancelled (ex-data e))))
4650

51+
(defn activity-rejected-exception [activity-name cause]
52+
(ex-info "Execution rejected"
53+
{::rejected true
54+
:cause cause
55+
:activity-name activity-name}))
56+
57+
4758
(defn activity-timeout-exception [activity-name timeout-ms]
4859
(ex-info "Activity timed out"
4960
{::activity-timeout true
@@ -53,8 +64,8 @@
5364
(defn activity-interrupted-exception [activity-name cause]
5465
(ex-info "Activity interrupted"
5566
{::activity-interrupted true
56-
:cause cause
57-
:activity-name activity-name}))
67+
:cause cause
68+
:activity-name activity-name}))
5869

5970
(defn activity-failed-exception [activity-name cause]
6071
(ex-info "Activity failed"

0 commit comments

Comments
 (0)