Skip to content

Commit 2d44fcd

Browse files
committed
cancellable publish
1 parent cc6e480 commit 2d44fcd

File tree

4 files changed

+110
-99
lines changed

4 files changed

+110
-99
lines changed

Sources/LiveKit/Participant/LocalParticipant.swift

Lines changed: 105 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -58,129 +58,140 @@ public class LocalParticipant: Participant {
5858
throw TrackError.publish(message: "Unknown LocalTrack type")
5959
}
6060

61-
// Start the track
61+
// Try to start the Track
6262
try await track.start()
63+
// Starting the Track could be time consuming especially for camera etc.
64+
// Check cancellation after track starts.
65+
try Task.checkCancellation()
6366

64-
var dimensions: Dimensions? // Only for Video
67+
do {
68+
var dimensions: Dimensions? // Only for Video
6569

66-
if let track = track as? LocalVideoTrack {
67-
// Wait for Dimensions...
68-
log("[Publish] Waiting for dimensions to resolve...")
69-
dimensions = try await track.capturer.dimensionsCompleter.wait()
70-
}
70+
if let track = track as? LocalVideoTrack {
71+
// Wait for Dimensions...
72+
log("[Publish] Waiting for dimensions to resolve...")
73+
dimensions = try await track.capturer.dimensionsCompleter.wait()
74+
}
7175

72-
let populatorFunc: SignalClient.AddTrackRequestPopulator<LKRTCRtpTransceiverInit> = { populator in
76+
let populatorFunc: SignalClient.AddTrackRequestPopulator<LKRTCRtpTransceiverInit> = { populator in
7377

74-
let transInit = DispatchQueue.liveKitWebRTC.sync { LKRTCRtpTransceiverInit() }
75-
transInit.direction = .sendOnly
78+
let transInit = DispatchQueue.liveKitWebRTC.sync { LKRTCRtpTransceiverInit() }
79+
transInit.direction = .sendOnly
7680

77-
if let track = track as? LocalVideoTrack {
78-
guard let dimensions else {
79-
throw TrackError.publish(message: "VideoCapturer dimensions are unknown")
80-
}
81+
if let track = track as? LocalVideoTrack {
82+
guard let dimensions else {
83+
throw TrackError.publish(message: "VideoCapturer dimensions are unknown")
84+
}
8185

82-
self.log("[publish] computing encode settings with dimensions: \(dimensions)...")
86+
self.log("[publish] computing encode settings with dimensions: \(dimensions)...")
8387

84-
let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions
88+
let publishOptions = (publishOptions as? VideoPublishOptions) ?? self.room._state.options.defaultVideoPublishOptions
8589

86-
let encodings = Utils.computeEncodings(dimensions: dimensions,
87-
publishOptions: publishOptions,
88-
isScreenShare: track.source == .screenShareVideo)
90+
let encodings = Utils.computeEncodings(dimensions: dimensions,
91+
publishOptions: publishOptions,
92+
isScreenShare: track.source == .screenShareVideo)
8993

90-
self.log("[publish] using encodings: \(encodings)")
91-
transInit.sendEncodings = encodings
94+
self.log("[publish] using encodings: \(encodings)")
95+
transInit.sendEncodings = encodings
9296

93-
let videoLayers = dimensions.videoLayers(for: encodings)
97+
let videoLayers = dimensions.videoLayers(for: encodings)
9498

95-
self.log("[publish] using layers: \(videoLayers.map { String(describing: $0) }.joined(separator: ", "))")
99+
self.log("[publish] using layers: \(videoLayers.map { String(describing: $0) }.joined(separator: ", "))")
96100

97-
populator.width = UInt32(dimensions.width)
98-
populator.height = UInt32(dimensions.height)
99-
populator.layers = videoLayers
101+
populator.width = UInt32(dimensions.width)
102+
populator.height = UInt32(dimensions.height)
103+
populator.layers = videoLayers
100104

101-
self.log("[publish] requesting add track to server with \(populator)...")
105+
self.log("[publish] requesting add track to server with \(populator)...")
102106

103-
} else if track is LocalAudioTrack {
104-
// additional params for Audio
105-
let publishOptions = (publishOptions as? AudioPublishOptions) ?? self.room._state.options.defaultAudioPublishOptions
107+
} else if track is LocalAudioTrack {
108+
// additional params for Audio
109+
let publishOptions = (publishOptions as? AudioPublishOptions) ?? self.room._state.options.defaultAudioPublishOptions
106110

107-
populator.disableDtx = !publishOptions.dtx
111+
populator.disableDtx = !publishOptions.dtx
108112

109-
let encoding = publishOptions.encoding ?? AudioEncoding.presetSpeech
113+
let encoding = publishOptions.encoding ?? AudioEncoding.presetSpeech
110114

111-
self.log("[publish] maxBitrate: \(encoding.maxBitrate)")
115+
self.log("[publish] maxBitrate: \(encoding.maxBitrate)")
112116

113-
transInit.sendEncodings = [
114-
Engine.createRtpEncodingParameters(encoding: encoding),
115-
]
117+
transInit.sendEncodings = [
118+
Engine.createRtpEncodingParameters(encoding: encoding),
119+
]
120+
}
121+
122+
return transInit
116123
}
117124

118-
return transInit
119-
}
125+
// Request a new track to the server
126+
let addTrackResult = try await room.engine.signalClient.sendAddTrack(cid: track.mediaTrack.trackId,
127+
name: track.name,
128+
type: track.kind.toPBType(),
129+
source: track.source.toPBType(),
130+
encryption: room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none,
131+
populatorFunc)
120132

121-
// Request a new track to the server
122-
let addTrackResult = try await room.engine.signalClient.sendAddTrack(cid: track.mediaTrack.trackId,
123-
name: track.name,
124-
type: track.kind.toPBType(),
125-
source: track.source.toPBType(),
126-
encryption: room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none,
127-
populatorFunc)
128-
129-
log("[Publish] server responded trackInfo: \(addTrackResult.trackInfo)")
130-
131-
// Add transceiver to pc
132-
let transceiver = try publisher.addTransceiver(with: track.mediaTrack, transceiverInit: addTrackResult.result)
133-
log("[Publish] Added transceiver: \(addTrackResult.trackInfo)...")
134-
135-
try await track.onPublish()
136-
137-
// Store publishOptions used for this track...
138-
track._publishOptions = publishOptions
139-
140-
// Attach sender to track...
141-
track.set(transport: publisher, rtpSender: transceiver.sender)
142-
143-
if track is LocalVideoTrack {
144-
let publishOptions = (publishOptions as? VideoPublishOptions) ?? room._state.options.defaultVideoPublishOptions
145-
// if screen share or simulcast is enabled,
146-
// degrade resolution by using server's layer switching logic instead of WebRTC's logic
147-
if track.source == .screenShareVideo || publishOptions.simulcast {
148-
log("[publish] set degradationPreference to .maintainResolution")
149-
let params = transceiver.sender.parameters
150-
params.degradationPreference = NSNumber(value: RTCDegradationPreference.maintainResolution.rawValue)
151-
// changing params directly doesn't work so we need to update params
152-
// and set it back to sender.parameters
153-
transceiver.sender.parameters = params
154-
}
155-
}
133+
log("[Publish] server responded trackInfo: \(addTrackResult.trackInfo)")
134+
135+
// Add transceiver to pc
136+
let transceiver = try publisher.addTransceiver(with: track.mediaTrack, transceiverInit: addTrackResult.result)
137+
log("[Publish] Added transceiver: \(addTrackResult.trackInfo)...")
156138

157-
try await room.engine.publisherShouldNegotiate()
139+
do {
140+
try await track.onPublish()
141+
142+
// Store publishOptions used for this track...
143+
track._publishOptions = publishOptions
144+
145+
// Attach sender to track...
146+
track.set(transport: publisher, rtpSender: transceiver.sender)
147+
148+
if track is LocalVideoTrack {
149+
let publishOptions = (publishOptions as? VideoPublishOptions) ?? room._state.options.defaultVideoPublishOptions
150+
// if screen share or simulcast is enabled,
151+
// degrade resolution by using server's layer switching logic instead of WebRTC's logic
152+
if track.source == .screenShareVideo || publishOptions.simulcast {
153+
log("[publish] set degradationPreference to .maintainResolution")
154+
let params = transceiver.sender.parameters
155+
params.degradationPreference = NSNumber(value: RTCDegradationPreference.maintainResolution.rawValue)
156+
// changing params directly doesn't work so we need to update params
157+
// and set it back to sender.parameters
158+
transceiver.sender.parameters = params
159+
}
160+
}
158161

159-
let publication = LocalTrackPublication(info: addTrackResult.trackInfo, track: track, participant: self)
162+
try await room.engine.publisherShouldNegotiate()
163+
try Task.checkCancellation()
160164

161-
addTrack(publication: publication)
165+
} catch {
166+
// Rollback
167+
track.set(transport: nil, rtpSender: nil)
168+
try publisher.remove(track: transceiver.sender)
169+
// Rethrow
170+
throw error
171+
}
162172

163-
// Notify didPublish
164-
delegates.notify(label: { "localParticipant.didPublish \(publication)" }) {
165-
$0.localParticipant?(self, didPublish: publication)
166-
}
167-
room.delegates.notify(label: { "localParticipant.didPublish \(publication)" }) {
168-
$0.room?(self.room, localParticipant: self, didPublish: publication)
169-
}
173+
let publication = LocalTrackPublication(info: addTrackResult.trackInfo, track: track, participant: self)
174+
175+
add(publication: publication)
170176

171-
log("[publish] success \(publication)", .info)
177+
// Notify didPublish
178+
delegates.notify(label: { "localParticipant.didPublish \(publication)" }) {
179+
$0.localParticipant?(self, didPublish: publication)
180+
}
181+
room.delegates.notify(label: { "localParticipant.didPublish \(publication)" }) {
182+
$0.room?(self.room, localParticipant: self, didPublish: publication)
183+
}
172184

173-
return publication
185+
log("[publish] success \(publication)", .info)
174186

175-
// }.catch(on: queue) { error in
176-
//
177-
// self.log("[publish] failed \(track), error: \(error)", .error)
178-
//
179-
// // stop the track
180-
// track.stop().catch(on: self.queue) { error in
181-
// self.log("[publish] failed to stop track, error: \(error)", .error)
182-
// }
183-
// }
187+
return publication
188+
} catch {
189+
log("[publish] failed \(track), error: \(error)", .error)
190+
// Stop track when publish fails
191+
try await track.stop()
192+
// Rethrow
193+
throw error
194+
}
184195
}
185196

186197
/// publish a new audio track to the Room

Sources/LiveKit/Participant/Participant.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public class Participant: NSObject, ObservableObject, Loggable {
162162
fatalError("Unimplemented")
163163
}
164164

165-
func addTrack(publication: TrackPublication) {
165+
func add(publication: TrackPublication) {
166166
_state.mutate { $0.tracks[publication.sid] = publication }
167167
publication.track?._state.mutate { $0.sid = publication.sid }
168168
}

Sources/LiveKit/Participant/RemoteParticipant.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class RemoteParticipant: Participant {
4343
if publication == nil {
4444
publication = RemoteTrackPublication(info: trackInfo, participant: self)
4545
newTrackPublications[trackInfo.sid] = publication
46-
addTrack(publication: publication!)
46+
add(publication: publication!)
4747
} else {
4848
publication!.updateFromInfo(info: trackInfo)
4949
}
@@ -117,7 +117,7 @@ public class RemoteParticipant: Participant {
117117
track.set(transport: transport, rtpReceiver: rtpReceiver)
118118
}
119119

120-
addTrack(publication: publication)
120+
add(publication: publication)
121121

122122
try await track.start()
123123

Sources/LiveKit/Track/Track.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,13 @@ public class Track: NSObject, Loggable {
182182
log("sid: \(String(describing: sid))")
183183
}
184184

185-
func set(transport: Transport, rtpSender: LKRTCRtpSender) {
185+
func set(transport: Transport?, rtpSender: LKRTCRtpSender?) {
186186
self.transport = transport
187187
self.rtpSender = rtpSender
188188
resumeOrSuspendStatisticsTimer()
189189
}
190190

191-
func set(transport: Transport, rtpReceiver: LKRTCRtpReceiver) {
191+
func set(transport: Transport?, rtpReceiver: LKRTCRtpReceiver?) {
192192
self.transport = transport
193193
self.rtpReceiver = rtpReceiver
194194
resumeOrSuspendStatisticsTimer()

0 commit comments

Comments
 (0)