Skip to content

Commit 2f6cb58

Browse files
hiroshihorieelesahich
authored andcommitted
WebSocket: Migrate to async/await from Promises lib (livekit#245)
* progress * format * progress * clean up * ref * optimize * optimize
1 parent 8b9cefe commit 2f6cb58

File tree

3 files changed

+191
-146
lines changed

3 files changed

+191
-146
lines changed

Sources/LiveKit/Core/SignalClient.swift

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -108,42 +108,59 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
108108
$0.connectionState = .connecting
109109
}
110110

111-
return WebSocket.connect(url: url,
112-
onMessage: self.onWebSocketMessage,
113-
onDisconnect: { reason in
114-
self.webSocket = nil
115-
self.cleanUp(reason: reason)
116-
})
117-
.then(on: queue) { (webSocket: WebSocket) -> Void in
118-
self.webSocket = webSocket
119-
self._state.mutate { $0.connectionState = .connected }
120-
}.recover(on: queue) { error -> Promise<Void> in
121-
// Skip validation if reconnect mode
122-
if reconnectMode != nil { throw error }
123-
// Catch first, then throw again after getting validation response
124-
// Re-build url with validate mode
125-
guard let validateUrl = Utils.buildUrl(urlString,
126-
token,
127-
connectOptions: connectOptions,
128-
adaptiveStream: adaptiveStream,
129-
validate: true) else {
130-
131-
return Promise(InternalError.parse(message: "Failed to parse validation url"))
111+
let socket = WebSocket(url: url)
112+
113+
return Promise<Void> { resolve, reject in
114+
Task {
115+
do {
116+
try await socket.connect()
117+
self.webSocket = socket
118+
self._state.mutate { $0.connectionState = .connected }
119+
resolve(())
120+
121+
Task.detached {
122+
self.log("Did enter WebSocket message loop...")
123+
do {
124+
for try await message in socket {
125+
self.onWebSocketMessage(message: message)
126+
}
127+
} catch {
128+
//
129+
self.cleanUp(reason: .networkError(error))
130+
}
131+
self.log("Did exit WebSocket message loop...")
132+
}
133+
} catch {
134+
reject(error)
132135
}
136+
}
137+
}.recover(on: queue) { error -> Promise<Void> in
138+
// Skip validation if reconnect mode
139+
if reconnectMode != nil { throw error }
140+
// Catch first, then throw again after getting validation response
141+
// Re-build url with validate mode
142+
guard let validateUrl = Utils.buildUrl(urlString,
143+
token,
144+
connectOptions: connectOptions,
145+
adaptiveStream: adaptiveStream,
146+
validate: true) else {
147+
148+
return Promise(InternalError.parse(message: "Failed to parse validation url"))
149+
}
133150

134-
self.log("Validating with url: \(validateUrl)")
151+
self.log("Validating with url: \(validateUrl)")
135152

136-
return HTTP().get(on: self.queue, url: validateUrl).then(on: self.queue) { data in
137-
guard let string = String(data: data, encoding: .utf8) else {
138-
throw SignalClientError.connect(message: "Failed to decode string")
139-
}
140-
self.log("validate response: \(string)")
141-
// re-throw with validation response
142-
throw SignalClientError.connect(message: "Validation response: \"\(string)\"")
153+
return HTTP().get(on: self.queue, url: validateUrl).then(on: self.queue) { data in
154+
guard let string = String(data: data, encoding: .utf8) else {
155+
throw SignalClientError.connect(message: "Failed to decode string")
143156
}
144-
}.catch(on: queue) { error in
145-
self.cleanUp(reason: .networkError(error))
157+
self.log("validate response: \(string)")
158+
// re-throw with validation response
159+
throw SignalClientError.connect(message: "Validation response: \"\(string)\"")
146160
}
161+
}.catch(on: queue) { error in
162+
self.cleanUp(reason: .networkError(error))
163+
}
147164
}
148165

149166
func cleanUp(reason: DisconnectReason? = nil) {
@@ -156,9 +173,11 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
156173
pingTimeoutTimer = nil
157174

158175
if let socket = webSocket {
159-
socket.cleanUp(reason: reason, notify: false)
160-
socket.onMessage = nil
161-
socket.onDisconnect = nil
176+
// socket.cleanUp(reason: reason, notify: false)
177+
// socket.onMessage = nil
178+
// socket.onDisconnect = nil
179+
// self.webSocket?.cancel()
180+
socket.reset()
162181
self.webSocket = nil
163182
}
164183

Lines changed: 79 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 LiveKit
2+
* Copyright 2022-2023 LiveKit
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,163 +17,130 @@
1717
import Foundation
1818
import Promises
1919

20-
internal class WebSocket: NSObject, URLSessionWebSocketDelegate, Loggable {
20+
internal typealias WebSocketStream = AsyncThrowingStream<URLSessionWebSocketTask.Message, Error>
2121

22-
private let queue = DispatchQueue(label: "LiveKitSDK.webSocket", qos: .default)
22+
internal class WebSocket: NSObject, Loggable, AsyncSequence, URLSessionWebSocketDelegate {
2323

24-
typealias OnMessage = (URLSessionWebSocketTask.Message) -> Void
25-
typealias OnDisconnect = (_ reason: DisconnectReason?) -> Void
24+
typealias AsyncIterator = WebSocketStream.Iterator
25+
typealias Element = URLSessionWebSocketTask.Message
2626

27-
public var onMessage: OnMessage?
28-
public var onDisconnect: OnDisconnect?
27+
private var streamContinuation: WebSocketStream.Continuation?
28+
private var connectContinuation: CheckedContinuation<Void, Error>?
2929

30-
private let operationQueue = OperationQueue()
3130
private let request: URLRequest
3231

33-
private var disconnected = false
34-
private var connectPromise: Promise<WebSocket>?
35-
36-
private lazy var session: URLSession = {
32+
private lazy var urlSession: URLSession = {
3733
let config = URLSessionConfiguration.default
3834
// explicitly set timeout intervals
3935
config.timeoutIntervalForRequest = TimeInterval(60)
4036
config.timeoutIntervalForResource = TimeInterval(604_800)
41-
log("URLSessionConfiguration.timeoutIntervalForRequest: \(config.timeoutIntervalForRequest)")
42-
log("URLSessionConfiguration.timeoutIntervalForResource: \(config.timeoutIntervalForResource)")
43-
return URLSession(configuration: config,
44-
delegate: self,
45-
delegateQueue: operationQueue)
37+
return URLSession(configuration: config, delegate: self, delegateQueue: nil)
4638
}()
4739

4840
private lazy var task: URLSessionWebSocketTask = {
49-
session.webSocketTask(with: request)
41+
urlSession.webSocketTask(with: request)
5042
}()
5143

52-
static func connect(url: URL,
53-
onMessage: OnMessage? = nil,
54-
onDisconnect: OnDisconnect? = nil) -> Promise<WebSocket> {
55-
56-
return WebSocket(url: url,
57-
onMessage: onMessage,
58-
onDisconnect: onDisconnect).connect()
59-
}
44+
private lazy var stream: WebSocketStream = {
45+
return WebSocketStream { continuation in
46+
streamContinuation = continuation
47+
waitForNextValue()
48+
}
49+
}()
6050

61-
private init(url: URL,
62-
onMessage: OnMessage? = nil,
63-
onDisconnect: OnDisconnect? = nil) {
51+
init(url: URL) {
6452

6553
request = URLRequest(url: url,
6654
cachePolicy: .useProtocolCachePolicy,
6755
timeoutInterval: .defaultSocketConnect)
68-
69-
self.onMessage = onMessage
70-
self.onDisconnect = onDisconnect
71-
super.init()
72-
task.resume()
7356
}
7457

7558
deinit {
76-
log()
59+
reset()
7760
}
7861

79-
private func connect() -> Promise<WebSocket> {
80-
connectPromise = Promise<WebSocket>.pending()
81-
return connectPromise!
82-
}
83-
84-
internal func cleanUp(reason: DisconnectReason?, notify: Bool = true) {
85-
86-
log("reason: \(String(describing: reason))")
62+
public func connect() async throws {
8763

88-
guard !disconnected else {
89-
log("dispose can be called only once", .warning)
90-
return
91-
}
92-
93-
// mark as disconnected, this instance cannot be re-used
94-
disconnected = true
95-
96-
task.cancel()
97-
session.invalidateAndCancel()
98-
99-
if let promise = connectPromise {
100-
let sdkError = NetworkError.disconnected(message: "WebSocket disconnected")
101-
promise.reject(sdkError)
102-
connectPromise = nil
103-
}
104-
105-
if notify {
106-
onDisconnect?(reason)
64+
try await withCheckedThrowingContinuation { continuation in
65+
connectContinuation = continuation
66+
task.resume()
10767
}
10868
}
10969

110-
public func send(data: Data) -> Promise<Void> {
111-
let message = URLSessionWebSocketTask.Message.data(data)
112-
return Promise(on: queue) { resolve, fail in
113-
self.task.send(message) { error in
114-
if let error = error {
115-
fail(error)
116-
return
117-
}
118-
resolve(())
119-
}
120-
}
70+
func reset() {
71+
task.cancel(with: .goingAway, reason: nil)
72+
connectContinuation?.resume(throwing: SignalClientError.socketError(rawError: nil))
73+
connectContinuation = nil
74+
streamContinuation?.finish()
75+
streamContinuation = nil
12176
}
12277

123-
private func receive(task: URLSessionWebSocketTask,
124-
result: Result<URLSessionWebSocketTask.Message, Error>) {
125-
switch result {
126-
case .failure(let error):
127-
log("Failed to receive \(error)", .error)
78+
// MARK: - AsyncSequence
12879

129-
case .success(let message):
130-
onMessage?(message)
131-
queue.async { task.receive { self.receive(task: task, result: $0) } }
132-
}
80+
func makeAsyncIterator() -> AsyncIterator {
81+
return stream.makeAsyncIterator()
13382
}
13483

135-
// MARK: - URLSessionWebSocketDelegate
136-
137-
internal func urlSession(_ session: URLSession,
138-
webSocketTask: URLSessionWebSocketTask,
139-
didOpenWithProtocol protocol: String?) {
140-
141-
guard !disconnected else {
84+
private func waitForNextValue() {
85+
guard task.closeCode == .invalid else {
86+
streamContinuation?.finish()
87+
streamContinuation = nil
14288
return
14389
}
14490

145-
if let promise = connectPromise {
146-
promise.fulfill(self)
147-
connectPromise = nil
148-
}
91+
task.receive(completionHandler: { [weak self] result in
92+
guard let continuation = self?.streamContinuation else {
93+
return
94+
}
14995

150-
queue.async { webSocketTask.receive { self.receive(task: webSocketTask, result: $0) } }
96+
do {
97+
let message = try result.get()
98+
continuation.yield(message)
99+
self?.waitForNextValue()
100+
} catch {
101+
continuation.finish(throwing: error)
102+
self?.streamContinuation = nil
103+
}
104+
})
151105
}
152106

153-
internal func urlSession(_ session: URLSession,
154-
webSocketTask: URLSessionWebSocketTask,
155-
didCloseWith closeCode: URLSessionWebSocketTask.CloseCode,
156-
reason: Data?) {
107+
// MARK: - Send
157108

158-
guard !disconnected else {
159-
return
160-
}
109+
public func send(data: Data) async throws {
110+
let message = URLSessionWebSocketTask.Message.data(data)
111+
try await task.send(message)
112+
}
161113

162-
let sdkError = NetworkError.disconnected(message: "WebSocket did close with code: \(closeCode) reason: \(String(describing: reason))")
114+
// MARK: - URLSessionWebSocketDelegate
163115

164-
cleanUp(reason: .networkError(sdkError))
116+
func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
117+
connectContinuation?.resume()
118+
connectContinuation = nil
165119
}
166120

167-
internal func urlSession(_ session: URLSession,
168-
task: URLSessionTask,
169-
didCompleteWithError error: Error?) {
170-
171-
guard !disconnected else {
172-
return
173-
}
121+
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
122+
log("didCompleteWithError: \(String(describing: error))", .error)
123+
let error = error ?? NetworkError.disconnected(message: "WebSocket didCompleteWithError")
124+
connectContinuation?.resume(throwing: error)
125+
connectContinuation = nil
126+
streamContinuation?.finish()
127+
streamContinuation = nil
128+
}
129+
}
174130

175-
let sdkError = NetworkError.disconnected(message: "WebSocket disconnected", rawError: error)
131+
internal extension WebSocket {
176132

177-
cleanUp(reason: .networkError(sdkError))
133+
// Deprecate
134+
func send(data: Data) -> Promise<Void> {
135+
Promise { [self] resolve, fail in
136+
Task {
137+
do {
138+
try await self.send(data: data)
139+
resolve(())
140+
} catch {
141+
fail(error)
142+
}
143+
}
144+
}
178145
}
179146
}

0 commit comments

Comments
 (0)