Skip to content

Commit b66d3b1

Browse files
authored
feat: initial connection retry (#245)
1 parent 4a0d511 commit b66d3b1

File tree

5 files changed

+64
-26
lines changed

5 files changed

+64
-26
lines changed

livekit-ffi/protocol/room.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ message RoomOptions {
183183
bool dynacast = 3;
184184
optional E2eeOptions e2ee = 4;
185185
optional RtcConfig rtc_config = 5; // allow to setup a custom RtcConfiguration
186+
uint32 join_retries = 6;
186187
}
187188

188189
//

livekit-ffi/src/conversion/room.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ impl From<proto::RoomOptions> for RoomOptions {
166166
dynacast: value.dynacast,
167167
e2ee,
168168
rtc_config,
169+
join_retries: value.join_retries,
169170
}
170171
}
171172
}

livekit-ffi/src/livekit.proto.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,6 +2299,8 @@ pub struct RoomOptions {
22992299
/// allow to setup a custom RtcConfiguration
23002300
#[prost(message, optional, tag="5")]
23012301
pub rtc_config: ::core::option::Option<RtcConfig>,
2302+
#[prost(uint32, tag="6")]
2303+
pub join_retries: u32,
23022304
}
23032305
#[allow(clippy::derive_partial_eq_without_eq)]
23042306
#[derive(Clone, PartialEq, ::prost::Message)]

livekit/src/room/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ pub struct RoomOptions {
172172
pub dynacast: bool,
173173
pub e2ee: Option<E2eeOptions>,
174174
pub rtc_config: RtcConfiguration,
175+
pub join_retries: u32,
175176
}
176177

177178
impl Default for RoomOptions {
@@ -188,6 +189,7 @@ impl Default for RoomOptions {
188189
continual_gathering_policy: ContinualGatheringPolicy::GatherContinually,
189190
ice_transport_type: IceTransportsType::All,
190191
},
192+
join_retries: 3,
191193
}
192194
}
193195
}
@@ -228,6 +230,7 @@ impl Room {
228230
auto_subscribe: options.auto_subscribe,
229231
adaptive_stream: options.adaptive_stream,
230232
},
233+
join_retries: options.join_retries,
231234
},
232235
)
233236
.await?;

livekit/src/rtc_engine/mod.rs

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub enum EngineError {
7474
pub struct EngineOptions {
7575
pub rtc_config: RtcConfiguration,
7676
pub signal_options: SignalOptions,
77+
pub join_retries: u32,
7778
}
7879

7980
#[derive(Debug)]
@@ -258,35 +259,65 @@ impl EngineInner {
258259
options: EngineOptions,
259260
) -> EngineResult<(Arc<Self>, proto::JoinResponse, EngineEvents)> {
260261
let lk_runtime = LkRuntime::instance();
262+
let max_retries = options.join_retries;
263+
264+
let try_connect = {
265+
move || {
266+
let options = options.clone();
267+
let lk_runtime = lk_runtime.clone();
268+
async move {
269+
let (session, join_response, session_events) =
270+
RtcSession::connect(url, token, options.clone()).await?;
271+
session.wait_pc_connection().await?;
272+
273+
let (engine_tx, engine_rx) = mpsc::unbounded_channel();
274+
let inner = Arc::new(Self {
275+
lk_runtime,
276+
engine_tx,
277+
close_notifier: Arc::new(Notify::new()),
278+
running_handle: RwLock::new(EngineHandle {
279+
session: Arc::new(session),
280+
closed: false,
281+
reconnecting: false,
282+
full_reconnect: false,
283+
engine_task: None,
284+
}),
285+
options,
286+
reconnecting_lock: AsyncRwLock::default(),
287+
reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
288+
});
261289

262-
let (session, join_response, session_events) =
263-
RtcSession::connect(url, token, options.clone()).await?;
264-
let (engine_tx, engine_rx) = mpsc::unbounded_channel();
265-
266-
session.wait_pc_connection().await?;
267-
268-
let inner = Arc::new(Self {
269-
lk_runtime,
270-
engine_tx,
271-
close_notifier: Arc::new(Notify::new()),
272-
running_handle: RwLock::new(EngineHandle {
273-
session: Arc::new(session),
274-
closed: false,
275-
reconnecting: false,
276-
full_reconnect: false,
277-
engine_task: None,
278-
}),
279-
options,
280-
reconnecting_lock: AsyncRwLock::default(),
281-
reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
282-
});
290+
// Start initial tasks
291+
let (close_tx, close_rx) = oneshot::channel();
292+
let session_task =
293+
tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx));
294+
inner.running_handle.write().engine_task = Some((session_task, close_tx));
283295

284-
// Start initial tasks
285-
let (close_tx, close_rx) = oneshot::channel();
286-
let session_task = tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx));
287-
inner.running_handle.write().engine_task = Some((session_task, close_tx));
296+
Ok((inner, join_response, engine_rx))
297+
}
298+
}
299+
};
300+
301+
let mut last_error = None;
302+
for i in 0..(max_retries + 1) {
303+
match try_connect().await {
304+
Ok(res) => return Ok(res),
305+
Err(e) => {
306+
let attempt_i = i + 1;
307+
if i < max_retries {
308+
log::warn!(
309+
"failed to connect: {:?}, retrying... ({}/{})",
310+
e,
311+
attempt_i,
312+
max_retries
313+
);
314+
}
315+
last_error = Some(e)
316+
}
317+
}
318+
}
288319

289-
Ok((inner, join_response, engine_rx))
320+
Err(last_error.unwrap())
290321
}
291322

292323
async fn engine_task(

0 commit comments

Comments
 (0)