Skip to content

Commit a6d8c82

Browse files
hiroshihorieelesahich
authored andcommitted
Actor pattern for SignalClient's request / response queue (livekit#269)
* impl * doc
1 parent af7dc4e commit a6d8c82

File tree

4 files changed

+94
-79
lines changed

4 files changed

+94
-79
lines changed

Sources/LiveKit/Core/Room.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,9 @@ internal extension Room {
294294
)
295295
}
296296

297-
engine.signalClient.cleanUp(reason: reason)
298-
299-
return engine.cleanUpRTC().then(on: queue) {
297+
return promise(from: engine.signalClient.cleanUp, param1: reason).then(on: queue) {
298+
self.engine.cleanUpRTC()
299+
}.then(on: queue) {
300300
self.cleanUpParticipants()
301301
}.then(on: queue) {
302302
// reset state

Sources/LiveKit/Core/SignalClient.swift

Lines changed: 30 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -40,19 +40,9 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
4040

4141
// MARK: - Private
4242

43-
private enum QueueState {
44-
case resumed
45-
case suspended
46-
}
47-
48-
// queue to store requests while reconnecting
49-
private var requestQueue = [Livekit_SignalRequest]()
50-
private var responseQueue = [Livekit_SignalResponse]()
51-
52-
private let requestDispatchQueue = DispatchQueue(label: "LiveKitSDK.signalClient.requestQueue", qos: .default)
53-
private let responseDispatchQueue = DispatchQueue(label: "LiveKitSDK.signalClient.responseQueue", qos: .default)
54-
55-
private var responseQueueState: QueueState = .resumed
43+
// Queue to store requests while reconnecting
44+
private let _requestQueue = AsyncQueueActor<Livekit_SignalRequest>()
45+
private var _responseQueue = AsyncQueueActor<Livekit_SignalResponse>()
5646

5747
private var _webSocket: WebSocket?
5848
private var latestJoinResponse: Livekit_JoinResponse?
@@ -89,7 +79,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
8979
reconnectMode: ReconnectMode? = nil,
9080
adaptiveStream: Bool) async throws {
9181

92-
cleanUp()
82+
await cleanUp()
9383

9484
log("reconnectMode: \(String(describing: reconnectMode))")
9585

@@ -123,14 +113,17 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
123113
self.onWebSocketMessage(message: message)
124114
}
125115
} catch {
126-
//
127-
self.cleanUp(reason: .networkError(error))
116+
await self.cleanUp(reason: .networkError(error))
128117
}
129118
self.log("Did exit WebSocket message loop...")
130119
}
131120
} catch let error {
132121

133-
defer { cleanUp(reason: .networkError(error)) }
122+
defer {
123+
Task {
124+
await cleanUp(reason: .networkError(error))
125+
}
126+
}
134127

135128
// Skip validation if reconnect mode
136129
if reconnectMode != nil { throw error }
@@ -152,7 +145,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
152145
}
153146
}
154147

155-
func cleanUp(reason: DisconnectReason? = nil) {
148+
func cleanUp(reason: DisconnectReason? = nil) async {
156149

157150
log("reason: \(String(describing: reason))")
158151

@@ -180,16 +173,8 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
180173
$0 = State()
181174
}
182175

183-
requestDispatchQueue.async { [weak self] in
184-
guard let self = self else { return }
185-
self.requestQueue = []
186-
}
187-
188-
responseDispatchQueue.async { [weak self] in
189-
guard let self = self else { return }
190-
self.responseQueue = []
191-
self.responseQueueState = .resumed
192-
}
176+
await _requestQueue.clear()
177+
await _responseQueue.clear()
193178
}
194179

195180
func resumeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) {
@@ -224,12 +209,9 @@ private extension SignalClient {
224209
// send request or enqueue while reconnecting
225210
func sendRequest(_ request: Livekit_SignalRequest, enqueueIfReconnecting: Bool = true) async throws {
226211

227-
// on: requestDispatchQueue
228-
229212
guard !(_state.connectionState.isReconnecting && request.canEnqueue() && enqueueIfReconnecting) else {
230-
log("queuing request while reconnecting, request: \(request)")
231-
requestQueue.append(request)
232-
// success
213+
log("Queuing request while reconnecting, request: \(request)")
214+
await _requestQueue.enqueue(request)
233215
return
234216
}
235217

@@ -267,17 +249,12 @@ private extension SignalClient {
267249
return
268250
}
269251

270-
responseDispatchQueue.async {
271-
if case .suspended = self.responseQueueState {
272-
self.log("Enqueueing response: \(response)")
273-
self.responseQueue.append(response)
274-
} else {
275-
self.onSignalResponse(response)
276-
}
252+
Task {
253+
await _responseQueue.enqueue(response) { await processSignalResponse($0) }
277254
}
278255
}
279256

280-
func onSignalResponse(_ response: Livekit_SignalResponse) {
257+
func processSignalResponse(_ response: Livekit_SignalResponse) async {
281258

282259
guard case .connected = connectionState else {
283260
log("Not connected", .warning)
@@ -291,7 +268,7 @@ private extension SignalClient {
291268

292269
switch message {
293270
case .join(let joinResponse):
294-
responseQueueState = .suspended
271+
await _responseQueue.suspend()
295272
latestJoinResponse = joinResponse
296273
restartPingTimer()
297274
notify { $0.signalClient(self, didReceive: joinResponse) }
@@ -370,23 +347,9 @@ internal extension SignalClient {
370347

371348
func resumeResponseQueue() async throws {
372349

373-
// on: responseDispatchQueue
374-
375-
defer { responseQueueState = .resumed }
376-
377-
// Quickly return if no queued requests
378-
guard !responseQueue.isEmpty else {
379-
self.log("No queued response")
380-
return
350+
await _responseQueue.resume { response in
351+
await processSignalResponse(response)
381352
}
382-
383-
// Run responses in sequence
384-
for response in responseQueue {
385-
onSignalResponse(response)
386-
}
387-
388-
// Clear the queue
389-
responseQueue = []
390353
}
391354
}
392355

@@ -396,25 +359,13 @@ internal extension SignalClient {
396359

397360
func sendQueuedRequests() async throws {
398361

399-
// on: requestDispatchQueue
400-
401-
// Return if no queued requests
402-
guard !requestQueue.isEmpty else {
403-
log("No queued requests")
404-
return
405-
}
406-
407-
// Send requests in sequential order
408-
for request in requestQueue {
362+
await _requestQueue.resume { element in
409363
do {
410-
try await sendRequest(request, enqueueIfReconnecting: false)
364+
try await sendRequest(element, enqueueIfReconnecting: false)
411365
} catch let error {
412-
log("Failed to send queued request \(request) with error: \(error)", .error)
366+
log("Failed to send queued request \(element) with error: \(error)", .error)
413367
}
414368
}
415-
416-
// Clear the queue
417-
requestQueue = []
418369
}
419370

420371
func sendOffer(offer: LKRTCSessionDescription) async throws {
@@ -623,7 +574,9 @@ internal extension SignalClient {
623574

624575
defer {
625576
if shouldDisconnect {
626-
cleanUp(reason: .networkError(NetworkError.disconnected(message: "Simulate scenario")))
577+
Task {
578+
await cleanUp(reason: .networkError(NetworkError.disconnected(message: "Simulate scenario")))
579+
}
627580
}
628581
}
629582

@@ -658,7 +611,9 @@ private extension SignalClient {
658611
timer.handler = { [weak self] in
659612
guard let self = self else { return }
660613
self.log("ping/pong timed out", .error)
661-
self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
614+
Task {
615+
await self.cleanUp(reason: .networkError(SignalClientError.serverPingTimedOut()))
616+
}
662617
}
663618
timer.resume()
664619
return timer

Sources/LiveKit/Support/AsyncCompleter.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616

1717
import Foundation
18-
import Promises
1918

2019
internal enum AsyncCompleterError: LiveKitError {
2120
case timedOut
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2023 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
internal actor AsyncQueueActor<T> {
20+
21+
public enum State {
22+
case resumed
23+
case suspended
24+
}
25+
26+
public private(set) var state: State = .resumed
27+
private var queue = [T]()
28+
29+
/// Mark as `.suspended`.
30+
func suspend() {
31+
state = .suspended
32+
}
33+
34+
func enqueue(_ value: T) {
35+
queue.append(value)
36+
}
37+
38+
/// Only enqueue if `.suspended` state, otherwise process immediately.
39+
func enqueue(_ value: T, ifResumed process: (T) async -> Void) async {
40+
if case .suspended = state {
41+
queue.append(value)
42+
} else {
43+
await process(value)
44+
}
45+
}
46+
47+
func clear() {
48+
queue.removeAll()
49+
state = .resumed
50+
}
51+
52+
/// Mark as `.resumed` and process each element with an async `block`.
53+
func resume(_ block: (T) async -> Void) async {
54+
state = .resumed
55+
if queue.isEmpty { return }
56+
for element in queue {
57+
await block(element)
58+
}
59+
queue.removeAll()
60+
}
61+
}

0 commit comments

Comments
 (0)