Skip to content

Commit dfba35d

Browse files
authored
Send pre-connect audio (#622)
- Adds infrastructure and public API for the agent pre-connect feature based on data channels livekit-examples/multimodal-agent-python#18 ### Building blocks: - `LocalAudioTrackRecorder` - a simple stream-based wrapper for `startLocalRecording`, can be used in isolation - `PreConnectAudioBuffer` - handles the `Room` state automatically (if a `Room?` is passed) by setting participant attributes and flushing the recording into the data stream 🚰, can be used manually via `start/stop` methods - `Room.startCapturingBeforeConnecting` - a convenience one-liner to get the default behavior; start automatically, stop when the agent joins ### Usage ```swift .onAppear { try? AudioManager.shared.setRecordingAlwaysPreparedMode(true) } + private func connect() { Task(priority: .userInitiated) { isConnecting = true if enablePreConnect { try? await room.startCapturingBeforeConnecting() isConnecting = false } ```
1 parent e220c30 commit dfba35d

13 files changed

+822
-8
lines changed

.nanpa/pre-connect-audio.kdl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
minor type="added" "Added LocalAudioTrackRecorder"
2+
minor type="added" "Added the possibility to capture pre-connect audio and send it to agents via data streams"
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import AVFAudio
18+
import Foundation
19+
20+
/// A buffer that captures audio before connecting to the server,
21+
/// and sends it on certain ``RoomDelegate`` events.
22+
@objc
23+
public final class PreConnectAudioBuffer: NSObject, Loggable {
24+
/// The default participant attribute key used to indicate that the audio buffer is active.
25+
@objc
26+
public static let attributeKey = "lk.agent.pre-connect-audio"
27+
28+
/// The default data topic used to send the audio buffer.
29+
@objc
30+
public static let dataTopic = "lk.agent.pre-connect-audio-buffer"
31+
32+
/// The room instance to listen for events.
33+
@objc
34+
public let room: Room?
35+
36+
/// The audio recorder instance.
37+
@objc
38+
public let recorder: LocalAudioTrackRecorder
39+
40+
private let state = StateSync<State>(State())
41+
private struct State {
42+
var audioStream: LocalAudioTrackRecorder.Stream?
43+
}
44+
45+
/// Initialize the audio buffer with a room instance.
46+
/// - Parameters:
47+
/// - room: The room instance to listen for events.
48+
/// - recorder: The audio recorder to use for capturing.
49+
@objc
50+
public init(room: Room?,
51+
recorder: LocalAudioTrackRecorder = LocalAudioTrackRecorder(
52+
track: LocalAudioTrack.createTrack(),
53+
format: .pcmFormatInt16, // supported by agent plugins
54+
sampleRate: 24000, // supported by agent plugins
55+
maxSize: 10 * 1024 * 1024 // arbitrary max recording size of 10MB
56+
))
57+
{
58+
self.room = room
59+
self.recorder = recorder
60+
super.init()
61+
}
62+
63+
deinit {
64+
stopRecording()
65+
room?.remove(delegate: self)
66+
}
67+
68+
/// Start capturing audio and listening to ``RoomDelegate`` events.
69+
@objc
70+
public func startRecording() async throws {
71+
room?.add(delegate: self)
72+
73+
let stream = try await recorder.start()
74+
log("Started capturing audio", .info)
75+
state.mutate { state in
76+
state.audioStream = stream
77+
}
78+
}
79+
80+
/// Stop capturing audio.
81+
/// - Parameters:
82+
/// - flush: If `true`, the audio stream will be flushed immediately without sending.
83+
@objc
84+
public func stopRecording(flush: Bool = false) {
85+
recorder.stop()
86+
log("Stopped capturing audio", .info)
87+
if flush, let stream = state.audioStream {
88+
Task {
89+
for await _ in stream {}
90+
}
91+
}
92+
}
93+
}
94+
95+
// MARK: - RoomDelegate
96+
97+
extension PreConnectAudioBuffer: RoomDelegate {
98+
public func roomDidConnect(_ room: Room) {
99+
Task {
100+
try? await setParticipantAttribute(room: room)
101+
}
102+
}
103+
104+
public func room(_ room: Room, participant _: LocalParticipant, remoteDidSubscribeTrack _: LocalTrackPublication) {
105+
stopRecording()
106+
Task {
107+
try? await sendAudioData(to: room)
108+
}
109+
}
110+
111+
/// Set the participant attribute to indicate that the audio buffer is active.
112+
/// - Parameters:
113+
/// - key: The key to set the attribute.
114+
/// - room: The room instance to set the attribute.
115+
@objc
116+
public func setParticipantAttribute(key _: String = attributeKey, room: Room) async throws {
117+
var attributes = room.localParticipant.attributes
118+
attributes[Self.attributeKey] = "true"
119+
try await room.localParticipant.set(attributes: attributes)
120+
log("Set participant attribute", .info)
121+
}
122+
123+
/// Send the audio data to the room.
124+
/// - Parameters:
125+
/// - room: The room instance to send the audio data.
126+
/// - topic: The topic to send the audio data.
127+
@objc
128+
public func sendAudioData(to room: Room, on topic: String = dataTopic) async throws {
129+
guard let audioStream = state.audioStream else {
130+
throw LiveKitError(.invalidState, message: "Audio stream is nil")
131+
}
132+
133+
let streamOptions = StreamByteOptions(
134+
topic: topic,
135+
attributes: [
136+
"sampleRate": "\(recorder.sampleRate)",
137+
"channels": "\(recorder.channels)",
138+
]
139+
)
140+
let writer = try await room.localParticipant.streamBytes(options: streamOptions)
141+
try await writer.write(audioStream.collect())
142+
try await writer.close()
143+
log("Sent audio data", .info)
144+
145+
room.remove(delegate: self)
146+
}
147+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2025 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
19+
public extension Room {
20+
/// Start capturing audio before connecting to the server,
21+
/// so that it's not lost when the connection is established.
22+
/// It will be automatically sent via data stream to the other participant
23+
/// using the `PreConnectAudioBuffer.dataTopic` when the local track is subscribed.
24+
/// - See: ``PreConnectAudioBuffer``
25+
/// - Note: Use ``AudioManager/setRecordingAlwaysPreparedMode(_:)`` to request microphone permissions early.
26+
func startCapturingBeforeConnecting() async throws {
27+
try await preConnectBuffer.startRecording()
28+
}
29+
}

Sources/LiveKit/Core/Room.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@ public class Room: NSObject, ObservableObject, Loggable {
116116
try await self?.send(dataPacket: packet)
117117
}
118118

119+
// MARK: - PreConnect
120+
121+
lazy var preConnectBuffer = PreConnectAudioBuffer(room: self)
122+
123+
// MARK: - Queue
124+
119125
var _blockProcessQueue = DispatchQueue(label: "LiveKitSDK.engine.pendingBlocks",
120126
qos: .default)
121127

@@ -395,6 +401,10 @@ extension Room {
395401
e2eeManager.cleanUp()
396402
}
397403

404+
if disconnectError != nil {
405+
preConnectBuffer.stopRecording(flush: true)
406+
}
407+
398408
// Reset state
399409
_state.mutate {
400410
// if isFullReconnect, keep connection related states

Sources/LiveKit/DataStream/Outgoing/OutgoingStreamManager.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ actor OutgoingStreamManager: Loggable {
246246
}
247247

248248
/// Maximum number of bytes to send in a single chunk.
249-
private static let chunkSize = 15000
249+
private static let chunkSize = 15 * 1024
250250

251251
/// Default MIME type to use for text streams.
252252
fileprivate static let textMimeType = "text/plain"

Sources/LiveKit/Extensions/AVAudioPCMBuffer.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,37 @@ public extension AVAudioPCMBuffer {
133133

134134
return outputBuffer
135135
}
136+
137+
func toData() -> Data? {
138+
switch format.commonFormat {
139+
case .pcmFormatInt16:
140+
guard let channelData = int16ChannelData else { return nil }
141+
return interleave(channelData: channelData)
142+
case .pcmFormatInt32:
143+
guard let channelData = int32ChannelData else { return nil }
144+
return interleave(channelData: channelData)
145+
case .pcmFormatFloat32:
146+
guard let channelData = floatChannelData else { return nil }
147+
return interleave(channelData: channelData)
148+
default:
149+
return nil
150+
}
151+
}
152+
153+
private func interleave<T>(channelData: UnsafePointer<UnsafeMutablePointer<T>>) -> Data {
154+
let channels = Int(format.channelCount)
155+
let frameLength = Int(frameLength)
156+
var interleavedBuffer = [T](repeating: channelData[0][0], count: frameLength * channels)
157+
158+
for frame in 0 ..< frameLength {
159+
for channel in 0 ..< channels {
160+
let channelPtr = channelData[channel]
161+
interleavedBuffer[frame * channels + channel] = channelPtr[frame]
162+
}
163+
}
164+
165+
return interleavedBuffer.withUnsafeBytes { bufferPointer in
166+
Data(bufferPointer)
167+
}
168+
}
136169
}

Sources/LiveKit/Track/AudioManager.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,9 @@ public class AudioManager: Loggable {
244244
/// This will per persisted accross Rooms and connections.
245245
public var isRecordingAlwaysPreparedMode: Bool { RTC.audioDeviceModule.isRecordingAlwaysPreparedMode }
246246

247+
/// Keep recording initialized (mic input) and pre-warm voice processing etc.
248+
/// Mic permission is required and dialog will appear if not already granted.
249+
/// This will per persisted accross Rooms and connections.
247250
public func setRecordingAlwaysPreparedMode(_ enabled: Bool) throws {
248251
let result = RTC.audioDeviceModule.setRecordingAlwaysPreparedMode(enabled)
249252
try checkAdmResult(code: result)

0 commit comments

Comments
 (0)