diff --git a/README.md b/README.md index 3def420..ae93736 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,6 @@ It also leverages Swift concurrency features to provide a more expressive and in - [x] Simple Swift API for SSE - [x] Supports data-only mode - [x] Data race safety with Swift 6 -- [ ] Broadcast event stream to multiple consumers (WIP) ## Installation @@ -57,9 +56,9 @@ Using EventSource is easy. Simply create a new data task from an instance of Eve import EventSource let eventSource = EventSource() -let dataTask = await eventSource.dataTask(for: urlRequest) +let dataTask = eventSource.dataTask(for: urlRequest) -for await event in await dataTask.events() { +for await event in dataTask.events() { switch event { case .open: print("Connection was opened.") @@ -96,11 +95,11 @@ urlRequest.httpBody = """ """.data(using: .utf8)! let eventSource = EventSource(mode: .dataOnly) -let dataTask = await eventSource.dataTask(for: urlRequest) +let dataTask = eventSource.dataTask(for: urlRequest) var response: String = "" -for await event in await dataTask.events() { +for await event in dataTask.events() { switch event { case .event(let event): if let data = eventDevent.data?.data(using: .utf8) { @@ -132,6 +131,10 @@ No dependencies. Contributions to are always welcomed! For more details see [CONTRIBUTING.md](CONTRIBUTING.md). +## Credits + +* Mutex backport from [swift-sharing](https://github.com/pointfreeco/swift-sharing) + ## License EventSource is released under the MIT License. See [LICENSE](LICENSE) for more information. diff --git a/Sources/EventSource/EventSource.swift b/Sources/EventSource/EventSource.swift index 77e2a68..b0419de 100644 --- a/Sources/EventSource/EventSource.swift +++ b/Sources/EventSource/EventSource.swift @@ -11,11 +11,6 @@ import Foundation import FoundationNetworking #endif -/// The global actor used for isolating ``EventSource/EventSource/DataTask``. -@globalActor public actor EventSourceActor: GlobalActor { - public static let shared = EventSourceActor() -} - /// /// An `EventSource` instance opens a persistent connection to an HTTP server, /// which sends events in `text/event-stream` format. @@ -63,7 +58,6 @@ public struct EventSource: Sendable { self.timeoutInterval = timeoutInterval } - @EventSourceActor public func dataTask(for urlRequest: URLRequest) -> DataTask { DataTask( urlRequest: urlRequest, @@ -79,29 +73,70 @@ public extension EventSource { /// Creation of a task is exclusively handled by ``EventSource``. A new task can be created by calling /// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task, /// it can be started by iterating event stream returned by ``DataTask/events()``. - @EventSourceActor final class DataTask { + final class DataTask: Sendable { + private let _readyState: Mutex = Mutex(.none) + /// A value representing the state of the connection. - public private(set) var readyState: ReadyState = .none + public var readyState: ReadyState { + get { + _readyState.withLock { $0 } + } + set { + _readyState.withLock { $0 = newValue } + } + } + + private let _lastMessageId: Mutex = Mutex("") /// Last event's ID string value. /// /// Sent in a HTTP request header and used when a user is to reestablish the connection. - public private(set) var lastMessageId: String = "" + public var lastMessageId: String { + get { + _lastMessageId.withLock { $0 } + } + set { + _lastMessageId.withLock { $0 = newValue } + } + } /// A URLRequest of the events source. public let urlRequest: URLRequest - private var eventParser: EventParser + private let _eventParser: Mutex + + private var eventParser: EventParser { + get { + _eventParser.withLock { $0 } + } + set { + _eventParser.withLock { $0 = newValue } + } + } private let timeoutInterval: TimeInterval - private var continuation: AsyncStream.Continuation? + private let _httpResponseErrorStatusCode: Mutex = Mutex(nil) - private var urlSession: URLSession? + private var httpResponseErrorStatusCode: Int? { + get { + _httpResponseErrorStatusCode.withLock { $0 } + } + set { + _httpResponseErrorStatusCode.withLock { $0 = newValue } + } + } - private var urlSessionDataTask: URLSessionDataTask? + private let _consumed: Mutex = Mutex(false) - private var httpResponseErrorStatusCode: Int? + private var consumed: Bool { + get { + _consumed.withLock { $0 } + } + set { + _consumed.withLock { $0 = newValue } + } + } private var urlSessionConfiguration: URLSessionConfiguration { let configuration = URLSessionConfiguration.default @@ -121,13 +156,13 @@ public extension EventSource { timeoutInterval: TimeInterval ) { self.urlRequest = urlRequest - self.eventParser = eventParser + self._eventParser = Mutex(eventParser) self.timeoutInterval = timeoutInterval } /// Creates and returns event stream. public func events() -> AsyncStream { - if urlSessionDataTask != nil { + if consumed { return AsyncStream { continuation in continuation.yield(.error(EventSourceError.alreadyConsumed)) continuation.finish() @@ -136,17 +171,29 @@ public extension EventSource { return AsyncStream { continuation in let sessionDelegate = SessionDelegate() + let urlSession = URLSession( + configuration: urlSessionConfiguration, + delegate: sessionDelegate, + delegateQueue: nil + ) + let urlSessionDataTask = urlSession.dataTask(with: urlRequest) + let sessionDelegateTask = Task { [weak self] in for await event in sessionDelegate.eventStream { guard let self else { return } switch event { case let .didCompleteWithError(error): - handleSessionError(error) + handleSessionError(error, stream: continuation, urlSession: urlSession) case let .didReceiveResponse(response, completionHandler): - handleSessionResponse(response, completionHandler: completionHandler) + handleSessionResponse( + response, + stream: continuation, + urlSession: urlSession, + completionHandler: completionHandler + ) case let .didReceiveData(data): - parseMessages(from: data) + parseMessages(from: data, stream: continuation, urlSession: urlSession) } } } @@ -154,49 +201,46 @@ public extension EventSource { #if compiler(>=6.0) continuation.onTermination = { @Sendable [weak self] _ in sessionDelegateTask.cancel() - Task { await self?.close() } + Task { self?.close(stream: continuation, urlSession: urlSession) } } #else continuation.onTermination = { @Sendable _ in sessionDelegateTask.cancel() Task { [weak self] in - await self?.close() + await self?.close(stream: continuation, urlSession: urlSession) } } #endif - self.continuation = continuation - - - urlSession = URLSession( - configuration: urlSessionConfiguration, - delegate: sessionDelegate, - delegateQueue: nil - ) - - urlSessionDataTask = urlSession!.dataTask(with: urlRequest) - urlSessionDataTask!.resume() + urlSessionDataTask.resume() readyState = .connecting + consumed = true } } - private func handleSessionError(_ error: Error?) { + private func handleSessionError( + _ error: Error?, + stream continuation: AsyncStream.Continuation, + urlSession: URLSession + ) { guard readyState != .closed else { - close() + close(stream: continuation, urlSession: urlSession) return } // Send error event if let error { - sendErrorEvent(with: error) + sendErrorEvent(with: error, stream: continuation) } // Close connection - close() + close(stream: continuation, urlSession: urlSession) } private func handleSessionResponse( _ response: URLResponse, + stream continuation: AsyncStream.Continuation, + urlSession: URLSession, completionHandler: @escaping (URLSession.ResponseDisposition) -> Void ) { guard readyState != .closed else { @@ -212,13 +256,13 @@ public extension EventSource { // Stop connection when 204 response code, otherwise keep open guard httpResponse.statusCode != 204 else { completionHandler(.cancel) - close() + close(stream: continuation, urlSession: urlSession) return } if 200...299 ~= httpResponse.statusCode { if readyState != .open { - setOpen() + setOpen(stream: continuation) } } else { httpResponseErrorStatusCode = httpResponse.statusCode @@ -230,20 +274,26 @@ public extension EventSource { /// Closes the connection, if one was made, /// and sets the `readyState` property to `.closed`. /// - Returns: State before closing. - private func close() { + private func close(stream continuation: AsyncStream.Continuation, urlSession: URLSession) { let previousState = self.readyState if previousState != .closed { - continuation?.yield(.closed) - continuation?.finish() + continuation.yield(.closed) + continuation.finish() } - cancel() + cancel(urlSession: urlSession) } - private func parseMessages(from data: Data) { + private func parseMessages( + from data: Data, + stream continuation: AsyncStream.Continuation, + urlSession: URLSession + ) { if let httpResponseErrorStatusCode { self.httpResponseErrorStatusCode = nil handleSessionError( - EventSourceError.connectionError(statusCode: httpResponseErrorStatusCode, response: data) + EventSourceError.connectionError(statusCode: httpResponseErrorStatusCode, response: data), + stream: continuation, + urlSession: urlSession ) return } @@ -256,17 +306,17 @@ public extension EventSource { } events.forEach { - continuation?.yield(.event($0)) + continuation.yield(.event($0)) } } - private func setOpen() { + private func setOpen(stream continuation: AsyncStream.Continuation) { readyState = .open - continuation?.yield(.open) + continuation.yield(.open) } - private func sendErrorEvent(with error: Error) { - continuation?.yield(.error(error)) + private func sendErrorEvent(with error: Error, stream continuation: AsyncStream.Continuation) { + continuation.yield(.error(error)) } /// Cancels the task. @@ -275,11 +325,10 @@ public extension EventSource { /// The event stream supports cooperative task cancellation. However, it should be noted that /// canceling the parent Task only cancels the underlying `URLSessionDataTask` of /// ``EventSource/EventSource/DataTask``; this does not actually stop the ongoing request. - public func cancel() { + public func cancel(urlSession: URLSession) { readyState = .closed lastMessageId = "" - urlSessionDataTask?.cancel() - urlSession?.invalidateAndCancel() + urlSession.invalidateAndCancel() } } } diff --git a/Sources/EventSource/Mutex.swift b/Sources/EventSource/Mutex.swift new file mode 100644 index 0000000..9a36361 --- /dev/null +++ b/Sources/EventSource/Mutex.swift @@ -0,0 +1,107 @@ +// +// Mutex.swift +// EventSource +// +// Created by Firdavs Khaydarov on 12/03/2025. +// + +import Foundation + +#if compiler(>=6) +/// A synchronization primitive that protects shared mutable state via mutual exclusion. +/// +/// A back-port of Swift's `Mutex` type for wider platform availability. +#if hasFeature(StaticExclusiveOnly) +@_staticExclusiveOnly +#endif +package struct Mutex: ~Copyable { + private let _lock = NSLock() + private let _box: Box + + /// Initializes a value of this mutex with the given initial state. + /// + /// - Parameter initialValue: The initial value to give to the mutex. + package init(_ initialValue: consuming sending Value) { + _box = Box(initialValue) + } + + private final class Box { + var value: Value + init(_ initialValue: consuming sending Value) { + value = initialValue + } + } +} + +extension Mutex: @unchecked Sendable where Value: ~Copyable {} + +extension Mutex where Value: ~Copyable { + /// Calls the given closure after acquiring the lock and then releases ownership. + borrowing package func withLock( + _ body: (inout sending Value) throws(E) -> sending Result + ) throws(E) -> sending Result { + _lock.lock() + defer { _lock.unlock() } + return try body(&_box.value) + } + + /// Attempts to acquire the lock and then calls the given closure if successful. + borrowing package func withLockIfAvailable( + _ body: (inout sending Value) throws(E) -> sending Result + ) throws(E) -> sending Result? { + guard _lock.try() else { return nil } + defer { _lock.unlock() } + return try body(&_box.value) + } +} +#else +package struct Mutex { + private let _lock = NSLock() + private let _box: Box + + package init(_ initialValue: consuming Value) { + _box = Box(initialValue) + } + + private final class Box { + var value: Value + init(_ initialValue: consuming Value) { + value = initialValue + } + } +} + +extension Mutex: @unchecked Sendable {} + +extension Mutex { + borrowing package func withLock( + _ body: (inout Value) throws -> Result + ) rethrows -> Result { + _lock.lock() + defer { _lock.unlock() } + return try body(&_box.value) + } + + borrowing package func withLockIfAvailable( + _ body: (inout Value) throws -> Result + ) rethrows -> Result? { + guard _lock.try() else { return nil } + defer { _lock.unlock() } + return try body(&_box.value) + } +} +#endif + +extension Mutex where Value == Void { + borrowing package func _unsafeLock() { + _lock.lock() + } + + borrowing package func _unsafeTryLock() -> Bool { + _lock.try() + } + + borrowing package func _unsafeUnlock() { + _lock.unlock() + } +}