Skip to content

Commit 2c6e2da

Browse files
committed
intemporal next
1 parent db7f44c commit 2c6e2da

10 files changed

Lines changed: 2473 additions & 0 deletions

File tree

src/detflow/activity.clj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
(ns detflow.activity
2+
(:require [detflow.runtime :as rt]))
3+
4+
(defmacro defactivity [name args & body]
5+
`(swap! rt/activity-registry assoc ~(keyword (str *ns*) (str name))
6+
(fn ~args ~@body)))

src/detflow/common.clj

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
(ns detflow.common
2+
(:import (clojure.lang IExceptionInfo)))
3+
4+
(def ^:private suspend-type ::suspend)
5+
6+
;; The special exception used to pause execution without blocking threads.
7+
(defn suspend!
8+
"Throws a data-carrying exception to pause the workflow."
9+
[wait-id]
10+
(throw (ex-info "Workflow Suspended"
11+
{:type suspend-type
12+
:wait-id wait-id})))
13+
14+
(defn suspend?
15+
"Checks if an exception is a valid workflow suspension."
16+
[e]
17+
(and (instance? clojure.lang.ExceptionInfo e)
18+
(= (:type (ex-data e)) suspend-type)))
19+
20+
(defn get-suspend-id
21+
"Extracts the wait-id from a suspension exception."
22+
[e]
23+
(:wait-id (ex-data e)))
24+
25+
26+
;; A wrapper for internal async task identifiers
27+
(defrecord AsyncHandle [id]
28+
Object
29+
(toString [_]
30+
(str "<AsyncHandle task-id:" id ">")))
31+
32+
(defn async-handle? [obj]
33+
(instance? AsyncHandle obj))
34+
35+
;; Protocol for the storage layer
36+
(defprotocol IHistoryStore
37+
(append-event [this workflow-id event])
38+
(get-history [this workflow-id]))
39+
40+
;; Protocol for the activity executor
41+
(defprotocol IActivityExecutor
42+
(execute-activity [this activity-name args]))
43+
44+

src/detflow/core.clj

Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
(ns detflow.core
2+
(:import [clojure.lang ExceptionInfo]))
3+
4+
;; =========================================================
5+
;; 1. SUSPENSION
6+
;; =========================================================
7+
8+
(def ^:private suspend-type ::suspend)
9+
10+
(defn suspend! [id]
11+
(throw (ex-info "Workflow Suspended"
12+
{:type suspend-type :wait-id id})))
13+
14+
(defn suspend? [e]
15+
(and (instance? ExceptionInfo e)
16+
(= (:type (ex-data e)) suspend-type)))
17+
18+
;; =========================================================
19+
;; 2. RUNTIME CONTEXT
20+
;; =========================================================
21+
22+
(def ^:dynamic *ctx* nil)
23+
24+
(defn ctx []
25+
(or *ctx*
26+
(throw (ex-info "No active workflow context" {}))))
27+
28+
(defn next-id! []
29+
(swap! (:id-counter (ctx)) inc))
30+
31+
(defn record-event! [evt]
32+
(swap! (:events-out (ctx)) conj evt))
33+
34+
(defn peek-history []
35+
(get (:history (ctx)) @(:cursor (ctx))))
36+
37+
(defn consume-history! []
38+
(swap! (:cursor (ctx)) inc))
39+
40+
;; =========================================================
41+
;; 3. UNIFIED EFFECT ABSTRACTION
42+
;; =========================================================
43+
;; This is the heart of determinism.
44+
;; Every effect:
45+
;; 1. Schedules exactly once
46+
;; 2. Awaits exactly one matching completion
47+
;; 3. Consumes exactly the events it observes
48+
49+
(defn effect!
50+
[{:keys [schedule complete-type result-xform]}]
51+
52+
;; ---- SCHEDULE PHASE (never suspends) ----
53+
(let [evt (peek-history)
54+
id (if (and evt (= (:type evt) (:type schedule)))
55+
(:id evt)
56+
(let [id (next-id!)]
57+
(record-event! (assoc schedule :id id))
58+
id))]
59+
60+
;; consume schedule if replaying
61+
(when (and evt (= (:type evt) (:type schedule)))
62+
(consume-history!))
63+
64+
;; ---- AWAIT PHASE (may suspend) ----
65+
(let [evt2 (peek-history)]
66+
(if (and evt2
67+
(= (:type evt2) complete-type)
68+
(= (:id evt2) id))
69+
(do
70+
(consume-history!)
71+
((or result-xform identity) (:result evt2)))
72+
(suspend! id)))))
73+
74+
;; =========================================================
75+
;; 4. WORKFLOW API
76+
;; =========================================================
77+
78+
(defn activity [name args]
79+
(effect!
80+
{:schedule {:type :activity-scheduled
81+
:name name
82+
:args args}
83+
:complete-type :activity-completed
84+
:result-xform :value}))
85+
86+
(defn sleep [ms]
87+
(effect!
88+
{:schedule {:type :timer-scheduled
89+
:duration ms}
90+
:complete-type :timer-fired
91+
:result-xform :value}))
92+
93+
;; =========================================================
94+
;; 5. ASYNC
95+
;; =========================================================
96+
97+
(defrecord AsyncHandle [id])
98+
99+
(defn async [f]
100+
(let [evt (peek-history)]
101+
(if (and evt (= (:type evt) :async-started))
102+
;; replay
103+
(do
104+
(consume-history!)
105+
(swap! (:tasks (ctx)) assoc (:id evt) f)
106+
(->AsyncHandle (:id evt)))
107+
108+
;; first execution
109+
(let [id (next-id!)]
110+
(record-event! {:type :async-started :id id})
111+
(swap! (:tasks (ctx)) assoc id f)
112+
;; 🔑 IMPORTANT: block parent until replay aligns
113+
(suspend! id)))))
114+
115+
116+
(defn await [^AsyncHandle handle]
117+
(let [evt (peek-history)]
118+
(if (and evt
119+
(= (:type evt) :async-completed)
120+
(= (:id evt) (:id handle)))
121+
(do
122+
(consume-history!)
123+
(:result evt))
124+
(suspend! (:id handle)))))
125+
126+
;; =========================================================
127+
;; 6. ACTIVITY REGISTRY
128+
;; =========================================================
129+
130+
(defonce activity-registry (atom {}))
131+
132+
(defmacro defactivity [aname args & body]
133+
`(swap! activity-registry
134+
assoc ~(keyword (str *ns*) (name aname))
135+
(fn ~args ~@body)))
136+
137+
(defmacro defworkflow [name args & body]
138+
`(defn ~name ~args ~@body))
139+
140+
;; =========================================================
141+
;; 7. HISTORY STORE
142+
;; =========================================================
143+
144+
(defprotocol IHistoryStore
145+
(append-event [this wf-id evt])
146+
(get-history [this wf-id]))
147+
148+
(defrecord InMemoryStore [store]
149+
IHistoryStore
150+
(append-event [_ id evt]
151+
(swap! store update id (fnil conj []) evt))
152+
(get-history [_ id]
153+
(get @store id [])))
154+
155+
;; =========================================================
156+
;; 8. ACTIVITY EXECUTOR
157+
;; =========================================================
158+
159+
(defprotocol IActivityExecutor
160+
(execute-activity [this name args]))
161+
162+
;; =========================================================
163+
;; 9. INTERPRETER
164+
;; =========================================================
165+
166+
(defn run-interpreter-pass [workflow runtime]
167+
(binding [*ctx* runtime]
168+
169+
;; register main task
170+
(when (empty? @(:tasks runtime))
171+
(swap! (:tasks runtime) assoc 0 workflow))
172+
173+
(loop []
174+
(let [progress? (atom false)
175+
initial-task-count (count @(:tasks runtime))]
176+
177+
(doseq [[tid task] @(:tasks runtime)]
178+
(when-not (contains? @(:results runtime) tid)
179+
(try
180+
(let [res (task)]
181+
(swap! (:results runtime) assoc tid res)
182+
(record-event! {:type :async-completed
183+
:id tid
184+
:result res})
185+
(reset! progress? true))
186+
(catch Exception e
187+
(when-not (suspend? e)
188+
(throw e))))))
189+
190+
(let [new-task-count (count @(:tasks runtime))]
191+
(cond
192+
(> new-task-count initial-task-count)
193+
(recur)
194+
195+
@progress?
196+
(recur)
197+
198+
(contains? @(:results runtime) 0)
199+
{:status :completed
200+
:result (get @(:results runtime) 0)}
201+
202+
:else
203+
{:status :blocked}))))))
204+
205+
;; =========================================================
206+
;; 10. SIDE EFFECT PROCESSING
207+
;; =========================================================
208+
209+
(defn process-side-effects [executor events]
210+
(keep
211+
(fn [evt]
212+
(println "> processing side effect" evt)
213+
(case (:type evt)
214+
:activity-scheduled
215+
{:type :activity-completed
216+
:id (:id evt)
217+
:result {:value
218+
(execute-activity executor
219+
(:name evt)
220+
(:args evt))}}
221+
222+
:timer-scheduled
223+
{:type :timer-fired
224+
:id (:id evt)
225+
:result {:value :time-up}}
226+
227+
nil))
228+
events))
229+
230+
;; =========================================================
231+
;; 11. ENGINE
232+
;; =========================================================
233+
234+
(defn start-workflow
235+
[{:keys [store executor]}
236+
{:keys [workflow args id]}]
237+
238+
(when (empty? (get-history store id))
239+
(append-event store id {:type :workflow-start :args args}))
240+
241+
(loop []
242+
(println "xxx workflow loop")
243+
(let [history (get-history store id)
244+
start-idx (if (= (:type (first history)) :workflow-start) 1 0)
245+
max-id (reduce #(max %1 (:id %2 0)) 0 history)
246+
247+
runtime {:history history
248+
:cursor (atom start-idx)
249+
:events-out (atom [])
250+
:id-counter (atom (inc max-id))
251+
:tasks (atom {})
252+
:results (atom {})}
253+
254+
result (run-interpreter-pass #(apply workflow args) runtime)
255+
256+
new-events @(:events-out runtime)
257+
commands (filterv #(#{:activity-scheduled
258+
:timer-scheduled}
259+
(:type %))
260+
new-events)
261+
completions (doall (process-side-effects executor commands))]
262+
263+
(doseq [e new-events]
264+
(println "TRACE new event" e)
265+
(append-event store id e))
266+
(doseq [e completions]
267+
(println "TRACE new completion event" e)
268+
(append-event store id e))
269+
270+
(cond
271+
(= (:status result) :completed)
272+
(:result result)
273+
274+
(or (seq new-events) (seq completions))
275+
(recur)
276+
277+
:else
278+
(do
279+
(throw (ex-info "Deadlock"
280+
{:history history})))))))
281+
282+
;; =========================================================
283+
;; 12. EXAMPLE
284+
;; =========================================================
285+
286+
(defactivity fetch-order [id]
287+
(println ">> Fetch order" id)
288+
{:id id :price 100})
289+
290+
(defactivity charge-card [amount]
291+
(println ">> Charge card" amount)
292+
:paid)
293+
294+
(defworkflow my-flow [id]
295+
(println "Workflow start")
296+
(let [order (activity :detflow.core/fetch-order [id])]
297+
;task (async #(activity :detflow.core/charge-card [(:price order)]))]
298+
;(await task)
299+
{:status :shipped
300+
:order order}))
301+
302+
(defn -main []
303+
(let [engine {:store (->InMemoryStore (atom {}))
304+
:executor (reify IActivityExecutor
305+
(execute-activity [_ n args]
306+
(let [act (get @activity-registry n)]
307+
(apply act args))))}]
308+
(println "RESULT:"
309+
(start-workflow engine
310+
{:workflow my-flow
311+
:id "wf-1"
312+
:args [123]}))))
313+
314+
(comment)
315+
(-main)

0 commit comments

Comments
 (0)