Skip to content

Data tracks support#586

Open
ladvoc wants to merge 32 commits intomainfrom
jacobgelman/bot-242-python-client-implementation
Open

Data tracks support#586
ladvoc wants to merge 32 commits intomainfrom
jacobgelman/bot-242-python-client-implementation

Conversation

@ladvoc
Copy link
Copy Markdown
Contributor

@ladvoc ladvoc commented Mar 5, 2026

No description provided.

@pblazej
Copy link
Copy Markdown

pblazej commented Mar 16, 2026

Some things from talking to claude (maybe you find it useful):

  • SubscribeDataTrackError / PublishDataTrackError not exported
  • existing streams use aclose() (async), while DataTrackSubscription uses close() (sync); all of them can probably leverage https://www.geeksforgeeks.org/python/aenter-in-python/
  • no remote_data_track_unpublished event - just mirroring rust comment here
  • try_push should probably be push in python
  • remote_data_track_published should be data_track_published as other tracks
  • DataTrackFrame(payload=...) may validate the input type (what happens if you pass a string etc.)

Comment on lines +270 to +274
async def __anext__(self) -> DataTrackFrame:
if self._closed:
raise StopAsyncIteration

event: proto_ffi.FfiEvent = await self._queue.get()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should the python implementation of data tracks be updated to have the synchronous track.subscribe() behavior which was implemented in javascript, with any subscription errors cascading down into the first iterator __anext__ call?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed on slack with @ladvoc, we decided this behavior makes sense to do here as well!

"""Subscribes to the data track to receive frames.

Args:
buffer_size: Maximum number of received frames to buffer internally.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: I realize this could get out of date quickly if the default starts changing a bunch, but it might be nice to actually include the default value here (I think it is 16?).

Or maybe another alternative to sidestep that duplication concern could be to link to the future docs page (probably will be https://docs.livekit.io/transport/data/data-tracks/#buffer-size) where this is mentioned.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I left that out here to avoid stale documentation in the future. Will make sure that is clear on the docs page.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a significant downside to including the docs page in the docstring?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean a link to the docs? I'm on board with that once it's published! Ideally, links would be included for other APIs as well.

track.info.name,
track.publisher_identity,
)
subscription = await track.subscribe()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have the same API as AudioTrack/VideoTrack for subscription?

data_track_subscribed event and set_subscribed

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We opted to diverge from media tracks in a few places to make the API more convenient to use for its target use cases. A few reasons for this in particular:

  • Auto-subscribe is not supported for data tracks.
  • Unlike media tracks, there are options when subscribing. For now the only option is buffer size, but v2 will introduce options for which there aren't necessarily reasonable defaults (e.g., requested FPS).
  • Data tracks support fan-out: a user can call track.subscribe() more than once on the same track to handle frames in multiple places in their application (e.g., displaying in a UI and writing to an MCAP file).



@dataclass
class DataTrackOptions:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use TypedDict

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in c1e9bfb.



@dataclass
class DataTrackFrame:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency should we use dataclass as well on other frames?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with the tradeoffs, but it seems like semantically DataTrackFrame is more of a simple value type compared to VideoFrame which has an initializer and methods? Happy to make the change here or in a follow-up PR if you think dataclass is a better fit though.

Comment on lines +274 to +297
async def __anext__(self) -> DataTrackFrame:
if self._closed:
raise StopAsyncIteration

self._send_read_request()
event: proto_ffi.FfiEvent = await self._queue.get()
sub_event = event.data_track_subscription_event
detail = sub_event.WhichOneof("detail")

if detail == "frame_received":
proto_frame = sub_event.frame_received.frame
user_ts: Optional[int] = None
if proto_frame.HasField("user_timestamp"):
user_ts = proto_frame.user_timestamp
return DataTrackFrame(
payload=proto_frame.payload,
user_timestamp=user_ts,
)
elif detail == "eos":
self._close()
raise StopAsyncIteration
else:
self._close()
raise StopAsyncIteration
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the user should be responsible for pulling here, can we have a main loop like VideoStream/AudioStream?

The risk is that it becomes easier for users to OOM their program

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the Rust side, each subscription uses a fixed-size receive buffer (configurable at subscribe time), and frames are dropped if the consumer can't keep up—so there's no risk of unbounded memory growth. The pull model here is intentional: the read request (L278) applies backpressure on the underlying Rust buffer, so frame events are only delivered as fast as they're processed in Python. This is different from VideoStream's approach, but the bounded buffer on the native side serves the same purpose as VideoStream's RingQueue. Please let me know if this addresses your concern.


async def publish_data_track(
self,
options: Union[str, DataTrackOptions],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Union is unnecessary complexity? OK if DataTrackOptions is a TypedDict?

Copy link
Copy Markdown
Contributor Author

@ladvoc ladvoc Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in cad3c25. At the call site, this now looks like:

track = await room.local_participant.publish_data_track({"name": "my_sensor_data"})

Do you think keyword args would be cleaner here?

self._close()
self._ffi_handle.dispose()

def __del__(self) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a destructor if we follow the main task pattern:

async def _run(self) -> None:

self._closed = True
FfiClient.instance.queue.unsubscribe(self._queue)

def close(self) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close becomes async

@ladvoc ladvoc marked this pull request as ready for review March 26, 2026 18:42
devin-ai-integration[bot]

This comment was marked as resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants