@@ -334,7 +334,12 @@ pub enum SimNetError {
334
334
}
335
335
336
336
struct State {
337
+ // The simnet is allowed to advance to the time of the earliest event in this queue at any time
337
338
scheduled_events : BTreeMap < SimulatorTimeInstant , Vec < ScheduledEvent > > ,
339
+ // The simnet is allowed to advance to the time of the earliest event in this queue at any time
340
+ // only if the earliest event in `scheduled_events` occurs after the earliest event in this queue
341
+ // or some debounce period has passed where there are only events in this queue.
342
+ unadvanceable_scheduled_events : BTreeMap < SimulatorTimeInstant , Vec < ScheduledEvent > > ,
338
343
}
339
344
340
345
/// The state of the python training script.
@@ -349,8 +354,7 @@ pub enum TrainingScriptState {
349
354
/// A handle to a running [`SimNet`] instance.
350
355
pub struct SimNetHandle {
351
356
join_handles : Arc < Mutex < Vec < JoinHandle < ( ) > > > > ,
352
- event_tx : UnboundedSender < Box < dyn Event > > ,
353
- scheduled_event_tx : UnboundedSender < ScheduledEvent > ,
357
+ event_tx : UnboundedSender < ( Box < dyn Event > , bool , Option < SimulatorTimeInstant > ) > ,
354
358
config : Arc < Mutex < SimNetConfig > > ,
355
359
records : Option < Arc < Mutex < Vec < SimulatorEventRecord > > > > ,
356
360
pending_event_count : Arc < AtomicUsize > ,
@@ -370,23 +374,38 @@ impl SimNetHandle {
370
374
/// Sends an event to be scheduled onto the simnet's event loop
371
375
#[ allow( clippy:: result_large_err) ] // TODO: Consider reducing the size of `SimNetError`.
372
376
pub fn send_event ( & self , event : Box < dyn Event > ) -> Result < ( ) , SimNetError > {
377
+ self . send_event_impl ( event, true )
378
+ }
379
+
380
+ #[ allow( clippy:: result_large_err) ] // TODO: Consider reducing the size of `SimNetError`.
381
+ fn send_event_impl ( & self , event : Box < dyn Event > , advanceable : bool ) -> Result < ( ) , SimNetError > {
373
382
self . pending_event_count
374
383
. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
375
384
self . event_tx
376
- . send ( event)
385
+ . send ( ( event, advanceable , None ) )
377
386
. map_err ( |err| SimNetError :: Closed ( err. to_string ( ) ) )
378
387
}
379
388
389
+ /// Sends an non-advanceable event to be scheduled onto the simnet's event loop
390
+ /// A non-advanceable event is an event that cannot advance the simnet's time unless
391
+ /// the earliest event in the simnet's advancing event queue occurs after the earliest
392
+ /// event in the simnet's non-advancing event queue, or some debounce period has passed
393
+ /// where there are only events in the simnet's non-advancing event queue.
394
+ #[ allow( clippy:: result_large_err) ] // TODO: Consider reducing the size of `SimNetError`.
395
+ pub fn send_nonadvanceable_event ( & self , event : Box < dyn Event > ) -> Result < ( ) , SimNetError > {
396
+ self . send_event_impl ( event, false )
397
+ }
398
+
380
399
/// Sends an event that already has a scheduled time onto the simnet's event loop
381
400
#[ allow( clippy:: result_large_err) ] // TODO: Consider reducing the size of `SimNetError`.
382
401
pub ( crate ) fn send_scheduled_event (
383
402
& self ,
384
- scheduled_event : ScheduledEvent ,
403
+ ScheduledEvent { event , time } : ScheduledEvent ,
385
404
) -> Result < ( ) , SimNetError > {
386
405
self . pending_event_count
387
406
. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
388
- self . scheduled_event_tx
389
- . send ( scheduled_event )
407
+ self . event_tx
408
+ . send ( ( event , true , Some ( time ) ) )
390
409
. map_err ( |err| SimNetError :: Closed ( err. to_string ( ) ) )
391
410
}
392
411
@@ -402,9 +421,13 @@ impl SimNetHandle {
402
421
self . pending_event_count
403
422
. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
404
423
self . event_tx
405
- . send ( Box :: new ( NodeJoinEvent {
406
- channel_addr : address,
407
- } ) )
424
+ . send ( (
425
+ Box :: new ( NodeJoinEvent {
426
+ channel_addr : address,
427
+ } ) ,
428
+ true ,
429
+ None ,
430
+ ) )
408
431
. map_err ( |err| SimNetError :: Closed ( err. to_string ( ) ) )
409
432
}
410
433
@@ -652,7 +675,7 @@ impl ProxyHandle {
652
675
/// to_event: a function that specifies how to generate an Event from a forward message
653
676
async fn start (
654
677
proxy_addr : ChannelAddr ,
655
- event_tx : UnboundedSender < Box < dyn Event > > ,
678
+ event_tx : UnboundedSender < ( Box < dyn Event > , bool , Option < SimulatorTimeInstant > ) > ,
656
679
pending_event_count : Arc < AtomicUsize > ,
657
680
operational_message_tx : UnboundedSender < OperationalMessage > ,
658
681
) -> anyhow:: Result < Self > {
@@ -683,7 +706,7 @@ impl ProxyHandle {
683
706
}
684
707
} ;
685
708
686
- if let Err ( e) = event_tx. send ( event) {
709
+ if let Err ( e) = event_tx. send ( ( event, true , None ) ) {
687
710
tracing:: error!( "error sending message to simnet: {:?}" , e) ;
688
711
} else {
689
712
pending_event_count. fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst ) ;
@@ -744,8 +767,8 @@ pub fn start(
744
767
745
768
let ( training_script_state_tx, training_script_state_rx) =
746
769
tokio:: sync:: watch:: channel ( TrainingScriptState :: Running ) ;
747
- let ( event_tx, event_rx) = mpsc :: unbounded_channel :: < Box < dyn Event > > ( ) ;
748
- let ( scheduled_event_tx , scheduled_event_rx ) = mpsc:: unbounded_channel :: < ScheduledEvent > ( ) ;
770
+ let ( event_tx, event_rx) =
771
+ mpsc:: unbounded_channel :: < ( Box < dyn Event > , bool , Option < SimulatorTimeInstant > ) > ( ) ;
749
772
let records = Some ( Arc :: new ( Mutex :: new ( vec ! [ ] ) ) ) ; // TODO remove optional
750
773
let pending_event_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
751
774
let stop_signal = Arc :: new ( AtomicBool :: new ( false ) ) ;
@@ -754,24 +777,19 @@ pub fn start(
754
777
let config = config. clone ( ) ;
755
778
let pending_event_count = pending_event_count. clone ( ) ;
756
779
let stop_signal = stop_signal. clone ( ) ;
757
- tokio:: spawn ( async move {
780
+ tokio:: task :: spawn_blocking ( move || {
758
781
let mut net = SimNet {
759
782
config,
760
783
address_book,
761
784
state : State {
762
785
scheduled_events : BTreeMap :: new ( ) ,
786
+ unadvanceable_scheduled_events : BTreeMap :: new ( ) ,
763
787
} ,
764
788
max_latency : Duration :: from_millis ( max_duration_ms) ,
765
789
records,
766
790
pending_event_count,
767
791
} ;
768
- net. run (
769
- event_rx,
770
- scheduled_event_rx,
771
- training_script_state_rx,
772
- stop_signal,
773
- )
774
- . await ;
792
+ block_on ( net. run ( event_rx, training_script_state_rx, stop_signal) ) ;
775
793
} )
776
794
} ;
777
795
let join_handles = Arc :: new ( Mutex :: new ( vec ! [ simnet_join_handle] ) ) ;
@@ -789,7 +807,6 @@ pub fn start(
789
807
HANDLE . get_or_init ( || SimNetHandle {
790
808
join_handles,
791
809
event_tx,
792
- scheduled_event_tx,
793
810
config,
794
811
records,
795
812
pending_event_count,
@@ -850,60 +867,57 @@ impl SimNet {
850
867
}
851
868
852
869
/// Schedule the event into the network.
853
- async fn schedule_event ( & mut self , scheduled_event : ScheduledEvent ) {
870
+ async fn schedule_event ( & mut self , scheduled_event : ScheduledEvent , advanceable : bool ) {
854
871
if let Some ( records) = & self . records {
855
872
records. lock ( ) . await . push ( SimulatorEventRecord {
856
873
summary : scheduled_event. event . summary ( ) ,
857
874
start_at : SimClock . millis_since_start ( SimClock . now ( ) ) ,
858
875
end_at : scheduled_event. time ,
859
876
} ) ;
860
877
}
861
- self . state
862
- . scheduled_events
863
- . entry ( scheduled_event. time )
864
- . or_insert_with ( Vec :: new)
865
- . push ( scheduled_event) ;
878
+ if advanceable {
879
+ self . state
880
+ . scheduled_events
881
+ . entry ( scheduled_event. time )
882
+ . or_insert_with ( Vec :: new)
883
+ . push ( scheduled_event) ;
884
+ } else {
885
+ self . state
886
+ . unadvanceable_scheduled_events
887
+ . entry ( scheduled_event. time )
888
+ . or_insert_with ( Vec :: new)
889
+ . push ( scheduled_event) ;
890
+ }
866
891
}
867
892
868
893
/// Run the simulation. This will dispatch all the messages in the network.
869
894
/// And wait for new ones.
870
895
async fn run (
871
896
& mut self ,
872
- mut event_rx : UnboundedReceiver < Box < dyn Event > > ,
873
- mut scheduled_event_rx : UnboundedReceiver < ScheduledEvent > ,
897
+ mut event_rx : UnboundedReceiver < ( Box < dyn Event > , bool , Option < SimulatorTimeInstant > ) > ,
874
898
training_script_state_rx : tokio:: sync:: watch:: Receiver < TrainingScriptState > ,
875
899
stop_signal : Arc < AtomicBool > ,
876
900
) {
877
901
// The simulated number of milliseconds the training script
878
902
// has spent waiting for the backend to resolve a future
879
903
let mut training_script_waiting_time: u64 = 0 ;
904
+ // Duration elapsed while only non_advanceable_events has events
905
+ let mut debounce_timer: Option < tokio:: time:: Instant > = None ;
880
906
' outer: loop {
881
907
// Check if we should stop
882
908
if stop_signal. load ( Ordering :: SeqCst ) {
883
909
break ' outer;
884
910
}
885
911
886
- // TODO: Find a way to drain all needed messages with better guarantees.
887
- //
888
- // Allow tiny grace period for messages to stop coming in before we actually advance time
889
- // to handle inflight events
890
- while let Ok ( event) =
891
- tokio:: time:: timeout ( tokio:: time:: Duration :: from_millis ( 10 ) , event_rx. recv ( ) ) . await
892
- {
893
- if let Some ( event) = event {
894
- let scheduled_event = self . create_scheduled_event ( event) . await ;
895
- self . schedule_event ( scheduled_event) . await ;
896
- } else {
897
- break ' outer;
898
- }
899
- }
900
-
901
- while let Ok ( ScheduledEvent { time, event } ) = scheduled_event_rx. try_recv ( ) {
902
- self . schedule_event ( ScheduledEvent {
903
- time : time + training_script_waiting_time,
904
- event,
905
- } )
906
- . await ;
912
+ while let Ok ( ( event, advanceable, time) ) = event_rx. try_recv ( ) {
913
+ let scheduled_event = match time {
914
+ Some ( time) => ScheduledEvent {
915
+ time : time + training_script_waiting_time,
916
+ event,
917
+ } ,
918
+ None => self . create_scheduled_event ( event) . await ,
919
+ } ;
920
+ self . schedule_event ( scheduled_event, advanceable) . await ;
907
921
}
908
922
909
923
{
@@ -924,10 +938,62 @@ impl SimNet {
924
938
{
925
939
continue ;
926
940
}
941
+ match (
942
+ self . state . scheduled_events . first_key_value ( ) ,
943
+ self . state . unadvanceable_scheduled_events . first_key_value ( ) ,
944
+ ) {
945
+ ( None , Some ( _) ) if debounce_timer. is_none ( ) => {
946
+ // Start debounce timer when only the non-advancedable
947
+ // queue has events and the timer has not already started
948
+ debounce_timer = Some ( RealClock . now ( ) ) ;
949
+ }
950
+ // Timer already active
951
+ ( None , Some ( _) ) => { }
952
+ // Reset timer when non-advanceable queue is not the only queue with events
953
+ _ => {
954
+ debounce_timer = None ;
955
+ }
956
+ }
927
957
// process for next delivery time.
928
- let Some ( ( scheduled_time, scheduled_events) ) =
929
- self . state . scheduled_events . pop_first ( )
930
- else {
958
+ let Some ( ( scheduled_time, scheduled_events) ) = ( match (
959
+ self . state . scheduled_events . first_key_value ( ) ,
960
+ self . state . unadvanceable_scheduled_events . first_key_value ( ) ,
961
+ ) {
962
+ ( Some ( ( advanceable_time, _) ) , Some ( ( unadvanceable_time, _) ) ) => {
963
+ if unadvanceable_time < advanceable_time {
964
+ self . state . unadvanceable_scheduled_events . pop_first ( )
965
+ } else {
966
+ self . state . scheduled_events . pop_first ( )
967
+ }
968
+ }
969
+ ( Some ( _) , None ) => self . state . scheduled_events . pop_first ( ) ,
970
+ ( None , Some ( _) ) => match debounce_timer {
971
+ Some ( time) => {
972
+ if time. elapsed ( ) > tokio:: time:: Duration :: from_millis ( 1000 ) {
973
+ // debounce interval has elapsed, reset timer
974
+ debounce_timer = None ;
975
+ self . state . unadvanceable_scheduled_events . pop_first ( )
976
+ } else {
977
+ None
978
+ }
979
+ }
980
+ None => None ,
981
+ } ,
982
+ ( None , None ) => None ,
983
+ } ) else {
984
+ tokio:: select! {
985
+ Some ( ( event, advanceable, time) ) = event_rx. recv( ) => {
986
+ let scheduled_event = match time {
987
+ Some ( time) => ScheduledEvent {
988
+ time: time + training_script_waiting_time,
989
+ event,
990
+ } ,
991
+ None => self . create_scheduled_event( event) . await ,
992
+ } ;
993
+ self . schedule_event( scheduled_event, advanceable) . await ;
994
+ } ,
995
+ _ = RealClock . sleep( Duration :: from_millis( 10 ) ) => { }
996
+ }
931
997
continue ;
932
998
} ;
933
999
if training_script_state_rx. borrow ( ) . is_waiting ( ) {
0 commit comments