Skip to content

Commit 2bdc879

Browse files
committed
Auto merge of #7838 - ehuss:fix-memory-rustc-output, r=alexcrichton
Avoid buffering large amounts of rustc output. If `rustc` prints out a lot of information (such as with `RUSTC_LOG`, or a huge number of diagnostics), cargo would buffer up large amounts of that in memory. For normal builds, this would happen if the terminal does not print fast enough. For "fresh" replay, *everything* was being buffered. There are two issues: 1. There is no back-pressure on the mpsc queue. If messages come in faster than they can be processed, it grows without bounds. 2. The cache-replay code runs in the "fresh" code path which does not spawn a thread. Thus the main thread was blocked and unable to process `Message`s while the replay is happening. The solution here is to use a bounded queue, and to always spawn a thread for the "fresh" case. The main concern here is performance. Previously the "fresh" jobs avoided spawning a thread to improve performance. I did a fair bit of profiling to understand the impact, using projects with anywhere from 100 to 500 units. On my macOS machine, I found spawning a thread to be slightly faster (1-5%). On Linux and Windows, it was generally about 0 to 5% slower. It might be helpful for others to profile it on their own system. I'm on the fence for the cost/benefit here. It seems generally good to reduce memory usage, but the slight performance hit is disappointing. I tried several other approaches to fix this, all with worse trade offs (I can discuss them if interested). Fixes #6197
2 parents b9b30c8 + 05a1f43 commit 2bdc879

File tree

5 files changed

+200
-46
lines changed

5 files changed

+200
-46
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ atty = "0.2"
2323
bytesize = "1.0"
2424
cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" }
2525
crates-io = { path = "crates/crates-io", version = "0.31" }
26-
crossbeam-channel = "0.4"
2726
crossbeam-utils = "0.7"
2827
crypto-hash = "0.3.1"
2928
curl = { version = "0.4.23", features = ["http2"] }

src/cargo/core/compiler/job_queue.rs

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use std::sync::Arc;
5858
use std::time::Duration;
5959

6060
use anyhow::format_err;
61-
use crossbeam_channel::{unbounded, Receiver, Sender};
6261
use crossbeam_utils::thread::Scope;
6362
use jobserver::{Acquired, Client, HelperThread};
6463
use log::{debug, info, trace};
@@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
7372
use crate::core::{PackageId, TargetKind};
7473
use crate::util;
7574
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
75+
use crate::util::Queue;
7676
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
7777
use crate::util::{Config, DependencyQueue};
7878
use crate::util::{Progress, ProgressStyle};
@@ -93,13 +93,34 @@ pub struct JobQueue<'a, 'cfg> {
9393
///
9494
/// It is created from JobQueue when we have fully assembled the crate graph
9595
/// (i.e., all package dependencies are known).
96+
///
97+
/// # Message queue
98+
///
99+
/// Each thread running a process uses the message queue to send messages back
100+
/// to the main thread. The main thread coordinates everything, and handles
101+
/// printing output.
102+
///
103+
/// It is important to be careful which messages use `push` vs `push_bounded`.
104+
/// `push` is for priority messages (like tokens, or "finished") where the
105+
/// sender shouldn't block. We want to handle those so real work can proceed
106+
/// ASAP.
107+
///
108+
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
109+
/// bounded prevents a flood of messages causing a large amount of memory
110+
/// being used.
111+
///
112+
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
113+
/// the diagnostic server thread is dropped, it waits for the thread to exit.
114+
/// But if the thread is blocked on a full queue, and there is a critical
115+
/// error, the drop will deadlock. This should be fixed at some point in the
116+
/// future. The jobserver thread has a similar problem, though it will time
117+
/// out after 1 second.
96118
struct DrainState<'a, 'cfg> {
97119
// This is the length of the DependencyQueue when starting out
98120
total_units: usize,
99121

100122
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
101-
tx: Sender<Message>,
102-
rx: Receiver<Message>,
123+
messages: Arc<Queue<Message>>,
103124
active: HashMap<JobId, Unit<'a>>,
104125
compiled: HashSet<PackageId>,
105126
documented: HashSet<PackageId>,
@@ -145,7 +166,7 @@ impl std::fmt::Display for JobId {
145166

146167
pub struct JobState<'a> {
147168
/// Channel back to the main thread to coordinate messages and such.
148-
tx: Sender<Message>,
169+
messages: Arc<Queue<Message>>,
149170

150171
/// The job id that this state is associated with, used when sending
151172
/// messages back to the main thread.
@@ -199,7 +220,7 @@ enum Message {
199220

200221
impl<'a> JobState<'a> {
201222
pub fn running(&self, cmd: &ProcessBuilder) {
202-
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
223+
self.messages.push(Message::Run(self.id, cmd.to_string()));
203224
}
204225

205226
pub fn build_plan(
@@ -208,17 +229,16 @@ impl<'a> JobState<'a> {
208229
cmd: ProcessBuilder,
209230
filenames: Arc<Vec<OutputFile>>,
210231
) {
211-
let _ = self
212-
.tx
213-
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
232+
self.messages
233+
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
214234
}
215235

216236
pub fn stdout(&self, stdout: String) {
217-
drop(self.tx.send(Message::Stdout(stdout)));
237+
self.messages.push_bounded(Message::Stdout(stdout));
218238
}
219239

220240
pub fn stderr(&self, stderr: String) {
221-
drop(self.tx.send(Message::Stderr(stderr)));
241+
self.messages.push_bounded(Message::Stderr(stderr));
222242
}
223243

224244
/// A method used to signal to the coordinator thread that the rmeta file
@@ -228,9 +248,8 @@ impl<'a> JobState<'a> {
228248
/// produced once!
229249
pub fn rmeta_produced(&self) {
230250
self.rmeta_required.set(false);
231-
let _ = self
232-
.tx
233-
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
251+
self.messages
252+
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
234253
}
235254

236255
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
@@ -239,14 +258,14 @@ impl<'a> JobState<'a> {
239258
/// This should arrange for the associated client to eventually get a token via
240259
/// `client.release_raw()`.
241260
pub fn will_acquire(&self) {
242-
let _ = self.tx.send(Message::NeedsToken(self.id));
261+
self.messages.push(Message::NeedsToken(self.id));
243262
}
244263

245264
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
246265
///
247266
/// Note that it does *not* write that token back anywhere.
248267
pub fn release_token(&self) {
249-
let _ = self.tx.send(Message::ReleaseToken(self.id));
268+
self.messages.push(Message::ReleaseToken(self.id));
250269
}
251270
}
252271

@@ -340,21 +359,22 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
340359
let _p = profile::start("executing the job graph");
341360
self.queue.queue_finished();
342361

343-
let (tx, rx) = unbounded();
344362
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
345363
let state = DrainState {
346364
total_units: self.queue.len(),
347365
queue: self.queue,
348-
tx,
349-
rx,
366+
// 100 here is somewhat arbitrary. It is a few screenfulls of
367+
// output, and hopefully at most a few megabytes of memory for
368+
// typical messages. If you change this, please update the test
369+
// caching_large_output, too.
370+
messages: Arc::new(Queue::new(100)),
350371
active: HashMap::new(),
351372
compiled: HashSet::new(),
352373
documented: HashSet::new(),
353374
counts: self.counts,
354375
progress,
355376
next_id: 0,
356377
timings: self.timings,
357-
358378
tokens: Vec::new(),
359379
rustc_tokens: HashMap::new(),
360380
to_send_clients: BTreeMap::new(),
@@ -364,25 +384,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
364384
};
365385

366386
// Create a helper thread for acquiring jobserver tokens
367-
let tx = state.tx.clone();
387+
let messages = state.messages.clone();
368388
let helper = cx
369389
.jobserver
370390
.clone()
371391
.into_helper_thread(move |token| {
372-
drop(tx.send(Message::Token(token)));
392+
drop(messages.push(Message::Token(token)));
373393
})
374394
.chain_err(|| "failed to create helper thread for jobserver management")?;
375395

376396
// Create a helper thread to manage the diagnostics for rustfix if
377397
// necessary.
378-
let tx = state.tx.clone();
398+
let messages = state.messages.clone();
399+
// It is important that this uses `push` instead of `push_bounded` for
400+
// now. If someone wants to fix this to be bounded, the `drop`
401+
// implementation needs to be changed to avoid possible deadlocks.
379402
let _diagnostic_server = cx
380403
.bcx
381404
.build_config
382405
.rustfix_diagnostic_server
383406
.borrow_mut()
384407
.take()
385-
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
408+
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));
386409

387410
crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
388411
.expect("child threads shouldn't panic")
@@ -584,7 +607,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
584607
// to run above to calculate CPU usage over time. To do this we
585608
// listen for a message with a timeout, and on timeout we run the
586609
// previous parts of the loop again.
587-
let events: Vec<_> = self.rx.try_iter().collect();
610+
let mut events = self.messages.try_pop_all();
588611
info!(
589612
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
590613
self.tokens.len(),
@@ -602,14 +625,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
602625
loop {
603626
self.tick_progress();
604627
self.tokens.truncate(self.active.len() - 1);
605-
match self.rx.recv_timeout(Duration::from_millis(500)) {
606-
Ok(message) => break vec![message],
607-
Err(_) => continue,
628+
match self.messages.pop(Duration::from_millis(500)) {
629+
Some(message) => {
630+
events.push(message);
631+
break;
632+
}
633+
None => continue,
608634
}
609635
}
610-
} else {
611-
events
612636
}
637+
return events;
613638
}
614639

615640
fn drain_the_queue(
@@ -756,7 +781,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
756781
assert!(self.active.insert(id, *unit).is_none());
757782
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
758783

759-
let my_tx = self.tx.clone();
784+
let messages = self.messages.clone();
760785
let fresh = job.freshness();
761786
let rmeta_required = cx.rmeta_required(unit);
762787

@@ -768,13 +793,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
768793
let doit = move || {
769794
let state = JobState {
770795
id,
771-
tx: my_tx.clone(),
796+
messages: messages.clone(),
772797
rmeta_required: Cell::new(rmeta_required),
773798
_marker: marker::PhantomData,
774799
};
775800

776801
let mut sender = FinishOnDrop {
777-
tx: &my_tx,
802+
messages: &messages,
778803
id,
779804
result: Err(format_err!("worker panicked")),
780805
};
@@ -793,39 +818,33 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
793818
// we need to make sure that the metadata is flagged as produced so
794819
// send a synthetic message here.
795820
if state.rmeta_required.get() && sender.result.is_ok() {
796-
my_tx
797-
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
798-
.unwrap();
821+
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
799822
}
800823

801824
// Use a helper struct with a `Drop` implementation to guarantee
802825
// that a `Finish` message is sent even if our job panics. We
803826
// shouldn't panic unless there's a bug in Cargo, so we just need
804827
// to make sure nothing hangs by accident.
805828
struct FinishOnDrop<'a> {
806-
tx: &'a Sender<Message>,
829+
messages: &'a Queue<Message>,
807830
id: JobId,
808831
result: CargoResult<()>,
809832
}
810833

811834
impl Drop for FinishOnDrop<'_> {
812835
fn drop(&mut self) {
813836
let msg = mem::replace(&mut self.result, Ok(()));
814-
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
837+
self.messages
838+
.push(Message::Finish(self.id, Artifact::All, msg));
815839
}
816840
}
817841
};
818842

819843
match fresh {
820-
Freshness::Fresh => {
821-
self.timings.add_fresh();
822-
doit();
823-
}
824-
Freshness::Dirty => {
825-
self.timings.add_dirty();
826-
scope.spawn(move |_| doit());
827-
}
844+
Freshness::Fresh => self.timings.add_fresh(),
845+
Freshness::Dirty => self.timings.add_dirty(),
828846
}
847+
scope.spawn(move |_| doit());
829848

830849
Ok(())
831850
}

src/cargo/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
1818
pub use self::paths::{dylib_path_envvar, normalize_path};
1919
pub use self::process_builder::{process, ProcessBuilder};
2020
pub use self::progress::{Progress, ProgressStyle};
21+
pub use self::queue::Queue;
2122
pub use self::read2::read2;
2223
pub use self::restricted_names::validate_package_name;
2324
pub use self::rustc::Rustc;
@@ -51,6 +52,7 @@ pub mod paths;
5152
pub mod process_builder;
5253
pub mod profile;
5354
mod progress;
55+
mod queue;
5456
mod read2;
5557
pub mod restricted_names;
5658
pub mod rustc;

src/cargo/util/queue.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use std::collections::VecDeque;
2+
use std::sync::{Condvar, Mutex};
3+
use std::time::Duration;
4+
5+
/// A simple, threadsafe, queue of items of type `T`
6+
///
7+
/// This is a sort of channel where any thread can push to a queue and any
8+
/// thread can pop from a queue.
9+
///
10+
/// This supports both bounded and unbounded operations. `push` will never block,
11+
/// and allows the queue to grow without bounds. `push_bounded` will block if the
12+
/// queue is over capacity, and will resume once there is enough capacity.
13+
pub struct Queue<T> {
14+
state: Mutex<State<T>>,
15+
popper_cv: Condvar,
16+
bounded_cv: Condvar,
17+
bound: usize,
18+
}
19+
20+
struct State<T> {
21+
items: VecDeque<T>,
22+
}
23+
24+
impl<T> Queue<T> {
25+
pub fn new(bound: usize) -> Queue<T> {
26+
Queue {
27+
state: Mutex::new(State {
28+
items: VecDeque::new(),
29+
}),
30+
popper_cv: Condvar::new(),
31+
bounded_cv: Condvar::new(),
32+
bound,
33+
}
34+
}
35+
36+
pub fn push(&self, item: T) {
37+
self.state.lock().unwrap().items.push_back(item);
38+
self.popper_cv.notify_one();
39+
}
40+
41+
/// Pushes an item onto the queue, blocking if the queue is full.
42+
pub fn push_bounded(&self, item: T) {
43+
let locked_state = self.state.lock().unwrap();
44+
let mut state = self
45+
.bounded_cv
46+
.wait_while(locked_state, |s| s.items.len() >= self.bound)
47+
.unwrap();
48+
state.items.push_back(item);
49+
self.popper_cv.notify_one();
50+
}
51+
52+
pub fn pop(&self, timeout: Duration) -> Option<T> {
53+
let (mut state, result) = self
54+
.popper_cv
55+
.wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
56+
.unwrap();
57+
if result.timed_out() {
58+
None
59+
} else {
60+
let value = state.items.pop_front()?;
61+
if state.items.len() < self.bound {
62+
// Assumes threads cannot be canceled.
63+
self.bounded_cv.notify_one();
64+
}
65+
Some(value)
66+
}
67+
}
68+
69+
pub fn try_pop_all(&self) -> Vec<T> {
70+
let mut state = self.state.lock().unwrap();
71+
let result = state.items.drain(..).collect();
72+
self.bounded_cv.notify_all();
73+
result
74+
}
75+
}

0 commit comments

Comments
 (0)