1
- use std:: f64:: INFINITY ;
2
-
3
1
use serde:: { Deserialize , Serialize } ;
4
2
5
3
use super :: model_trait:: { AsModel , SerializableModel } ;
6
4
use super :: ModelMessage ;
7
5
use crate :: input_modeling:: random_variable:: ContinuousRandomVariable ;
8
6
use crate :: input_modeling:: Thinning ;
9
7
use crate :: simulator:: Services ;
8
+ use crate :: utils:: default_records_port_name;
10
9
use crate :: utils:: error:: SimulationError ;
11
- use crate :: utils:: { populate_history_port, populate_snapshot_port} ;
12
10
13
11
use sim_derive:: SerializableModel ;
14
12
@@ -31,109 +29,159 @@ pub struct Generator {
31
29
ports_in : PortsIn ,
32
30
ports_out : PortsOut ,
33
31
#[ serde( default ) ]
34
- state : State ,
35
- #[ serde( default ) ]
36
- snapshot : Metrics ,
32
+ store_records : bool ,
37
33
#[ serde( default ) ]
38
- history : Vec < Metrics > ,
34
+ state : State ,
39
35
}
40
36
41
37
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
42
38
struct PortsIn {
43
- snapshot : Option < String > ,
44
- history : Option < String > ,
39
+ # [ serde ( default = "default_records_port_name" ) ]
40
+ records : String ,
45
41
}
46
42
47
43
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
48
44
struct PortsOut {
49
45
job : String ,
50
- snapshot : Option < String > ,
51
- history : Option < String > ,
46
+ # [ serde ( default = "default_records_port_name" ) ]
47
+ records : String ,
52
48
}
53
49
54
50
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
55
51
#[ serde( rename_all = "camelCase" ) ]
56
52
struct State {
57
- event_list : Vec < ScheduledEvent > ,
58
- until_message_interdeparture : f64 ,
59
- job_counter : usize ,
53
+ phase : Phase ,
54
+ until_next_event : f64 ,
55
+ until_job : f64 ,
56
+ last_job : Job ,
57
+ records : Vec < Job > ,
60
58
}
61
59
62
60
impl Default for State {
63
61
fn default ( ) -> Self {
64
- let initalization_event = ScheduledEvent {
65
- time : 0.0 ,
66
- event : Event :: Run ,
67
- } ;
68
- State {
69
- event_list : vec ! [ initalization_event] ,
70
- until_message_interdeparture : INFINITY ,
71
- job_counter : 0 ,
62
+ Self {
63
+ phase : Phase :: Initializing ,
64
+ until_next_event : 0.0 ,
65
+ until_job : 0.0 ,
66
+ last_job : Job {
67
+ index : 0 ,
68
+ content : String :: from ( "job 0" ) ,
69
+ time : 0.0 ,
70
+ } ,
71
+ records : Vec :: new ( ) ,
72
72
}
73
73
}
74
74
}
75
75
76
- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
77
- enum Event {
78
- Run ,
79
- BeginGeneration ,
80
- SendJob ,
81
- }
82
-
83
- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
84
- struct ScheduledEvent {
85
- time : f64 ,
86
- event : Event ,
76
+ #[ derive( Debug , Clone , Serialize , Deserialize , PartialEq ) ]
77
+ enum Phase {
78
+ Initializing ,
79
+ RecordsFetch ,
80
+ Generating ,
81
+ Saved ,
87
82
}
88
83
89
84
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
90
85
#[ serde( rename_all = "camelCase" ) ]
91
- struct Metrics {
92
- last_generation : Option < ( String , f64 ) > ,
93
- }
94
-
95
- impl Default for Metrics {
96
- fn default ( ) -> Self {
97
- Metrics {
98
- last_generation : None ,
99
- }
100
- }
86
+ struct Job {
87
+ index : usize ,
88
+ content : String ,
89
+ time : f64 ,
101
90
}
102
91
103
92
impl Generator {
104
93
pub fn new (
105
94
message_interdeparture_time : ContinuousRandomVariable ,
106
95
thinning : Option < Thinning > ,
107
96
job_port : String ,
108
- snapshot_metrics : bool ,
109
- history_metrics : bool ,
97
+ store_records : bool ,
110
98
) -> Self {
111
99
Self {
112
100
message_interdeparture_time,
113
101
thinning,
114
102
ports_in : PortsIn {
115
- snapshot : populate_snapshot_port ( snapshot_metrics) ,
116
- history : populate_history_port ( history_metrics) ,
103
+ records : default_records_port_name ( ) ,
117
104
} ,
118
105
ports_out : PortsOut {
119
106
job : job_port,
120
- snapshot : populate_snapshot_port ( snapshot_metrics) ,
121
- history : populate_history_port ( history_metrics) ,
107
+ records : default_records_port_name ( ) ,
122
108
} ,
109
+ store_records,
123
110
state : Default :: default ( ) ,
124
- snapshot : Default :: default ( ) ,
125
- history : Default :: default ( ) ,
126
111
}
127
112
}
128
113
129
- fn need_snapshot_metrics ( & self ) -> bool {
130
- self . ports_in . snapshot . is_some ( ) && self . ports_out . snapshot . is_some ( )
114
+ fn request_records (
115
+ & mut self ,
116
+ _incoming_message : & ModelMessage ,
117
+ services : & mut Services ,
118
+ ) -> Result < ( ) , SimulationError > {
119
+ self . state . phase = Phase :: RecordsFetch ;
120
+ self . state . until_next_event = 0.0 ;
121
+ self . state . until_job -= services. global_time ( ) - self . state . last_job . time ;
122
+ Ok ( ( ) )
123
+ }
124
+
125
+ fn ignore_request (
126
+ & mut self ,
127
+ _incoming_message : & ModelMessage ,
128
+ _services : & mut Services ,
129
+ ) -> Result < ( ) , SimulationError > {
130
+ Ok ( ( ) )
131
+ }
132
+
133
+ fn save_job ( & mut self , services : & mut Services ) -> Result < Vec < ModelMessage > , SimulationError > {
134
+ self . state . phase = Phase :: Saved ;
135
+ self . state . until_next_event = 0.0 ;
136
+ self . state . records . push ( Job {
137
+ index : self . state . last_job . index + 1 ,
138
+ content : format ! [ "{} {}" , self . ports_out. job, self . state. last_job. index + 1 ] ,
139
+ time : services. global_time ( ) ,
140
+ } ) ;
141
+ Ok ( Vec :: new ( ) )
142
+ }
143
+
144
+ fn release_job (
145
+ & mut self ,
146
+ services : & mut Services ,
147
+ ) -> Result < Vec < ModelMessage > , SimulationError > {
148
+ let interdeparture = self
149
+ . message_interdeparture_time
150
+ . random_variate ( services. uniform_rng ( ) ) ?;
151
+ self . state . phase = Phase :: Generating ;
152
+ self . state . until_next_event = interdeparture;
153
+ self . state . until_job = interdeparture;
154
+ self . state . last_job = Job {
155
+ index : self . state . last_job . index + 1 ,
156
+ content : format ! [ "{} {}" , self . ports_out. job, self . state. last_job. index + 1 ] ,
157
+ time : services. global_time ( ) ,
158
+ } ;
159
+ Ok ( vec ! [ ModelMessage {
160
+ port_name: self . ports_out. job. clone( ) ,
161
+ content: self . state. last_job. content. clone( ) ,
162
+ } ] )
131
163
}
132
164
133
- fn need_historical_metrics ( & self ) -> bool {
134
- self . need_snapshot_metrics ( )
135
- && self . ports_in . history . is_some ( )
136
- && self . ports_out . history . is_some ( )
165
+ fn release_records ( & mut self ) -> Result < Vec < ModelMessage > , SimulationError > {
166
+ self . state . phase = Phase :: Generating ;
167
+ self . state . until_next_event = self . state . until_job ;
168
+ Ok ( vec ! [ ModelMessage {
169
+ port_name: self . ports_out. records. clone( ) ,
170
+ content: serde_json:: to_string( & self . state. records) . unwrap( ) ,
171
+ } ] )
172
+ }
173
+
174
+ fn initialize_generation (
175
+ & mut self ,
176
+ services : & mut Services ,
177
+ ) -> Result < Vec < ModelMessage > , SimulationError > {
178
+ let interdeparture = self
179
+ . message_interdeparture_time
180
+ . random_variate ( services. uniform_rng ( ) ) ?;
181
+ self . state . phase = Phase :: Generating ;
182
+ self . state . until_next_event = interdeparture;
183
+ self . state . until_job = interdeparture;
184
+ Ok ( Vec :: new ( ) )
137
185
}
138
186
}
139
187
@@ -144,105 +192,43 @@ impl AsModel for Generator {
144
192
145
193
fn events_ext (
146
194
& mut self ,
147
- _incoming_message : & ModelMessage ,
148
- _services : & mut Services ,
195
+ incoming_message : & ModelMessage ,
196
+ services : & mut Services ,
149
197
) -> Result < Vec < ModelMessage > , SimulationError > {
198
+ if self . store_records {
199
+ self . request_records ( incoming_message, services) ?;
200
+ } else if !self . store_records {
201
+ self . ignore_request ( incoming_message, services) ?;
202
+ } else {
203
+ return Err ( SimulationError :: InvalidModelState ) ;
204
+ }
150
205
Ok ( Vec :: new ( ) )
151
206
}
152
207
153
208
fn events_int (
154
209
& mut self ,
155
210
services : & mut Services ,
156
211
) -> Result < Vec < ModelMessage > , SimulationError > {
157
- let mut outgoing_messages: Vec < ModelMessage > = Vec :: new ( ) ;
158
- let events = self . state . event_list . clone ( ) ;
159
- self . state . event_list = self
160
- . state
161
- . event_list
162
- . iter ( )
163
- . filter ( |scheduled_event| scheduled_event. time != 0.0 )
164
- . cloned ( )
165
- . collect ( ) ;
166
- events
167
- . iter ( )
168
- . filter ( |scheduled_event| scheduled_event. time == 0.0 )
169
- . map (
170
- |scheduled_event| -> Result < Vec < ModelMessage > , SimulationError > {
171
- match scheduled_event. event {
172
- Event :: Run => {
173
- self . state . event_list . push ( ScheduledEvent {
174
- time : 0.0 ,
175
- event : Event :: BeginGeneration ,
176
- } ) ;
177
- }
178
- Event :: BeginGeneration => {
179
- self . state . until_message_interdeparture = self
180
- . message_interdeparture_time
181
- . random_variate ( services. uniform_rng ( ) ) ?;
182
- self . state . event_list . push ( ScheduledEvent {
183
- time : self . state . until_message_interdeparture ,
184
- event : Event :: BeginGeneration ,
185
- } ) ;
186
- if let Some ( thinning) = self . thinning . clone ( ) {
187
- let thinning_threshold =
188
- thinning. evaluate ( services. global_time ( ) ) ?;
189
- let uniform_rn = services. uniform_rng ( ) . rn ( ) ;
190
- if uniform_rn < thinning_threshold {
191
- self . state . event_list . push ( ScheduledEvent {
192
- time : self . state . until_message_interdeparture ,
193
- event : Event :: SendJob ,
194
- } ) ;
195
- }
196
- } else {
197
- self . state . event_list . push ( ScheduledEvent {
198
- time : self . state . until_message_interdeparture ,
199
- event : Event :: SendJob ,
200
- } ) ;
201
- }
202
- }
203
- Event :: SendJob => {
204
- self . state . job_counter += 1 ;
205
- let generated = format ! [
206
- "{job_type} {job_id}" ,
207
- job_type = self . ports_out. job,
208
- job_id = self . state. job_counter
209
- ] ;
210
- outgoing_messages. push ( ModelMessage {
211
- port_name : self . ports_out . job . clone ( ) ,
212
- content : generated. clone ( ) ,
213
- } ) ;
214
- // Possible metrics updates
215
- if self . need_snapshot_metrics ( ) {
216
- self . snapshot . last_generation =
217
- Some ( ( generated, services. global_time ( ) ) ) ;
218
- }
219
- if self . need_historical_metrics ( ) {
220
- self . history . push ( self . snapshot . clone ( ) ) ;
221
- }
222
- }
223
- }
224
- Ok ( Vec :: new ( ) )
225
- } ,
226
- )
227
- . find ( |result| result. is_err ( ) )
228
- . unwrap_or ( Ok ( outgoing_messages) )
212
+ if self . state . phase == Phase :: Generating && self . store_records {
213
+ self . save_job ( services)
214
+ } else if ( self . state . phase == Phase :: Generating && !self . store_records )
215
+ || self . state . phase == Phase :: Saved
216
+ {
217
+ self . release_job ( services)
218
+ } else if self . state . phase == Phase :: RecordsFetch {
219
+ self . release_records ( )
220
+ } else if self . state . phase == Phase :: Initializing {
221
+ self . initialize_generation ( services)
222
+ } else {
223
+ Err ( SimulationError :: InvalidModelState )
224
+ }
229
225
}
230
226
231
227
fn time_advance ( & mut self , time_delta : f64 ) {
232
- self . state
233
- . event_list
234
- . iter_mut ( )
235
- . for_each ( |scheduled_event| {
236
- scheduled_event. time -= time_delta;
237
- } ) ;
228
+ self . state . until_next_event -= time_delta;
238
229
}
239
230
240
231
fn until_next_event ( & self ) -> f64 {
241
- self . state
242
- . event_list
243
- . iter ( )
244
- . fold ( INFINITY , |until_next_event, event| {
245
- f64:: min ( until_next_event, event. time )
246
- } )
232
+ self . state . until_next_event
247
233
}
248
234
}
0 commit comments