Skip to content

Commit 7a50169

Browse files
committed
Add a Stopwatch model
1 parent 593d8e9 commit 7a50169

File tree

4 files changed

+473
-0
lines changed

4 files changed

+473
-0
lines changed

sim/src/models/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod model;
1212
pub mod parallel_gateway;
1313
pub mod processor;
1414
pub mod stochastic_gate;
15+
pub mod stopwatch;
1516
pub mod storage;
1617

1718
pub mod model_factory;
@@ -28,6 +29,7 @@ pub use self::model_trait::AsModel;
2829
pub use self::parallel_gateway::ParallelGateway;
2930
pub use self::processor::Processor;
3031
pub use self::stochastic_gate::StochasticGate;
32+
pub use self::stopwatch::Stopwatch;
3133
pub use self::storage::Storage;
3234

3335
pub use self::model_repr::ModelRepr;

sim/src/models/model_factory.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ lazy_static! {
3737
"StochasticGate",
3838
super::StochasticGate::from_value as ModelConstructor,
3939
);
40+
m.insert(
41+
"Stopwatch",
42+
super::Stopwatch::from_value as ModelConstructor,
43+
);
4044
m.insert("Storage", super::Storage::from_value as ModelConstructor);
4145
Mutex::new(m)
4246
};

sim/src/models/stopwatch.rs

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
use std::f64::{INFINITY, NEG_INFINITY};
2+
use std::iter::once;
3+
4+
use serde::{Deserialize, Serialize};
5+
6+
use super::model_trait::{AsModel, SerializableModel};
7+
use super::ModelMessage;
8+
use crate::simulator::Services;
9+
use crate::utils::default_records_port_name;
10+
use crate::utils::error::SimulationError;
11+
12+
use sim_derive::SerializableModel;
13+
14+
#[derive(Debug, Clone, Serialize, Deserialize, SerializableModel)]
15+
#[serde(rename_all = "camelCase")]
16+
pub struct Stopwatch {
17+
ports_in: PortsIn,
18+
ports_out: PortsOut,
19+
#[serde(default)]
20+
metric: Metric,
21+
#[serde(default)]
22+
store_records: bool,
23+
#[serde(default)]
24+
state: State,
25+
}
26+
27+
#[derive(Debug, Clone, Serialize, Deserialize)]
28+
struct PortsIn {
29+
start: String,
30+
stop: String,
31+
metric: String,
32+
#[serde(default = "default_records_port_name")]
33+
records: String,
34+
}
35+
36+
#[derive(Debug, Clone, Serialize, Deserialize)]
37+
struct PortsOut {
38+
job: String,
39+
#[serde(default = "default_records_port_name")]
40+
records: String,
41+
}
42+
43+
#[derive(Debug, Clone, Serialize, Deserialize)]
44+
pub enum Metric {
45+
Minimum,
46+
Maximum,
47+
}
48+
49+
impl Default for Metric {
50+
fn default() -> Self {
51+
Metric::Minimum
52+
}
53+
}
54+
55+
#[derive(Debug, Clone, Serialize, Deserialize)]
56+
#[serde(rename_all = "camelCase")]
57+
struct State {
58+
phase: Phase,
59+
until_next_event: f64,
60+
jobs: Vec<Job>,
61+
records: Vec<Record>,
62+
}
63+
64+
impl Default for State {
65+
fn default() -> Self {
66+
State {
67+
phase: Phase::Passive,
68+
until_next_event: INFINITY,
69+
jobs: Vec::new(),
70+
records: Vec::new(),
71+
}
72+
}
73+
}
74+
75+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76+
enum Phase {
77+
Passive,
78+
JobFetch,
79+
RecordsFetch,
80+
}
81+
82+
#[derive(Debug, Clone, Serialize, Deserialize)]
83+
#[serde(rename_all = "camelCase")]
84+
pub struct Job {
85+
name: String,
86+
start: Option<f64>,
87+
stop: Option<f64>,
88+
}
89+
90+
#[derive(Debug, Clone, Serialize, Deserialize)]
91+
#[serde(rename_all = "camelCase")]
92+
pub struct Record {
93+
timestamp: f64,
94+
name: String,
95+
start: Option<f64>,
96+
stop: Option<f64>,
97+
}
98+
99+
impl Stopwatch {
100+
pub fn new(
101+
start_port: String,
102+
stop_port: String,
103+
metric_port: String,
104+
job_port: String,
105+
metric: Metric,
106+
store_records: bool,
107+
) -> Self {
108+
Self {
109+
ports_in: PortsIn {
110+
start: start_port,
111+
stop: stop_port,
112+
metric: metric_port,
113+
records: default_records_port_name(),
114+
},
115+
ports_out: PortsOut {
116+
job: job_port,
117+
records: default_records_port_name(),
118+
},
119+
metric,
120+
store_records,
121+
state: Default::default(),
122+
}
123+
}
124+
125+
fn matching_or_new_job(&mut self, incoming_message: &ModelMessage) -> &mut Job {
126+
if self
127+
.state
128+
.jobs
129+
.iter()
130+
.find(|job| job.name == incoming_message.content)
131+
.is_none()
132+
{
133+
self.state.jobs.push(Job {
134+
name: incoming_message.content.clone(),
135+
start: None,
136+
stop: None,
137+
});
138+
}
139+
self.state
140+
.jobs
141+
.iter_mut()
142+
.find(|job| job.name == incoming_message.content)
143+
.unwrap()
144+
}
145+
146+
fn some_duration(&self, job: &Job) -> Option<(String, f64)> {
147+
match (job.start, job.stop) {
148+
(Some(start), Some(stop)) => Some((job.name.to_string(), stop - start)),
149+
_ => None,
150+
}
151+
}
152+
153+
fn minimum_duration_job(&self) -> Option<String> {
154+
self.state
155+
.jobs
156+
.iter()
157+
.filter_map(|job| self.some_duration(job))
158+
.fold((None, INFINITY), |minimum, (job_name, job_duration)| {
159+
if job_duration < minimum.1 {
160+
(Some(job_name), job_duration)
161+
} else {
162+
minimum
163+
}
164+
})
165+
.0
166+
}
167+
168+
fn maximum_duration_job(&self) -> Option<String> {
169+
self.state
170+
.jobs
171+
.iter()
172+
.filter_map(|job| self.some_duration(job))
173+
.fold((None, NEG_INFINITY), |maximum, (job_name, job_duration)| {
174+
if job_duration > maximum.1 {
175+
(Some(job_name), job_duration)
176+
} else {
177+
maximum
178+
}
179+
})
180+
.0
181+
}
182+
183+
fn calculate_job(
184+
&mut self,
185+
incoming_message: &ModelMessage,
186+
services: &mut Services,
187+
) -> Result<(), SimulationError> {
188+
if incoming_message.port_name == self.ports_in.start {
189+
self.matching_or_new_job(incoming_message).start = Some(services.global_time())
190+
} else if incoming_message.port_name == self.ports_in.stop {
191+
self.matching_or_new_job(incoming_message).stop = Some(services.global_time())
192+
} else {
193+
return Err(SimulationError::InvalidModelState);
194+
}
195+
Ok(())
196+
}
197+
198+
fn calculate_and_save_job(
199+
&mut self,
200+
incoming_message: &ModelMessage,
201+
services: &mut Services,
202+
) -> Result<(), SimulationError> {
203+
if incoming_message.port_name == self.ports_in.start {
204+
self.matching_or_new_job(incoming_message).start = Some(services.global_time())
205+
} else if incoming_message.port_name == self.ports_in.stop {
206+
self.matching_or_new_job(incoming_message).stop = Some(services.global_time())
207+
} else {
208+
return Err(SimulationError::InvalidModelState);
209+
}
210+
let job = self.matching_or_new_job(incoming_message).clone();
211+
self.state.records.push(Record {
212+
timestamp: services.global_time(),
213+
name: job.name,
214+
start: job.start,
215+
stop: job.stop,
216+
});
217+
Ok(())
218+
}
219+
220+
fn get_job(&mut self) -> Result<(), SimulationError> {
221+
self.state.phase = Phase::JobFetch;
222+
self.state.until_next_event = 0.0;
223+
Ok(())
224+
}
225+
226+
fn get_records(&mut self) -> Result<(), SimulationError> {
227+
self.state.phase = Phase::RecordsFetch;
228+
self.state.until_next_event = 0.0;
229+
Ok(())
230+
}
231+
232+
fn release_records(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
233+
self.state.phase = Phase::Passive;
234+
self.state.until_next_event = INFINITY;
235+
Ok(vec![ModelMessage {
236+
port_name: self.ports_out.records.clone(),
237+
content: serde_json::to_string(&self.state.records).unwrap(),
238+
}])
239+
}
240+
241+
fn release_job(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
242+
self.state.phase = Phase::Passive;
243+
self.state.until_next_event = INFINITY;
244+
Ok(match &self.metric {
245+
Metric::Minimum => once(self.minimum_duration_job())
246+
.flatten()
247+
.map(|job| ModelMessage {
248+
content: job,
249+
port_name: self.ports_out.job.clone(),
250+
})
251+
.collect(),
252+
Metric::Maximum => once(self.maximum_duration_job())
253+
.flatten()
254+
.map(|job| ModelMessage {
255+
content: job,
256+
port_name: self.ports_out.job.clone(),
257+
})
258+
.collect(),
259+
})
260+
}
261+
262+
fn passivate(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
263+
self.state.phase = Phase::Passive;
264+
self.state.until_next_event = INFINITY;
265+
Ok(Vec::new())
266+
}
267+
}
268+
269+
impl AsModel for Stopwatch {
270+
fn status(&self) -> String {
271+
let durations: Vec<f64> = self
272+
.state
273+
.jobs
274+
.iter()
275+
.filter_map(|job| self.some_duration(job))
276+
.map(|(_, duration)| duration)
277+
.collect();
278+
format![
279+
"Average {:.3} sec",
280+
durations.iter().sum::<f64>() / durations.len() as f64
281+
]
282+
}
283+
284+
fn events_ext(
285+
&mut self,
286+
incoming_message: &ModelMessage,
287+
services: &mut Services,
288+
) -> Result<(), SimulationError> {
289+
if (self.ports_in.start == incoming_message.port_name
290+
|| self.ports_in.stop == incoming_message.port_name)
291+
&& self.store_records
292+
{
293+
self.calculate_and_save_job(incoming_message, services)
294+
} else if (self.ports_in.start == incoming_message.port_name
295+
|| self.ports_in.stop == incoming_message.port_name)
296+
&& !self.store_records
297+
{
298+
self.calculate_job(incoming_message, services)
299+
} else if incoming_message.port_name == self.ports_in.metric {
300+
self.get_job()
301+
} else if incoming_message.port_name == self.ports_in.records {
302+
self.get_records()
303+
} else {
304+
Err(SimulationError::InvalidModelState)
305+
}
306+
}
307+
308+
fn events_int(
309+
&mut self,
310+
_services: &mut Services,
311+
) -> Result<Vec<ModelMessage>, SimulationError> {
312+
if self.state.phase == Phase::RecordsFetch {
313+
self.release_records()
314+
} else if self.state.phase == Phase::JobFetch {
315+
self.release_job()
316+
} else if self.state.phase == Phase::Passive {
317+
self.passivate()
318+
} else {
319+
Err(SimulationError::InvalidModelState)
320+
}
321+
}
322+
323+
fn time_advance(&mut self, time_delta: f64) {
324+
self.state.until_next_event -= time_delta;
325+
}
326+
327+
fn until_next_event(&self) -> f64 {
328+
self.state.until_next_event
329+
}
330+
}

0 commit comments

Comments
 (0)