Skip to content

Commit 0435901

Browse files
committed
Async blocking task support
Added the `BlockingTaskQueue` type BlockingTaskQueue allows a Rust closure to be scheduled on a foreign thread where blocking operations are okay. The closure runs inside the parent future, which is nice because it allows the closure to reference its outside scope. On the foreign side, a `BlockingTaskQueue` is a native type that runs a task in some sort of thread queue (`DispatchQueue`, `CoroutineContext`, `futures.Executor`, etc.). Updated handlemaps to always start at 1 rather than 0, which is reserved for a NULL handle. Added new tests for this in the futures fixtures. Updated the tests to check that handles are being released properly.
1 parent 6b09f11 commit 0435901

File tree

42 files changed

+1029
-155
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1029
-155
lines changed

docs/manual/src/futures.md

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,62 @@ In this case, we need an event loop to run the Python async function, but there'
9393
Use `uniffi_set_event_loop()` to handle this case.
9494
It should be called before the Rust code makes the async call and passed an eventloop to use.
9595

96+
## Blocking tasks
97+
98+
Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
99+
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
100+
Foreign executors make similar assumptions and sometimes more extreme ones.
101+
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.
102+
103+
This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
104+
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.
105+
106+
On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
107+
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
108+
It inputs a closure and runs it in the `BlockingTaskQueue`.
109+
This closure can reference the outside scope (i.e. it does not need to be `'static`).
110+
For example:
111+
112+
```rust
113+
#[derive(uniffi::Object)]
114+
struct DataStore {
115+
// Used to run blocking tasks
116+
queue: uniffi::BlockingTaskQueue,
117+
// Low-level DB object with blocking methods
118+
db: Mutex<Database>,
119+
}
120+
121+
#[uniffi::export]
122+
impl DataStore {
123+
#[uniffi::constructor]
124+
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
125+
Self {
126+
queue,
127+
db: Mutex::new(Database::new())
128+
}
129+
}
130+
131+
async fn fetch_all_items(&self) -> Vec<DbItem> {
132+
self.queue.execute(|| self.db.lock().fetch_all_items()).await
133+
}
134+
}
135+
```
136+
137+
On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.
138+
139+
### Kotlin
140+
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
141+
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
142+
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.
143+
144+
### Swift
145+
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
146+
The user-initiated global queue is normally a good choice.
147+
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
148+
The `DispatchQueue` should be concurrent.
149+
150+
### Python
151+
152+
Python uses a `futures.Executor` for its `BlockingTaskQueue`.
153+
`ThreadPoolExecutor` is typically a good choice.
154+
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.

fixtures/futures/src/lib.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use std::{
1111
time::Duration,
1212
};
1313

14-
use futures::future::{AbortHandle, Abortable, Aborted};
14+
use futures::{
15+
future::{AbortHandle, Abortable, Aborted},
16+
stream::{FuturesUnordered, StreamExt},
17+
};
1518

1619
/// Non-blocking timer future.
1720
pub struct TimerFuture {
@@ -456,4 +459,58 @@ async fn cancel_delay_using_trait(obj: Arc<dyn AsyncParser>, delay_ms: i32) {
456459
assert_eq!(future.await, Err(Aborted));
457460
}
458461

462+
/// Async function that uses a blocking task queue to do its work
463+
#[uniffi::export]
464+
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
465+
queue.execute(|| value * value).await
466+
}
467+
468+
/// Same as before, but this one runs multiple tasks
469+
#[uniffi::export]
470+
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
471+
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
472+
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
473+
let mut futures: FuturesUnordered<_> = (0..items.len())
474+
.map(|i| {
475+
// Test that we can use references from the surrounding scope
476+
let items = &items;
477+
queue.execute(move || items[i] * items[i])
478+
})
479+
.collect();
480+
let mut results = vec![];
481+
while let Some(result) = futures.next().await {
482+
results.push(result);
483+
}
484+
results.sort();
485+
results
486+
}
487+
488+
/// ...and this one uses multiple BlockingTaskQueues
489+
#[uniffi::export]
490+
pub async fn calc_squares_multi_queue(
491+
queues: Vec<uniffi::BlockingTaskQueue>,
492+
items: Vec<i32>,
493+
) -> Vec<i32> {
494+
let mut futures: FuturesUnordered<_> = (0..items.len())
495+
.map(|i| {
496+
// Test that we can use references from the surrounding scope
497+
let items = &items;
498+
queues[i].execute(move || items[i] * items[i])
499+
})
500+
.collect();
501+
let mut results = vec![];
502+
while let Some(result) = futures.next().await {
503+
results.push(result);
504+
}
505+
results.sort();
506+
results
507+
}
508+
509+
/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
510+
/// test that a) the clone works and b) we correctly drop the references.
511+
#[uniffi::export]
512+
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
513+
queue.clone().execute(|| value * value).await
514+
}
515+
459516
uniffi::include_scaffolding!("futures");

fixtures/futures/tests/bindings/test_futures.kts

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,22 @@
11
import uniffi.fixture.futures.*
2+
import java.util.concurrent.Executors
23
import kotlinx.coroutines.*
34
import kotlin.system.*
45

6+
fun runAsyncTest(test: suspend CoroutineScope.() -> Unit) {
7+
val initialBlockingTaskQueueHandleCount = uniffiBlockingTaskQueueHandleCount()
8+
val initialPollHandleCount = uniffiPollHandleCount()
9+
val time = runBlocking {
10+
measureTimeMillis {
11+
test()
12+
}
13+
}
14+
assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueHandleCount)
15+
assert(uniffiPollHandleCount() == initialPollHandleCount)
16+
}
17+
518
// init UniFFI to get good measurements after that
6-
runBlocking {
19+
runAsyncTest {
720
val time = measureTimeMillis {
821
alwaysReady()
922
}
@@ -24,7 +37,7 @@ fun assertApproximateTime(actualTime: Long, expectedTime: Int, testName: String
2437
}
2538

2639
// Test `always_ready`.
27-
runBlocking {
40+
runAsyncTest {
2841
val time = measureTimeMillis {
2942
val result = alwaysReady()
3043

@@ -35,7 +48,7 @@ runBlocking {
3548
}
3649

3750
// Test `void`.
38-
runBlocking {
51+
runAsyncTest {
3952
val time = measureTimeMillis {
4053
val result = void()
4154

@@ -46,7 +59,7 @@ runBlocking {
4659
}
4760

4861
// Test `sleep`.
49-
runBlocking {
62+
runAsyncTest {
5063
val time = measureTimeMillis {
5164
sleep(200U)
5265
}
@@ -55,7 +68,7 @@ runBlocking {
5568
}
5669

5770
// Test sequential futures.
58-
runBlocking {
71+
runAsyncTest {
5972
val time = measureTimeMillis {
6073
val resultAlice = sayAfter(100U, "Alice")
6174
val resultBob = sayAfter(200U, "Bob")
@@ -68,7 +81,7 @@ runBlocking {
6881
}
6982

7083
// Test concurrent futures.
71-
runBlocking {
84+
runAsyncTest {
7285
val time = measureTimeMillis {
7386
val resultAlice = async { sayAfter(100U, "Alice") }
7487
val resultBob = async { sayAfter(200U, "Bob") }
@@ -81,7 +94,7 @@ runBlocking {
8194
}
8295

8396
// Test async methods.
84-
runBlocking {
97+
runAsyncTest {
8598
val megaphone = newMegaphone()
8699
val time = measureTimeMillis {
87100
val resultAlice = megaphone.sayAfter(200U, "Alice")
@@ -92,7 +105,7 @@ runBlocking {
92105
assertApproximateTime(time, 200, "async methods")
93106
}
94107

95-
runBlocking {
108+
runAsyncTest {
96109
val megaphone = newMegaphone()
97110
val time = measureTimeMillis {
98111
val resultAlice = sayAfterWithMegaphone(megaphone, 200U, "Alice")
@@ -104,7 +117,7 @@ runBlocking {
104117
}
105118

106119
// Test async method returning optional object
107-
runBlocking {
120+
runAsyncTest {
108121
val megaphone = asyncMaybeNewMegaphone(true)
109122
assert(megaphone != null)
110123

@@ -213,7 +226,7 @@ runBlocking {
213226

214227

215228
// Test with the Tokio runtime.
216-
runBlocking {
229+
runAsyncTest {
217230
val time = measureTimeMillis {
218231
val resultAlice = sayAfterWithTokio(200U, "Alice")
219232

@@ -224,7 +237,7 @@ runBlocking {
224237
}
225238

226239
// Test fallible function/method.
227-
runBlocking {
240+
runAsyncTest {
228241
val time1 = measureTimeMillis {
229242
try {
230243
fallibleMe(false)
@@ -289,7 +302,7 @@ runBlocking {
289302
}
290303

291304
// Test record.
292-
runBlocking {
305+
runAsyncTest {
293306
val time = measureTimeMillis {
294307
val result = newMyRecord("foo", 42U)
295308

@@ -303,7 +316,7 @@ runBlocking {
303316
}
304317

305318
// Test a broken sleep.
306-
runBlocking {
319+
runAsyncTest {
307320
val time = measureTimeMillis {
308321
brokenSleep(100U, 0U) // calls the waker twice immediately
309322
sleep(100U) // wait for possible failure
@@ -317,7 +330,7 @@ runBlocking {
317330

318331

319332
// Test a future that uses a lock and that is cancelled.
320-
runBlocking {
333+
runAsyncTest {
321334
val time = measureTimeMillis {
322335
val job = launch {
323336
useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U))
@@ -336,11 +349,41 @@ runBlocking {
336349
}
337350

338351
// Test a future that uses a lock and that is not cancelled.
339-
runBlocking {
352+
runAsyncTest {
340353
val time = measureTimeMillis {
341354
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
342355

343356
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
344357
}
345358
println("useSharedResource (not canceled): ${time}ms")
346359
}
360+
361+
// Test blocking task queues
362+
runAsyncTest {
363+
withTimeout(1000) {
364+
assert(calcSquare(Dispatchers.IO, 20) == 400)
365+
}
366+
367+
withTimeout(1000) {
368+
assert(calcSquares(Dispatchers.IO, listOf(1, -2, 3)) == listOf(1, 4, 9))
369+
}
370+
371+
val executors = listOf(
372+
Executors.newSingleThreadExecutor(),
373+
Executors.newSingleThreadExecutor(),
374+
Executors.newSingleThreadExecutor(),
375+
)
376+
withTimeout(1000) {
377+
assert(calcSquaresMultiQueue(executors.map { it.asCoroutineDispatcher() }, listOf(1, -2, 3)) == listOf(1, 4, 9))
378+
}
379+
for (executor in executors) {
380+
executor.shutdown()
381+
}
382+
}
383+
384+
// Test blocking task queue cloning
385+
runAsyncTest {
386+
withTimeout(1000) {
387+
assert(calcSquareWithClone(Dispatchers.IO, 20) == 400)
388+
}
389+
}

0 commit comments

Comments
 (0)