Skip to content

Commit a28e4f2

Browse files
committed
refactor: ensure tasks reach terminal state
1 parent 2ff8021 commit a28e4f2

22 files changed

Lines changed: 296 additions & 187 deletions

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ docker/fdb.cluster
2929

3030
# coverage
3131
merged.info
32-
coverage/
32+
coverage/
33+
.store/

DEVELOPMENT.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ clj -A:dev:doc:cljs
3636
# Tests
3737

3838
```shell
39-
bin/kaocha unit
40-
bin/kaocha unit-cljs
39+
bin/kaocha test
40+
bin/kaocha test-cljs
4141

4242
;; or run everything
4343
bin/run-coverage

bin/kaocha

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
[ -d "node_modules/ws" ] || npm install ws
44

5+
#JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 -Dio.netty.tryUnsafe=false"
56
JAVA_OPTS="-Dio.netty.tryUnsafe=false"
67
if [[ "$OSTYPE" == "darwin"* ]]; then
78
JAVA_OPTS="-Dio.netty.tryUnsafe=false -DFDB_LIBRARY_PATH_FDB_C=/usr/local/lib/libfdb_c.dylib -DFDB_LIBRARY_PATH_FDB_JAVA=/usr/local/lib/libfdb_java.jnilib"
89
fi
910

10-
1111
# bin/kaocha unit-cljs --reporter documentation --focus cljs:intemporal.internal-failures-test --no-capture-output
1212
JAVA_OPTS="$JAVA_OPTS" clojure -A:dev:test:jdbc:fdb -M:test "$@"

bin/run-coverage

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@ set -e
55
BLUE='\033[0;34m'
66
NC='\033[0m' # No Color
77

8-
98
rm -rf coverage
109
mkdir -p coverage
1110

11+
trap 'kill 0' SIGINT
12+
1213
printf "${BLUE}running clj tests...${NC}\n"
13-
CLOVERAGE_OUTPUT=target/coverage-clj bin/kaocha unit --reporter documentation
14+
CLOVERAGE_OUTPUT=target/coverage-clj bin/kaocha test --reporter documentation &
1415

1516
printf "${BLUE}running cljs tests...${NC}\n"
16-
CLOVERAGE_OUTPUT=target/coverage-cljs bin/kaocha unit-cljs --reporter documentation
17+
CLOVERAGE_OUTPUT=target/coverage-cljs bin/kaocha test-cljs --reporter documentation &
18+
19+
wait
1720

1821
printf "${BLUE}combining coverage reports...${NC}\n"
1922
lcov --add-tracefile target/coverage-clj/lcov.info --add-tracefile target/coverage-cljs/lcov.info -o coverage/lcov.merged.info

docker/fdb.cluster

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
docker:docker@172.19.0.2:4500
1+
docker:docker@172.19.0.3:4500

shadow-cljs.edn

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
:automata {:init-fn intemporal.automata/init
1414
:depends-on #{:shared}}}}
1515

16+
:node {:target :node-test
17+
:output-to "target/tests.js"
18+
:ns-regexp "-test$"
19+
:compiler-options {:source-map "true"}
20+
:autorun true}
21+
1622
:dev {:target :browser
1723
:asset-path "."
1824
:output-dir "target"

src/intemporal/error.cljc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
(ns intemporal.error)
2+
3+
(defn internal-error? [ex]
4+
(when-let [t (-> ex ex-data ::type)]
5+
(or (= :internal t)
6+
(= :panic t))))
7+
8+
(defn panic? [ex]
9+
(and (instance? #?(:clj Exception :cljs js/Error) ex)
10+
(= :panic (-> ex ex-data ::type))))
11+
12+
(defn internal-error [msg data]
13+
(ex-info msg (merge data {::type :internal})))
14+
15+
(defn panic [msg]
16+
(ex-info msg {::type :panic}))

src/intemporal/macros.cljc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
id# (or (:id i/*env*) (i/random-id))
7575
fvar# #'~wname
7676
task# (i/create-workflow-task ref# root# (symbol fvar#) (macros/case :cljs fvar# :clj (var-get fvar#)) ~argv id#)]
77-
(t/log! {:level :trace :_data {:env i/*env* :task task#}} ["Invoking task with id" (:id task#)])
77+
(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id" (:id task#)])
7878
(w/enqueue-and-wait i/*env* task#))))))
7979

8080
(defmacro stub-function
@@ -93,7 +93,7 @@
9393
ref# nil ;; no enqueued task => no ref
9494
task# (i/create-activity-task ref# root# (symbol fvar#) (macros/case :cljs fvar# :clj (var-get fvar#)) argv# id#)]
9595
;; an embedded workflow engine doesn't need to have a task per invocation
96-
(t/log! {:level :trace :_data {:env i/*env* :task task#}} ["Invoking task"])
96+
(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id " id#])
9797
(let [res# (i/resume-task i/*env* store# protos# task#)]
9898
(macros/case
9999
:cljs res#
@@ -148,7 +148,7 @@
148148
[~@args]
149149
id#)]
150150

151-
(t/log! {:level :trace :_data {:env i/*env* :task task#}} ["Invoking task"])
151+
(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id" id#])
152152
(i/resume-task i/*env* store# protos# task#))))))))
153153
;(w/enqueue-and-wait i/*env* task#))))))))
154154

@@ -191,7 +191,7 @@
191191
[~@args]
192192
id#)]
193193

194-
(t/log! {:level :trace :_data {:env i/*env* :task task#}} ["Invoking task"])
194+
(t/log! {:level :debug :_data {:env i/*env* :task task#}} ["Invoking task with id" id#])
195195
@(i/resume-task i/*env* store# protos# task#)))))))))
196196
;(w/enqueue-and-wait i/*env* task#)))))))))
197197

src/intemporal/store.cljc

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
(ns intemporal.store
22
(:require [clojure.tools.reader.edn :as edn]
3-
[intemporal.store.internal :refer [validate-task validate-event]]
3+
[intemporal.store.internal :as si]
44
[promesa.core :as p]
55
[taoensso.telemere :as t]
66
#?(:clj [clojure.java.io :as io]))
@@ -13,6 +13,8 @@
1313

1414
(defprotocol TaskStore
1515
(list-tasks [this] "Lists all tasks")
16+
(task<-panic [this task-id error]
17+
"Terminates the task via panic; events should not be stored")
1618
(task<-event [this task-id event-descr]
1719
"Transitions the task. The task should be dequeued beforehand. Returns the event.
1820
`event-descr` is one of:
@@ -122,7 +124,7 @@
122124
(when-let [w (find-task this id)]
123125
(maybe-fail!)
124126
(->> (apply assoc w kvs)
125-
(validate-task)
127+
(si/validate-task)
126128
(swap! tasks assoc id))))]
127129

128130
;; deser the db
@@ -156,7 +158,7 @@
156158
(apply concat (vals @history)))
157159
(save-event [this task-id event]
158160
(let [evt+id (assoc event :id (swap! counter inc))]
159-
(validate-event evt+id)
161+
(si/validate-event evt+id)
160162
(swap! history (fn [v]
161163
(assoc v task-id (-> (or (get v task-id) [])
162164
(conj evt+id)))))
@@ -171,6 +173,9 @@
171173
(list-tasks [this]
172174
(vals @tasks))
173175

176+
(task<-panic [this task-id error]
177+
(update-task this task-id :result error))
178+
174179
(task<-event [this task-id {:keys [id ref root type sym args result error] :as event-descr}]
175180
;; some redundancy between :result in task and event
176181
;; note that we save the event first, because update-task can trigger some watchers
@@ -221,25 +226,22 @@
221226
(await-task this id {:timeout-ms default-lease}))
222227

223228
(await-task [this id {:keys [timeout-ms] :as opts}]
224-
;; TODO handle internal errors? use a separate state?
225229
(maybe-fail!)
230+
;; TODO use owner
226231
(let [task (find-task this id)
227232
deferred (p/deferred)
228-
completed? (fn [{:keys [state]}]
229-
(or (= :success state)
230-
(= :failure state)))
231-
wrap-result (fn [{:keys [state result] :as task}]
233+
wrap-result (fn [{:keys [result] :as task}]
232234
(cond
233-
(= :success state) (p/resolved result)
234-
(= :failure state) (p/rejected result)
235+
(si/success? task) (p/resolved result)
236+
(si/failure? task) (p/rejected result)
235237
:else (p/rejected (ex-info "Unknown state" {:task task}))))]
236238

237-
(if (completed? task)
239+
(if (si/terminal? task)
238240
(wrap-result task)
239241
;;else
240242
(do
241-
(watch-task this id (fn [{:keys [state] :as task}]
242-
(when (#{:success :failure} state)
243+
(watch-task this id (fn [task]
244+
(when (si/terminal? task)
243245
(p/resolve! deferred task))))
244246
;; wait for resolution
245247
;; remember: js doesnt have blocking op so we need to chain
@@ -267,7 +269,7 @@
267269
(enqueue-task [this task]
268270
;; TODO use owner
269271
(maybe-fail!)
270-
(validate-task task)
272+
(si/validate-task task)
271273
(swap! tasks assoc
272274
(:id task) (assoc task :order (swap! tcounter inc)))
273275
#?(:cljs (register this (:sym task) (:fvar task)))

src/intemporal/store/foundationdb.clj

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(ns intemporal.store.foundationdb
22
(:require [intemporal.store :as store]
33
[intemporal.workflow.internal :as i]
4-
[intemporal.store.internal :refer [resolve-fvar serialize deserialize serializable? next-id validate-task validate-event]]
4+
[intemporal.store.internal :as si :refer [resolve-fvar serialize deserialize serializable? next-id validate-task validate-event]]
55
[me.vedang.clj-fdb.FDB :as cfdb]
66
[me.vedang.clj-fdb.core :as fc]
77
[me.vedang.clj-fdb.transaction :as ftr]
@@ -76,6 +76,14 @@
7676
(fc/get-range tx subspace-tasks {:valfn (comp resolve-fvar deserialize)}))
7777
(vals)))
7878

79+
(task<-panic [this task-id error]
80+
(with-tx [tx (open-db)]
81+
(let [task (fc/get tx subspace-tasks task-id {:valfn (comp resolve-fvar deserialize)})
82+
updated-task (assoc task :result error)]
83+
(when task
84+
(validate-task updated-task)
85+
(fc/set tx subspace-tasks task-id (serialize updated-task))))))
86+
7987
(task<-event [this task-id {:keys [id ref root type sym args result error] :as event-descr}]
8088
;; some redundancy between :result in task and event
8189
;; note that we save the event first, because update-task can trigger some watchers
@@ -123,25 +131,20 @@
123131

124132
(await-task [this id {:keys [timeout-ms] :as opts}]
125133
;; TODO use owner
126-
(let [task (with-tx [tx (open-db)]
127-
(fc/get tx subspace-tasks [id]))
134+
(let [task (store/find-task this id)
128135
deferred (p/deferred)
129-
completed? (fn [{:keys [state]}]
130-
(or (= :success state)
131-
(= :failure state)))
132136
wrap-result (fn [{:keys [state result] :as task}]
133137
(cond
134-
(= :success state) (p/resolved result)
135-
(= :failure state) (p/rejected result)
136-
;; TODO throw internal error
138+
(si/success? task) (p/resolved result)
139+
(si/failure? task) (p/rejected result)
137140
:else (p/rejected (ex-info "Unknown state" {:task task}))))]
138141

139-
(if (completed? task)
142+
(if (si/terminal? task)
140143
(wrap-result task)
141144
;;else
142145
(do
143146
(store/watch-task this id (fn [{:keys [state] :as task}]
144-
(when (#{:success :failure} state)
147+
(when (si/terminal? task)
145148
(p/resolve! deferred task))))
146149
;; wait for resolution
147150
(-> (p/timeout deferred timeout-ms ::timeout)

0 commit comments

Comments
 (0)