Fishjam Python Server SDK

Fishjam Python Server SDK

Python server SDK for the Fishjam.

Read the docs here

Installation

pip install fishjam-server-sdk

Usage

The SDK exports two main classes for interacting with Fishjam server: FishjamClient and FishjamNotifier.

FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.

FishjamClient

Create a FishjamClient instance, providing the fishjam server address and api token

from fishjam import FishjamClient

fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")

You can use it to interact with Fishjam to manage rooms and peers

# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)

# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])

# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)

# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'

All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.

FishjamNotifier

FishjamNotifier allows for receiving real-time updates from the Fishjam Server.

You can read more about notifications in the Fishjam Docs.

Create FishjamNotifier instance

from fishjam import FishjamNotifier

fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')

Then define a handler for incoming messages

@notifier.on_server_notification
def handle_notification(server_notification):
    print(f'Received a notification: {server_notification}')

After that you can start the notifier

async def test_notifier():
    notifier_task = asyncio.create_task(fishjam_notifier.connect())

    # Wait for notifier to be ready to receive messages
    await fishjam_notifier.wait_ready()

    # Create a room to trigger a server notification
    fishjam_client = FishjamClient()
    fishjam_client.create_room()

    await notifier_task

asyncio.run(test_notifier())

# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')

License

Licensed under the Apache License, Version 2.0

Fishjam is created by Software Mansion

Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.

Software Mansion

 1""".. include:: ../README.md"""
 2
 3# pylint: disable=locally-disabled, no-name-in-module, import-error
 4
 5# Exceptions and Server Messages
 6
 7# API
 8# pylint: disable=locally-disabled, no-name-in-module, import-error
 9
10# Exceptions and Server Messages
11from fishjam import agent, errors, events, integrations, peer, room, version
12from fishjam._openapi_client.models import PeerMetadata
13
14# API
15from fishjam._webhook_notifier import receive_binary
16from fishjam._ws_notifier import FishjamNotifier
17from fishjam.api._fishjam_client import (
18    AgentOptions,
19    AgentOutputOptions,
20    FishjamClient,
21    Peer,
22    PeerOptions,
23    PeerOptionsVapi,
24    Room,
25    RoomOptions,
26)
27
28__version__ = version.__version__
29
30__all__ = [
31    "FishjamClient",
32    "FishjamNotifier",
33    "receive_binary",
34    "PeerMetadata",
35    "PeerOptions",
36    "PeerOptionsVapi",
37    "RoomOptions",
38    "AgentOptions",
39    "AgentOutputOptions",
40    "Room",
41    "Peer",
42    "events",
43    "errors",
44    "room",
45    "peer",
46    "agent",
47    "integrations",
48]
49
50
51__docformat__ = "restructuredtext"
class FishjamClient(fishjam.api._client.Client):
148class FishjamClient(Client):
149    """Allows for managing rooms."""
150
151    def __init__(
152        self,
153        fishjam_id: str,
154        management_token: str,
155    ):
156        """Create a FishjamClient instance.
157
158        Args:
159            fishjam_id: The unique identifier for the Fishjam instance.
160            management_token: The token used for authenticating management operations.
161        """
162        super().__init__(fishjam_id=fishjam_id, management_token=management_token)
163
164    def create_peer(
165        self,
166        room_id: str,
167        options: PeerOptions | None = None,
168    ) -> tuple[Peer, str]:
169        """Creates a peer in the room.
170
171        Args:
172            room_id: The ID of the room where the peer will be created.
173            options: Configuration options for the peer. Defaults to None.
174
175        Returns:
176            A tuple containing:
177                - Peer: The created peer object.
178                - str: The peer token needed to authenticate to Fishjam.
179        """
180        options = options or PeerOptions()
181
182        peer_metadata = self.__parse_peer_metadata(options.metadata)
183        peer_options = PeerOptionsWebRTC(
184            metadata=peer_metadata,
185            subscribe_mode=SubscribeMode(options.subscribe_mode),
186        )
187        body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options)
188
189        resp = cast(
190            PeerDetailsResponse,
191            self._request(room_add_peer, room_id=room_id, body=body),
192        )
193
194        return (resp.data.peer, resp.data.token)
195
196    def create_agent(self, room_id: str, options: AgentOptions | None = None):
197        """Creates an agent in the room.
198
199        Args:
200            room_id: The ID of the room where the agent will be created.
201            options: Configuration options for the agent. Defaults to None.
202
203        Returns:
204            Agent: The created agent instance initialized with peer ID, room ID, token,
205                and Fishjam URL.
206        """
207        options = options or AgentOptions()
208        body = PeerConfig(
209            type_=PeerType.AGENT,
210            options=PeerOptionsAgent(
211                output=AgentOutput(
212                    audio_format=AudioFormat(options.output.audio_format),
213                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
214                ),
215                subscribe_mode=SubscribeMode(options.subscribe_mode),
216            ),
217        )
218
219        resp = cast(
220            PeerDetailsResponse,
221            self._request(room_add_peer, room_id=room_id, body=body),
222        )
223
224        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
225
226    def create_vapi_agent(
227        self,
228        room_id: str,
229        options: PeerOptionsVapi,
230    ) -> Peer:
231        """Creates a vapi agent in the room.
232
233        Args:
234            room_id: The ID of the room where the vapi agent will be created.
235            options: Configuration options for the vapi peer.
236
237        Returns:
238            - Peer: The created peer object.
239        """
240        body = PeerConfig(type_=PeerType.VAPI, options=options)
241
242        resp = cast(
243            PeerDetailsResponse,
244            self._request(room_add_peer, room_id=room_id, body=body),
245        )
246
247        return resp.data.peer
248
249    def create_room(self, options: RoomOptions | None = None) -> Room:
250        """Creates a new room.
251
252        Args:
253            options: Configuration options for the room. Defaults to None.
254
255        Returns:
256            Room: The created Room object.
257        """
258        options = options or RoomOptions()
259
260        if options.video_codec is None:
261            codec = UNSET
262        else:
263            codec = VideoCodec(options.video_codec)
264
265        config = RoomConfig(
266            max_peers=options.max_peers,
267            video_codec=codec,
268            webhook_url=options.webhook_url,
269            room_type=RoomType(options.room_type),
270            public=options.public,
271        )
272
273        room = cast(
274            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
275        ).data.room
276
277        return Room(config=room.config, id=room.id, peers=room.peers)
278
279    def get_all_rooms(self) -> list[Room]:
280        """Returns list of all rooms.
281
282        Returns:
283            list[Room]: A list of all available Room objects.
284        """
285        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
286
287        return [
288            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
289        ]
290
291    def get_room(self, room_id: str) -> Room:
292        """Returns room with the given id.
293
294        Args:
295            room_id: The ID of the room to retrieve.
296
297        Returns:
298            Room: The Room object corresponding to the given ID.
299        """
300        room = cast(
301            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
302        ).data
303
304        return Room(config=room.config, id=room.id, peers=room.peers)
305
306    def delete_peer(self, room_id: str, peer_id: str) -> None:
307        """Deletes a peer from a room.
308
309        Args:
310            room_id: The ID of the room the peer belongs to.
311            peer_id: The ID of the peer to delete.
312        """
313        return self._request(room_delete_peer, id=peer_id, room_id=room_id)
314
315    def delete_room(self, room_id: str) -> None:
316        """Deletes a room.
317
318        Args:
319            room_id: The ID of the room to delete.
320        """
321        return self._request(room_delete_room, room_id=room_id)
322
323    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
324        """Refreshes a peer token.
325
326        Args:
327            room_id: The ID of the room.
328            peer_id: The ID of the peer whose token needs refreshing.
329
330        Returns:
331            str: The new peer token.
332        """
333        response = cast(
334            PeerRefreshTokenResponse,
335            self._request(room_refresh_token, id=peer_id, room_id=room_id),
336        )
337
338        return response.data.token
339
340    def create_livestream_viewer_token(self, room_id: str) -> str:
341        """Generates a viewer token for livestream rooms.
342
343        Args:
344            room_id: The ID of the livestream room.
345
346        Returns:
347            str: The generated viewer token.
348        """
349        response = cast(
350            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
351        )
352
353        return response.token
354
355    def create_livestream_streamer_token(self, room_id: str) -> str:
356        """Generates a streamer token for livestream rooms.
357
358        Args:
359            room_id: The ID of the livestream room.
360
361        Returns:
362            str: The generated streamer token.
363        """
364        response = cast(
365            StreamerToken,
366            self._request(streamer_generate_streamer_token, room_id=room_id),
367        )
368
369        return response.token
370
371    def create_moq_token(
372        self,
373        publish_path: str | None = None,
374        subscribe_path: str | None = None,
375    ) -> str:
376        """Generates a MoQ token.
377
378        Args:
379            publish_path: Path the token grants publish access to.
380            subscribe_path: Path the token grants subscribe access to.
381
382        Returns:
383            str: The generated token.
384        """
385        config = MoqTokenConfig(
386            publish_path=publish_path, subscribe_path=subscribe_path
387        )
388        response = cast(
389            MoqToken,
390            self._request(moq_create_token, body=config),
391        )
392
393        return response.token
394
395    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
396        """Subscribes a peer to all tracks of another peer.
397
398        Args:
399            room_id: The ID of the room.
400            peer_id: The ID of the subscribing peer.
401            target_peer_id: The ID of the peer to subscribe to.
402        """
403        self._request(
404            room_subscribe_peer,
405            room_id=room_id,
406            id=peer_id,
407            peer_id=target_peer_id,
408        )
409
410    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
411        """Subscribes a peer to specific tracks of another peer.
412
413        Args:
414            room_id: The ID of the room.
415            peer_id: The ID of the subscribing peer.
416            track_ids: A list of track IDs to subscribe to.
417        """
418        self._request(
419            room_subscribe_tracks,
420            room_id=room_id,
421            id=peer_id,
422            body=SubscribeTracksBody(track_ids=track_ids),
423        )
424
425    def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata:
426        peer_metadata = WebRTCMetadata()
427
428        if not metadata:
429            return peer_metadata
430
431        for key, value in metadata.items():
432            peer_metadata.additional_properties[key] = value
433
434        return peer_metadata

Allows for managing rooms.

FishjamClient(fishjam_id: str, management_token: str)
151    def __init__(
152        self,
153        fishjam_id: str,
154        management_token: str,
155    ):
156        """Create a FishjamClient instance.
157
158        Args:
159            fishjam_id: The unique identifier for the Fishjam instance.
160            management_token: The token used for authenticating management operations.
161        """
162        super().__init__(fishjam_id=fishjam_id, management_token=management_token)

Create a FishjamClient instance.

Args:

  • fishjam_id: The unique identifier for the Fishjam instance.
  • management_token: The token used for authenticating management operations.
def create_peer( self, room_id: str, options: PeerOptions | None = None) -> tuple[Peer, str]:
164    def create_peer(
165        self,
166        room_id: str,
167        options: PeerOptions | None = None,
168    ) -> tuple[Peer, str]:
169        """Creates a peer in the room.
170
171        Args:
172            room_id: The ID of the room where the peer will be created.
173            options: Configuration options for the peer. Defaults to None.
174
175        Returns:
176            A tuple containing:
177                - Peer: The created peer object.
178                - str: The peer token needed to authenticate to Fishjam.
179        """
180        options = options or PeerOptions()
181
182        peer_metadata = self.__parse_peer_metadata(options.metadata)
183        peer_options = PeerOptionsWebRTC(
184            metadata=peer_metadata,
185            subscribe_mode=SubscribeMode(options.subscribe_mode),
186        )
187        body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options)
188
189        resp = cast(
190            PeerDetailsResponse,
191            self._request(room_add_peer, room_id=room_id, body=body),
192        )
193
194        return (resp.data.peer, resp.data.token)

Creates a peer in the room.

Args:

  • room_id: The ID of the room where the peer will be created.
  • options: Configuration options for the peer. Defaults to None.

Returns:

  • A tuple containing:
    • Peer: The created peer object.
    • str: The peer token needed to authenticate to Fishjam.
def create_agent( self, room_id: str, options: AgentOptions | None = None):
196    def create_agent(self, room_id: str, options: AgentOptions | None = None):
197        """Creates an agent in the room.
198
199        Args:
200            room_id: The ID of the room where the agent will be created.
201            options: Configuration options for the agent. Defaults to None.
202
203        Returns:
204            Agent: The created agent instance initialized with peer ID, room ID, token,
205                and Fishjam URL.
206        """
207        options = options or AgentOptions()
208        body = PeerConfig(
209            type_=PeerType.AGENT,
210            options=PeerOptionsAgent(
211                output=AgentOutput(
212                    audio_format=AudioFormat(options.output.audio_format),
213                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
214                ),
215                subscribe_mode=SubscribeMode(options.subscribe_mode),
216            ),
217        )
218
219        resp = cast(
220            PeerDetailsResponse,
221            self._request(room_add_peer, room_id=room_id, body=body),
222        )
223
224        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)

Creates an agent in the room.

Args:

  • room_id: The ID of the room where the agent will be created.
  • options: Configuration options for the agent. Defaults to None.

Returns:

  • Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
def create_vapi_agent( self, room_id: str, options: PeerOptionsVapi) -> Peer:
226    def create_vapi_agent(
227        self,
228        room_id: str,
229        options: PeerOptionsVapi,
230    ) -> Peer:
231        """Creates a vapi agent in the room.
232
233        Args:
234            room_id: The ID of the room where the vapi agent will be created.
235            options: Configuration options for the vapi peer.
236
237        Returns:
238            - Peer: The created peer object.
239        """
240        body = PeerConfig(type_=PeerType.VAPI, options=options)
241
242        resp = cast(
243            PeerDetailsResponse,
244            self._request(room_add_peer, room_id=room_id, body=body),
245        )
246
247        return resp.data.peer

Creates a vapi agent in the room.

Args:

  • room_id: The ID of the room where the vapi agent will be created.
  • options: Configuration options for the vapi peer.

- - Peer: The created peer object.

def create_room( self, options: RoomOptions | None = None) -> Room:
249    def create_room(self, options: RoomOptions | None = None) -> Room:
250        """Creates a new room.
251
252        Args:
253            options: Configuration options for the room. Defaults to None.
254
255        Returns:
256            Room: The created Room object.
257        """
258        options = options or RoomOptions()
259
260        if options.video_codec is None:
261            codec = UNSET
262        else:
263            codec = VideoCodec(options.video_codec)
264
265        config = RoomConfig(
266            max_peers=options.max_peers,
267            video_codec=codec,
268            webhook_url=options.webhook_url,
269            room_type=RoomType(options.room_type),
270            public=options.public,
271        )
272
273        room = cast(
274            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
275        ).data.room
276
277        return Room(config=room.config, id=room.id, peers=room.peers)

Creates a new room.

Args:

  • options: Configuration options for the room. Defaults to None.

Returns:

  • Room: The created Room object.
def get_all_rooms(self) -> list[Room]:
279    def get_all_rooms(self) -> list[Room]:
280        """Returns list of all rooms.
281
282        Returns:
283            list[Room]: A list of all available Room objects.
284        """
285        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
286
287        return [
288            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
289        ]

Returns list of all rooms.

Returns:

  • list[Room]: A list of all available Room objects.
def get_room(self, room_id: str) -> Room:
291    def get_room(self, room_id: str) -> Room:
292        """Returns room with the given id.
293
294        Args:
295            room_id: The ID of the room to retrieve.
296
297        Returns:
298            Room: The Room object corresponding to the given ID.
299        """
300        room = cast(
301            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
302        ).data
303
304        return Room(config=room.config, id=room.id, peers=room.peers)

Returns room with the given id.

Args:

  • room_id: The ID of the room to retrieve.

Returns:

  • Room: The Room object corresponding to the given ID.
def delete_peer(self, room_id: str, peer_id: str) -> None:
306    def delete_peer(self, room_id: str, peer_id: str) -> None:
307        """Deletes a peer from a room.
308
309        Args:
310            room_id: The ID of the room the peer belongs to.
311            peer_id: The ID of the peer to delete.
312        """
313        return self._request(room_delete_peer, id=peer_id, room_id=room_id)

Deletes a peer from a room.

Args:

  • room_id: The ID of the room the peer belongs to.
  • peer_id: The ID of the peer to delete.
def delete_room(self, room_id: str) -> None:
315    def delete_room(self, room_id: str) -> None:
316        """Deletes a room.
317
318        Args:
319            room_id: The ID of the room to delete.
320        """
321        return self._request(room_delete_room, room_id=room_id)

Deletes a room.

Args:

  • room_id: The ID of the room to delete.
def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
323    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
324        """Refreshes a peer token.
325
326        Args:
327            room_id: The ID of the room.
328            peer_id: The ID of the peer whose token needs refreshing.
329
330        Returns:
331            str: The new peer token.
332        """
333        response = cast(
334            PeerRefreshTokenResponse,
335            self._request(room_refresh_token, id=peer_id, room_id=room_id),
336        )
337
338        return response.data.token

Refreshes a peer token.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the peer whose token needs refreshing.

Returns:

  • str: The new peer token.
def create_livestream_viewer_token(self, room_id: str) -> str:
340    def create_livestream_viewer_token(self, room_id: str) -> str:
341        """Generates a viewer token for livestream rooms.
342
343        Args:
344            room_id: The ID of the livestream room.
345
346        Returns:
347            str: The generated viewer token.
348        """
349        response = cast(
350            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
351        )
352
353        return response.token

Generates a viewer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated viewer token.
def create_livestream_streamer_token(self, room_id: str) -> str:
355    def create_livestream_streamer_token(self, room_id: str) -> str:
356        """Generates a streamer token for livestream rooms.
357
358        Args:
359            room_id: The ID of the livestream room.
360
361        Returns:
362            str: The generated streamer token.
363        """
364        response = cast(
365            StreamerToken,
366            self._request(streamer_generate_streamer_token, room_id=room_id),
367        )
368
369        return response.token

Generates a streamer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated streamer token.
def create_moq_token( self, publish_path: str | None = None, subscribe_path: str | None = None) -> str:
371    def create_moq_token(
372        self,
373        publish_path: str | None = None,
374        subscribe_path: str | None = None,
375    ) -> str:
376        """Generates a MoQ token.
377
378        Args:
379            publish_path: Path the token grants publish access to.
380            subscribe_path: Path the token grants subscribe access to.
381
382        Returns:
383            str: The generated token.
384        """
385        config = MoqTokenConfig(
386            publish_path=publish_path, subscribe_path=subscribe_path
387        )
388        response = cast(
389            MoqToken,
390            self._request(moq_create_token, body=config),
391        )
392
393        return response.token

Generates a MoQ token.

Args:

  • publish_path: Path the token grants publish access to.
  • subscribe_path: Path the token grants subscribe access to.

Returns:

  • str: The generated token.
def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
395    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
396        """Subscribes a peer to all tracks of another peer.
397
398        Args:
399            room_id: The ID of the room.
400            peer_id: The ID of the subscribing peer.
401            target_peer_id: The ID of the peer to subscribe to.
402        """
403        self._request(
404            room_subscribe_peer,
405            room_id=room_id,
406            id=peer_id,
407            peer_id=target_peer_id,
408        )

Subscribes a peer to all tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • target_peer_id: The ID of the peer to subscribe to.
def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
410    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
411        """Subscribes a peer to specific tracks of another peer.
412
413        Args:
414            room_id: The ID of the room.
415            peer_id: The ID of the subscribing peer.
416            track_ids: A list of track IDs to subscribe to.
417        """
418        self._request(
419            room_subscribe_tracks,
420            room_id=room_id,
421            id=peer_id,
422            body=SubscribeTracksBody(track_ids=track_ids),
423        )

Subscribes a peer to specific tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • track_ids: A list of track IDs to subscribe to.
Inherited Members
fishjam.api._client.Client
client
warnings_shown
class FishjamNotifier:
 33class FishjamNotifier:
 34    """Allows for receiving WebSocket messages from Fishjam."""
 35
 36    def __init__(
 37        self,
 38        fishjam_id: str,
 39        management_token: str,
 40    ):
 41        """Create a FishjamNotifier instance with an ID and management token."""
 42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
 43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
 44        self._management_token: str = management_token
 45        self._websocket: client.ClientConnection | None = None
 46        self._ready: bool = False
 47
 48        self._ready_event: asyncio.Event | None = None
 49
 50        self._notification_handler: NotificationHandler | None = None
 51
 52    def on_server_notification(self, handler: NotificationHandler):
 53        """Decorator for defining a handler for Fishjam notifications.
 54
 55        Args:
 56            handler: The function to be registered as the notification handler.
 57
 58        Returns:
 59            NotificationHandler: The original handler function (unmodified).
 60        """
 61        self._notification_handler = handler
 62        return handler
 63
 64    async def connect(self):
 65        """Connects to Fishjam and listens for all incoming messages.
 66
 67        It runs until the connection isn't closed.
 68
 69        The incoming messages are handled by the functions defined using the
 70        `on_server_notification` decorator.
 71
 72        The handler have to be defined before calling `connect`,
 73        otherwise the messages won't be received.
 74        """
 75        async with client.connect(self._fishjam_url) as websocket:
 76            try:
 77                self._websocket = websocket
 78                await self._authenticate()
 79
 80                if self._notification_handler:
 81                    await self._subscribe_event(
 82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
 83                    )
 84
 85                self._ready = True
 86                if self._ready_event:
 87                    self._ready_event.set()
 88
 89                await self._receive_loop()
 90            finally:
 91                self._websocket = None
 92
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()
105
106    async def _authenticate(self):
107        if not self._websocket:
108            raise RuntimeError("Websocket is not connected")
109
110        msg = ServerMessage(
111            auth_request=ServerMessageAuthRequest(token=self._management_token)
112        )
113        await self._websocket.send(bytes(msg))
114
115        try:
116            message = await self._websocket.recv(decode=False)
117        except ConnectionClosed as exception:
118            if "invalid token" in str(exception):
119                raise RuntimeError("Invalid management token") from exception
120            raise
121
122        message = ServerMessage().parse(message)
123
124        _type, message = betterproto.which_one_of(message, "content")
125        assert isinstance(message, ServerMessageAuthenticated)
126
127    async def _receive_loop(self):
128        if not self._websocket:
129            raise RuntimeError("Websocket is not connected")
130        if not self._notification_handler:
131            raise RuntimeError("Notification handler is not defined")
132
133        while True:
134            message = await self._websocket.recv(decode=False)
135            message = ServerMessage().parse(message)
136            _which, message = betterproto.which_one_of(message, "content")
137
138            if isinstance(message, ALLOWED_NOTIFICATIONS):
139                res = self._notification_handler(message)
140                if inspect.isawaitable(res):
141                    await res
142
143    async def _subscribe_event(self, event: ServerMessageEventType):
144        if not self._websocket:
145            raise RuntimeError("Websocket is not connected")
146
147        request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))
148
149        await self._websocket.send(bytes(request))
150        message = cast(bytes, await self._websocket.recv())
151        message = ServerMessage().parse(message)
152        _which, message = betterproto.which_one_of(message, "content")
153        assert isinstance(message, ServerMessageSubscribeResponse)

Allows for receiving WebSocket messages from Fishjam.

FishjamNotifier(fishjam_id: str, management_token: str)
36    def __init__(
37        self,
38        fishjam_id: str,
39        management_token: str,
40    ):
41        """Create a FishjamNotifier instance with an ID and management token."""
42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
44        self._management_token: str = management_token
45        self._websocket: client.ClientConnection | None = None
46        self._ready: bool = False
47
48        self._ready_event: asyncio.Event | None = None
49
50        self._notification_handler: NotificationHandler | None = None

Create a FishjamNotifier instance with an ID and management token.

def on_server_notification( self, handler: Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamerConnected | fishjam.events.ServerMessageStreamerDisconnected | fishjam.events.ServerMessageChannelAdded | fishjam.events.ServerMessageChannelRemoved | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], NoneType] | Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamerConnected | fishjam.events.ServerMessageStreamerDisconnected | fishjam.events.ServerMessageChannelAdded | fishjam.events.ServerMessageChannelRemoved | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], Coroutine[Any, Any, None]]):
52    def on_server_notification(self, handler: NotificationHandler):
53        """Decorator for defining a handler for Fishjam notifications.
54
55        Args:
56            handler: The function to be registered as the notification handler.
57
58        Returns:
59            NotificationHandler: The original handler function (unmodified).
60        """
61        self._notification_handler = handler
62        return handler

Decorator for defining a handler for Fishjam notifications.

Args:

  • handler: The function to be registered as the notification handler.

Returns:

  • NotificationHandler: The original handler function (unmodified).
async def connect(self):
64    async def connect(self):
65        """Connects to Fishjam and listens for all incoming messages.
66
67        It runs until the connection isn't closed.
68
69        The incoming messages are handled by the functions defined using the
70        `on_server_notification` decorator.
71
72        The handler have to be defined before calling `connect`,
73        otherwise the messages won't be received.
74        """
75        async with client.connect(self._fishjam_url) as websocket:
76            try:
77                self._websocket = websocket
78                await self._authenticate()
79
80                if self._notification_handler:
81                    await self._subscribe_event(
82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
83                    )
84
85                self._ready = True
86                if self._ready_event:
87                    self._ready_event.set()
88
89                await self._receive_loop()
90            finally:
91                self._websocket = None

Connects to Fishjam and listens for all incoming messages.

It runs until the connection isn't closed.

The incoming messages are handled by the functions defined using the on_server_notification decorator.

The handler have to be defined before calling connect, otherwise the messages won't be received.

async def wait_ready(self) -> None:
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()

Waits until the notifier is connected and authenticated to Fishjam.

If already connected, returns immediately.

15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
16    """Transforms a received protobuf notification into a notification instance.
17
18    The available notifications are listed in `fishjam.events` module.
19
20    Args:
21        binary: The raw binary data received from the webhook.
22
23    Returns:
24        AllowedNotification | None: The parsed notification object, or None if
25            the message type is not supported.
26    """
27    message = ServerMessage().parse(binary)
28    _which, message = betterproto.which_one_of(message, "content")
29
30    if isinstance(message, ALLOWED_NOTIFICATIONS):
31        return message
32
33    return None

Transforms a received protobuf notification into a notification instance.

The available notifications are listed in fishjam.events module.

Args:

  • binary: The raw binary data received from the webhook.

Returns:

  • AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
class PeerMetadata:
13@_attrs_define
14class PeerMetadata:
15    """Custom metadata set by the peer
16
17    Example:
18        {'name': 'FishjamUser'}
19
20    """
21
22    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
23
24    def to_dict(self) -> dict[str, Any]:
25        field_dict: dict[str, Any] = {}
26        field_dict.update(self.additional_properties)
27
28        return field_dict
29
30    @classmethod
31    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
32        d = dict(src_dict)
33        peer_metadata = cls()
34
35        peer_metadata.additional_properties = d
36        return peer_metadata
37
38    @property
39    def additional_keys(self) -> list[str]:
40        return list(self.additional_properties.keys())
41
42    def __getitem__(self, key: str) -> Any:
43        return self.additional_properties[key]
44
45    def __setitem__(self, key: str, value: Any) -> None:
46        self.additional_properties[key] = value
47
48    def __delitem__(self, key: str) -> None:
49        del self.additional_properties[key]
50
51    def __contains__(self, key: str) -> bool:
52        return key in self.additional_properties

Custom metadata set by the peer

Example:

  • {'name': 'FishjamUser'}
PeerMetadata()
23def __init__(self, ):
24    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class PeerMetadata.

additional_properties: 'dict[str, Any]'
def to_dict(self) -> 'dict[str, Any]':
24    def to_dict(self) -> dict[str, Any]:
25        field_dict: dict[str, Any] = {}
26        field_dict.update(self.additional_properties)
27
28        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
30    @classmethod
31    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
32        d = dict(src_dict)
33        peer_metadata = cls()
34
35        peer_metadata.additional_properties = d
36        return peer_metadata
additional_keys: 'list[str]'
38    @property
39    def additional_keys(self) -> list[str]:
40        return list(self.additional_properties.keys())
@dataclass
class PeerOptions:
106@dataclass
107class PeerOptions:
108    """Options specific to a WebRTC Peer.
109
110    Attributes:
111        metadata: Peer metadata.
112        subscribe_mode: Configuration of peer's subscribing policy.
113    """
114
115    metadata: dict[str, Any] | None = None
116    """Peer metadata"""
117    subscribe_mode: Literal["auto", "manual"] = "auto"
118    """Configuration of peer's subscribing policy"""

Options specific to a WebRTC Peer.

Attributes:

  • metadata: Peer metadata.
  • subscribe_mode: Configuration of peer's subscribing policy.
PeerOptions( metadata: dict[str, Any] | None = None, subscribe_mode: Literal['auto', 'manual'] = 'auto')
metadata: dict[str, Any] | None = None

Peer metadata

subscribe_mode: Literal['auto', 'manual'] = 'auto'

Configuration of peer's subscribing policy

class PeerOptionsVapi:
15@_attrs_define
16class PeerOptionsVapi:
17    """Options specific to the VAPI peer
18
19    Attributes:
20        api_key (str): VAPI API key
21        call_id (str): VAPI call ID
22        subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
23    """
24
25    api_key: str
26    call_id: str
27    subscribe_mode: SubscribeMode | Unset = UNSET
28
29    def to_dict(self) -> dict[str, Any]:
30        api_key = self.api_key
31
32        call_id = self.call_id
33
34        subscribe_mode: str | Unset = UNSET
35        if not isinstance(self.subscribe_mode, Unset):
36            subscribe_mode = self.subscribe_mode.value
37
38        field_dict: dict[str, Any] = {}
39
40        field_dict.update({
41            "apiKey": api_key,
42            "callId": call_id,
43        })
44        if subscribe_mode is not UNSET:
45            field_dict["subscribeMode"] = subscribe_mode
46
47        return field_dict
48
49    @classmethod
50    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
51        d = dict(src_dict)
52        api_key = d.pop("apiKey")
53
54        call_id = d.pop("callId")
55
56        _subscribe_mode = d.pop("subscribeMode", UNSET)
57        subscribe_mode: SubscribeMode | Unset
58        if isinstance(_subscribe_mode, Unset):
59            subscribe_mode = UNSET
60        else:
61            subscribe_mode = SubscribeMode(_subscribe_mode)
62
63        peer_options_vapi = cls(
64            api_key=api_key,
65            call_id=call_id,
66            subscribe_mode=subscribe_mode,
67        )
68
69        return peer_options_vapi

Options specific to the VAPI peer

Attributes:

  • api_key (str): VAPI API key
  • call_id (str): VAPI call ID
  • subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
PeerOptionsVapi( api_key: 'str', call_id: 'str', subscribe_mode: 'SubscribeMode | Unset' = <fishjam._openapi_client.types.Unset object>)
25def __init__(self, api_key, call_id, subscribe_mode=attr_dict['subscribe_mode'].default):
26    self.api_key = api_key
27    self.call_id = call_id
28    self.subscribe_mode = subscribe_mode

Method generated by attrs for class PeerOptionsVapi.

api_key: 'str'
call_id: 'str'
subscribe_mode: 'SubscribeMode | Unset'
def to_dict(self) -> 'dict[str, Any]':
29    def to_dict(self) -> dict[str, Any]:
30        api_key = self.api_key
31
32        call_id = self.call_id
33
34        subscribe_mode: str | Unset = UNSET
35        if not isinstance(self.subscribe_mode, Unset):
36            subscribe_mode = self.subscribe_mode.value
37
38        field_dict: dict[str, Any] = {}
39
40        field_dict.update({
41            "apiKey": api_key,
42            "callId": call_id,
43        })
44        if subscribe_mode is not UNSET:
45            field_dict["subscribeMode"] = subscribe_mode
46
47        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
49    @classmethod
50    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
51        d = dict(src_dict)
52        api_key = d.pop("apiKey")
53
54        call_id = d.pop("callId")
55
56        _subscribe_mode = d.pop("subscribeMode", UNSET)
57        subscribe_mode: SubscribeMode | Unset
58        if isinstance(_subscribe_mode, Unset):
59            subscribe_mode = UNSET
60        else:
61            subscribe_mode = SubscribeMode(_subscribe_mode)
62
63        peer_options_vapi = cls(
64            api_key=api_key,
65            call_id=call_id,
66            subscribe_mode=subscribe_mode,
67        )
68
69        return peer_options_vapi
@dataclass
class RoomOptions:
 74@dataclass
 75class RoomOptions:
 76    """Description of a room options.
 77
 78    Attributes:
 79        max_peers: Maximum amount of peers allowed into the room.
 80        video_codec: Enforces video codec for each peer in the room.
 81        webhook_url: URL where Fishjam notifications will be sent.
 82        room_type: The use-case of the room. If not provided, this defaults
 83            to conference.
 84        public: True if livestream viewers can omit specifying a token.
 85    """
 86
 87    max_peers: int | None = None
 88    """Maximum amount of peers allowed into the room"""
 89    video_codec: Literal["h264", "vp8"] | None = None
 90    """Enforces video codec for each peer in the room"""
 91    webhook_url: str | None = None
 92    """URL where Fishjam notifications will be sent"""
 93    room_type: Literal[
 94        "conference",
 95        "audio_only",
 96        "livestream",
 97        "full_feature",
 98        "broadcaster",
 99        "audio_only_livestream",
100    ] = "conference"
101    """The use-case of the room. If not provided, this defaults to conference."""
102    public: bool = False
103    """True if livestream viewers can omit specifying a token."""

Description of a room options.

Attributes:

  • max_peers: Maximum amount of peers allowed into the room.
  • video_codec: Enforces video codec for each peer in the room.
  • webhook_url: URL where Fishjam notifications will be sent.
  • room_type: The use-case of the room. If not provided, this defaults to conference.
  • public: True if livestream viewers can omit specifying a token.
RoomOptions( max_peers: int | None = None, video_codec: Literal['h264', 'vp8'] | None = None, webhook_url: str | None = None, room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference', public: bool = False)
max_peers: int | None = None

Maximum amount of peers allowed into the room

video_codec: Literal['h264', 'vp8'] | None = None

Enforces video codec for each peer in the room

webhook_url: str | None = None

URL where Fishjam notifications will be sent

room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference'

The use-case of the room. If not provided, this defaults to conference.

public: bool = False

True if livestream viewers can omit specifying a token.

@dataclass
class AgentOptions:
134@dataclass
135class AgentOptions:
136    """Options specific to an Agent Peer.
137
138    Attributes:
139        output: Configuration for the agent's output options.
140        subscribe_mode: Configuration of peer's subscribing policy.
141    """
142
143    output: AgentOutputOptions = field(default_factory=AgentOutputOptions)
144
145    subscribe_mode: Literal["auto", "manual"] = "auto"

Options specific to an Agent Peer.

Attributes:

  • output: Configuration for the agent's output options.
  • subscribe_mode: Configuration of peer's subscribing policy.
AgentOptions( output: AgentOutputOptions = <factory>, subscribe_mode: Literal['auto', 'manual'] = 'auto')
subscribe_mode: Literal['auto', 'manual'] = 'auto'
@dataclass
class AgentOutputOptions:
121@dataclass
122class AgentOutputOptions:
123    """Options of the desired format of audio tracks going from Fishjam to the agent.
124
125    Attributes:
126        audio_format: The format of the audio stream (e.g., 'pcm16').
127        audio_sample_rate: The sample rate of the audio stream.
128    """
129
130    audio_format: Literal["pcm16"] = "pcm16"
131    audio_sample_rate: Literal[16000, 24000] = 16000

Options of the desired format of audio tracks going from Fishjam to the agent.

Attributes:

  • audio_format: The format of the audio stream (e.g., 'pcm16').
  • audio_sample_rate: The sample rate of the audio stream.
AgentOutputOptions( audio_format: Literal['pcm16'] = 'pcm16', audio_sample_rate: Literal[16000, 24000] = 16000)
audio_format: Literal['pcm16'] = 'pcm16'
audio_sample_rate: Literal[16000, 24000] = 16000
@dataclass
class Room:
56@dataclass
57class Room:
58    """Description of the room state.
59
60    Attributes:
61        config: Room configuration.
62        id: Room ID.
63        peers: List of all peers.
64    """
65
66    config: RoomConfig
67    """Room configuration"""
68    id: str
69    """Room ID"""
70    peers: list[Peer]
71    """List of all peers"""

Description of the room state.

Attributes:

  • config: Room configuration.
  • id: Room ID.
  • peers: List of all peers.
Room( config: fishjam._openapi_client.models.room_config.RoomConfig, id: str, peers: list[Peer])
config: fishjam._openapi_client.models.room_config.RoomConfig

Room configuration

id: str

Room ID

peers: list[Peer]

List of all peers

class Peer:
 23@_attrs_define
 24class Peer:
 25    """Describes peer status
 26
 27    Attributes:
 28        id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
 29        metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
 30        status (PeerStatus): Informs about the peer status Example: disconnected.
 31        subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
 32        subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
 33        tracks (list[Track]): List of all peer's tracks
 34        type_ (PeerType): Peer type Example: webrtc.
 35    """
 36
 37    id: str
 38    metadata: None | PeerMetadata
 39    status: PeerStatus
 40    subscribe_mode: SubscribeMode
 41    subscriptions: Subscriptions
 42    tracks: list[Track]
 43    type_: PeerType
 44    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
 45
 46    def to_dict(self) -> dict[str, Any]:
 47        from ..models.peer_metadata import PeerMetadata
 48
 49        id = self.id
 50
 51        metadata: dict[str, Any] | None
 52        if isinstance(self.metadata, PeerMetadata):
 53            metadata = self.metadata.to_dict()
 54        else:
 55            metadata = self.metadata
 56
 57        status = self.status.value
 58
 59        subscribe_mode = self.subscribe_mode.value
 60
 61        subscriptions = self.subscriptions.to_dict()
 62
 63        tracks = []
 64        for tracks_item_data in self.tracks:
 65            tracks_item = tracks_item_data.to_dict()
 66            tracks.append(tracks_item)
 67
 68        type_ = self.type_.value
 69
 70        field_dict: dict[str, Any] = {}
 71        field_dict.update(self.additional_properties)
 72        field_dict.update({
 73            "id": id,
 74            "metadata": metadata,
 75            "status": status,
 76            "subscribeMode": subscribe_mode,
 77            "subscriptions": subscriptions,
 78            "tracks": tracks,
 79            "type": type_,
 80        })
 81
 82        return field_dict
 83
 84    @classmethod
 85    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 86        from ..models.peer_metadata import PeerMetadata
 87        from ..models.subscriptions import Subscriptions
 88        from ..models.track import Track
 89
 90        d = dict(src_dict)
 91        id = d.pop("id")
 92
 93        def _parse_metadata(data: object) -> None | PeerMetadata:
 94            if data is None:
 95                return data
 96            try:
 97                if not isinstance(data, dict):
 98                    raise TypeError()
 99                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
100
101                return componentsschemas_peer_metadata_type_0
102            except (TypeError, ValueError, AttributeError, KeyError):
103                pass
104            return cast(None | PeerMetadata, data)
105
106        metadata = _parse_metadata(d.pop("metadata"))
107
108        status = PeerStatus(d.pop("status"))
109
110        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
111
112        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
113
114        tracks = []
115        _tracks = d.pop("tracks")
116        for tracks_item_data in _tracks:
117            tracks_item = Track.from_dict(tracks_item_data)
118
119            tracks.append(tracks_item)
120
121        type_ = PeerType(d.pop("type"))
122
123        peer = cls(
124            id=id,
125            metadata=metadata,
126            status=status,
127            subscribe_mode=subscribe_mode,
128            subscriptions=subscriptions,
129            tracks=tracks,
130            type_=type_,
131        )
132
133        peer.additional_properties = d
134        return peer
135
136    @property
137    def additional_keys(self) -> list[str]:
138        return list(self.additional_properties.keys())
139
140    def __getitem__(self, key: str) -> Any:
141        return self.additional_properties[key]
142
143    def __setitem__(self, key: str, value: Any) -> None:
144        self.additional_properties[key] = value
145
146    def __delitem__(self, key: str) -> None:
147        del self.additional_properties[key]
148
149    def __contains__(self, key: str) -> bool:
150        return key in self.additional_properties

Describes peer status

Attributes:

  • id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
  • metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
  • status (PeerStatus): Informs about the peer status Example: disconnected.
  • subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
  • subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
  • tracks (list[Track]): List of all peer's tracks
  • type_ (PeerType): Peer type Example: webrtc.
Peer( id: 'str', metadata: 'None | PeerMetadata', status: 'PeerStatus', subscribe_mode: 'SubscribeMode', subscriptions: 'Subscriptions', tracks: 'list[Track]', type_: 'PeerType')
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_):
31    self.id = id
32    self.metadata = metadata
33    self.status = status
34    self.subscribe_mode = subscribe_mode
35    self.subscriptions = subscriptions
36    self.tracks = tracks
37    self.type_ = type_
38    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class Peer.

id: 'str'
metadata: 'None | PeerMetadata'
status: 'PeerStatus'
subscribe_mode: 'SubscribeMode'
subscriptions: 'Subscriptions'
tracks: 'list[Track]'
type_: 'PeerType'
additional_properties: 'dict[str, Any]'
def to_dict(self) -> 'dict[str, Any]':
46    def to_dict(self) -> dict[str, Any]:
47        from ..models.peer_metadata import PeerMetadata
48
49        id = self.id
50
51        metadata: dict[str, Any] | None
52        if isinstance(self.metadata, PeerMetadata):
53            metadata = self.metadata.to_dict()
54        else:
55            metadata = self.metadata
56
57        status = self.status.value
58
59        subscribe_mode = self.subscribe_mode.value
60
61        subscriptions = self.subscriptions.to_dict()
62
63        tracks = []
64        for tracks_item_data in self.tracks:
65            tracks_item = tracks_item_data.to_dict()
66            tracks.append(tracks_item)
67
68        type_ = self.type_.value
69
70        field_dict: dict[str, Any] = {}
71        field_dict.update(self.additional_properties)
72        field_dict.update({
73            "id": id,
74            "metadata": metadata,
75            "status": status,
76            "subscribeMode": subscribe_mode,
77            "subscriptions": subscriptions,
78            "tracks": tracks,
79            "type": type_,
80        })
81
82        return field_dict
@classmethod
def from_dict(cls: 'type[T]', src_dict: 'Mapping[str, Any]') -> 'T':
 84    @classmethod
 85    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 86        from ..models.peer_metadata import PeerMetadata
 87        from ..models.subscriptions import Subscriptions
 88        from ..models.track import Track
 89
 90        d = dict(src_dict)
 91        id = d.pop("id")
 92
 93        def _parse_metadata(data: object) -> None | PeerMetadata:
 94            if data is None:
 95                return data
 96            try:
 97                if not isinstance(data, dict):
 98                    raise TypeError()
 99                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
100
101                return componentsschemas_peer_metadata_type_0
102            except (TypeError, ValueError, AttributeError, KeyError):
103                pass
104            return cast(None | PeerMetadata, data)
105
106        metadata = _parse_metadata(d.pop("metadata"))
107
108        status = PeerStatus(d.pop("status"))
109
110        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
111
112        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
113
114        tracks = []
115        _tracks = d.pop("tracks")
116        for tracks_item_data in _tracks:
117            tracks_item = Track.from_dict(tracks_item_data)
118
119            tracks.append(tracks_item)
120
121        type_ = PeerType(d.pop("type"))
122
123        peer = cls(
124            id=id,
125            metadata=metadata,
126            status=status,
127            subscribe_mode=subscribe_mode,
128            subscriptions=subscriptions,
129            tracks=tracks,
130            type_=type_,
131        )
132
133        peer.additional_properties = d
134        return peer
additional_keys: 'list[str]'
136    @property
137    def additional_keys(self) -> list[str]:
138        return list(self.additional_properties.keys())