Skip to content

Commit ba7738e

Browse files
authored
bug: terminate threads, fix blocking poll in input (#262)
Bug fix for improper use of threads, where they were not properly terminated (not really too bad) and the input thread code actually blocked.
1 parent a5b95ae commit ba7738e

File tree

4 files changed

+142
-67
lines changed

4 files changed

+142
-67
lines changed

.vscode/settings.json

+4
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
"cSpell.words": [
33
"Artem",
44
"COPR",
5+
"Condvar",
56
"DWORD",
67
"Deque",
78
"EINVAL",
89
"EPERM",
910
"ESRCH",
11+
"Fini",
1012
"GIBI",
1113
"GIBIBYTE",
1214
"GIGA",
@@ -42,6 +44,8 @@
4244
"concat",
4345
"crossterm",
4446
"curr",
47+
"cvar",
48+
"cvars",
4549
"czvf",
4650
"denylist",
4751
"fedoracentos",

src/app/data_harvester/processes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::collections::{hash_map::RandomState, HashMap};
1212
use sysinfo::{ProcessExt, ProcessorExt, System, SystemExt};
1313

1414
/// Maximum character length of a /proc/<PID>/stat process name.
15+
#[cfg(target_os = "linux")]
1516
const MAX_STAT_NAME_LEN: usize = 15;
1617

1718
// TODO: Add value so we know if it's sorted ascending or descending by default?

src/bin/main.rs

+57-23
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ use bottom::{canvas, constants::*, data_conversion::*, options::*, *};
77

88
use std::{
99
boxed::Box,
10-
ffi::OsStr,
1110
io::{stdout, Write},
1211
panic,
1312
sync::{
1413
atomic::{AtomicBool, Ordering},
15-
mpsc, Arc,
14+
mpsc, Arc, Condvar, Mutex,
1615
},
1716
thread,
1817
time::Duration,
@@ -36,7 +35,10 @@ fn main() -> Result<()> {
3635
} else {
3736
#[cfg(debug_assertions)]
3837
{
39-
utils::logging::init_logger(log::LevelFilter::Debug, OsStr::new("debug.log"))?;
38+
utils::logging::init_logger(
39+
log::LevelFilter::Debug,
40+
std::ffi::OsStr::new("debug.log"),
41+
)?;
4042
}
4143
}
4244

@@ -70,32 +72,53 @@ fn main() -> Result<()> {
7072
&config,
7173
)?;
7274

75+
// Create termination mutex and cvar
76+
#[allow(clippy::mutex_atomic)]
77+
let thread_termination_lock = Arc::new(Mutex::new(false));
78+
let thread_termination_cvar = Arc::new(Condvar::new());
79+
7380
// Set up input handling
7481
let (sender, receiver) = mpsc::channel();
75-
create_input_thread(sender.clone());
82+
let input_thread = create_input_thread(sender.clone(), thread_termination_lock.clone());
7683

7784
// Cleaning loop
78-
{
85+
let cleaning_thread = {
86+
let lock = thread_termination_lock.clone();
87+
let cvar = thread_termination_cvar.clone();
7988
let cleaning_sender = sender.clone();
8089
trace!("Initializing cleaning thread...");
81-
thread::spawn(move || loop {
82-
thread::sleep(Duration::from_millis(
83-
constants::STALE_MAX_MILLISECONDS + 5000,
84-
));
85-
trace!("Sending cleaning signal...");
86-
if cleaning_sender.send(BottomEvent::Clean).is_err() {
87-
trace!("Failed to send cleaning signal. Halting cleaning thread loop.");
88-
break;
90+
thread::spawn(move || {
91+
loop {
92+
let result = cvar.wait_timeout(
93+
lock.lock().unwrap(),
94+
Duration::from_millis(constants::STALE_MAX_MILLISECONDS + 5000),
95+
);
96+
if let Ok(result) = result {
97+
if *(result.0) {
98+
trace!("Received termination lock in cleaning thread from cvar!");
99+
break;
100+
}
101+
} else {
102+
trace!("Sending cleaning signal...");
103+
if cleaning_sender.send(BottomEvent::Clean).is_err() {
104+
trace!("Failed to send cleaning signal. Halting cleaning thread loop.");
105+
break;
106+
}
107+
trace!("Cleaning signal sent without errors.");
108+
}
89109
}
90-
trace!("Cleaning signal sent without errors.");
91-
});
92-
}
110+
111+
trace!("Cleaning thread loop has closed.");
112+
})
113+
};
93114

94115
// Event loop
95-
let (reset_sender, reset_receiver) = mpsc::channel();
96-
create_collection_thread(
116+
let (collection_thread_ctrl_sender, collection_thread_ctrl_receiver) = mpsc::channel();
117+
let collection_thread = create_collection_thread(
97118
sender,
98-
reset_receiver,
119+
collection_thread_ctrl_receiver,
120+
thread_termination_lock.clone(),
121+
thread_termination_cvar.clone(),
99122
&app.app_config_fields,
100123
app.used_widgets.clone(),
101124
);
@@ -117,7 +140,6 @@ fn main() -> Result<()> {
117140
let ist_clone = is_terminated.clone();
118141
ctrlc::set_handler(move || {
119142
ist_clone.store(true, Ordering::SeqCst);
120-
termination_hook();
121143
})?;
122144
let mut first_run = true;
123145

@@ -127,12 +149,12 @@ fn main() -> Result<()> {
127149
if let BottomEvent::Update(_) = recv {
128150
trace!("Main/drawing thread received Update event.");
129151
} else {
130-
trace!("Main/drawing thread received event: {:#?}", recv);
152+
trace!("Main/drawing thread received event: {:?}", recv);
131153
}
132154
}
133155
match recv {
134156
BottomEvent::KeyInput(event) => {
135-
if handle_key_event_or_break(event, &mut app, &reset_sender) {
157+
if handle_key_event_or_break(event, &mut app, &collection_thread_ctrl_sender) {
136158
break;
137159
}
138160
handle_force_redraws(&mut app);
@@ -227,7 +249,19 @@ fn main() -> Result<()> {
227249
try_drawing(&mut terminal, &mut app, &mut painter, is_debug)?;
228250
}
229251

230-
trace!("Main/drawing thread is cleaning up.");
252+
// I think doing it in this order is safe...
253+
trace!("Send termination thread locks.");
254+
*thread_termination_lock.lock().unwrap() = true;
255+
trace!("Notifying all cvars.");
256+
thread_termination_cvar.notify_all();
257+
231258
cleanup_terminal(&mut terminal, is_debug)?;
259+
260+
trace!("Main/drawing thread is cleaning up.");
261+
262+
cleaning_thread.join().unwrap();
263+
input_thread.join().unwrap();
264+
collection_thread.join().unwrap();
265+
trace!("Fini.");
232266
Ok(())
233267
}

src/lib.rs

+80-44
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use std::{
99
io::{stdout, Write},
1010
panic::PanicInfo,
1111
path::PathBuf,
12+
sync::Arc,
13+
sync::Condvar,
14+
sync::Mutex,
1215
thread,
1316
time::{Duration, Instant},
1417
};
@@ -57,7 +60,7 @@ pub enum BottomEvent<I, J> {
5760
}
5861

5962
#[derive(Debug)]
60-
pub enum CollectionThreadEvent {
63+
pub enum ThreadControlEvent {
6164
Reset,
6265
UpdateConfig(Box<app::AppConfigFields>),
6366
UpdateUsedWidgets(Box<UsedWidgets>),
@@ -87,7 +90,7 @@ pub fn handle_mouse_event(event: MouseEvent, app: &mut App) {
8790
}
8891

8992
pub fn handle_key_event_or_break(
90-
event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender<CollectionThreadEvent>,
93+
event: KeyEvent, app: &mut App, reset_sender: &std::sync::mpsc::Sender<ThreadControlEvent>,
9194
) -> bool {
9295
// debug!("KeyEvent: {:?}", event);
9396

@@ -144,7 +147,7 @@ pub fn handle_key_event_or_break(
144147
KeyCode::Up => app.move_widget_selection(&WidgetDirection::Up),
145148
KeyCode::Down => app.move_widget_selection(&WidgetDirection::Down),
146149
KeyCode::Char('r') => {
147-
if reset_sender.send(CollectionThreadEvent::Reset).is_ok() {
150+
if reset_sender.send(ThreadControlEvent::Reset).is_ok() {
148151
app.reset();
149152
}
150153
}
@@ -262,12 +265,6 @@ pub fn cleanup_terminal(
262265
Ok(())
263266
}
264267

265-
pub fn termination_hook() {
266-
let mut stdout = stdout();
267-
disable_raw_mode().unwrap();
268-
execute!(stdout, DisableMouseCapture, LeaveAlternateScreen).unwrap();
269-
}
270-
271268
/// Based on https://github.com/Rigellute/spotify-tui/blob/master/src/main.rs
272269
pub fn panic_hook(panic_info: &PanicInfo<'_>) {
273270
let mut stdout = stdout();
@@ -564,48 +561,60 @@ pub fn create_input_thread(
564561
sender: std::sync::mpsc::Sender<
565562
BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>,
566563
>,
567-
) {
564+
termination_ctrl_lock: Arc<Mutex<bool>>,
565+
) -> std::thread::JoinHandle<()> {
568566
trace!("Creating input thread.");
569567
thread::spawn(move || {
570568
trace!("Spawned input thread.");
571569
let mut mouse_timer = Instant::now();
572570
let mut keyboard_timer = Instant::now();
573571

574572
loop {
575-
trace!("Waiting for an input event...");
576-
if poll(Duration::from_millis(20)).is_ok() {
577-
if let Ok(event) = read() {
578-
trace!("Input thread received an event: {:?}", event);
579-
if let Event::Key(key) = event {
580-
if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 {
581-
if sender.send(BottomEvent::KeyInput(key)).is_err() {
582-
break;
573+
if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
574+
// We don't block.
575+
if *is_terminated {
576+
trace!("Received termination lock in input thread!");
577+
drop(is_terminated);
578+
break;
579+
}
580+
}
581+
if let Ok(poll) = poll(Duration::from_millis(20)) {
582+
if poll {
583+
if let Ok(event) = read() {
584+
trace!("Input thread received an event: {:?}", event);
585+
if let Event::Key(key) = event {
586+
if Instant::now().duration_since(keyboard_timer).as_millis() >= 20 {
587+
if sender.send(BottomEvent::KeyInput(key)).is_err() {
588+
break;
589+
}
590+
trace!("Input thread sent keyboard data.");
591+
keyboard_timer = Instant::now();
583592
}
584-
trace!("Input thread sent data.");
585-
keyboard_timer = Instant::now();
586-
}
587-
} else if let Event::Mouse(mouse) = event {
588-
if Instant::now().duration_since(mouse_timer).as_millis() >= 20 {
589-
if sender.send(BottomEvent::MouseInput(mouse)).is_err() {
590-
break;
593+
} else if let Event::Mouse(mouse) = event {
594+
if Instant::now().duration_since(mouse_timer).as_millis() >= 20 {
595+
if sender.send(BottomEvent::MouseInput(mouse)).is_err() {
596+
break;
597+
}
598+
trace!("Input thread sent mouse data.");
599+
mouse_timer = Instant::now();
591600
}
592-
trace!("Input thread sent data.");
593-
mouse_timer = Instant::now();
594601
}
595602
}
596603
}
597604
}
598605
}
599-
});
606+
trace!("Input thread loop has closed.");
607+
})
600608
}
601609

602610
pub fn create_collection_thread(
603611
sender: std::sync::mpsc::Sender<
604612
BottomEvent<crossterm::event::KeyEvent, crossterm::event::MouseEvent>,
605613
>,
606-
reset_receiver: std::sync::mpsc::Receiver<CollectionThreadEvent>,
614+
control_receiver: std::sync::mpsc::Receiver<ThreadControlEvent>,
615+
termination_ctrl_lock: Arc<Mutex<bool>>, termination_ctrl_cvar: Arc<Condvar>,
607616
app_config_fields: &app::AppConfigFields, used_widget_set: UsedWidgets,
608-
) {
617+
) -> std::thread::JoinHandle<()> {
609618
trace!("Creating collection thread.");
610619
let temp_type = app_config_fields.temperature_type.clone();
611620
let use_current_cpu_total = app_config_fields.use_current_cpu_total;
@@ -617,50 +626,77 @@ pub fn create_collection_thread(
617626
let mut data_state = data_harvester::DataCollector::default();
618627
trace!("Created initial data state.");
619628
data_state.set_collected_data(used_widget_set);
620-
trace!("Set collected data.");
621629
data_state.set_temperature_type(temp_type);
622-
trace!("Set initial temp type.");
623630
data_state.set_use_current_cpu_total(use_current_cpu_total);
624-
trace!("Set current CPU total.");
625631
data_state.set_show_average_cpu(show_average_cpu);
626-
trace!("Set showing average CPU.");
627632

628633
data_state.init();
629634
trace!("Data state is now fully initialized.");
630635
loop {
631-
trace!("Collecting...");
636+
// Check once at the very top...
637+
if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
638+
// We don't block here.
639+
if *is_terminated {
640+
trace!("Received termination lock in collection thread!");
641+
drop(is_terminated);
642+
break;
643+
}
644+
}
645+
646+
trace!("Checking for collection control receiver event...");
632647
let mut update_time = update_rate_in_milliseconds;
633-
if let Ok(message) = reset_receiver.try_recv() {
648+
if let Ok(message) = control_receiver.try_recv() {
634649
trace!("Received message in collection thread: {:?}", message);
635650
match message {
636-
CollectionThreadEvent::Reset => {
651+
ThreadControlEvent::Reset => {
637652
data_state.data.cleanup();
638653
}
639-
CollectionThreadEvent::UpdateConfig(app_config_fields) => {
654+
ThreadControlEvent::UpdateConfig(app_config_fields) => {
640655
data_state.set_temperature_type(app_config_fields.temperature_type.clone());
641656
data_state
642657
.set_use_current_cpu_total(app_config_fields.use_current_cpu_total);
643658
data_state.set_show_average_cpu(app_config_fields.show_average_cpu);
644659
}
645-
CollectionThreadEvent::UpdateUsedWidgets(used_widget_set) => {
660+
ThreadControlEvent::UpdateUsedWidgets(used_widget_set) => {
646661
data_state.set_collected_data(*used_widget_set);
647662
}
648-
CollectionThreadEvent::UpdateUpdateTime(new_time) => {
663+
ThreadControlEvent::UpdateUpdateTime(new_time) => {
649664
update_time = new_time;
650665
}
651666
}
652667
}
653668
futures::executor::block_on(data_state.update_data());
654-
trace!("Collection thread is updating...");
669+
670+
// Yet another check to bail if needed...
671+
if let Ok(is_terminated) = termination_ctrl_lock.try_lock() {
672+
// We don't block here.
673+
if *is_terminated {
674+
trace!("Received termination lock in collection thread!");
675+
drop(is_terminated);
676+
break;
677+
}
678+
}
679+
680+
trace!("Collection thread is updating and sending...");
655681
let event = BottomEvent::Update(Box::from(data_state.data));
656-
trace!("Collection thread done updating. Sending data now...");
657682
data_state.data = data_harvester::Data::default();
658683
if sender.send(event).is_err() {
659684
trace!("Error sending from collection thread...");
660685
break;
661686
}
662687
trace!("No problem sending from collection thread!");
663-
thread::sleep(Duration::from_millis(update_time));
688+
689+
if let Ok((is_terminated, _wait_timeout_result)) = termination_ctrl_cvar.wait_timeout(
690+
termination_ctrl_lock.lock().unwrap(),
691+
Duration::from_millis(update_time),
692+
) {
693+
if *is_terminated {
694+
trace!("Received termination lock in collection thread from cvar!");
695+
drop(is_terminated);
696+
break;
697+
}
698+
}
664699
}
665-
});
700+
trace!("Collection thread loop has closed.");
701+
})
666702
}

0 commit comments

Comments
 (0)