Skip to content

Commit 17e7642

Browse files
authored
Task System for Bevy (#384)
Add bevy_tasks crate to replace rayon
1 parent db8ec7d commit 17e7642

File tree

22 files changed

+847
-67
lines changed

22 files changed

+847
-67
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" }
5959
bevy_ui = { path = "crates/bevy_ui", version = "0.1" }
6060
bevy_utils = { path = "crates/bevy_utils", version = "0.1" }
6161
bevy_window = { path = "crates/bevy_window", version = "0.1" }
62+
bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" }
6263

6364
# bevy (optional)
6465
bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" }

crates/bevy_app/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ keywords = ["bevy"]
1313
# bevy
1414
bevy_derive = { path = "../bevy_derive", version = "0.1" }
1515
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
16+
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
17+
bevy_math = { path = "../bevy_math", version = "0.1" }
1618

1719
# other
1820
libloading = "0.6"
1921
log = { version = "0.4", features = ["release_max_level_info"] }
20-
serde = { version = "1.0", features = ["derive"]}
22+
serde = { version = "1.0", features = ["derive"]}

crates/bevy_app/src/app.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::app_builder::AppBuilder;
1+
use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions};
22
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};
33

44
#[allow(clippy::needless_doctest_main)]
@@ -63,6 +63,12 @@ impl App {
6363
}
6464

6565
pub fn run(mut self) {
66+
// Setup the default bevy task pools
67+
self.resources
68+
.get_cloned::<DefaultTaskPoolOptions>()
69+
.unwrap_or_else(DefaultTaskPoolOptions::default)
70+
.create_default_pools(&mut self.resources);
71+
6672
self.startup_schedule.initialize(&mut self.resources);
6773
self.startup_executor.run(
6874
&mut self.startup_schedule,

crates/bevy_app/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ mod app_builder;
88
mod event;
99
mod plugin;
1010
mod schedule_runner;
11+
mod task_pool_options;
1112

1213
pub use app::*;
1314
pub use app_builder::*;
1415
pub use bevy_derive::DynamicPlugin;
1516
pub use event::*;
1617
pub use plugin::*;
1718
pub use schedule_runner::*;
19+
pub use task_pool_options::*;
1820

1921
pub mod prelude {
2022
pub use crate::{
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
use bevy_ecs::Resources;
2+
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};
3+
4+
/// Defines a simple way to determine how many threads to use given the number of remaining cores
5+
/// and number of total cores
6+
#[derive(Clone)]
7+
pub struct TaskPoolThreadAssignmentPolicy {
8+
/// Force using at least this many threads
9+
pub min_threads: usize,
10+
/// Under no circumstance use more than this many threads for this pool
11+
pub max_threads: usize,
12+
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
13+
/// permitted to use 1.0 to try to use all remaining threads
14+
pub percent: f32,
15+
}
16+
17+
impl TaskPoolThreadAssignmentPolicy {
18+
/// Determine the number of threads to use for this task pool
19+
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
20+
assert!(self.percent >= 0.0);
21+
let mut desired = (total_threads as f32 * self.percent).round() as usize;
22+
23+
// Limit ourselves to the number of cores available
24+
desired = desired.min(remaining_threads);
25+
26+
// Clamp by min_threads, max_threads. (This may result in us using more threads than are
27+
// available, this is intended. An example case where this might happen is a device with
28+
// <= 2 threads.
29+
bevy_math::clamp(desired, self.min_threads, self.max_threads)
30+
}
31+
}
32+
33+
/// Helper for configuring and creating the default task pools. For end-users who want full control,
34+
/// insert the default task pools into the resource map manually. If the pools are already inserted,
35+
/// this helper will do nothing.
36+
#[derive(Clone)]
37+
pub struct DefaultTaskPoolOptions {
38+
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
39+
pub min_total_threads: usize,
40+
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
41+
pub max_total_threads: usize,
42+
43+
/// Used to determine number of IO threads to allocate
44+
pub io: TaskPoolThreadAssignmentPolicy,
45+
/// Used to determine number of async compute threads to allocate
46+
pub async_compute: TaskPoolThreadAssignmentPolicy,
47+
/// Used to determine number of compute threads to allocate
48+
pub compute: TaskPoolThreadAssignmentPolicy,
49+
}
50+
51+
impl Default for DefaultTaskPoolOptions {
52+
fn default() -> Self {
53+
DefaultTaskPoolOptions {
54+
// By default, use however many cores are available on the system
55+
min_total_threads: 1,
56+
max_total_threads: std::usize::MAX,
57+
58+
// Use 25% of cores for IO, at least 1, no more than 4
59+
io: TaskPoolThreadAssignmentPolicy {
60+
min_threads: 1,
61+
max_threads: 4,
62+
percent: 0.25,
63+
},
64+
65+
// Use 25% of cores for async compute, at least 1, no more than 4
66+
async_compute: TaskPoolThreadAssignmentPolicy {
67+
min_threads: 1,
68+
max_threads: 4,
69+
percent: 0.25,
70+
},
71+
72+
// Use all remaining cores for compute (at least 1)
73+
compute: TaskPoolThreadAssignmentPolicy {
74+
min_threads: 1,
75+
max_threads: std::usize::MAX,
76+
percent: 1.0, // This 1.0 here means "whatever is left over"
77+
},
78+
}
79+
}
80+
}
81+
82+
impl DefaultTaskPoolOptions {
83+
/// Create a configuration that forces using the given number of threads.
84+
pub fn with_num_threads(thread_count: usize) -> Self {
85+
let mut options = Self::default();
86+
options.min_total_threads = thread_count;
87+
options.max_total_threads = thread_count;
88+
89+
options
90+
}
91+
92+
/// Inserts the default thread pools into the given resource map based on the configured values
93+
pub fn create_default_pools(&self, resources: &mut Resources) {
94+
let total_threads = bevy_math::clamp(
95+
bevy_tasks::logical_core_count(),
96+
self.min_total_threads,
97+
self.max_total_threads,
98+
);
99+
100+
let mut remaining_threads = total_threads;
101+
102+
if !resources.contains::<IOTaskPool>() {
103+
// Determine the number of IO threads we will use
104+
let io_threads = self
105+
.io
106+
.get_number_of_threads(remaining_threads, total_threads);
107+
remaining_threads -= io_threads;
108+
109+
resources.insert(IOTaskPool(
110+
TaskPoolBuilder::default()
111+
.num_threads(io_threads)
112+
.thread_name("IO Task Pool".to_string())
113+
.build(),
114+
));
115+
}
116+
117+
if !resources.contains::<AsyncComputeTaskPool>() {
118+
// Determine the number of async compute threads we will use
119+
let async_compute_threads = self
120+
.async_compute
121+
.get_number_of_threads(remaining_threads, total_threads);
122+
remaining_threads -= async_compute_threads;
123+
124+
resources.insert(AsyncComputeTaskPool(
125+
TaskPoolBuilder::default()
126+
.num_threads(async_compute_threads)
127+
.thread_name("Async Compute Task Pool".to_string())
128+
.build(),
129+
));
130+
}
131+
132+
if !resources.contains::<ComputeTaskPool>() {
133+
// Determine the number of compute threads we will use
134+
// This is intentionally last so that an end user can specify 1.0 as the percent
135+
let compute_threads = self
136+
.compute
137+
.get_number_of_threads(remaining_threads, total_threads);
138+
139+
resources.insert(ComputeTaskPool(
140+
TaskPoolBuilder::default()
141+
.num_threads(compute_threads)
142+
.thread_name("Compute Task Pool".to_string())
143+
.build(),
144+
));
145+
}
146+
}
147+
}

crates/bevy_ecs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ profiler = []
1515

1616
[dependencies]
1717
bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" }
18+
bevy_tasks = { path = "../bevy_tasks", version = "0.1" }
1819
bevy_utils = { path = "../bevy_utils", version = "0.1" }
1920
rand = "0.7.2"
20-
rayon = "1.3"
2121
crossbeam-channel = "0.4.2"
2222
fixedbitset = "0.3.0"
2323
downcast-rs = "1.1.1"

crates/bevy_ecs/src/resource/resources.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ impl Resources {
4343
self.get_resource_mut(ResourceIndex::Global)
4444
}
4545

46+
/// Returns a clone of the underlying resource, this is helpful when borrowing something
47+
/// cloneable (like a task pool) without taking a borrow on the resource map
48+
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
49+
self.get::<T>().map(|r| (*r).clone())
50+
}
51+
4652
#[allow(clippy::needless_lifetimes)]
4753
pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option<Ref<'a, T>> {
4854
self.get_resource(ResourceIndex::System(id))

crates/bevy_ecs/src/schedule/parallel_executor.rs

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World};
77
use crossbeam_channel::{Receiver, Sender};
88
use fixedbitset::FixedBitSet;
99
use parking_lot::Mutex;
10-
use rayon::ScopeFifo;
1110
use std::{ops::Range, sync::Arc};
1211

1312
/// Executes each schedule stage in parallel by analyzing system dependencies.
@@ -66,52 +65,6 @@ impl ParallelExecutor {
6665
}
6766
}
6867

69-
/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs.
70-
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
71-
#[derive(Debug, Default, Clone)]
72-
pub struct ParallelExecutorOptions {
73-
/// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`.
74-
num_threads: Option<usize>,
75-
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`.
76-
stack_size: Option<usize>,
77-
// TODO: Do we also need/want to expose other features (*_handler, etc.)
78-
}
79-
80-
impl ParallelExecutorOptions {
81-
/// Creates a new ParallelExecutorOptions instance
82-
pub fn new() -> Self {
83-
Self::default()
84-
}
85-
86-
/// Sets the num_threads option, using the builder pattern
87-
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
88-
self.num_threads = num_threads;
89-
self
90-
}
91-
92-
/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
93-
/// otherwise your application may run into stability and performance issues.
94-
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
95-
self.stack_size = stack_size;
96-
self
97-
}
98-
99-
/// Creates a new ThreadPoolBuilder based on the current options.
100-
pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder {
101-
let mut builder = rayon::ThreadPoolBuilder::new();
102-
103-
if let Some(num_threads) = self.num_threads {
104-
builder = builder.num_threads(num_threads);
105-
}
106-
107-
if let Some(stack_size) = self.stack_size {
108-
builder = builder.stack_size(stack_size);
109-
}
110-
111-
builder
112-
}
113-
}
114-
11568
#[derive(Debug, Clone)]
11669
pub struct ExecutorStage {
11770
/// each system's set of dependencies
@@ -262,7 +215,7 @@ impl ExecutorStage {
262215
&mut self,
263216
systems: &[Arc<Mutex<Box<dyn System>>>],
264217
run_ready_type: RunReadyType,
265-
scope: &ScopeFifo<'run>,
218+
scope: &mut bevy_tasks::Scope<'run, ()>,
266219
world: &'run World,
267220
resources: &'run Resources,
268221
) -> RunReadyResult {
@@ -308,7 +261,8 @@ impl ExecutorStage {
308261
// handle multi-threaded system
309262
let sender = self.sender.clone();
310263
self.running_systems.insert(system_index);
311-
scope.spawn_fifo(move |_| {
264+
265+
scope.spawn(async move {
312266
let mut system = system.lock();
313267
system.run(world, resources);
314268
sender.send(system_index).unwrap();
@@ -328,6 +282,10 @@ impl ExecutorStage {
328282
systems: &[Arc<Mutex<Box<dyn System>>>],
329283
schedule_changed: bool,
330284
) {
285+
let compute_pool = resources
286+
.get_cloned::<bevy_tasks::ComputeTaskPool>()
287+
.unwrap();
288+
331289
// if the schedule has changed, clear executor state / fill it with new defaults
332290
if schedule_changed {
333291
self.system_dependencies.clear();
@@ -364,7 +322,8 @@ impl ExecutorStage {
364322
// if there are no upcoming thread local systems, run everything right now
365323
0..systems.len()
366324
};
367-
rayon::scope_fifo(|scope| {
325+
326+
compute_pool.scope(|scope| {
368327
run_ready_result = self.run_ready_systems(
369328
systems,
370329
RunReadyType::Range(run_ready_system_index_range),
@@ -373,6 +332,7 @@ impl ExecutorStage {
373332
resources,
374333
);
375334
});
335+
376336
loop {
377337
// if all systems in the stage are finished, break out of the loop
378338
if self.finished_systems.count_ones(..) == systems.len() {
@@ -393,7 +353,7 @@ impl ExecutorStage {
393353
run_ready_result = RunReadyResult::Ok;
394354
} else {
395355
// wait for a system to finish, then run its dependents
396-
rayon::scope_fifo(|scope| {
356+
compute_pool.scope(|scope| {
397357
loop {
398358
// if all systems in the stage are finished, break out of the loop
399359
if self.finished_systems.count_ones(..) == systems.len() {
@@ -410,7 +370,7 @@ impl ExecutorStage {
410370
resources,
411371
);
412372

413-
// if the next ready system is thread local, break out of this loop/rayon scope so it can be run
373+
// if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run
414374
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
415375
break;
416376
}
@@ -442,6 +402,7 @@ mod tests {
442402
Commands,
443403
};
444404
use bevy_hecs::{Entity, World};
405+
use bevy_tasks::{ComputeTaskPool, TaskPool};
445406
use fixedbitset::FixedBitSet;
446407
use parking_lot::Mutex;
447408
use std::sync::Arc;
@@ -455,6 +416,8 @@ mod tests {
455416
fn cross_stage_archetype_change_prepare() {
456417
let mut world = World::new();
457418
let mut resources = Resources::default();
419+
resources.insert(ComputeTaskPool(TaskPool::default()));
420+
458421
let mut schedule = Schedule::default();
459422
schedule.add_stage("PreArchetypeChange");
460423
schedule.add_stage("PostArchetypeChange");
@@ -484,6 +447,8 @@ mod tests {
484447
fn intra_stage_archetype_change_prepare() {
485448
let mut world = World::new();
486449
let mut resources = Resources::default();
450+
resources.insert(ComputeTaskPool(TaskPool::default()));
451+
487452
let mut schedule = Schedule::default();
488453
schedule.add_stage("update");
489454

@@ -512,6 +477,7 @@ mod tests {
512477
fn schedule() {
513478
let mut world = World::new();
514479
let mut resources = Resources::default();
480+
resources.insert(ComputeTaskPool(TaskPool::default()));
515481
resources.insert(Counter::default());
516482
resources.insert(1.0f64);
517483
resources.insert(2isize);

0 commit comments

Comments
 (0)