Skip to content

Add async/await create methods for Observable and Infallible#2673

Closed
Woollim wants to merge 3 commits intoReactiveX:mainfrom
Woollim:main
Closed

Add async/await create methods for Observable and Infallible#2673
Woollim wants to merge 3 commits intoReactiveX:mainfrom
Woollim:main

Conversation

@Woollim
Copy link
Copy Markdown

@Woollim Woollim commented Sep 5, 2025

Summary

This PR adds new create methods for both Observable and Infallible types that support Swift's async/await concurrency features, making it easier to create reactive streams from asynchronous operations.

Changes

New Methods Added

  • Observable.create with async/await support

    • Takes an async closure with an ElementObserver parameter
    • Supports throwing operations (emits error events on exceptions)
    • Provides task cancellation handling
  • Infallible.create with async/await support

    • Takes an async closure with an ElementObserver parameter
    • Non-throwing version for infallible operations
    • Provides task cancellation handling

Usage Examples

// Observable with async/await
let observable = Observable<String>.create { observer in
    let data = try await fetchDataFromAPI()
    observer(data)
    
    let moreData = try await fetchMoreData()
    observer(moreData)
}

// Infallible with async/await
let infallible = Infallible<Int>.create { observer in
    for i in 1...10 {
        await Task.yield() // Cooperative cancellation
        observer(i)
    }
}

This implementation was inspired by TCA's run function for creating Effects from async operations.
See: https://github.com/pointfreeco/swift-composable-architecture/blob/acd9bb8a7cf6e36a89d81a432c2e8eb3b1bb3771/Sources/ComposableArchitecture/Effect.swift#L87-L95

Woollim and others added 3 commits September 5, 2025 14:55
- Add Observable.create with async/await support
- Add Infallible.create with async/await support
- Both methods support detached tasks and custom priority
- Include proper Task cancellation handling

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Add test for Infallible.create with async support
- Add test for Observable.create with async support
- Tests verify proper value emission and completion

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Add documentation for Observable.create with async/await
- Add documentation for Infallible.create with async/await

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
@freak4pc
Copy link
Copy Markdown
Member

Hey @Woollim,
Just going through old PRs and trying to clean up the repo.

This is a nice quality-of-life improvement but I'm not convinced it provides too much value, vs.

Observable.create { observer in 
    Task { ... 
        observer.onNext(...)
    }
}

Or even using a subject/relay:

let subject = BehaviorSubject ... 

try await subject.onNext(myAsyncThing()) 

return subject.asObservable()

Or, using the already-existing convenience helpers:

asyncStream.asObservable()
Single.create { try await thing() }
// etc...

I'm closing this for now but if you want to continue the discussion feel free to comment.

@freak4pc freak4pc closed this Jan 21, 2026
@Woollim
Copy link
Copy Markdown
Author

Woollim commented Jan 23, 2026

Hey @freak4pc,

Thanks for taking the time to review this older PR.

TL;DR: Single.create in PrimitiveSequence+Concurrency.swift already provides async/await support with automatic Task cancellation. I'm proposing the same convenience for Observable and Infallible to maintain API consistency.


Regarding Observable.create with Task

I usually create Observables this way too, but in practice it's not as simple as the example suggests:

Observable.create { observer in 
    let task = Task {
        do {
            observer.onNext(try await fetchData())
            observer.onCompleted()    
        } catch {
            observer.onError(error)
        }
    }
    return Disposables.create {
        task.cancel()
    }
}

There are a few things developers need to handle manually every time:

  • Wrapping in do-catch to propagate errors properly
  • Returning a Disposables.create that cancels the Task
  • Ensuring Task cancellation propagates when the Observable is disposed

Missing the cancellation handling is particularly problematic—the Observable gets disposed, but the internal Task keeps running. This is important but tedious boilerplate that's easy to forget.

Why I proposed this

The main inspiration for this PR was actually Single.create in PrimitiveSequence+Concurrency.swift. That helper already handles everything I'm proposing:

  • Automatic Task cancellation in Disposables
  • Support for detached execution and custom priority
  • Clean async/await syntax

I was simply wondering: if Single has this convenience, why doesn't the foundational Observable type have it too?

(I also asked Claude to follow the same API style as Single.create when implementing this.)

Would you be open to reconsidering? Happy to discuss further or make any adjustments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants