Skip to content

Commit 618b0d4

Browse files
bors[bot]joboet
andauthored
Merge #534
534: Add futex syscalls r=mkroening a=joboet [Futexes](https://man7.org/linux/man-pages/man2/futex.2.html) greatly simplify the creation of synchronization primitives (see [`std`'s mutex](https://github.com/rust-lang/rust/blob/master/library/std/src/sys/unix/locks/futex_mutex.rs)). They are here implemented by using a hashmap, in which the handles to waiting threads are stored, keyed with the address of the futex they are waiting on. I prioritised readability over performance in this PR, faster implementations would use finer grained locking and would not need to allocate the queues. It should be performant enough for most purposes however. Co-authored-by: joboet <[email protected]>
2 parents b5a5dae + 2de4047 commit 618b0d4

File tree

10 files changed

+278
-3
lines changed

10 files changed

+278
-3
lines changed

Cargo.lock

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ dhcpv4 = [
6767
]
6868

6969
[dependencies]
70+
ahash = { version = "0.8", default-features = false }
7071
bitflags = "1.3"
7172
crossbeam-utils = { version = "0.8", default-features = false }
73+
hashbrown = { version = "0.12", default-features = false }
7274
hermit-entry = { version = "0.9", features = ["kernel"] }
7375
include-transformed = { version = "0.2", optional = true }
7476
log = { version = "0.4", default-features = false }

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#![allow(incomplete_features)]
1010
#![feature(abi_x86_interrupt)]
1111
#![feature(allocator_api)]
12+
#![feature(atomic_mut_ptr)]
1213
#![feature(asm_const)]
1314
#![feature(asm_sym)]
1415
#![feature(const_btree_new)]
@@ -23,6 +24,8 @@
2324
#![feature(alloc_error_handler)]
2425
#![feature(vec_into_raw_parts)]
2526
#![feature(drain_filter)]
27+
#![feature(strict_provenance)]
28+
#![feature(is_some_with)]
2629
#![no_std]
2730
#![cfg_attr(target_os = "none", feature(custom_test_frameworks))]
2831
#![cfg_attr(target_os = "none", cfg_attr(test, test_runner(crate::test_runner)))]

src/scheduler/task.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ impl PartialEq for TaskHandle {
140140
impl Eq for TaskHandle {}
141141

142142
/// Realize a priority queue for task handles
143+
#[derive(Default)]
143144
pub struct TaskHandlePriorityQueue {
144145
queues: [Option<VecDeque<TaskHandle>>; NO_PRIORITIES],
145146
prio_bitmap: u64,
@@ -158,6 +159,11 @@ impl TaskHandlePriorityQueue {
158159
}
159160
}
160161

162+
/// Checks if the queue is empty.
163+
pub fn is_empty(&self) -> bool {
164+
self.prio_bitmap == 0
165+
}
166+
161167
/// Add a task handle by its priority to the queue
162168
pub fn push(&mut self, task: TaskHandle) {
163169
let i = task.priority.into() as usize;
@@ -196,16 +202,19 @@ impl TaskHandlePriorityQueue {
196202
None
197203
}
198204

199-
/// Remove a specific task handle from the priority queue.
200-
pub fn remove(&mut self, task: TaskHandle) {
205+
/// Remove a specific task handle from the priority queue. Returns `true` if
206+
/// the handle was in the queue.
207+
pub fn remove(&mut self, task: TaskHandle) -> bool {
201208
let queue_index = task.priority.into() as usize;
202209
//assert!(queue_index < NO_PRIORITIES, "Priority {} is too high", queue_index);
203210

211+
let mut success = false;
204212
if let Some(queue) = &mut self.queues[queue_index] {
205213
let mut i = 0;
206214
while i != queue.len() {
207215
if queue[i].id == task.id {
208216
queue.remove(i);
217+
success = true;
209218
} else {
210219
i += 1;
211220
}
@@ -215,6 +224,8 @@ impl TaskHandlePriorityQueue {
215224
self.prio_bitmap &= !(1 << queue_index as u64);
216225
}
217226
}
227+
228+
success
218229
}
219230
}
220231

src/synch/futex.rs

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use ahash::RandomState;
2+
use core::sync::atomic::{AtomicU32, Ordering::SeqCst};
3+
use hashbrown::{hash_map::Entry, HashMap};
4+
5+
use crate::{
6+
arch::kernel::{percore::core_scheduler, processor::get_timer_ticks},
7+
errno::{EAGAIN, EINVAL, ETIMEDOUT},
8+
scheduler::task::TaskHandlePriorityQueue,
9+
};
10+
11+
use super::spinlock::SpinlockIrqSave;
12+
13+
// TODO: Replace with a concurrent hashmap.
14+
static PARKING_LOT: SpinlockIrqSave<HashMap<usize, TaskHandlePriorityQueue, RandomState>> =
15+
SpinlockIrqSave::new(HashMap::with_hasher(RandomState::with_seeds(0, 0, 0, 0)));
16+
17+
bitflags! {
18+
pub struct Flags: u32 {
19+
/// Use a relative timeout
20+
const RELATIVE = 0b01;
21+
}
22+
}
23+
24+
/// If the value at address matches the expected value, park the current thread until it is either
25+
/// woken up with `futex_wake` (returns 0) or the specified timeout elapses (returns -ETIMEDOUT).
26+
///
27+
/// The timeout is given in microseconds. If [`Flags::RELATIVE`] is given, it is interpreted as
28+
/// relative to the current time. Otherwise it is understood to be an absolute time
29+
/// (see `get_timer_ticks`).
30+
pub fn futex_wait(address: &AtomicU32, expected: u32, timeout: Option<u64>, flags: Flags) -> i32 {
31+
let mut parking_lot = PARKING_LOT.lock();
32+
// Check the futex value after locking the parking lot so that all changes are observed.
33+
if address.load(SeqCst) != expected {
34+
return -EAGAIN;
35+
}
36+
37+
let wakeup_time = if flags.contains(Flags::RELATIVE) {
38+
timeout.and_then(|t| get_timer_ticks().checked_add(t))
39+
} else {
40+
timeout
41+
};
42+
43+
let scheduler = core_scheduler();
44+
scheduler.block_current_task(wakeup_time);
45+
let handle = scheduler.get_current_task_handle();
46+
parking_lot
47+
.entry(address.as_mut_ptr().addr())
48+
.or_default()
49+
.push(handle);
50+
drop(parking_lot);
51+
52+
loop {
53+
scheduler.reschedule();
54+
55+
// Try to remove ourselves from the waiting queue.
56+
let mut parking_lot = PARKING_LOT.lock();
57+
let mut wakeup = true;
58+
if let Entry::Occupied(mut queue) = parking_lot.entry(address.as_mut_ptr().addr()) {
59+
// If we are not in the waking queue, this must have been a wakeup.
60+
wakeup = !queue.get_mut().remove(handle);
61+
if queue.get().is_empty() {
62+
queue.remove();
63+
}
64+
};
65+
66+
if wakeup {
67+
return 0;
68+
} else if wakeup_time.is_some_and(|&t| t <= get_timer_ticks()) {
69+
// If the current time is past the wakeup time, the operation timed out.
70+
return -ETIMEDOUT;
71+
}
72+
73+
// A spurious wakeup occurred, sleep again.
74+
scheduler.block_current_task(wakeup_time);
75+
}
76+
}
77+
78+
/// Wake `count` threads waiting on the futex at address. Returns the number of threads
79+
/// woken up (saturates to `i32::MAX`). If `count` is `i32::MAX`, wake up all matching
80+
/// waiting threads. If `count` is negative, returns -EINVAL.
81+
pub fn futex_wake(address: &AtomicU32, count: i32) -> i32 {
82+
if count < 0 {
83+
return -EINVAL;
84+
}
85+
86+
let mut parking_lot = PARKING_LOT.lock();
87+
let mut queue = match parking_lot.entry(address.as_mut_ptr().addr()) {
88+
Entry::Occupied(entry) => entry,
89+
Entry::Vacant(_) => return 0,
90+
};
91+
92+
let scheduler = core_scheduler();
93+
let mut woken = 0;
94+
while woken != count || count == i32::MAX {
95+
match queue.get_mut().pop() {
96+
Some(handle) => scheduler.custom_wakeup(handle),
97+
None => break,
98+
}
99+
woken = woken.saturating_add(1);
100+
}
101+
102+
if queue.get().is_empty() {
103+
queue.remove();
104+
}
105+
106+
woken
107+
}

src/synch/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Synchronization primitives
22
3+
pub mod futex;
34
pub mod recmutex;
45
pub mod semaphore;
56
pub mod spinlock;

src/syscalls/futex.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use core::sync::atomic::AtomicU32;
2+
3+
use crate::{
4+
errno::EINVAL,
5+
synch::futex::{self as synch, Flags},
6+
timespec, timespec_to_microseconds,
7+
};
8+
9+
/// Like `synch::futex_wait`, but does extra sanity checks and takes a `timespec`.
10+
///
11+
/// Returns -EINVAL if
12+
/// * `address` is null
13+
/// * `timeout` is negative
14+
/// * `flags` contains unknown flags
15+
extern "C" fn __sys_futex_wait(
16+
address: *mut u32,
17+
expected: u32,
18+
timeout: *const timespec,
19+
flags: u32,
20+
) -> i32 {
21+
if address.is_null() {
22+
return -EINVAL;
23+
}
24+
25+
let address = unsafe { &*(address as *const AtomicU32) };
26+
let timeout = if timeout.is_null() {
27+
None
28+
} else {
29+
match timespec_to_microseconds(unsafe { timeout.read() }) {
30+
t @ Some(_) => t,
31+
None => return -EINVAL,
32+
}
33+
};
34+
let flags = match Flags::from_bits(flags) {
35+
Some(flags) => flags,
36+
None => return -EINVAL,
37+
};
38+
39+
synch::futex_wait(address, expected, timeout, flags)
40+
}
41+
42+
#[no_mangle]
43+
pub extern "C" fn sys_futex_wait(
44+
address: *mut u32,
45+
expected: u32,
46+
timeout: *const timespec,
47+
flags: u32,
48+
) -> i32 {
49+
kernel_function!(__sys_futex_wait(address, expected, timeout, flags))
50+
}
51+
52+
/// Like `synch::futex_wake`, but does extra sanity checks.
53+
///
54+
/// Returns -EINVAL if `address` is null.
55+
extern "C" fn __sys_futex_wake(address: *mut u32, count: i32) -> i32 {
56+
if address.is_null() {
57+
return -EINVAL;
58+
}
59+
60+
let address = unsafe { &*(address as *const AtomicU32) };
61+
synch::futex_wake(address, count)
62+
}
63+
64+
#[no_mangle]
65+
pub extern "C" fn sys_futex_wake(address: *mut u32, count: i32) -> i32 {
66+
kernel_function!(__sys_futex_wake(address, count))
67+
}

src/syscalls/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::syscalls::interfaces::SyscallInterface;
88
use crate::{__sys_free, __sys_malloc, __sys_realloc};
99

1010
pub use self::condvar::*;
11+
pub use self::futex::*;
1112
pub use self::processor::*;
1213
pub use self::random::*;
1314
pub use self::recmutex::*;
@@ -19,6 +20,7 @@ pub use self::timer::*;
1920

2021
mod condvar;
2122
pub(crate) mod fs;
23+
mod futex;
2224
mod interfaces;
2325
#[cfg(feature = "newlib")]
2426
mod lwip;

src/syscalls/timer.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ fn microseconds_to_timeval(microseconds: u64, result: &mut timeval) {
3939
result.tv_usec = (microseconds % 1_000_000) as i64;
4040
}
4141

42+
pub(crate) fn timespec_to_microseconds(time: timespec) -> Option<u64> {
43+
u64::try_from(time.tv_sec)
44+
.ok()
45+
.and_then(|secs| secs.checked_mul(1_000_000))
46+
.and_then(|millions| millions.checked_add(u64::try_from(time.tv_nsec).ok()? / 1000))
47+
}
48+
4249
extern "C" fn __sys_clock_getres(clock_id: u64, res: *mut timespec) -> i32 {
4350
assert!(
4451
!res.is_null(),

0 commit comments

Comments
 (0)