|
7 | 7 |
|
8 | 8 | import Foundation |
9 | 9 |
|
10 | | -/// Reads values from an `AsyncSequence` until a condition is met or an external signal resolves. |
11 | | -/// |
12 | | -/// - Parameters: |
13 | | -/// - sequence: The `AsyncSequence` to read from. |
14 | | -/// - isDone: Asynchronously evaluates whether reading should stop. |
15 | | -/// - waitFor: A `Task<Void, Never>` that triggers an external stop signal when completed. |
16 | | -/// Optional. Defaults to `nil`. |
17 | | -/// - maxLength: The maximum number of elements to collect before stopping. Defaults to `.max`. |
18 | | -/// |
19 | | -/// - Returns: An array containing the collected elements from the sequence. |
20 | | -func readFromAsyncSequence<S: AsyncSequence>( |
21 | | - _ sequence: S, |
22 | | - isDone: @escaping (S.Element?) async -> Bool, |
23 | | - waitFor: Task<Void, Never>? = nil, |
24 | | - maxLength: Int = .max |
25 | | -) async -> [S.Element] { |
26 | | - var results: [S.Element] = [] |
27 | | - var iterator = sequence.makeAsyncIterator() |
28 | | - var lastValue: S.Element? |
| 10 | +/// Errors that can occur with certain Ayncronous buffer tasks. |
| 11 | +public enum AsyncBufferFullError: Error, LocalizedError, CustomStringConvertible { |
29 | 12 |
|
30 | | - // Define an early stop mechanism |
31 | | - func shouldStop() async -> Bool { |
32 | | - return await isDone(lastValue) || results.count >= maxLength |
33 | | - } |
34 | | - |
35 | | - // Handle external stop signal (if provided) |
36 | | - var hasStoppedExternally = false |
37 | | - let externalStopTask = Task { |
38 | | - await waitFor?.value |
39 | | - hasStoppedExternally = true |
40 | | - } |
41 | | - |
42 | | - defer { externalStopTask.cancel() } // Cleanup on exit |
43 | | - |
44 | | - // Read from sequence |
45 | | - while !hasStoppedExternally, let value = try? await iterator.next() { |
46 | | - lastValue = value |
47 | | - results.append(value) |
48 | | - |
49 | | - if await shouldStop() { |
50 | | - break |
51 | | - } |
52 | | - } |
53 | | - |
54 | | - return results |
55 | | -} |
56 | | - |
57 | | -/// A structure that defines a deferrable, asynchronous event stream. |
58 | | -/// |
59 | | -/// This allows you to manually trigger an event that can be awaited in an `AsyncStream`. |
60 | | -struct DeferrableStream { |
61 | | - |
62 | | - /// An asynchronous stream of void events. |
63 | | - public let stream: AsyncStream<Void> |
64 | | - |
65 | | - /// The continuation that allows externally sending values to the stream. |
66 | | - private let continuation: AsyncStream<Void>.Continuation |
67 | | - |
68 | | - /// Initializes a new `DeferrableStream` instance. |
69 | | - public init() { |
70 | | - var cont: AsyncStream<Void>.Continuation! |
71 | | - self.stream = AsyncStream { continuation in |
72 | | - cont = continuation |
73 | | - } |
74 | | - |
75 | | - self.continuation = cont |
76 | | - } |
77 | | - |
78 | | - /// Resolves the deferrable by yielding a signal to the stream. |
79 | | - public func resolve() { |
80 | | - continuation.yield(()) |
81 | | - } |
82 | | -} |
83 | | - |
84 | | -/// An asynchronous buffer that stores elements and allows streaming them asynchronously. |
85 | | -/// This actor ensures thread-safe operations and can be used to buffer elements |
86 | | -/// before processing them via an `AsyncStream`. |
87 | | -actor AsyncBuffer<T: Sendable> { |
88 | | - |
89 | | - /// Returns the current size of the buffer. |
90 | | - public var size: Int { |
91 | | - buffer.count |
92 | | - } |
93 | | - |
94 | | - /// Indicates whether the buffer has been closed. |
95 | | - public var isBufferClosed: Bool { |
96 | | - isClosed |
97 | | - } |
98 | | - |
99 | | - /// The internal storage for buffered elements. |
100 | | - private var buffer: [T] = [] |
101 | | - |
102 | | - /// The continuation for yielding elements to the `AsyncStream`. Optional. |
103 | | - private var continuation: AsyncStream<T>.Continuation? |
104 | | - |
105 | | - /// Indicating whether the buffer has been closed. Defaults to `false`. |
106 | | - private var isClosed = false |
107 | | - |
108 | | - /// An error to be thrown when the buffer is closed. Optional. |
109 | | - private var toThrow: Error? |
110 | | - |
111 | | - /// The maximum size of the buffer. Optional. |
112 | | - private let maxSize: Int? |
113 | | - |
114 | | - /// Initializes the `AsyncBuffer`. |
115 | | - /// |
116 | | - /// - Parameter maxSize: The maximum number of elements that can be stored in the buffer. |
117 | | - /// Optional. Defaults to `nil`. |
118 | | - public init(maxSize: Int? = nil) { |
119 | | - self.maxSize = maxSize |
120 | | - } |
121 | | - |
122 | | - /// Pushes a single item into the buffer. |
123 | | - /// |
124 | | - /// If the buffer is closed, the operation will have no effect. |
125 | | - /// |
126 | | - /// - Parameter item: The item to be added to the buffer. |
127 | | - public func push(_ item: T) { |
128 | | - guard !isClosed else { return } |
129 | | - buffer.append(item) |
130 | | - yieldItem(item) |
131 | | - } |
132 | | - |
133 | | - /// Pushes multiple items into the buffer. |
134 | | - /// |
135 | | - /// If the buffer is closed, the operation will have no effect. |
136 | | - /// |
137 | | - /// - Parameter items: An array of items to be added to the buffer. |
138 | | - public func pushMany(_ items: [T]) { |
139 | | - guard !isClosed else { return } |
140 | | - buffer.append(contentsOf: items) |
141 | | - for item in items { |
142 | | - yieldItem(item) |
143 | | - } |
144 | | - } |
145 | | - |
146 | | - /// Closes the buffer, preventing further additions and ending the stream. |
147 | | - public func close() { |
148 | | - isClosed = true |
149 | | - continuation?.finish() |
150 | | - } |
151 | | - |
152 | | - /// Closes the buffer and throws an error. |
| 13 | + /// The buffer size has been reached. |
153 | 14 | /// |
154 | | - /// - Parameter error: The error to be associated with the buffer closure. |
155 | | - public func throwError(_ error: Error) { |
156 | | - toThrow = error |
157 | | - isClosed = true |
158 | | - continuation?.finish() |
159 | | - } |
| 15 | + /// - Parameter maxSize: The maximum byte size that can be reached. |
| 16 | + case bufferSizeTooLarge(maxSize: Int) |
160 | 17 |
|
161 | | - /// Returns an `AsyncStream` for consuming buffered elements asynchronously. |
162 | | - /// |
163 | | - /// - Returns: An `AsyncStream<T>` that provides elements from the buffer. |
164 | | - public func stream() -> AsyncStream<T> { |
165 | | - return AsyncStream { continuation in |
166 | | - self.continuation = continuation |
167 | | - Task { await self.drainBuffer() } |
| 18 | + public var errorDescription: String? { |
| 19 | + switch self { |
| 20 | + case .bufferSizeTooLarge(let maxSize): |
| 21 | + return "Buffer size too large. Max size is \(maxSize) bytes." |
168 | 22 | } |
169 | 23 | } |
170 | 24 |
|
171 | | - /// Drains the buffer by yielding all remaining elements to the stream. |
172 | | - private func drainBuffer() async { |
173 | | - while !buffer.isEmpty { |
174 | | - let item = buffer.removeFirst() |
175 | | - yieldItem(item) |
176 | | - } |
177 | | - |
178 | | - if toThrow != nil || isClosed { |
179 | | - continuation?.finish() |
180 | | - } |
181 | | - } |
182 | | - |
183 | | - /// Yields an item to the `AsyncStream`. |
184 | | - /// |
185 | | - /// - Parameter item: The item to yield. |
186 | | - private func yieldItem(_ item: T) { |
187 | | - continuation?.yield(item) |
| 25 | + public var description: String { |
| 26 | + return errorDescription ?? String(describing: self) |
188 | 27 | } |
189 | 28 | } |
0 commit comments