Fishjam Python Server SDK

Fishjam Python Server SDK

Python server SDK for the Fishjam.

Read the docs here

Installation

pip install fishjam-server-sdk

Usage

The SDK exports two main classes for interacting with Fishjam server: FishjamClient and FishjamNotifier.

FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.

FishjamClient

Create a FishjamClient instance, providing the fishjam server address and api token

from fishjam import FishjamClient

fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")

You can use it to interact with Fishjam to manage rooms and peers

# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)

# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])

# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)

# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'

All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.

FishjamNotifier

FishjamNotifier allows for receiving real-time updates from the Fishjam Server.

You can read more about notifications in the Fishjam Docs.

Create FishjamNotifier instance

from fishjam import FishjamNotifier

fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')

Then define a handler for incoming messages

@notifier.on_server_notification
def handle_notification(server_notification):
    print(f'Received a notification: {server_notification}')

After that you can start the notifier

async def test_notifier():
    notifier_task = asyncio.create_task(fishjam_notifier.connect())

    # Wait for notifier to be ready to receive messages
    await fishjam_notifier.wait_ready()

    # Create a room to trigger a server notification
    fishjam_client = FishjamClient()
    fishjam_client.create_room()

    await notifier_task

asyncio.run(test_notifier())

# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')

License

Licensed under the Apache License, Version 2.0

Fishjam is created by Software Mansion

Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.

Software Mansion

 1""".. include:: ../README.md"""
 2
 3# pylint: disable=locally-disabled, no-name-in-module, import-error
 4
 5# Exceptions and Server Messages
 6
 7# API
 8# pylint: disable=locally-disabled, no-name-in-module, import-error
 9
10# Exceptions and Server Messages
11from fishjam import agent, errors, events, integrations, peer, room, version
12from fishjam._openapi_client.models import PeerMetadata
13
14# API
15from fishjam._webhook_notifier import receive_binary
16from fishjam._ws_notifier import FishjamNotifier
17from fishjam.api._fishjam_client import (
18    AgentOptions,
19    AgentOutputOptions,
20    FishjamClient,
21    Peer,
22    PeerOptions,
23    PeerOptionsVapi,
24    Room,
25    RoomOptions,
26)
27
28__version__ = version.__version__
29
30__all__ = [
31    "FishjamClient",
32    "FishjamNotifier",
33    "receive_binary",
34    "PeerMetadata",
35    "PeerOptions",
36    "PeerOptionsVapi",
37    "RoomOptions",
38    "AgentOptions",
39    "AgentOutputOptions",
40    "Room",
41    "Peer",
42    "events",
43    "errors",
44    "room",
45    "peer",
46    "agent",
47    "integrations",
48]
49
50
51__docformat__ = "restructuredtext"
class FishjamClient(fishjam.api._client.Client):
143class FishjamClient(Client):
144    """Allows for managing rooms."""
145
146    def __init__(
147        self,
148        fishjam_id: str,
149        management_token: str,
150    ):
151        """Create a FishjamClient instance.
152
153        Args:
154            fishjam_id: The unique identifier for the Fishjam instance.
155            management_token: The token used for authenticating management operations.
156        """
157        super().__init__(fishjam_id=fishjam_id, management_token=management_token)
158
159    def create_peer(
160        self,
161        room_id: str,
162        options: PeerOptions | None = None,
163    ) -> tuple[Peer, str]:
164        """Creates a peer in the room.
165
166        Args:
167            room_id: The ID of the room where the peer will be created.
168            options: Configuration options for the peer. Defaults to None.
169
170        Returns:
171            A tuple containing:
172                - Peer: The created peer object.
173                - str: The peer token needed to authenticate to Fishjam.
174        """
175        options = options or PeerOptions()
176
177        peer_metadata = self.__parse_peer_metadata(options.metadata)
178        peer_options = PeerOptionsWebRTC(
179            metadata=peer_metadata,
180            subscribe_mode=SubscribeMode(options.subscribe_mode),
181        )
182        body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options)
183
184        resp = cast(
185            PeerDetailsResponse,
186            self._request(room_add_peer, room_id=room_id, body=body),
187        )
188
189        return (resp.data.peer, resp.data.token)
190
191    def create_agent(self, room_id: str, options: AgentOptions | None = None):
192        """Creates an agent in the room.
193
194        Args:
195            room_id: The ID of the room where the agent will be created.
196            options: Configuration options for the agent. Defaults to None.
197
198        Returns:
199            Agent: The created agent instance initialized with peer ID, room ID, token,
200                and Fishjam URL.
201        """
202        options = options or AgentOptions()
203        body = PeerConfig(
204            type_=PeerType.AGENT,
205            options=PeerOptionsAgent(
206                output=AgentOutput(
207                    audio_format=AudioFormat(options.output.audio_format),
208                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
209                ),
210                subscribe_mode=SubscribeMode(options.subscribe_mode),
211            ),
212        )
213
214        resp = cast(
215            PeerDetailsResponse,
216            self._request(room_add_peer, room_id=room_id, body=body),
217        )
218
219        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
220
221    def create_vapi_agent(
222        self,
223        room_id: str,
224        options: PeerOptionsVapi,
225    ) -> Peer:
226        """Creates a vapi agent in the room.
227
228        Args:
229            room_id: The ID of the room where the vapi agent will be created.
230            options: Configuration options for the vapi peer.
231
232        Returns:
233            - Peer: The created peer object.
234        """
235        body = PeerConfig(type_=PeerType.VAPI, options=options)
236
237        resp = cast(
238            PeerDetailsResponse,
239            self._request(room_add_peer, room_id=room_id, body=body),
240        )
241
242        return resp.data.peer
243
244    def create_room(self, options: RoomOptions | None = None) -> Room:
245        """Creates a new room.
246
247        Args:
248            options: Configuration options for the room. Defaults to None.
249
250        Returns:
251            Room: The created Room object.
252        """
253        options = options or RoomOptions()
254
255        if options.video_codec is None:
256            codec = UNSET
257        else:
258            codec = VideoCodec(options.video_codec)
259
260        config = RoomConfig(
261            max_peers=options.max_peers,
262            video_codec=codec,
263            webhook_url=options.webhook_url,
264            room_type=RoomType(options.room_type),
265            public=options.public,
266        )
267
268        room = cast(
269            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
270        ).data.room
271
272        return Room(config=room.config, id=room.id, peers=room.peers)
273
274    def get_all_rooms(self) -> list[Room]:
275        """Returns list of all rooms.
276
277        Returns:
278            list[Room]: A list of all available Room objects.
279        """
280        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
281
282        return [
283            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
284        ]
285
286    def get_room(self, room_id: str) -> Room:
287        """Returns room with the given id.
288
289        Args:
290            room_id: The ID of the room to retrieve.
291
292        Returns:
293            Room: The Room object corresponding to the given ID.
294        """
295        room = cast(
296            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
297        ).data
298
299        return Room(config=room.config, id=room.id, peers=room.peers)
300
301    def delete_peer(self, room_id: str, peer_id: str) -> None:
302        """Deletes a peer from a room.
303
304        Args:
305            room_id: The ID of the room the peer belongs to.
306            peer_id: The ID of the peer to delete.
307        """
308        return self._request(room_delete_peer, id=peer_id, room_id=room_id)
309
310    def delete_room(self, room_id: str) -> None:
311        """Deletes a room.
312
313        Args:
314            room_id: The ID of the room to delete.
315        """
316        return self._request(room_delete_room, room_id=room_id)
317
318    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
319        """Refreshes a peer token.
320
321        Args:
322            room_id: The ID of the room.
323            peer_id: The ID of the peer whose token needs refreshing.
324
325        Returns:
326            str: The new peer token.
327        """
328        response = cast(
329            PeerRefreshTokenResponse,
330            self._request(room_refresh_token, id=peer_id, room_id=room_id),
331        )
332
333        return response.data.token
334
335    def create_livestream_viewer_token(self, room_id: str) -> str:
336        """Generates a viewer token for livestream rooms.
337
338        Args:
339            room_id: The ID of the livestream room.
340
341        Returns:
342            str: The generated viewer token.
343        """
344        response = cast(
345            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
346        )
347
348        return response.token
349
350    def create_livestream_streamer_token(self, room_id: str) -> str:
351        """Generates a streamer token for livestream rooms.
352
353        Args:
354            room_id: The ID of the livestream room.
355
356        Returns:
357            str: The generated streamer token.
358        """
359        response = cast(
360            StreamerToken,
361            self._request(streamer_generate_streamer_token, room_id=room_id),
362        )
363
364        return response.token
365
366    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
367        """Subscribes a peer to all tracks of another peer.
368
369        Args:
370            room_id: The ID of the room.
371            peer_id: The ID of the subscribing peer.
372            target_peer_id: The ID of the peer to subscribe to.
373        """
374        self._request(
375            room_subscribe_peer,
376            room_id=room_id,
377            id=peer_id,
378            peer_id=target_peer_id,
379        )
380
381    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
382        """Subscribes a peer to specific tracks of another peer.
383
384        Args:
385            room_id: The ID of the room.
386            peer_id: The ID of the subscribing peer.
387            track_ids: A list of track IDs to subscribe to.
388        """
389        self._request(
390            room_subscribe_tracks,
391            room_id=room_id,
392            id=peer_id,
393            body=SubscribeTracksBody(track_ids=track_ids),
394        )
395
396    def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata:
397        peer_metadata = WebRTCMetadata()
398
399        if not metadata:
400            return peer_metadata
401
402        for key, value in metadata.items():
403            peer_metadata.additional_properties[key] = value
404
405        return peer_metadata

Allows for managing rooms.

FishjamClient(fishjam_id: str, management_token: str)
146    def __init__(
147        self,
148        fishjam_id: str,
149        management_token: str,
150    ):
151        """Create a FishjamClient instance.
152
153        Args:
154            fishjam_id: The unique identifier for the Fishjam instance.
155            management_token: The token used for authenticating management operations.
156        """
157        super().__init__(fishjam_id=fishjam_id, management_token=management_token)

Create a FishjamClient instance.

Args:

  • fishjam_id: The unique identifier for the Fishjam instance.
  • management_token: The token used for authenticating management operations.
def create_peer( self, room_id: str, options: PeerOptions | None = None) -> tuple[Peer, str]:
159    def create_peer(
160        self,
161        room_id: str,
162        options: PeerOptions | None = None,
163    ) -> tuple[Peer, str]:
164        """Creates a peer in the room.
165
166        Args:
167            room_id: The ID of the room where the peer will be created.
168            options: Configuration options for the peer. Defaults to None.
169
170        Returns:
171            A tuple containing:
172                - Peer: The created peer object.
173                - str: The peer token needed to authenticate to Fishjam.
174        """
175        options = options or PeerOptions()
176
177        peer_metadata = self.__parse_peer_metadata(options.metadata)
178        peer_options = PeerOptionsWebRTC(
179            metadata=peer_metadata,
180            subscribe_mode=SubscribeMode(options.subscribe_mode),
181        )
182        body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options)
183
184        resp = cast(
185            PeerDetailsResponse,
186            self._request(room_add_peer, room_id=room_id, body=body),
187        )
188
189        return (resp.data.peer, resp.data.token)

Creates a peer in the room.

Args:

  • room_id: The ID of the room where the peer will be created.
  • options: Configuration options for the peer. Defaults to None.

Returns:

  • A tuple containing:
    • Peer: The created peer object.
    • str: The peer token needed to authenticate to Fishjam.
def create_agent( self, room_id: str, options: AgentOptions | None = None):
191    def create_agent(self, room_id: str, options: AgentOptions | None = None):
192        """Creates an agent in the room.
193
194        Args:
195            room_id: The ID of the room where the agent will be created.
196            options: Configuration options for the agent. Defaults to None.
197
198        Returns:
199            Agent: The created agent instance initialized with peer ID, room ID, token,
200                and Fishjam URL.
201        """
202        options = options or AgentOptions()
203        body = PeerConfig(
204            type_=PeerType.AGENT,
205            options=PeerOptionsAgent(
206                output=AgentOutput(
207                    audio_format=AudioFormat(options.output.audio_format),
208                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
209                ),
210                subscribe_mode=SubscribeMode(options.subscribe_mode),
211            ),
212        )
213
214        resp = cast(
215            PeerDetailsResponse,
216            self._request(room_add_peer, room_id=room_id, body=body),
217        )
218
219        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)

Creates an agent in the room.

Args:

  • room_id: The ID of the room where the agent will be created.
  • options: Configuration options for the agent. Defaults to None.

Returns:

  • Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
def create_vapi_agent( self, room_id: str, options: PeerOptionsVapi) -> Peer:
221    def create_vapi_agent(
222        self,
223        room_id: str,
224        options: PeerOptionsVapi,
225    ) -> Peer:
226        """Creates a vapi agent in the room.
227
228        Args:
229            room_id: The ID of the room where the vapi agent will be created.
230            options: Configuration options for the vapi peer.
231
232        Returns:
233            - Peer: The created peer object.
234        """
235        body = PeerConfig(type_=PeerType.VAPI, options=options)
236
237        resp = cast(
238            PeerDetailsResponse,
239            self._request(room_add_peer, room_id=room_id, body=body),
240        )
241
242        return resp.data.peer

Creates a vapi agent in the room.

Args:

  • room_id: The ID of the room where the vapi agent will be created.
  • options: Configuration options for the vapi peer.

- - Peer: The created peer object.

def create_room( self, options: RoomOptions | None = None) -> Room:
244    def create_room(self, options: RoomOptions | None = None) -> Room:
245        """Creates a new room.
246
247        Args:
248            options: Configuration options for the room. Defaults to None.
249
250        Returns:
251            Room: The created Room object.
252        """
253        options = options or RoomOptions()
254
255        if options.video_codec is None:
256            codec = UNSET
257        else:
258            codec = VideoCodec(options.video_codec)
259
260        config = RoomConfig(
261            max_peers=options.max_peers,
262            video_codec=codec,
263            webhook_url=options.webhook_url,
264            room_type=RoomType(options.room_type),
265            public=options.public,
266        )
267
268        room = cast(
269            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
270        ).data.room
271
272        return Room(config=room.config, id=room.id, peers=room.peers)

Creates a new room.

Args:

  • options: Configuration options for the room. Defaults to None.

Returns:

  • Room: The created Room object.
def get_all_rooms(self) -> list[Room]:
274    def get_all_rooms(self) -> list[Room]:
275        """Returns list of all rooms.
276
277        Returns:
278            list[Room]: A list of all available Room objects.
279        """
280        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
281
282        return [
283            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
284        ]

Returns list of all rooms.

Returns:

  • list[Room]: A list of all available Room objects.
def get_room(self, room_id: str) -> Room:
286    def get_room(self, room_id: str) -> Room:
287        """Returns room with the given id.
288
289        Args:
290            room_id: The ID of the room to retrieve.
291
292        Returns:
293            Room: The Room object corresponding to the given ID.
294        """
295        room = cast(
296            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
297        ).data
298
299        return Room(config=room.config, id=room.id, peers=room.peers)

Returns room with the given id.

Args:

  • room_id: The ID of the room to retrieve.

Returns:

  • Room: The Room object corresponding to the given ID.
def delete_peer(self, room_id: str, peer_id: str) -> None:
301    def delete_peer(self, room_id: str, peer_id: str) -> None:
302        """Deletes a peer from a room.
303
304        Args:
305            room_id: The ID of the room the peer belongs to.
306            peer_id: The ID of the peer to delete.
307        """
308        return self._request(room_delete_peer, id=peer_id, room_id=room_id)

Deletes a peer from a room.

Args:

  • room_id: The ID of the room the peer belongs to.
  • peer_id: The ID of the peer to delete.
def delete_room(self, room_id: str) -> None:
310    def delete_room(self, room_id: str) -> None:
311        """Deletes a room.
312
313        Args:
314            room_id: The ID of the room to delete.
315        """
316        return self._request(room_delete_room, room_id=room_id)

Deletes a room.

Args:

  • room_id: The ID of the room to delete.
def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
318    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
319        """Refreshes a peer token.
320
321        Args:
322            room_id: The ID of the room.
323            peer_id: The ID of the peer whose token needs refreshing.
324
325        Returns:
326            str: The new peer token.
327        """
328        response = cast(
329            PeerRefreshTokenResponse,
330            self._request(room_refresh_token, id=peer_id, room_id=room_id),
331        )
332
333        return response.data.token

Refreshes a peer token.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the peer whose token needs refreshing.

Returns:

  • str: The new peer token.
def create_livestream_viewer_token(self, room_id: str) -> str:
335    def create_livestream_viewer_token(self, room_id: str) -> str:
336        """Generates a viewer token for livestream rooms.
337
338        Args:
339            room_id: The ID of the livestream room.
340
341        Returns:
342            str: The generated viewer token.
343        """
344        response = cast(
345            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
346        )
347
348        return response.token

Generates a viewer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated viewer token.
def create_livestream_streamer_token(self, room_id: str) -> str:
350    def create_livestream_streamer_token(self, room_id: str) -> str:
351        """Generates a streamer token for livestream rooms.
352
353        Args:
354            room_id: The ID of the livestream room.
355
356        Returns:
357            str: The generated streamer token.
358        """
359        response = cast(
360            StreamerToken,
361            self._request(streamer_generate_streamer_token, room_id=room_id),
362        )
363
364        return response.token

Generates a streamer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated streamer token.
def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
366    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
367        """Subscribes a peer to all tracks of another peer.
368
369        Args:
370            room_id: The ID of the room.
371            peer_id: The ID of the subscribing peer.
372            target_peer_id: The ID of the peer to subscribe to.
373        """
374        self._request(
375            room_subscribe_peer,
376            room_id=room_id,
377            id=peer_id,
378            peer_id=target_peer_id,
379        )

Subscribes a peer to all tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • target_peer_id: The ID of the peer to subscribe to.
def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
381    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
382        """Subscribes a peer to specific tracks of another peer.
383
384        Args:
385            room_id: The ID of the room.
386            peer_id: The ID of the subscribing peer.
387            track_ids: A list of track IDs to subscribe to.
388        """
389        self._request(
390            room_subscribe_tracks,
391            room_id=room_id,
392            id=peer_id,
393            body=SubscribeTracksBody(track_ids=track_ids),
394        )

Subscribes a peer to specific tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • track_ids: A list of track IDs to subscribe to.
Inherited Members
fishjam.api._client.Client
client
warnings_shown
class FishjamNotifier:
 33class FishjamNotifier:
 34    """Allows for receiving WebSocket messages from Fishjam."""
 35
 36    def __init__(
 37        self,
 38        fishjam_id: str,
 39        management_token: str,
 40    ):
 41        """Create a FishjamNotifier instance with an ID and management token."""
 42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
 43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
 44        self._management_token: str = management_token
 45        self._websocket: client.ClientConnection | None = None
 46        self._ready: bool = False
 47
 48        self._ready_event: asyncio.Event | None = None
 49
 50        self._notification_handler: NotificationHandler | None = None
 51
 52    def on_server_notification(self, handler: NotificationHandler):
 53        """Decorator for defining a handler for Fishjam notifications.
 54
 55        Args:
 56            handler: The function to be registered as the notification handler.
 57
 58        Returns:
 59            NotificationHandler: The original handler function (unmodified).
 60        """
 61        self._notification_handler = handler
 62        return handler
 63
 64    async def connect(self):
 65        """Connects to Fishjam and listens for all incoming messages.
 66
 67        It runs until the connection isn't closed.
 68
 69        The incoming messages are handled by the functions defined using the
 70        `on_server_notification` decorator.
 71
 72        The handler have to be defined before calling `connect`,
 73        otherwise the messages won't be received.
 74        """
 75        async with client.connect(self._fishjam_url) as websocket:
 76            try:
 77                self._websocket = websocket
 78                await self._authenticate()
 79
 80                if self._notification_handler:
 81                    await self._subscribe_event(
 82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
 83                    )
 84
 85                self._ready = True
 86                if self._ready_event:
 87                    self._ready_event.set()
 88
 89                await self._receive_loop()
 90            finally:
 91                self._websocket = None
 92
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()
105
106    async def _authenticate(self):
107        if not self._websocket:
108            raise RuntimeError("Websocket is not connected")
109
110        msg = ServerMessage(
111            auth_request=ServerMessageAuthRequest(token=self._management_token)
112        )
113        await self._websocket.send(bytes(msg))
114
115        try:
116            message = await self._websocket.recv(decode=False)
117        except ConnectionClosed as exception:
118            if "invalid token" in str(exception):
119                raise RuntimeError("Invalid management token") from exception
120            raise
121
122        message = ServerMessage().parse(message)
123
124        _type, message = betterproto.which_one_of(message, "content")
125        assert isinstance(message, ServerMessageAuthenticated)
126
127    async def _receive_loop(self):
128        if not self._websocket:
129            raise RuntimeError("Websocket is not connected")
130        if not self._notification_handler:
131            raise RuntimeError("Notification handler is not defined")
132
133        while True:
134            message = await self._websocket.recv(decode=False)
135            message = ServerMessage().parse(message)
136            _which, message = betterproto.which_one_of(message, "content")
137
138            if isinstance(message, ALLOWED_NOTIFICATIONS):
139                res = self._notification_handler(message)
140                if inspect.isawaitable(res):
141                    await res
142
143    async def _subscribe_event(self, event: ServerMessageEventType):
144        if not self._websocket:
145            raise RuntimeError("Websocket is not connected")
146
147        request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))
148
149        await self._websocket.send(bytes(request))
150        message = cast(bytes, await self._websocket.recv())
151        message = ServerMessage().parse(message)
152        _which, message = betterproto.which_one_of(message, "content")
153        assert isinstance(message, ServerMessageSubscribeResponse)

Allows for receiving WebSocket messages from Fishjam.

FishjamNotifier(fishjam_id: str, management_token: str)
36    def __init__(
37        self,
38        fishjam_id: str,
39        management_token: str,
40    ):
41        """Create a FishjamNotifier instance with an ID and management token."""
42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
44        self._management_token: str = management_token
45        self._websocket: client.ClientConnection | None = None
46        self._ready: bool = False
47
48        self._ready_event: asyncio.Event | None = None
49
50        self._notification_handler: NotificationHandler | None = None

Create a FishjamNotifier instance with an ID and management token.

def on_server_notification( self, handler: Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamConnected | fishjam.events.ServerMessageStreamDisconnected | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], NoneType] | Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamConnected | fishjam.events.ServerMessageStreamDisconnected | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], Coroutine[Any, Any, None]]):
52    def on_server_notification(self, handler: NotificationHandler):
53        """Decorator for defining a handler for Fishjam notifications.
54
55        Args:
56            handler: The function to be registered as the notification handler.
57
58        Returns:
59            NotificationHandler: The original handler function (unmodified).
60        """
61        self._notification_handler = handler
62        return handler

Decorator for defining a handler for Fishjam notifications.

Args:

  • handler: The function to be registered as the notification handler.

Returns:

  • NotificationHandler: The original handler function (unmodified).
async def connect(self):
64    async def connect(self):
65        """Connects to Fishjam and listens for all incoming messages.
66
67        It runs until the connection isn't closed.
68
69        The incoming messages are handled by the functions defined using the
70        `on_server_notification` decorator.
71
72        The handler have to be defined before calling `connect`,
73        otherwise the messages won't be received.
74        """
75        async with client.connect(self._fishjam_url) as websocket:
76            try:
77                self._websocket = websocket
78                await self._authenticate()
79
80                if self._notification_handler:
81                    await self._subscribe_event(
82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
83                    )
84
85                self._ready = True
86                if self._ready_event:
87                    self._ready_event.set()
88
89                await self._receive_loop()
90            finally:
91                self._websocket = None

Connects to Fishjam and listens for all incoming messages.

It runs until the connection isn't closed.

The incoming messages are handled by the functions defined using the on_server_notification decorator.

The handler have to be defined before calling connect, otherwise the messages won't be received.

async def wait_ready(self) -> None:
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()

Waits until the notifier is connected and authenticated to Fishjam.

If already connected, returns immediately.

15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
16    """Transforms a received protobuf notification into a notification instance.
17
18    The available notifications are listed in `fishjam.events` module.
19
20    Args:
21        binary: The raw binary data received from the webhook.
22
23    Returns:
24        AllowedNotification | None: The parsed notification object, or None if
25            the message type is not supported.
26    """
27    message = ServerMessage().parse(binary)
28    _which, message = betterproto.which_one_of(message, "content")
29
30    if isinstance(message, ALLOWED_NOTIFICATIONS):
31        return message
32
33    return None

Transforms a received protobuf notification into a notification instance.

The available notifications are listed in fishjam.events module.

Args:

  • binary: The raw binary data received from the webhook.

Returns:

  • AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
class PeerMetadata:
13@_attrs_define
14class PeerMetadata:
15    """Custom metadata set by the peer
16
17    Example:
18        {'name': 'FishjamUser'}
19
20    """
21
22    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
23
24    def to_dict(self) -> dict[str, Any]:
25        field_dict: dict[str, Any] = {}
26        field_dict.update(self.additional_properties)
27
28        return field_dict
29
30    @classmethod
31    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
32        d = dict(src_dict)
33        peer_metadata = cls()
34
35        peer_metadata.additional_properties = d
36        return peer_metadata
37
38    @property
39    def additional_keys(self) -> list[str]:
40        return list(self.additional_properties.keys())
41
42    def __getitem__(self, key: str) -> Any:
43        return self.additional_properties[key]
44
45    def __setitem__(self, key: str, value: Any) -> None:
46        self.additional_properties[key] = value
47
48    def __delitem__(self, key: str) -> None:
49        del self.additional_properties[key]
50
51    def __contains__(self, key: str) -> bool:
52        return key in self.additional_properties

Custom metadata set by the peer

Example:

  • {'name': 'FishjamUser'}
PeerMetadata()
23def __init__(self, ):
24    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class PeerMetadata.

additional_properties: 'dict[str, Any]'
def to_dict(self) -> 'dict[str, Any]':
24    def to_dict(self) -> dict[str, Any]:
25        field_dict: dict[str, Any] = {}
26        field_dict.update(self.additional_properties)
27
28        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
30    @classmethod
31    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
32        d = dict(src_dict)
33        peer_metadata = cls()
34
35        peer_metadata.additional_properties = d
36        return peer_metadata
additional_keys: 'list[str]'
38    @property
39    def additional_keys(self) -> list[str]:
40        return list(self.additional_properties.keys())
@dataclass
class PeerOptions:
101@dataclass
102class PeerOptions:
103    """Options specific to a WebRTC Peer.
104
105    Attributes:
106        metadata: Peer metadata.
107        subscribe_mode: Configuration of peer's subscribing policy.
108    """
109
110    metadata: dict[str, Any] | None = None
111    """Peer metadata"""
112    subscribe_mode: Literal["auto", "manual"] = "auto"
113    """Configuration of peer's subscribing policy"""

Options specific to a WebRTC Peer.

Attributes:

  • metadata: Peer metadata.
  • subscribe_mode: Configuration of peer's subscribing policy.
PeerOptions( metadata: dict[str, Any] | None = None, subscribe_mode: Literal['auto', 'manual'] = 'auto')
metadata: dict[str, Any] | None = None

Peer metadata

subscribe_mode: Literal['auto', 'manual'] = 'auto'

Configuration of peer's subscribing policy

class PeerOptionsVapi:
15@_attrs_define
16class PeerOptionsVapi:
17    """Options specific to the VAPI peer
18
19    Attributes:
20        api_key (str): VAPI API key
21        call_id (str): VAPI call ID
22        subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
23    """
24
25    api_key: str
26    call_id: str
27    subscribe_mode: SubscribeMode | Unset = UNSET
28
29    def to_dict(self) -> dict[str, Any]:
30        api_key = self.api_key
31
32        call_id = self.call_id
33
34        subscribe_mode: str | Unset = UNSET
35        if not isinstance(self.subscribe_mode, Unset):
36            subscribe_mode = self.subscribe_mode.value
37
38        field_dict: dict[str, Any] = {}
39
40        field_dict.update({
41            "apiKey": api_key,
42            "callId": call_id,
43        })
44        if subscribe_mode is not UNSET:
45            field_dict["subscribeMode"] = subscribe_mode
46
47        return field_dict
48
49    @classmethod
50    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
51        d = dict(src_dict)
52        api_key = d.pop("apiKey")
53
54        call_id = d.pop("callId")
55
56        _subscribe_mode = d.pop("subscribeMode", UNSET)
57        subscribe_mode: SubscribeMode | Unset
58        if isinstance(_subscribe_mode, Unset):
59            subscribe_mode = UNSET
60        else:
61            subscribe_mode = SubscribeMode(_subscribe_mode)
62
63        peer_options_vapi = cls(
64            api_key=api_key,
65            call_id=call_id,
66            subscribe_mode=subscribe_mode,
67        )
68
69        return peer_options_vapi

Options specific to the VAPI peer

Attributes:

  • api_key (str): VAPI API key
  • call_id (str): VAPI call ID
  • subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
PeerOptionsVapi( api_key: 'str', call_id: 'str', subscribe_mode: 'SubscribeMode | Unset' = <fishjam._openapi_client.types.Unset object>)
25def __init__(self, api_key, call_id, subscribe_mode=attr_dict['subscribe_mode'].default):
26    self.api_key = api_key
27    self.call_id = call_id
28    self.subscribe_mode = subscribe_mode

Method generated by attrs for class PeerOptionsVapi.

api_key: 'str'
call_id: 'str'
subscribe_mode: 'SubscribeMode | Unset'
def to_dict(self) -> 'dict[str, Any]':
29    def to_dict(self) -> dict[str, Any]:
30        api_key = self.api_key
31
32        call_id = self.call_id
33
34        subscribe_mode: str | Unset = UNSET
35        if not isinstance(self.subscribe_mode, Unset):
36            subscribe_mode = self.subscribe_mode.value
37
38        field_dict: dict[str, Any] = {}
39
40        field_dict.update({
41            "apiKey": api_key,
42            "callId": call_id,
43        })
44        if subscribe_mode is not UNSET:
45            field_dict["subscribeMode"] = subscribe_mode
46
47        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
49    @classmethod
50    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
51        d = dict(src_dict)
52        api_key = d.pop("apiKey")
53
54        call_id = d.pop("callId")
55
56        _subscribe_mode = d.pop("subscribeMode", UNSET)
57        subscribe_mode: SubscribeMode | Unset
58        if isinstance(_subscribe_mode, Unset):
59            subscribe_mode = UNSET
60        else:
61            subscribe_mode = SubscribeMode(_subscribe_mode)
62
63        peer_options_vapi = cls(
64            api_key=api_key,
65            call_id=call_id,
66            subscribe_mode=subscribe_mode,
67        )
68
69        return peer_options_vapi
@dataclass
class RoomOptions:
69@dataclass
70class RoomOptions:
71    """Description of a room options.
72
73    Attributes:
74        max_peers: Maximum amount of peers allowed into the room.
75        video_codec: Enforces video codec for each peer in the room.
76        webhook_url: URL where Fishjam notifications will be sent.
77        room_type: The use-case of the room. If not provided, this defaults
78            to conference.
79        public: True if livestream viewers can omit specifying a token.
80    """
81
82    max_peers: int | None = None
83    """Maximum amount of peers allowed into the room"""
84    video_codec: Literal["h264", "vp8"] | None = None
85    """Enforces video codec for each peer in the room"""
86    webhook_url: str | None = None
87    """URL where Fishjam notifications will be sent"""
88    room_type: Literal[
89        "conference",
90        "audio_only",
91        "livestream",
92        "full_feature",
93        "broadcaster",
94        "audio_only_livestream",
95    ] = "conference"
96    """The use-case of the room. If not provided, this defaults to conference."""
97    public: bool = False
98    """True if livestream viewers can omit specifying a token."""

Description of a room options.

Attributes:

  • max_peers: Maximum amount of peers allowed into the room.
  • video_codec: Enforces video codec for each peer in the room.
  • webhook_url: URL where Fishjam notifications will be sent.
  • room_type: The use-case of the room. If not provided, this defaults to conference.
  • public: True if livestream viewers can omit specifying a token.
RoomOptions( max_peers: int | None = None, video_codec: Literal['h264', 'vp8'] | None = None, webhook_url: str | None = None, room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference', public: bool = False)
max_peers: int | None = None

Maximum amount of peers allowed into the room

video_codec: Literal['h264', 'vp8'] | None = None

Enforces video codec for each peer in the room

webhook_url: str | None = None

URL where Fishjam notifications will be sent

room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference'

The use-case of the room. If not provided, this defaults to conference.

public: bool = False

True if livestream viewers can omit specifying a token.

@dataclass
class AgentOptions:
129@dataclass
130class AgentOptions:
131    """Options specific to an Agent Peer.
132
133    Attributes:
134        output: Configuration for the agent's output options.
135        subscribe_mode: Configuration of peer's subscribing policy.
136    """
137
138    output: AgentOutputOptions = field(default_factory=AgentOutputOptions)
139
140    subscribe_mode: Literal["auto", "manual"] = "auto"

Options specific to an Agent Peer.

Attributes:

  • output: Configuration for the agent's output options.
  • subscribe_mode: Configuration of peer's subscribing policy.
AgentOptions( output: AgentOutputOptions = <factory>, subscribe_mode: Literal['auto', 'manual'] = 'auto')
subscribe_mode: Literal['auto', 'manual'] = 'auto'
@dataclass
class AgentOutputOptions:
116@dataclass
117class AgentOutputOptions:
118    """Options of the desired format of audio tracks going from Fishjam to the agent.
119
120    Attributes:
121        audio_format: The format of the audio stream (e.g., 'pcm16').
122        audio_sample_rate: The sample rate of the audio stream.
123    """
124
125    audio_format: Literal["pcm16"] = "pcm16"
126    audio_sample_rate: Literal[16000, 24000] = 16000

Options of the desired format of audio tracks going from Fishjam to the agent.

Attributes:

  • audio_format: The format of the audio stream (e.g., 'pcm16').
  • audio_sample_rate: The sample rate of the audio stream.
AgentOutputOptions( audio_format: Literal['pcm16'] = 'pcm16', audio_sample_rate: Literal[16000, 24000] = 16000)
audio_format: Literal['pcm16'] = 'pcm16'
audio_sample_rate: Literal[16000, 24000] = 16000
@dataclass
class Room:
51@dataclass
52class Room:
53    """Description of the room state.
54
55    Attributes:
56        config: Room configuration.
57        id: Room ID.
58        peers: List of all peers.
59    """
60
61    config: RoomConfig
62    """Room configuration"""
63    id: str
64    """Room ID"""
65    peers: list[Peer]
66    """List of all peers"""

Description of the room state.

Attributes:

  • config: Room configuration.
  • id: Room ID.
  • peers: List of all peers.
Room( config: fishjam._openapi_client.models.room_config.RoomConfig, id: str, peers: list[Peer])
config: fishjam._openapi_client.models.room_config.RoomConfig

Room configuration

id: str

Room ID

peers: list[Peer]

List of all peers

class Peer:
 23@_attrs_define
 24class Peer:
 25    """Describes peer status
 26
 27    Attributes:
 28        id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
 29        metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
 30        status (PeerStatus): Informs about the peer status Example: disconnected.
 31        subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
 32        subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
 33        tracks (list[Track]): List of all peer's tracks
 34        type_ (PeerType): Peer type Example: webrtc.
 35    """
 36
 37    id: str
 38    metadata: None | PeerMetadata
 39    status: PeerStatus
 40    subscribe_mode: SubscribeMode
 41    subscriptions: Subscriptions
 42    tracks: list[Track]
 43    type_: PeerType
 44    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
 45
 46    def to_dict(self) -> dict[str, Any]:
 47        from ..models.peer_metadata import PeerMetadata
 48
 49        id = self.id
 50
 51        metadata: dict[str, Any] | None
 52        if isinstance(self.metadata, PeerMetadata):
 53            metadata = self.metadata.to_dict()
 54        else:
 55            metadata = self.metadata
 56
 57        status = self.status.value
 58
 59        subscribe_mode = self.subscribe_mode.value
 60
 61        subscriptions = self.subscriptions.to_dict()
 62
 63        tracks = []
 64        for tracks_item_data in self.tracks:
 65            tracks_item = tracks_item_data.to_dict()
 66            tracks.append(tracks_item)
 67
 68        type_ = self.type_.value
 69
 70        field_dict: dict[str, Any] = {}
 71        field_dict.update(self.additional_properties)
 72        field_dict.update({
 73            "id": id,
 74            "metadata": metadata,
 75            "status": status,
 76            "subscribeMode": subscribe_mode,
 77            "subscriptions": subscriptions,
 78            "tracks": tracks,
 79            "type": type_,
 80        })
 81
 82        return field_dict
 83
 84    @classmethod
 85    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 86        from ..models.peer_metadata import PeerMetadata
 87        from ..models.subscriptions import Subscriptions
 88        from ..models.track import Track
 89
 90        d = dict(src_dict)
 91        id = d.pop("id")
 92
 93        def _parse_metadata(data: object) -> None | PeerMetadata:
 94            if data is None:
 95                return data
 96            try:
 97                if not isinstance(data, dict):
 98                    raise TypeError()
 99                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
100
101                return componentsschemas_peer_metadata_type_0
102            except (TypeError, ValueError, AttributeError, KeyError):
103                pass
104            return cast(None | PeerMetadata, data)
105
106        metadata = _parse_metadata(d.pop("metadata"))
107
108        status = PeerStatus(d.pop("status"))
109
110        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
111
112        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
113
114        tracks = []
115        _tracks = d.pop("tracks")
116        for tracks_item_data in _tracks:
117            tracks_item = Track.from_dict(tracks_item_data)
118
119            tracks.append(tracks_item)
120
121        type_ = PeerType(d.pop("type"))
122
123        peer = cls(
124            id=id,
125            metadata=metadata,
126            status=status,
127            subscribe_mode=subscribe_mode,
128            subscriptions=subscriptions,
129            tracks=tracks,
130            type_=type_,
131        )
132
133        peer.additional_properties = d
134        return peer
135
136    @property
137    def additional_keys(self) -> list[str]:
138        return list(self.additional_properties.keys())
139
140    def __getitem__(self, key: str) -> Any:
141        return self.additional_properties[key]
142
143    def __setitem__(self, key: str, value: Any) -> None:
144        self.additional_properties[key] = value
145
146    def __delitem__(self, key: str) -> None:
147        del self.additional_properties[key]
148
149    def __contains__(self, key: str) -> bool:
150        return key in self.additional_properties

Describes peer status

Attributes:

  • id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
  • metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
  • status (PeerStatus): Informs about the peer status Example: disconnected.
  • subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
  • subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
  • tracks (list[Track]): List of all peer's tracks
  • type_ (PeerType): Peer type Example: webrtc.
Peer( id: 'str', metadata: 'None | PeerMetadata', status: 'PeerStatus', subscribe_mode: 'SubscribeMode', subscriptions: 'Subscriptions', tracks: 'list[Track]', type_: 'PeerType')
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_):
31    self.id = id
32    self.metadata = metadata
33    self.status = status
34    self.subscribe_mode = subscribe_mode
35    self.subscriptions = subscriptions
36    self.tracks = tracks
37    self.type_ = type_
38    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class Peer.

id: 'str'
metadata: 'None | PeerMetadata'
status: 'PeerStatus'
subscribe_mode: 'SubscribeMode'
subscriptions: 'Subscriptions'
tracks: 'list[Track]'
type_: 'PeerType'
additional_properties: 'dict[str, Any]'
def to_dict(self) -> 'dict[str, Any]':
46    def to_dict(self) -> dict[str, Any]:
47        from ..models.peer_metadata import PeerMetadata
48
49        id = self.id
50
51        metadata: dict[str, Any] | None
52        if isinstance(self.metadata, PeerMetadata):
53            metadata = self.metadata.to_dict()
54        else:
55            metadata = self.metadata
56
57        status = self.status.value
58
59        subscribe_mode = self.subscribe_mode.value
60
61        subscriptions = self.subscriptions.to_dict()
62
63        tracks = []
64        for tracks_item_data in self.tracks:
65            tracks_item = tracks_item_data.to_dict()
66            tracks.append(tracks_item)
67
68        type_ = self.type_.value
69
70        field_dict: dict[str, Any] = {}
71        field_dict.update(self.additional_properties)
72        field_dict.update({
73            "id": id,
74            "metadata": metadata,
75            "status": status,
76            "subscribeMode": subscribe_mode,
77            "subscriptions": subscriptions,
78            "tracks": tracks,
79            "type": type_,
80        })
81
82        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
 84    @classmethod
 85    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 86        from ..models.peer_metadata import PeerMetadata
 87        from ..models.subscriptions import Subscriptions
 88        from ..models.track import Track
 89
 90        d = dict(src_dict)
 91        id = d.pop("id")
 92
 93        def _parse_metadata(data: object) -> None | PeerMetadata:
 94            if data is None:
 95                return data
 96            try:
 97                if not isinstance(data, dict):
 98                    raise TypeError()
 99                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
100
101                return componentsschemas_peer_metadata_type_0
102            except (TypeError, ValueError, AttributeError, KeyError):
103                pass
104            return cast(None | PeerMetadata, data)
105
106        metadata = _parse_metadata(d.pop("metadata"))
107
108        status = PeerStatus(d.pop("status"))
109
110        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
111
112        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
113
114        tracks = []
115        _tracks = d.pop("tracks")
116        for tracks_item_data in _tracks:
117            tracks_item = Track.from_dict(tracks_item_data)
118
119            tracks.append(tracks_item)
120
121        type_ = PeerType(d.pop("type"))
122
123        peer = cls(
124            id=id,
125            metadata=metadata,
126            status=status,
127            subscribe_mode=subscribe_mode,
128            subscriptions=subscriptions,
129            tracks=tracks,
130            type_=type_,
131        )
132
133        peer.additional_properties = d
134        return peer
additional_keys: 'list[str]'
136    @property
137    def additional_keys(self) -> list[str]:
138        return list(self.additional_properties.keys())