Skip to content

fix(#2553) : AsyncSequence.asObservable() runs on background thread #2662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
* Provides `Infallible` versions of `combineLatest` without `resultSelector` requirement.
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods
* Ensure `AsyncSequence.asObservable()` runs on background thread using `Task.detached`.

## 6.5.0

Expand Down
2 changes: 1 addition & 1 deletion RxSwift/Observable+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public extension AsyncSequence {
/// - returns: An `Observable` of the async sequence's type
func asObservable() -> Observable<Element> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding a new param to decide whether to use Task or Task.detached?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding a parameter is a good idea to provide flexibility. I'm considering implementing it like this:

func asObservable(detached: Bool = true) -> Observable<Element> {
    // Use Task.detached or Task based on the parameter
}

With true as the default value, it would still solve the original issue by running on a background thread, while giving users the option to use regular Task when they want to maintain the current context's priority.

What do you think about this approach and the default value being true?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds good. I’ve also seen a similar implementation in another library, which takes the same approach

About the default value of detached, I think false should be used to avoid introducing a breaking change.

Copy link
Author

@suojae suojae Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented the parameter as suggested, with false as the default value to maintain backward compatibility.

func asObservable(detached: Bool = false) -> Observable<Element> {
    // Implementation uses Task.detached when detached=true, otherwise uses Task
}

All tests are passing. Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🙏

Observable.create { observer in
let task = Task {
let task = Task.detached {
do {
for try await value in self {
observer.onNext(value)
Expand Down
106 changes: 106 additions & 0 deletions Tests/RxSwiftTests/Observable+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,111 @@ extension ObservableConcurrencyTests {
task.cancel()
}

func testAsyncSequenceToObservableRunsOnBackgroundThread() async throws {

let asyncSequence = AsyncStream<Int> { continuation in
for i in 1...5 {
continuation.yield(i)
}
continuation.finish()
}

let expectation = XCTestExpectation(description: "Observable completes")

DispatchQueue.main.async {
let observable = asyncSequence.asObservable()

var threadIsNotMain = false
var values = [Int]()

_ = observable.subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onCompleted: {
XCTAssertEqual(values, [1, 2, 3, 4, 5])
XCTAssertTrue(threadIsNotMain, "AsyncSequence.asObservable should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithSleep() async throws {
let asyncSequence = AsyncStream<Int> { continuation in
Task {
for i in 1...3 {
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(i)
}
continuation.finish()
}
}

let expectation = XCTestExpectation(description: "Observable with sleep completes")

DispatchQueue.main.async {
let startTime = Date()
var values = [Int]()
var executionThreads = Set<String>()

_ = asyncSequence.asObservable().subscribe(
onNext: { value in
values.append(value)
let threadName = Thread.current.description
executionThreads.insert(threadName)
},
onCompleted: {
let duration = Date().timeIntervalSince(startTime)
XCTAssertGreaterThanOrEqual(duration, 0.3)
XCTAssertEqual(values, [1, 2, 3])
XCTAssertFalse(executionThreads.contains(where: { $0.contains("main") }))

expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithError() async throws {
struct TestError: Error {}

let asyncSequence = AsyncThrowingStream<Int, Error> { continuation in
for i in 1...3 {
continuation.yield(i)
}
continuation.finish(throwing: TestError())
}

let expectation = XCTestExpectation(description: "Observable with error completes")

var values = [Int]()
var receivedError: Error?
var threadIsNotMain = false

DispatchQueue.main.async {
_ = asyncSequence.asObservable().subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onError: { error in
receivedError = error
XCTAssertEqual(values, [1, 2, 3])
XCTAssertTrue(threadIsNotMain, "Error handler should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
XCTAssertTrue(receivedError is TestError)
}

}
#endif