@@ -96,7 +96,7 @@ impl Display for EncoderCrash {
96
96
97
97
impl < ' a > Broker < ' a > {
98
98
/// Main encoding loop. set_thread_affinity may be ignored if the value is invalid.
99
- pub fn encoding_loop ( self , tx : Sender < ( ) > , mut set_thread_affinity : Option < usize > ) {
99
+ pub fn encoding_loop ( self , tx : Sender < ( ) > , set_thread_affinity : Option < usize > ) {
100
100
assert ! ( !self . chunk_queue. is_empty( ) ) ;
101
101
102
102
if !self . chunk_queue . is_empty ( ) {
@@ -107,26 +107,6 @@ impl<'a> Broker<'a> {
107
107
}
108
108
drop ( sender) ;
109
109
110
- cfg_if ! {
111
- if #[ cfg( any( target_os = "linux" , target_os = "windows" ) ) ] {
112
- if let Some ( threads) = set_thread_affinity {
113
- let available_threads = available_parallelism( ) . expect( "Unrecoverable: Failed to get thread count" ) . get( ) ;
114
- let requested_threads = threads. saturating_mul( self . project. args. workers) ;
115
- if requested_threads > available_threads {
116
- warn!(
117
- "ignoring set_thread_affinity: requested more threads than available ({}/{})" ,
118
- requested_threads, available_threads
119
- ) ;
120
- set_thread_affinity = None ;
121
- } else if requested_threads == 0 {
122
- warn!( "ignoring set_thread_affinity: requested 0 threads" ) ;
123
-
124
- set_thread_affinity = None ;
125
- }
126
- }
127
- }
128
- }
129
-
130
110
crossbeam_utils:: thread:: scope ( |s| {
131
111
let consumers: Vec < _ > = ( 0 ..self . project . args . workers )
132
112
. map ( |idx| ( receiver. clone ( ) , & self , idx) )
@@ -136,13 +116,26 @@ impl<'a> Broker<'a> {
136
116
cfg_if ! {
137
117
if #[ cfg( any( target_os = "linux" , target_os = "windows" ) ) ] {
138
118
if let Some ( threads) = set_thread_affinity {
139
- let mut cpu_set = SmallVec :: <[ usize ; 16 ] >:: new( ) ;
140
- cpu_set. extend( ( threads * worker_id..) . take( threads) ) ;
141
- if let Err ( e) = affinity:: set_thread_affinity( & cpu_set) {
142
- warn!(
143
- "failed to set thread affinity for worker {}: {}" ,
144
- worker_id, e
145
- ) ;
119
+ if threads == 0 {
120
+ warn!( "Ignoring set_thread_affinity: Requested 0 threads" ) ;
121
+ } else {
122
+ match available_parallelism( ) {
123
+ Ok ( parallelism) => {
124
+ let available_threads = parallelism. get( ) ;
125
+ let mut cpu_set = SmallVec :: <[ usize ; 16 ] >:: new( ) ;
126
+ let start_thread = ( threads * worker_id) % available_threads;
127
+ cpu_set. extend( ( start_thread..start_thread + threads) . map( |t| t % available_threads) ) ;
128
+ if let Err ( e) = affinity:: set_thread_affinity( & cpu_set) {
129
+ warn!(
130
+ "Failed to set thread affinity for worker {}: {}" ,
131
+ worker_id, e
132
+ ) ;
133
+ }
134
+ } ,
135
+ Err ( e) => {
136
+ warn!( "Failed to get thread count: {}. Thread affinity will not be set" , e) ;
137
+ }
138
+ }
146
139
}
147
140
}
148
141
}
0 commit comments