Skip to content

Commit faf8f92

Browse files
authored
Fix use of cc with make <4.3: Clear O_NONBLOCK after compilaton (#966)
* Fix use of cc with `make`: Clear `O_NONBLOCK` after compilaton Since make <4.3 cannot handle jobserver pipe with `O_NONBLOCK` set. Signed-off-by: Jiahao XU <[email protected]> * Linux optimization: Try converting pipe to fifo Signed-off-by: Jiahao XU <[email protected]> * Fix compilation on windows Signed-off-by: Jiahao XU <[email protected]> * Optimize unix job server impl All unix, not just linux, has /dev/fd --------- Signed-off-by: Jiahao XU <[email protected]>
1 parent 271d3d6 commit faf8f92

File tree

4 files changed

+124
-20
lines changed

4 files changed

+124
-20
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1395,7 +1395,7 @@ impl Build {
13951395
}
13961396

13971397
// Limit our parallelism globally with a jobserver.
1398-
let tokens = parallel::job_token::JobTokenServer::new();
1398+
let tokens = parallel::job_token::ActiveJobTokenServer::new()?;
13991399

14001400
// When compiling objects in parallel we do a few dirty tricks to speed
14011401
// things up:

src/parallel/job_token/mod.rs

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl Drop for JobToken {
2121
}
2222
}
2323

24-
pub(crate) enum JobTokenServer {
24+
enum JobTokenServer {
2525
Inherited(inherited_jobserver::JobServer),
2626
InProcess(inprocess_jobserver::JobServer),
2727
}
@@ -35,7 +35,7 @@ impl JobTokenServer {
3535
/// present), we will create a global in-process only jobserver
3636
/// that has to be static so that it will be shared by all cc
3737
/// compilation.
38-
pub(crate) fn new() -> &'static Self {
38+
fn new() -> &'static Self {
3939
static INIT: Once = Once::new();
4040
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
4141

@@ -50,11 +50,35 @@ impl JobTokenServer {
5050
&*JOBSERVER.as_ptr()
5151
}
5252
}
53+
}
54+
55+
pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer);
56+
57+
impl ActiveJobTokenServer {
58+
pub(crate) fn new() -> Result<Self, Error> {
59+
let jobserver = JobTokenServer::new();
60+
61+
#[cfg(unix)]
62+
if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver {
63+
inherited_jobserver.enter_active()?;
64+
}
65+
66+
Ok(Self(jobserver))
67+
}
5368

5469
pub(crate) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
55-
match self {
56-
Self::Inherited(jobserver) => jobserver.try_acquire(),
57-
Self::InProcess(jobserver) => Ok(jobserver.try_acquire()),
70+
match &self.0 {
71+
JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(),
72+
JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()),
73+
}
74+
}
75+
}
76+
77+
impl Drop for ActiveJobTokenServer {
78+
fn drop(&mut self) {
79+
#[cfg(unix)]
80+
if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 {
81+
inherited_jobserver.exit_active();
5882
}
5983
}
6084
}
@@ -70,6 +94,9 @@ mod inherited_jobserver {
7094
},
7195
};
7296

97+
#[cfg(unix)]
98+
use std::sync::{Mutex, MutexGuard, PoisonError};
99+
73100
pub(crate) struct JobServer {
74101
/// Implicit token for this process which is obtained and will be
75102
/// released in parent. Since JobTokens only give back what they got,
@@ -80,6 +107,10 @@ mod inherited_jobserver {
80107
/// the end of the process.
81108
global_implicit_token: AtomicBool,
82109
inner: sys::JobServerClient,
110+
/// number of active clients is required to know when it is safe to clear non-blocking
111+
/// flags
112+
#[cfg(unix)]
113+
active_clients_cnt: Mutex<usize>,
83114
}
84115

85116
impl JobServer {
@@ -117,9 +148,40 @@ mod inherited_jobserver {
117148
.map(|inner| Self {
118149
inner,
119150
global_implicit_token: AtomicBool::new(true),
151+
#[cfg(unix)]
152+
active_clients_cnt: Mutex::new(0),
120153
})
121154
}
122155

156+
#[cfg(unix)]
157+
fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> {
158+
self.active_clients_cnt
159+
.lock()
160+
.unwrap_or_else(PoisonError::into_inner)
161+
}
162+
163+
#[cfg(unix)]
164+
pub(super) fn enter_active(&self) -> Result<(), Error> {
165+
let mut active_cnt = self.get_locked_active_cnt();
166+
if *active_cnt == 0 {
167+
self.inner.prepare_for_acquires()?;
168+
}
169+
170+
*active_cnt += 1;
171+
172+
Ok(())
173+
}
174+
175+
#[cfg(unix)]
176+
pub(super) fn exit_active(&self) {
177+
let mut active_cnt = self.get_locked_active_cnt();
178+
*active_cnt -= 1;
179+
180+
if *active_cnt == 0 {
181+
self.inner.done_acquires();
182+
}
183+
}
184+
123185
pub(super) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
124186
if !self.global_implicit_token.swap(false, AcqRel) {
125187
// Cold path, no global implicit token, obtain one

src/parallel/job_token/unix.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
path::Path,
88
};
99

10-
use crate::parallel::stderr::set_non_blocking;
10+
use crate::parallel::stderr::{set_blocking, set_non_blocking};
1111

1212
pub(super) struct JobServerClient {
1313
read: File,
@@ -74,13 +74,16 @@ impl JobServerClient {
7474
Some(libc::O_RDONLY) | Some(libc::O_RDWR),
7575
Some(libc::O_WRONLY) | Some(libc::O_RDWR),
7676
) => {
77+
// Optimization: Try converting it to a fifo by using /dev/fd
78+
if let Some(jobserver) =
79+
Self::from_fifo(Path::new(&format!("/dev/fd/{}", read.as_raw_fd())))
80+
{
81+
return Some(jobserver);
82+
}
83+
7784
let read = read.try_clone().ok()?;
7885
let write = write.try_clone().ok()?;
7986

80-
// Set read and write end to nonblocking
81-
set_non_blocking(&read).ok()?;
82-
set_non_blocking(&write).ok()?;
83-
8487
Some(Self {
8588
read,
8689
write: Some(write),
@@ -90,6 +93,23 @@ impl JobServerClient {
9093
}
9194
}
9295

96+
pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> {
97+
if let Some(write) = self.write.as_ref() {
98+
set_non_blocking(&self.read)?;
99+
set_non_blocking(write)?;
100+
}
101+
102+
Ok(())
103+
}
104+
105+
pub(super) fn done_acquires(&self) {
106+
if let Some(write) = self.write.as_ref() {
107+
let _ = set_blocking(&self.read);
108+
let _ = set_blocking(write);
109+
}
110+
}
111+
112+
/// Must call `prepare_for_acquire` before using it.
93113
pub(super) fn try_acquire(&self) -> io::Result<Option<()>> {
94114
let mut fds = [libc::pollfd {
95115
fd: self.read.as_raw_fd(),

src/parallel/stderr.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,56 @@ use crate::{Error, ErrorKind};
77
compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");
88

99
#[cfg(unix)]
10-
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
11-
// On Unix, switch the pipe to non-blocking mode.
12-
// On Windows, we have a different way to be non-blocking.
13-
let fd = pipe.as_raw_fd();
10+
fn get_flags(fd: std::os::unix::io::RawFd) -> Result<i32, Error> {
1411
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
1512
if flags == -1 {
16-
return Err(Error::new(
13+
Err(Error::new(
1714
ErrorKind::IOError,
1815
format!(
1916
"Failed to get flags for pipe {}: {}",
2017
fd,
2118
std::io::Error::last_os_error()
2219
),
23-
));
20+
))
21+
} else {
22+
Ok(flags)
2423
}
24+
}
2525

26+
#[cfg(unix)]
27+
fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> {
2628
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } == -1 {
27-
return Err(Error::new(
29+
Err(Error::new(
2830
ErrorKind::IOError,
2931
format!(
3032
"Failed to set flags for pipe {}: {}",
3133
fd,
3234
std::io::Error::last_os_error()
3335
),
34-
));
36+
))
37+
} else {
38+
Ok(())
3539
}
40+
}
41+
42+
#[cfg(unix)]
43+
pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
44+
// On Unix, switch the pipe to non-blocking mode.
45+
// On Windows, we have a different way to be non-blocking.
46+
let fd = pipe.as_raw_fd();
47+
48+
let flags = get_flags(fd)?;
49+
set_flags(fd, flags & (!libc::O_NONBLOCK))
50+
}
51+
52+
#[cfg(unix)]
53+
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
54+
// On Unix, switch the pipe to non-blocking mode.
55+
// On Windows, we have a different way to be non-blocking.
56+
let fd = pipe.as_raw_fd();
3657

37-
Ok(())
58+
let flags = get_flags(fd)?;
59+
set_flags(fd, flags | libc::O_NONBLOCK)
3860
}
3961

4062
pub fn bytes_available(stderr: &mut ChildStderr) -> Result<usize, Error> {

0 commit comments

Comments
 (0)