Skip to content

Commit 8b62c79

Browse files
committed
Fix: Immediate response when snapshot installation is unnecessary
When `Engine::handle_install_full_snapshot()` is called and the provided snapshot is not up-to-date, the snapshot should not be installed, and the response should be sent back immediately. Previously, the method might delay the response unnecessarily, waiting for an installation process that would not proceed. This commit adjusts the logic so that if the snapshot is recognized as outdated, it immediately returns a `None` `Condition`, ensuring the caller is informed straightaway that no installation will occur.
1 parent 81240b8 commit 8b62c79

File tree

5 files changed

+212
-17
lines changed

5 files changed

+212
-17
lines changed

openraft/src/engine/engine_impl.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use crate::engine::handler::server_state_handler::ServerStateHandler;
1818
use crate::engine::handler::snapshot_handler::SnapshotHandler;
1919
use crate::engine::handler::vote_handler::VoteHandler;
2020
use crate::engine::Command;
21-
use crate::engine::Condition;
2221
use crate::engine::EngineOutput;
2322
use crate::engine::Respond;
2423
use crate::entry::RaftPayload;
@@ -465,17 +464,16 @@ where C: RaftTypeConfig
465464
};
466465

467466
let mut fh = self.following_handler();
468-
fh.install_full_snapshot(snapshot);
467+
468+
// The condition to satisfy before running other command that depends on the snapshot.
469+
// In this case, the response can only be sent when the snapshot is installed.
470+
let cond = fh.install_full_snapshot(snapshot);
469471
let res = Ok(SnapshotResponse {
470472
vote: *self.state.vote_ref(),
471473
});
472474

473475
self.output.push_command(Command::Respond {
474-
// When there is an error, there may still be queued IO, we need to run them before sending back
475-
// response.
476-
when: Some(Condition::StateMachineCommand {
477-
command_seq: self.output.last_sm_seq(),
478-
}),
476+
when: cond,
479477
resp: Respond::new(res, tx),
480478
});
481479
}

openraft/src/engine/handler/following_handler/install_snapshot_test.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use pretty_assertions::assert_eq;
77
use crate::core::sm;
88
use crate::engine::testing::UTConfig;
99
use crate::engine::Command;
10+
use crate::engine::Condition;
1011
use crate::engine::Engine;
1112
use crate::engine::LogIdList;
1213
use crate::raft_state::LogStateReader;
@@ -56,7 +57,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
5657
// `snapshot_meta.last_log_id`.
5758
let mut eng = eng();
5859

59-
eng.following_handler().install_full_snapshot(Snapshot {
60+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
6061
meta: SnapshotMeta {
6162
last_log_id: Some(log_id(2, 1, 2)),
6263
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -65,6 +66,8 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
6566
snapshot: Box::new(Cursor::new(vec![0u8])),
6667
});
6768

69+
assert_eq!(None, cond);
70+
6871
assert_eq!(
6972
SnapshotMeta {
7073
last_log_id: Some(log_id(2, 1, 2)),
@@ -86,7 +89,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
8689
// Although in this case the state machine is not affected.
8790
let mut eng = eng();
8891

89-
eng.following_handler().install_full_snapshot(Snapshot {
92+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
9093
meta: SnapshotMeta {
9194
last_log_id: Some(log_id(4, 1, 5)),
9295
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -95,6 +98,8 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
9598
snapshot: Box::new(Cursor::new(vec![0u8])),
9699
});
97100

101+
assert_eq!(None, cond);
102+
98103
assert_eq!(
99104
SnapshotMeta {
100105
last_log_id: Some(log_id(2, 1, 2)),
@@ -113,7 +118,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
113118
// Snapshot will be installed and there are no conflicting logs.
114119
let mut eng = eng();
115120

116-
eng.following_handler().install_full_snapshot(Snapshot {
121+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
117122
meta: SnapshotMeta {
118123
last_log_id: Some(log_id(4, 1, 6)),
119124
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -122,6 +127,8 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
122127
snapshot: Box::new(Cursor::new(vec![0u8])),
123128
});
124129

130+
assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);
131+
125132
assert_eq!(
126133
SnapshotMeta {
127134
last_log_id: Some(log_id(4, 1, 6)),
@@ -187,7 +194,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
187194
eng
188195
};
189196

190-
eng.following_handler().install_full_snapshot(Snapshot {
197+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
191198
meta: SnapshotMeta {
192199
last_log_id: Some(log_id(5, 1, 6)),
193200
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -196,6 +203,8 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
196203
snapshot: Box::new(Cursor::new(vec![0u8])),
197204
});
198205

206+
assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);
207+
199208
assert_eq!(
200209
SnapshotMeta {
201210
last_log_id: Some(log_id(5, 1, 6)),
@@ -238,7 +247,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
238247
// Snapshot will be installed and there are no conflicting logs.
239248
let mut eng = eng();
240249

241-
eng.following_handler().install_full_snapshot(Snapshot {
250+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
242251
meta: SnapshotMeta {
243252
last_log_id: Some(log_id(100, 1, 100)),
244253
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -247,6 +256,8 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
247256
snapshot: Box::new(Cursor::new(vec![0u8])),
248257
});
249258

259+
assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);
260+
250261
assert_eq!(
251262
SnapshotMeta {
252263
last_log_id: Some(log_id(100, 1, 100)),
@@ -293,7 +304,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
293304
// Snapshot will be installed and `accepted` should be updated.
294305
let mut eng = eng();
295306

296-
eng.following_handler().install_full_snapshot(Snapshot {
307+
let cond = eng.following_handler().install_full_snapshot(Snapshot {
297308
meta: SnapshotMeta {
298309
last_log_id: Some(log_id(100, 1, 100)),
299310
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
@@ -302,6 +313,8 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
302313
snapshot: Box::new(Cursor::new(vec![0u8])),
303314
});
304315

316+
assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);
317+
305318
assert_eq!(Some(&log_id(100, 1, 100)), eng.state.accepted());
306319

307320
Ok(())

openraft/src/engine/handler/following_handler/mod.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::engine::handler::log_handler::LogHandler;
77
use crate::engine::handler::server_state_handler::ServerStateHandler;
88
use crate::engine::handler::snapshot_handler::SnapshotHandler;
99
use crate::engine::Command;
10+
use crate::engine::Condition;
1011
use crate::engine::EngineConfig;
1112
use crate::engine::EngineOutput;
1213
use crate::entry::RaftPayload;
@@ -238,12 +239,26 @@ where C: RaftTypeConfig
238239
self.server_state_handler().update_server_state_if_changed();
239240
}
240241

241-
/// Follower/Learner handles install-full-snapshot.
242+
/// Installs a full snapshot on a follower or learner node.
242243
///
243244
/// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication)
244245
/// for the reason the following workflow is needed.
246+
///
247+
/// The method processes the given `snapshot` and updates the internal state of the node based
248+
/// on the snapshot's metadata. It checks if the `snapshot` is newer than the currently
249+
/// committed snapshot. If not, it does nothing.
250+
///
251+
/// It returns the condition about when the snapshot is installed and can proceed the commands
252+
/// that depends on the state of snapshot.
253+
///
254+
/// It returns an `Option<Condition<C>>` indicating the next action:
255+
/// - `Some(Condition::StateMachineCommand { command_seq })` if the snapshot will be installed.
256+
/// Further commands that depend on snapshot state should use this condition so that these
257+
/// command block until the condition is satisfied(`RaftCore` receives a `Notify`).
258+
/// - Otherwise `None` if the snapshot will not be installed (e.g., if it is not newer than the
259+
/// current state).
245260
#[tracing::instrument(level = "debug", skip_all)]
246-
pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot<C>) {
261+
pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot<C>) -> Option<Condition<C::NodeId>> {
247262
let meta = &snapshot.meta;
248263
tracing::info!("install_full_snapshot: meta:{:?}", meta);
249264

@@ -255,7 +270,7 @@ where C: RaftTypeConfig
255270
snap_last_log_id.summary(),
256271
self.state.committed().summary()
257272
);
258-
return;
273+
return None;
259274
}
260275

261276
// snapshot_last_log_id can not be None
@@ -268,7 +283,7 @@ where C: RaftTypeConfig
268283
let mut snap_handler = self.snapshot_handler();
269284
let updated = snap_handler.update_snapshot(meta.clone());
270285
if !updated {
271-
return;
286+
return None;
272287
}
273288

274289
let local = self.state.get_log_id(snap_last_log_id.index);
@@ -286,9 +301,14 @@ where C: RaftTypeConfig
286301
));
287302

288303
self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot)));
304+
let last_sm_seq = self.output.last_sm_seq();
289305

290306
self.state.purge_upto = Some(snap_last_log_id);
291307
self.log_handler().purge_log();
308+
309+
Some(Condition::StateMachineCommand {
310+
command_seq: last_sm_seq,
311+
})
292312
}
293313

294314
/// Find the last 2 membership entries in a list of entries.

openraft/src/engine/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod tests {
4444
mod handle_vote_req_test;
4545
mod handle_vote_resp_test;
4646
mod initialize_test;
47+
mod install_full_snapshot_test;
4748
mod log_id_list_test;
4849
mod startup_test;
4950
mod trigger_purge_log_test;
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
use std::io::Cursor;
2+
3+
use maplit::btreeset;
4+
use pretty_assertions::assert_eq;
5+
6+
use crate::core::sm;
7+
use crate::engine::testing::UTConfig;
8+
use crate::engine::Command;
9+
use crate::engine::Condition;
10+
use crate::engine::Engine;
11+
use crate::engine::LogIdList;
12+
use crate::engine::Respond;
13+
use crate::raft::SnapshotResponse;
14+
use crate::testing::log_id;
15+
use crate::type_config::alias::AsyncRuntimeOf;
16+
use crate::AsyncRuntime;
17+
use crate::Membership;
18+
use crate::Snapshot;
19+
use crate::SnapshotMeta;
20+
use crate::StoredMembership;
21+
use crate::TokioInstant;
22+
use crate::Vote;
23+
24+
fn m12() -> Membership<u64, ()> {
25+
Membership::<u64, ()>::new(vec![btreeset! {1,2}], None)
26+
}
27+
28+
fn m1234() -> Membership<u64, ()> {
29+
Membership::<u64, ()>::new(vec![btreeset! {1,2,3,4}], None)
30+
}
31+
32+
fn eng() -> Engine<UTConfig> {
33+
let mut eng = Engine::default();
34+
eng.state.enable_validation(false); // Disable validation for incomplete state
35+
36+
eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1));
37+
eng.state.committed = Some(log_id(4, 1, 5));
38+
eng.state.log_ids = LogIdList::new(vec![
39+
//
40+
log_id(2, 1, 2),
41+
log_id(3, 1, 5),
42+
log_id(4, 1, 6),
43+
log_id(4, 1, 8),
44+
]);
45+
eng.state.snapshot_meta = SnapshotMeta {
46+
last_log_id: Some(log_id(2, 1, 2)),
47+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()),
48+
snapshot_id: "1-2-3-4".to_string(),
49+
};
50+
eng.state.server_state = eng.calc_server_state();
51+
52+
eng
53+
}
54+
55+
#[test]
56+
fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
57+
// Snapshot will not be installed because new `last_log_id` is less or equal current
58+
// `snapshot_meta.last_log_id`.
59+
//
60+
// It should respond at once.
61+
62+
let mut eng = eng();
63+
64+
let curr_vote = *eng.state.vote_ref();
65+
66+
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
67+
68+
eng.handle_install_full_snapshot(
69+
curr_vote,
70+
Snapshot {
71+
meta: SnapshotMeta {
72+
last_log_id: Some(log_id(1, 1, 2)),
73+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
74+
snapshot_id: "1-2-3-4".to_string(),
75+
},
76+
snapshot: Box::new(Cursor::new(vec![0u8])),
77+
},
78+
tx,
79+
);
80+
81+
assert_eq!(
82+
SnapshotMeta {
83+
last_log_id: Some(log_id(2, 1, 2)),
84+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()),
85+
snapshot_id: "1-2-3-4".to_string(),
86+
},
87+
eng.state.snapshot_meta
88+
);
89+
90+
let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
91+
assert_eq!(
92+
vec![
93+
//
94+
Command::Respond {
95+
when: None,
96+
resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx),
97+
},
98+
],
99+
eng.output.take_commands()
100+
);
101+
102+
Ok(())
103+
}
104+
105+
#[test]
106+
fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
107+
// Snapshot will be installed and there are no conflicting logs.
108+
// The response should be sent after the snapshot is installed.
109+
110+
let mut eng = eng();
111+
112+
let curr_vote = *eng.state.vote_ref();
113+
114+
let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
115+
116+
eng.handle_install_full_snapshot(
117+
curr_vote,
118+
Snapshot {
119+
meta: SnapshotMeta {
120+
last_log_id: Some(log_id(4, 1, 6)),
121+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
122+
snapshot_id: "1-2-3-4".to_string(),
123+
},
124+
snapshot: Box::new(Cursor::new(vec![0u8])),
125+
},
126+
tx,
127+
);
128+
129+
assert_eq!(
130+
SnapshotMeta {
131+
last_log_id: Some(log_id(4, 1, 6)),
132+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
133+
snapshot_id: "1-2-3-4".to_string(),
134+
},
135+
eng.state.snapshot_meta
136+
);
137+
138+
let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
139+
assert_eq!(
140+
vec![
141+
//
142+
Command::from(
143+
sm::Command::install_full_snapshot(Snapshot {
144+
meta: SnapshotMeta {
145+
last_log_id: Some(log_id(4, 1, 6)),
146+
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
147+
snapshot_id: "1-2-3-4".to_string(),
148+
},
149+
snapshot: Box::new(Cursor::new(vec![0u8])),
150+
})
151+
.with_seq(1)
152+
),
153+
Command::PurgeLog { upto: log_id(4, 1, 6) },
154+
Command::Respond {
155+
when: Some(Condition::StateMachineCommand { command_seq: 1 }),
156+
resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx),
157+
},
158+
],
159+
eng.output.take_commands()
160+
);
161+
162+
Ok(())
163+
}

0 commit comments

Comments
 (0)