Skip to content

Commit e7e1bfc

Browse files
committed
Update the Load Balancer model to better align with the DEVS formalism - use Moore machine style outputs and decouple conditionality from state changes
1 parent 0de618b commit e7e1bfc

File tree

2 files changed

+127
-111
lines changed

2 files changed

+127
-111
lines changed

sim/src/models/load_balancer.rs

Lines changed: 127 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use serde::{Deserialize, Serialize};
55
use super::model_trait::{AsModel, SerializableModel};
66
use super::ModelMessage;
77
use crate::simulator::Services;
8+
use crate::utils::default_records_port_name;
89
use crate::utils::error::SimulationError;
9-
use crate::utils::{populate_history_port, populate_snapshot_port};
1010

1111
use sim_derive::SerializableModel;
1212

@@ -18,106 +18,155 @@ pub struct LoadBalancer {
1818
ports_in: PortsIn,
1919
ports_out: PortsOut,
2020
#[serde(default)]
21-
state: State,
22-
#[serde(default)]
23-
snapshot: Metrics,
21+
store_records: bool,
2422
#[serde(default)]
25-
history: Vec<Metrics>,
23+
state: State,
2624
}
2725

2826
#[derive(Debug, Clone, Serialize, Deserialize)]
2927
struct PortsIn {
3028
job: String,
31-
snapshot: Option<String>,
32-
history: Option<String>,
29+
#[serde(default = "default_records_port_name")]
30+
records: String,
3331
}
3432

3533
#[derive(Debug, Clone, Serialize, Deserialize)]
3634
#[serde(rename_all = "camelCase")]
3735
struct PortsOut {
3836
flow_paths: Vec<String>,
39-
snapshot: Option<String>,
40-
history: Option<String>,
37+
#[serde(default = "default_records_port_name")]
38+
records: String,
4139
}
4240

4341
#[derive(Debug, Clone, Serialize, Deserialize)]
4442
#[serde(rename_all = "camelCase")]
4543
struct State {
46-
event_list: Vec<ScheduledEvent>,
47-
jobs: Vec<String>,
44+
phase: Phase,
45+
until_next_event: f64,
4846
next_port_out: usize,
47+
jobs: Vec<Job>,
48+
records: Vec<Job>,
4949
}
5050

5151
impl Default for State {
5252
fn default() -> Self {
53-
let initalization_event = ScheduledEvent {
54-
time: 0.0,
55-
event: Event::Run,
56-
};
57-
State {
58-
event_list: vec![initalization_event],
59-
jobs: Vec::new(),
53+
Self {
54+
phase: Phase::LoadBalancing,
55+
until_next_event: 0.0,
6056
next_port_out: 0,
57+
jobs: Vec::new(),
58+
records: Vec::new(),
6159
}
6260
}
6361
}
6462

65-
#[derive(Debug, Clone, Serialize, Deserialize)]
66-
enum Event {
67-
Run,
68-
SendJob,
69-
}
70-
71-
#[derive(Debug, Clone, Serialize, Deserialize)]
72-
struct ScheduledEvent {
73-
time: f64,
74-
event: Event,
63+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64+
enum Phase {
65+
LoadBalancing,
66+
RecordsFetch,
7567
}
7668

7769
#[derive(Debug, Clone, Serialize, Deserialize)]
7870
#[serde(rename_all = "camelCase")]
79-
struct Metrics {
80-
last_job: Option<(String, String, f64)>, // Port, message, time
81-
}
82-
83-
impl Default for Metrics {
84-
fn default() -> Self {
85-
Metrics { last_job: None }
86-
}
71+
struct Job {
72+
content: String,
73+
time: f64,
74+
port_out: String,
8775
}
8876

8977
impl LoadBalancer {
90-
pub fn new(
91-
job_port: String,
92-
flow_path_ports: Vec<String>,
93-
snapshot_metrics: bool,
94-
history_metrics: bool,
95-
) -> Self {
78+
pub fn new(job_port: String, flow_path_ports: Vec<String>, store_records: bool) -> Self {
9679
Self {
9780
ports_in: PortsIn {
9881
job: job_port,
99-
snapshot: populate_snapshot_port(snapshot_metrics),
100-
history: populate_history_port(history_metrics),
82+
records: default_records_port_name(),
10183
},
10284
ports_out: PortsOut {
10385
flow_paths: flow_path_ports,
104-
snapshot: populate_snapshot_port(snapshot_metrics),
105-
history: populate_history_port(history_metrics),
86+
records: default_records_port_name(),
10687
},
88+
store_records,
10789
state: Default::default(),
108-
snapshot: Default::default(),
109-
history: Default::default(),
11090
}
11191
}
11292

113-
fn need_snapshot_metrics(&self) -> bool {
114-
self.ports_in.snapshot.is_some() && self.ports_out.snapshot.is_some()
93+
fn request_records(
94+
&mut self,
95+
_incoming_message: &ModelMessage,
96+
_services: &mut Services,
97+
) -> Result<(), SimulationError> {
98+
self.state.phase = Phase::RecordsFetch;
99+
self.state.until_next_event = 0.0;
100+
Ok(())
115101
}
116102

117-
fn need_historical_metrics(&self) -> bool {
118-
self.need_snapshot_metrics()
119-
&& self.ports_in.history.is_some()
120-
&& self.ports_out.history.is_some()
103+
fn ignore_request(
104+
&mut self,
105+
_incoming_message: &ModelMessage,
106+
_services: &mut Services,
107+
) -> Result<(), SimulationError> {
108+
Ok(())
109+
}
110+
111+
fn pass_job(
112+
&mut self,
113+
incoming_message: &ModelMessage,
114+
services: &mut Services,
115+
) -> Result<(), SimulationError> {
116+
self.state.phase = Phase::LoadBalancing;
117+
self.state.until_next_event = 0.0;
118+
self.state.jobs.push(Job {
119+
content: incoming_message.content.clone(),
120+
time: services.global_time(),
121+
port_out: self.ports_out.flow_paths[self.state.next_port_out].clone(),
122+
});
123+
self.state.next_port_out = (self.state.next_port_out + 1) % self.ports_out.flow_paths.len();
124+
Ok(())
125+
}
126+
127+
fn save_job(
128+
&mut self,
129+
incoming_message: &ModelMessage,
130+
services: &mut Services,
131+
) -> Result<(), SimulationError> {
132+
self.state.phase = Phase::LoadBalancing;
133+
self.state.until_next_event = 0.0;
134+
self.state.jobs.push(Job {
135+
content: incoming_message.content.clone(),
136+
time: services.global_time(),
137+
port_out: self.ports_out.flow_paths[self.state.next_port_out].clone(),
138+
});
139+
self.state.records.push(Job {
140+
content: incoming_message.content.clone(),
141+
time: services.global_time(),
142+
port_out: self.ports_out.flow_paths[self.state.next_port_out].clone(),
143+
});
144+
self.state.next_port_out = (self.state.next_port_out + 1) % self.ports_out.flow_paths.len();
145+
Ok(())
146+
}
147+
148+
fn passivate(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
149+
self.state.phase = Phase::LoadBalancing;
150+
self.state.until_next_event = INFINITY;
151+
Ok(Vec::new())
152+
}
153+
154+
fn send_job(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
155+
self.state.until_next_event = 0.0;
156+
let job = self.state.jobs.remove(0);
157+
Ok(vec![ModelMessage {
158+
port_name: job.port_out,
159+
content: job.content,
160+
}])
161+
}
162+
163+
fn release_records(&mut self) -> Result<Vec<ModelMessage>, SimulationError> {
164+
self.state.phase = Phase::LoadBalancing;
165+
self.state.until_next_event = 0.0;
166+
Ok(vec![ModelMessage {
167+
port_name: self.ports_out.records.clone(),
168+
content: serde_json::to_string(&self.state.records).unwrap(),
169+
}])
121170
}
122171
}
123172

@@ -129,73 +178,42 @@ impl AsModel for LoadBalancer {
129178
fn events_ext(
130179
&mut self,
131180
incoming_message: &ModelMessage,
132-
_services: &mut Services,
181+
services: &mut Services,
133182
) -> Result<Vec<ModelMessage>, SimulationError> {
134-
self.state.jobs.push(incoming_message.content.clone());
135-
self.state.event_list.push(ScheduledEvent {
136-
time: 0.0,
137-
event: Event::SendJob,
138-
});
183+
if incoming_message.port_name == self.ports_in.records && self.store_records {
184+
self.request_records(incoming_message, services)?;
185+
} else if incoming_message.port_name == self.ports_in.records && !self.store_records {
186+
self.ignore_request(incoming_message, services)?;
187+
} else if incoming_message.port_name == self.ports_in.job && self.store_records {
188+
self.save_job(incoming_message, services)?;
189+
} else if incoming_message.port_name == self.ports_in.job && !self.store_records {
190+
self.pass_job(incoming_message, services)?;
191+
} else {
192+
return Err(SimulationError::InvalidModelState);
193+
}
139194
Ok(Vec::new())
140195
}
141196

142197
fn events_int(
143198
&mut self,
144-
services: &mut Services,
199+
_services: &mut Services,
145200
) -> Result<Vec<ModelMessage>, SimulationError> {
146-
let mut outgoing_messages: Vec<ModelMessage> = Vec::new();
147-
let events = self.state.event_list.clone();
148-
self.state.event_list = self
149-
.state
150-
.event_list
151-
.iter()
152-
.filter(|scheduled_event| scheduled_event.time != 0.0)
153-
.cloned()
154-
.collect();
155-
events
156-
.iter()
157-
.filter(|scheduled_event| scheduled_event.time == 0.0)
158-
.for_each(|scheduled_event| match scheduled_event.event {
159-
Event::Run => {}
160-
Event::SendJob => {
161-
// Possible metrics updates
162-
if self.need_snapshot_metrics() {
163-
self.snapshot.last_job = Some((
164-
self.ports_out.flow_paths[self.state.next_port_out].clone(),
165-
self.state.jobs[0].clone(),
166-
services.global_time(),
167-
));
168-
}
169-
if self.need_historical_metrics() {
170-
self.history.push(self.snapshot.clone());
171-
}
172-
// State changes
173-
outgoing_messages.push(ModelMessage {
174-
port_name: self.ports_out.flow_paths[self.state.next_port_out].clone(),
175-
content: self.state.jobs.remove(0),
176-
});
177-
self.state.next_port_out =
178-
(self.state.next_port_out + 1) % self.ports_out.flow_paths.len();
179-
}
180-
});
181-
Ok(outgoing_messages)
201+
if self.state.phase == Phase::RecordsFetch {
202+
self.release_records()
203+
} else if self.state.phase == Phase::LoadBalancing && self.state.jobs.is_empty() {
204+
self.passivate()
205+
} else if self.state.phase == Phase::LoadBalancing && !self.state.jobs.is_empty() {
206+
self.send_job()
207+
} else {
208+
Err(SimulationError::InvalidModelState)
209+
}
182210
}
183211

184212
fn time_advance(&mut self, time_delta: f64) {
185-
self.state
186-
.event_list
187-
.iter_mut()
188-
.for_each(|scheduled_event| {
189-
scheduled_event.time -= time_delta;
190-
});
213+
self.state.until_next_event -= time_delta;
191214
}
192215

193216
fn until_next_event(&self) -> f64 {
194-
self.state
195-
.event_list
196-
.iter()
197-
.fold(INFINITY, |until_next_event, event| {
198-
f64::min(until_next_event, event.time)
199-
})
217+
self.state.until_next_event
200218
}
201219
}

sim/tests/simulations.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,6 @@ fn load_balancer_round_robin_outputs() {
509509
String::from("server-3"),
510510
],
511511
false,
512-
false,
513512
)),
514513
),
515514
Model::new(
@@ -803,7 +802,6 @@ fn match_status_reporting() {
803802
String::from("delta"),
804803
],
805804
false,
806-
false,
807805
)),
808806
),
809807
];

0 commit comments

Comments
 (0)