Skip to content

feat: datachannels #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions examples/basic_room/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ async def main():
audio_stream = None
video_stream = None

logging.info("connecting to %s", URL)
try:
await room.connect(URL, TOKEN)
logging.info("connected to room %s", room.name)
except livekit.ConnectError as e:
logging.error("failed to connect to the room: %s", e)
return False

@room.on("participant_connected")
def on_participant_connected(participant: livekit.RemoteParticipant):
logging.info(
Expand Down Expand Up @@ -65,8 +57,20 @@ def on_audio_frame(frame: livekit.AudioFrame):
def on_track_unsubscribed(track: livekit.Track, publication: livekit.RemoteTrackPublication, participant: livekit.RemoteParticipant):
logging.info("track unsubscribed: %s", publication.sid)

@room.on("data_received")
def on_data_received(data: bytes, kind: livekit.DataPacketKind, participant: livekit.Participant):
logging.info("received data from %s: %s", participant.identity, data)

try:
logging.info("connecting to %s", URL)
await room.connect(URL, TOKEN)
logging.info("connected to room %s", room.name)

await room.local_participant.publish_data("hello world")

await room.run()
except livekit.ConnectError as e:
logging.error("failed to connect to the room: %s", e)
except asyncio.CancelledError:
logging.info("closing the room")
await room.close()
Expand Down
2 changes: 1 addition & 1 deletion livekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ._proto.video_frame_pb2 import (
VideoRotation, VideoFormatType, VideoFrameBufferType)
from ._proto.track_pb2 import (TrackKind, TrackSource, StreamState)
from ._proto.room_pb2 import (TrackPublishOptions)
from ._proto.room_pb2 import (TrackPublishOptions, DataPacketKind)

from .room import (Room, ConnectError)
from .participant import (Participant, LocalParticipant, RemoteParticipant)
Expand Down
4 changes: 2 additions & 2 deletions livekit/lib/darwin/arm64/liblivekit_ffi.dylib
Git LFS file not shown
4 changes: 2 additions & 2 deletions livekit/lib/darwin/x86_64/liblivekit_ffi.dylib
Git LFS file not shown
4 changes: 2 additions & 2 deletions livekit/lib/linux/x86_64/liblivekit_ffi.so
Git LFS file not shown
3 changes: 3 additions & 0 deletions livekit/lib/windows/arm64/livekit_ffi.dll
Git LFS file not shown
4 changes: 2 additions & 2 deletions livekit/lib/windows/x86_64/livekit_ffi.dll
Git LFS file not shown
54 changes: 53 additions & 1 deletion livekit/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from ._ffi_client import (FfiClient, FfiHandle)
from typing import TYPE_CHECKING
import weakref
import ctypes
import asyncio
from livekit import TrackPublishOptions
from livekit import (TrackPublishOptions, DataPacketKind)

if TYPE_CHECKING:
from livekit import (Room, Participant)
Expand All @@ -18,6 +19,11 @@ def __init__(self, message: str):
self.message = message


class PublishDataError(Exception):
def __init__(self, message: str):
self.message = message


class Participant():
def __init__(self, info: proto_participant.ParticipantInfo):
self._info = info
Expand Down Expand Up @@ -45,6 +51,52 @@ def __init__(self, info: proto_participant.ParticipantInfo, room: weakref.ref['R
super().__init__(info)
self._room = room

async def publish_data(self,
# TODO(theomonnom): Allow ctypes.Array as payload?
payload: bytes or str,
kind: DataPacketKind = DataPacketKind.KIND_RELIABLE,
destination_sids: list[str] or list['RemoteParticipant'] = []):

room = self._room()
if room is None:
raise Exception('room is closed')

if isinstance(payload, str):
payload = payload.encode('utf-8')

data_len = len(payload)

cdata = (ctypes.c_byte * data_len)(*payload)

sids = []
for p in destination_sids:
if isinstance(p, RemoteParticipant):
sids.append(p.sid)
else:
sids.append(p)

req = proto_ffi.FfiRequest()
req.publish_data.room_handle.id = room._ffi_handle.handle
req.publish_data.data_ptr = ctypes.addressof(cdata)
req.publish_data.data_size = data_len
req.publish_data.kind = kind
req.publish_data.destination_sids.extend(sids)

ffi_client = FfiClient()
resp = ffi_client.request(req)
future = asyncio.Future()

@ffi_client.on('publish_data')
def on_publish_callback(cb: proto_room.PublishDataCallback):
if cb.async_id == resp.publish_data.async_id:
future.set_result(cb)
ffi_client.remove_listener(
'publish_data', on_publish_callback)

resp: proto_room.PublishDataCallback = await future
if resp.error:
raise PublishDataError(resp.error)

async def publish_track(self, track: Track, options: TrackPublishOptions) -> TrackPublication:
if not isinstance(track, LocalAudioTrack) and not isinstance(track, LocalVideoTrack):
raise Exception('cannot publish a remote track')
Expand Down
39 changes: 27 additions & 12 deletions livekit/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .track import (RemoteAudioTrack, RemoteVideoTrack)
from livekit import TrackKind
import weakref
import ctypes


class ConnectError(Exception):
Expand All @@ -31,6 +32,21 @@ def __del__(self):
ffi_client = FfiClient()
ffi_client.remove_listener('room_event', self._on_room_event)

@property
def sid(self) -> str:
return self._room_info.sid

@property
def name(self) -> str:
return self._room_info.name

@property
def metadata(self) -> str:
return self._room_info.metadata

def isconnected(self) -> bool:
return self._ffi_handle is not None

async def connect(self, url: str, token: str):
# TODO(theomonnom): We should be more flexible about the event loop
ffi_client = FfiClient()
Expand Down Expand Up @@ -65,6 +81,9 @@ def on_connect_callback(cb: proto_room.ConnectCallback):
self._create_remote_participant(participant_info)

async def close(self):
if not self.isconnected():
return

ffi_client = FfiClient()

req = proto_ffi.FfiRequest()
Expand Down Expand Up @@ -134,6 +153,14 @@ def _on_room_event(self, event: proto_room.RoomEvent):
track = publication._track
publication._track = None
self.emit('track_unsubscribed', track, publication, participant)
elif which == 'data_received':
participant = self.participants[event.data_received.participant_sid]
data = ctypes.cast(event.data_received.data_ptr,
ctypes.POINTER(ctypes.c_byte * event.data_received.data_size)).contents
data = bytes(data)
FfiHandle(event.data_received.handle.id)
self.emit('data_received', data,
event.data_received.kind, participant)

def _create_remote_participant(self, info: proto_participant.ParticipantInfo) -> RemoteParticipant:
if info.sid in self.participants:
Expand All @@ -147,15 +174,3 @@ def _create_remote_participant(self, info: proto_participant.ParticipantInfo) ->
participant.tracks[publication.sid] = publication

return participant

@property
def sid(self) -> str:
return self._room_info.sid

@property
def name(self) -> str:
return self._room_info.name

@property
def metadata(self) -> str:
return self._room_info.metadata