Skip to content

Commit 6e10648

Browse files
authored
feat: Implement Resource Coordinator (Issue #8) and Task Orchestrator (Issue #9) (#10)
1 parent 2379347 commit 6e10648

4 files changed

Lines changed: 416 additions & 0 deletions

File tree

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import Foundation
2+
3+
/// Represents a unit of work in a dependency graph.
4+
public struct DAGTask: Identifiable, Sendable {
5+
public let id: String
6+
public let operation: @Sendable () async throws -> Void
7+
public let dependencies: Set<String>
8+
9+
public init(id: String, dependencies: Set<String> = [], operation: @escaping @Sendable () async throws -> Void) {
10+
self.id = id
11+
self.dependencies = dependencies
12+
self.operation = operation
13+
}
14+
}
15+
16+
public enum OrchestrationError: Error {
17+
case cycleDetected
18+
case dependencyNotFound(taskId: String, dependencyId: String)
19+
case taskFailure(taskId: String, error: Error)
20+
}
21+
22+
/// A Task Orchestrator that executes tasks based on a Directed Acyclic Graph (DAG).
23+
/// It ensures that a task only runs when all its dependencies have successfully completed.
24+
@available(macOS 12.0, iOS 15.0, *)
25+
public actor TaskOrchestrator {
26+
27+
private var tasks: [String: DAGTask] = [:]
28+
29+
public init() {}
30+
31+
/// Adds a task to the orchestrator.
32+
public func addTask(_ task: DAGTask) {
33+
tasks[task.id] = task
34+
}
35+
36+
/// Validates the graph for cycles and missing dependencies.
37+
public func validate() throws {
38+
// Check for missing dependencies
39+
for task in tasks.values {
40+
for dep in task.dependencies {
41+
guard tasks[dep] != nil else {
42+
throw OrchestrationError.dependencyNotFound(taskId: task.id, dependencyId: dep)
43+
}
44+
}
45+
}
46+
47+
// Detect Cycles (DFS)
48+
var visited: Set<String> = []
49+
var recursionStack: Set<String> = []
50+
51+
func hasCycle(_ nodeId: String) -> Bool {
52+
visited.insert(nodeId)
53+
recursionStack.insert(nodeId)
54+
55+
if let node = tasks[nodeId] {
56+
for dep in node.dependencies {
57+
if !visited.contains(dep) {
58+
if hasCycle(dep) { return true }
59+
} else if recursionStack.contains(dep) {
60+
return true
61+
}
62+
}
63+
}
64+
65+
recursionStack.remove(nodeId)
66+
return false
67+
}
68+
69+
for nodeId in tasks.keys {
70+
if !visited.contains(nodeId) {
71+
if hasCycle(nodeId) {
72+
throw OrchestrationError.cycleDetected
73+
}
74+
}
75+
}
76+
}
77+
78+
/// Executes all tasks in the graph, respecting dependencies.
79+
/// Runs independent tasks in parallel where possible.
80+
public func execute() async throws {
81+
try validate()
82+
83+
// Build adjacency list for "dependents" (upstream -> [downstream])
84+
var dependents: [String: [String]] = [:]
85+
var inDegree: [String: Int] = [:]
86+
87+
for task in tasks.values {
88+
inDegree[task.id] = task.dependencies.count
89+
for dep in task.dependencies {
90+
dependents[dep, default: []].append(task.id)
91+
}
92+
}
93+
94+
// Queue of tasks ready to run (in-degree 0)
95+
let readyQueue: [String] = tasks.values.filter { ($0.dependencies.isEmpty) }.map { $0.id }
96+
97+
// We use a task group to run ready tasks concurrently
98+
try await withThrowingTaskGroup(of: String.self) { group in
99+
100+
// Initial batch
101+
for taskId in readyQueue {
102+
guard let task = tasks[taskId] else { continue }
103+
group.addTask {
104+
try await task.operation()
105+
return taskId
106+
}
107+
}
108+
109+
// Loop as tasks finish
110+
var remainingTasks = tasks.count
111+
112+
while remainingTasks > 0 {
113+
// Wait for any task to finish
114+
guard let finishedTaskId = try await group.next() else {
115+
break
116+
}
117+
118+
remainingTasks -= 1
119+
120+
// Unlock downstream
121+
if let downstreamNodes = dependents[finishedTaskId] {
122+
for downstreamId in downstreamNodes {
123+
inDegree[downstreamId, default: 0] -= 1
124+
if inDegree[downstreamId] == 0 {
125+
// Ready!
126+
if let task = tasks[downstreamId] {
127+
group.addTask {
128+
try await task.operation()
129+
return downstreamId
130+
}
131+
}
132+
}
133+
}
134+
}
135+
}
136+
}
137+
}
138+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import Foundation
2+
3+
/// A coordinator that manages safe concurrent access to resources identified by keys (e.g., file paths).
4+
/// It implements a Read-Write Lock semantics where multiple readers can access a resource simultaneously,
5+
/// but writers require exclusive access.
6+
@available(macOS 12.0, iOS 15.0, *)
7+
public actor ResourceCoordinator {
8+
9+
private var locks: [String: PathLock] = [:]
10+
11+
public init() {}
12+
13+
/// Executes the given block with a shared (read) lock on the specified resource.
14+
/// Other readers can execute concurrently, but writers will block.
15+
public func withReadLock<T>(for key: String, operation: () async throws -> T) async throws -> T {
16+
return try await access(path: key, type: .read, operation: operation)
17+
}
18+
19+
/// Helper to just acquire and return a token?
20+
/// No, closures are safer.
21+
22+
public func access<T>(path: String, type: AccessType, operation: () async throws -> T) async throws -> T {
23+
let lock = getLock(for: path)
24+
switch type {
25+
case .read:
26+
await lock.lockRead()
27+
// We use standard do/defer pattern here since we are inside an async function
28+
defer { Task { await lock.unlockRead() } }
29+
return try await operation()
30+
case .write:
31+
await lock.lockWrite()
32+
defer { Task { await lock.unlockWrite() } }
33+
return try await operation()
34+
}
35+
}
36+
37+
private func getLock(for key: String) -> PathLock {
38+
if let lock = locks[key] {
39+
return lock
40+
}
41+
let newLock = PathLock()
42+
locks[key] = newLock
43+
return newLock
44+
}
45+
46+
public enum AccessType {
47+
case read
48+
case write
49+
}
50+
}
51+
52+
/// A standard Read-Write lock implemented as an Actor.
53+
@available(macOS 12.0, iOS 15.0, *)
54+
actor PathLock {
55+
private var readers: Int = 0
56+
private var writers: Int = 0 // Should be 0 or 1
57+
private var writeWaiters: [CheckedContinuation<Void, Never>] = []
58+
private var readWaiters: [CheckedContinuation<Void, Never>] = []
59+
60+
func lockRead() async {
61+
if writers > 0 || !writeWaiters.isEmpty {
62+
// Writer has priority or active
63+
await withCheckedContinuation { continuation in
64+
readWaiters.append(continuation)
65+
}
66+
} else {
67+
readers += 1
68+
}
69+
}
70+
71+
func unlockRead() {
72+
readers -= 1
73+
if readers == 0 {
74+
// If no more readers, wake one writer if any
75+
if !writeWaiters.isEmpty {
76+
let writer = writeWaiters.removeFirst()
77+
writers = 1 // Pass ownership
78+
writer.resume()
79+
}
80+
}
81+
}
82+
83+
func lockWrite() async {
84+
if readers > 0 || writers > 0 {
85+
await withCheckedContinuation { continuation in
86+
writeWaiters.append(continuation)
87+
}
88+
} else {
89+
writers = 1
90+
}
91+
}
92+
93+
func unlockWrite() {
94+
writers = 0
95+
// Prefer writers? Or strict FIFO?
96+
// Simple implementation: Wake one writer if present, else wake all readers.
97+
98+
if !writeWaiters.isEmpty {
99+
let nextWriter = writeWaiters.removeFirst()
100+
writers = 1
101+
nextWriter.resume()
102+
} else {
103+
// Wake ALL readers
104+
let currentReaders = readWaiters
105+
readWaiters.removeAll()
106+
readers += currentReaders.count
107+
for reader in currentReaders {
108+
reader.resume()
109+
}
110+
}
111+
}
112+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import XCTest
2+
@testable import DesignAlgorithmsKit
3+
4+
@available(macOS 12.0, iOS 15.0, *)
5+
final class TaskOrchestratorTests: XCTestCase {
6+
7+
func testLinearDependency() async throws {
8+
let orchestrator = TaskOrchestrator()
9+
let result = ResultCollector()
10+
11+
let taskA = DAGTask(id: "A") {
12+
await result.append("A")
13+
}
14+
15+
// B depends on A
16+
let taskB = DAGTask(id: "B", dependencies: ["A"]) {
17+
await result.append("B")
18+
}
19+
20+
// C depends on B
21+
let taskC = DAGTask(id: "C", dependencies: ["B"]) {
22+
await result.append("C")
23+
}
24+
25+
await orchestrator.addTask(taskA)
26+
await orchestrator.addTask(taskB)
27+
await orchestrator.addTask(taskC)
28+
29+
try await orchestrator.execute()
30+
31+
let order = await result.items
32+
XCTAssertEqual(order, ["A", "B", "C"])
33+
}
34+
35+
func testParallelExecution() async throws {
36+
let orchestrator = TaskOrchestrator()
37+
let result = ResultCollector()
38+
39+
// A and B independent. C depends on both.
40+
let taskA = DAGTask(id: "A") {
41+
try? await Task.sleep(nanoseconds: 10_000_000)
42+
await result.append("A")
43+
}
44+
45+
let taskB = DAGTask(id: "B") {
46+
try? await Task.sleep(nanoseconds: 10_000_000)
47+
await result.append("B")
48+
}
49+
50+
let taskC = DAGTask(id: "C", dependencies: ["A", "B"]) {
51+
await result.append("C")
52+
}
53+
54+
await orchestrator.addTask(taskA)
55+
await orchestrator.addTask(taskB)
56+
await orchestrator.addTask(taskC)
57+
58+
try await orchestrator.execute()
59+
60+
let order = await result.items
61+
// A and B can be in any order, but both must be before C
62+
XCTAssertTrue(order.contains("A"))
63+
XCTAssertTrue(order.contains("B"))
64+
XCTAssertEqual(order.last, "C")
65+
XCTAssertEqual(order.count, 3)
66+
}
67+
68+
func testCycleDetection() async {
69+
let orchestrator = TaskOrchestrator()
70+
71+
let taskA = DAGTask(id: "A", dependencies: ["B"]) { }
72+
let taskB = DAGTask(id: "B", dependencies: ["A"]) { }
73+
74+
await orchestrator.addTask(taskA)
75+
await orchestrator.addTask(taskB)
76+
77+
do {
78+
try await orchestrator.execute()
79+
XCTFail("Should have thrown cycle detected error")
80+
} catch OrchestrationError.cycleDetected {
81+
// Success
82+
} catch {
83+
XCTFail("Wrong error: \(error)")
84+
}
85+
}
86+
87+
// MARK: - Helpers
88+
actor ResultCollector {
89+
var items: [String] = []
90+
func append(_ item: String) { items.append(item) }
91+
}
92+
}

0 commit comments

Comments
 (0)