Skip to content

Commit 175bfda

Browse files
authored
feat: datachannels (#3)
1 parent ce18c35 commit 175bfda

File tree

9 files changed

+104
-30
lines changed

9 files changed

+104
-30
lines changed

examples/basic_room/room.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@ async def main():
1313
audio_stream = None
1414
video_stream = None
1515

16-
logging.info("connecting to %s", URL)
17-
try:
18-
await room.connect(URL, TOKEN)
19-
logging.info("connected to room %s", room.name)
20-
except livekit.ConnectError as e:
21-
logging.error("failed to connect to the room: %s", e)
22-
return False
23-
2416
@room.on("participant_connected")
2517
def on_participant_connected(participant: livekit.RemoteParticipant):
2618
logging.info(
@@ -65,8 +57,20 @@ def on_audio_frame(frame: livekit.AudioFrame):
6557
def on_track_unsubscribed(track: livekit.Track, publication: livekit.RemoteTrackPublication, participant: livekit.RemoteParticipant):
6658
logging.info("track unsubscribed: %s", publication.sid)
6759

60+
@room.on("data_received")
61+
def on_data_received(data: bytes, kind: livekit.DataPacketKind, participant: livekit.Participant):
62+
logging.info("received data from %s: %s", participant.identity, data)
63+
6864
try:
65+
logging.info("connecting to %s", URL)
66+
await room.connect(URL, TOKEN)
67+
logging.info("connected to room %s", room.name)
68+
69+
await room.local_participant.publish_data("hello world")
70+
6971
await room.run()
72+
except livekit.ConnectError as e:
73+
logging.error("failed to connect to the room: %s", e)
7074
except asyncio.CancelledError:
7175
logging.info("closing the room")
7276
await room.close()

livekit/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from ._proto.video_frame_pb2 import (
77
VideoRotation, VideoFormatType, VideoFrameBufferType)
88
from ._proto.track_pb2 import (TrackKind, TrackSource, StreamState)
9-
from ._proto.room_pb2 import (TrackPublishOptions)
9+
from ._proto.room_pb2 import (TrackPublishOptions, DataPacketKind)
1010

1111
from .room import (Room, ConnectError)
1212
from .participant import (Participant, LocalParticipant, RemoteParticipant)
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:3506be9e3e8fc713fa1fc425a48dbed71a86cfddea6beab68a7721d59b4fa792
3-
size 27984241
2+
oid sha256:c4910845710a43ba065bf4e697b4f99c908167b636080f56f1ad2a5337cbc82f
3+
size 27965777
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:698dad5c7e83f01f5421805293c2335365dd40b3d5629d548ae82e4dbee71b8e
3-
size 32685760
2+
oid sha256:ffd4891c955d4743f5689ee5242a4c4b572a9c7a38e277bd035071d1a30463f4
3+
size 32682136
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:aa2fdeeb7fbf1ffd001b6324f94e4e60fe5e2eb708f52445e0bf19c3b89bff4d
3-
size 36215176
2+
oid sha256:d9d16748f346ea8f607e0d6df6f0dece90d9284ee0d50e8eacdd66f5051ddeb0
3+
size 36168880
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:35446a0fda6ec3b9f3f85200d3ed6ffbc588a531e3bd6ca0da431c5b4bb1c89e
3+
size 16523776
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:942a965d09f7a59e357ef6f72b402e4637a257281125a38c82f373252bc6db9f
3-
size 22181376
2+
oid sha256:3db42643671dee8f37be783e92308eb027c000736cb580fb17155951e24239b9
3+
size 22211584

livekit/participant.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
from ._ffi_client import (FfiClient, FfiHandle)
77
from typing import TYPE_CHECKING
88
import weakref
9+
import ctypes
910
import asyncio
10-
from livekit import TrackPublishOptions
11+
from livekit import (TrackPublishOptions, DataPacketKind)
1112

1213
if TYPE_CHECKING:
1314
from livekit import (Room, Participant)
@@ -18,6 +19,11 @@ def __init__(self, message: str):
1819
self.message = message
1920

2021

22+
class PublishDataError(Exception):
23+
def __init__(self, message: str):
24+
self.message = message
25+
26+
2127
class Participant():
2228
def __init__(self, info: proto_participant.ParticipantInfo):
2329
self._info = info
@@ -45,6 +51,52 @@ def __init__(self, info: proto_participant.ParticipantInfo, room: weakref.ref['R
4551
super().__init__(info)
4652
self._room = room
4753

54+
async def publish_data(self,
55+
# TODO(theomonnom): Allow ctypes.Array as payload?
56+
payload: bytes or str,
57+
kind: DataPacketKind = DataPacketKind.KIND_RELIABLE,
58+
destination_sids: list[str] or list['RemoteParticipant'] = []):
59+
60+
room = self._room()
61+
if room is None:
62+
raise Exception('room is closed')
63+
64+
if isinstance(payload, str):
65+
payload = payload.encode('utf-8')
66+
67+
data_len = len(payload)
68+
69+
cdata = (ctypes.c_byte * data_len)(*payload)
70+
71+
sids = []
72+
for p in destination_sids:
73+
if isinstance(p, RemoteParticipant):
74+
sids.append(p.sid)
75+
else:
76+
sids.append(p)
77+
78+
req = proto_ffi.FfiRequest()
79+
req.publish_data.room_handle.id = room._ffi_handle.handle
80+
req.publish_data.data_ptr = ctypes.addressof(cdata)
81+
req.publish_data.data_size = data_len
82+
req.publish_data.kind = kind
83+
req.publish_data.destination_sids.extend(sids)
84+
85+
ffi_client = FfiClient()
86+
resp = ffi_client.request(req)
87+
future = asyncio.Future()
88+
89+
@ffi_client.on('publish_data')
90+
def on_publish_callback(cb: proto_room.PublishDataCallback):
91+
if cb.async_id == resp.publish_data.async_id:
92+
future.set_result(cb)
93+
ffi_client.remove_listener(
94+
'publish_data', on_publish_callback)
95+
96+
resp: proto_room.PublishDataCallback = await future
97+
if resp.error:
98+
raise PublishDataError(resp.error)
99+
48100
async def publish_track(self, track: Track, options: TrackPublishOptions) -> TrackPublication:
49101
if not isinstance(track, LocalAudioTrack) and not isinstance(track, LocalVideoTrack):
50102
raise Exception('cannot publish a remote track')

livekit/room.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .track import (RemoteAudioTrack, RemoteVideoTrack)
1010
from livekit import TrackKind
1111
import weakref
12+
import ctypes
1213

1314

1415
class ConnectError(Exception):
@@ -31,6 +32,21 @@ def __del__(self):
3132
ffi_client = FfiClient()
3233
ffi_client.remove_listener('room_event', self._on_room_event)
3334

35+
@property
36+
def sid(self) -> str:
37+
return self._room_info.sid
38+
39+
@property
40+
def name(self) -> str:
41+
return self._room_info.name
42+
43+
@property
44+
def metadata(self) -> str:
45+
return self._room_info.metadata
46+
47+
def isconnected(self) -> bool:
48+
return self._ffi_handle is not None
49+
3450
async def connect(self, url: str, token: str):
3551
# TODO(theomonnom): We should be more flexible about the event loop
3652
ffi_client = FfiClient()
@@ -65,6 +81,9 @@ def on_connect_callback(cb: proto_room.ConnectCallback):
6581
self._create_remote_participant(participant_info)
6682

6783
async def close(self):
84+
if not self.isconnected():
85+
return
86+
6887
ffi_client = FfiClient()
6988

7089
req = proto_ffi.FfiRequest()
@@ -134,6 +153,14 @@ def _on_room_event(self, event: proto_room.RoomEvent):
134153
track = publication._track
135154
publication._track = None
136155
self.emit('track_unsubscribed', track, publication, participant)
156+
elif which == 'data_received':
157+
participant = self.participants[event.data_received.participant_sid]
158+
data = ctypes.cast(event.data_received.data_ptr,
159+
ctypes.POINTER(ctypes.c_byte * event.data_received.data_size)).contents
160+
data = bytes(data)
161+
FfiHandle(event.data_received.handle.id)
162+
self.emit('data_received', data,
163+
event.data_received.kind, participant)
137164

138165
def _create_remote_participant(self, info: proto_participant.ParticipantInfo) -> RemoteParticipant:
139166
if info.sid in self.participants:
@@ -147,15 +174,3 @@ def _create_remote_participant(self, info: proto_participant.ParticipantInfo) ->
147174
participant.tracks[publication.sid] = publication
148175

149176
return participant
150-
151-
@property
152-
def sid(self) -> str:
153-
return self._room_info.sid
154-
155-
@property
156-
def name(self) -> str:
157-
return self._room_info.name
158-
159-
@property
160-
def metadata(self) -> str:
161-
return self._room_info.metadata

0 commit comments

Comments
 (0)