Fishjam Python Server SDK

Fishjam Python Server SDK

Python server SDK for the Fishjam.

Read the docs here

Installation

pip install fishjam-server-sdk

Usage

The SDK exports two main classes for interacting with Fishjam server: FishjamClient and FishjamNotifier.

FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.

FishjamClient

Create a FishjamClient instance, providing the fishjam server address and api token

from fishjam import FishjamClient

fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")

You can use it to interact with Fishjam to manage rooms and peers

# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)

# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])

# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)

# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'

All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.

FishjamNotifier

FishjamNotifier allows for receiving real-time updates from the Fishjam Server.

You can read more about notifications in the Fishjam Docs.

Create FishjamNotifier instance

from fishjam import FishjamNotifier

fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')

Then define a handler for incoming messages

@notifier.on_server_notification
def handle_notification(server_notification):
    print(f'Received a notification: {server_notification}')

After that you can start the notifier

async def test_notifier():
    notifier_task = asyncio.create_task(fishjam_notifier.connect())

    # Wait for notifier to be ready to receive messages
    await fishjam_notifier.wait_ready()

    # Create a room to trigger a server notification
    fishjam_client = FishjamClient()
    fishjam_client.create_room()

    await notifier_task

asyncio.run(test_notifier())

# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')

License

Licensed under the Apache License, Version 2.0

Fishjam is created by Software Mansion

Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.

Software Mansion

 1""".. include:: ../README.md"""
 2
 3# pylint: disable=locally-disabled, no-name-in-module, import-error
 4
 5# Exceptions and Server Messages
 6
 7# API
 8# pylint: disable=locally-disabled, no-name-in-module, import-error
 9
10# Exceptions and Server Messages
11from fishjam import agent, errors, events, integrations, peer, room, version
12from fishjam._openapi_client.models import PeerMetadata
13
14# API
15from fishjam._webhook_notifier import receive_binary
16from fishjam._ws_notifier import FishjamNotifier
17from fishjam.api._fishjam_client import (
18    AgentOptions,
19    AgentOutputOptions,
20    FishjamClient,
21    Peer,
22    PeerOptions,
23    Room,
24    RoomOptions,
25)
26
27__version__ = version.__version__
28
29__all__ = [
30    "FishjamClient",
31    "FishjamNotifier",
32    "receive_binary",
33    "PeerMetadata",
34    "PeerOptions",
35    "RoomOptions",
36    "AgentOptions",
37    "AgentOutputOptions",
38    "Room",
39    "Peer",
40    "events",
41    "errors",
42    "room",
43    "peer",
44    "agent",
45    "integrations",
46]
47
48
49__docformat__ = "restructuredtext"
class FishjamClient(fishjam.api._client.Client):
142class FishjamClient(Client):
143    """Allows for managing rooms."""
144
145    def __init__(
146        self,
147        fishjam_id: str,
148        management_token: str,
149    ):
150        """Create a FishjamClient instance.
151
152        Args:
153            fishjam_id: The unique identifier for the Fishjam instance.
154            management_token: The token used for authenticating management operations.
155        """
156        super().__init__(fishjam_id=fishjam_id, management_token=management_token)
157
158    def create_peer(
159        self,
160        room_id: str,
161        options: PeerOptions | None = None,
162    ) -> tuple[Peer, str]:
163        """Creates a peer in the room.
164
165        Args:
166            room_id: The ID of the room where the peer will be created.
167            options: Configuration options for the peer. Defaults to None.
168
169        Returns:
170            A tuple containing:
171                - Peer: The created peer object.
172                - str: The peer token needed to authenticate to Fishjam.
173        """
174        options = options or PeerOptions()
175
176        peer_metadata = self.__parse_peer_metadata(options.metadata)
177        peer_options = PeerOptionsWebRTC(
178            metadata=peer_metadata,
179            subscribe_mode=SubscribeMode(options.subscribe_mode),
180        )
181        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
182
183        resp = cast(
184            PeerDetailsResponse,
185            self._request(room_add_peer, room_id=room_id, body=body),
186        )
187
188        return (resp.data.peer, resp.data.token)
189
190    def create_agent(self, room_id: str, options: AgentOptions | None = None):
191        """Creates an agent in the room.
192
193        Args:
194            room_id: The ID of the room where the agent will be created.
195            options: Configuration options for the agent. Defaults to None.
196
197        Returns:
198            Agent: The created agent instance initialized with peer ID, room ID, token,
199                and Fishjam URL.
200        """
201        options = options or AgentOptions()
202        body = AddPeerBody(
203            type_=PeerType.AGENT,
204            options=PeerOptionsAgent(
205                output=AgentOutput(
206                    audio_format=AudioFormat(options.output.audio_format),
207                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
208                ),
209                subscribe_mode=SubscribeMode(options.subscribe_mode),
210            ),
211        )
212
213        resp = cast(
214            PeerDetailsResponse,
215            self._request(room_add_peer, room_id=room_id, body=body),
216        )
217
218        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
219
220    def create_room(self, options: RoomOptions | None = None) -> Room:
221        """Creates a new room.
222
223        Args:
224            options: Configuration options for the room. Defaults to None.
225
226        Returns:
227            Room: The created Room object.
228        """
229        options = options or RoomOptions()
230
231        if options.video_codec is None:
232            codec = UNSET
233        else:
234            codec = VideoCodec(options.video_codec)
235
236        config = RoomConfig(
237            max_peers=options.max_peers,
238            video_codec=codec,
239            webhook_url=options.webhook_url,
240            room_type=RoomType(options.room_type),
241            public=options.public,
242        )
243
244        room = cast(
245            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
246        ).data.room
247
248        return Room(config=room.config, id=room.id, peers=room.peers)
249
250    def get_all_rooms(self) -> list[Room]:
251        """Returns list of all rooms.
252
253        Returns:
254            list[Room]: A list of all available Room objects.
255        """
256        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
257
258        return [
259            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
260        ]
261
262    def get_room(self, room_id: str) -> Room:
263        """Returns room with the given id.
264
265        Args:
266            room_id: The ID of the room to retrieve.
267
268        Returns:
269            Room: The Room object corresponding to the given ID.
270        """
271        room = cast(
272            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
273        ).data
274
275        return Room(config=room.config, id=room.id, peers=room.peers)
276
277    def delete_peer(self, room_id: str, peer_id: str) -> None:
278        """Deletes a peer from a room.
279
280        Args:
281            room_id: The ID of the room the peer belongs to.
282            peer_id: The ID of the peer to delete.
283        """
284        return self._request(room_delete_peer, id=peer_id, room_id=room_id)
285
286    def delete_room(self, room_id: str) -> None:
287        """Deletes a room.
288
289        Args:
290            room_id: The ID of the room to delete.
291        """
292        return self._request(room_delete_room, room_id=room_id)
293
294    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
295        """Refreshes a peer token.
296
297        Args:
298            room_id: The ID of the room.
299            peer_id: The ID of the peer whose token needs refreshing.
300
301        Returns:
302            str: The new peer token.
303        """
304        response = cast(
305            PeerRefreshTokenResponse,
306            self._request(room_refresh_token, id=peer_id, room_id=room_id),
307        )
308
309        return response.data.token
310
311    def create_livestream_viewer_token(self, room_id: str) -> str:
312        """Generates a viewer token for livestream rooms.
313
314        Args:
315            room_id: The ID of the livestream room.
316
317        Returns:
318            str: The generated viewer token.
319        """
320        response = cast(
321            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
322        )
323
324        return response.token
325
326    def create_livestream_streamer_token(self, room_id: str) -> str:
327        """Generates a streamer token for livestream rooms.
328
329        Args:
330            room_id: The ID of the livestream room.
331
332        Returns:
333            str: The generated streamer token.
334        """
335        response = cast(
336            StreamerToken,
337            self._request(streamer_generate_streamer_token, room_id=room_id),
338        )
339
340        return response.token
341
342    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
343        """Subscribes a peer to all tracks of another peer.
344
345        Args:
346            room_id: The ID of the room.
347            peer_id: The ID of the subscribing peer.
348            target_peer_id: The ID of the peer to subscribe to.
349        """
350        self._request(
351            room_subscribe_peer,
352            room_id=room_id,
353            id=peer_id,
354            peer_id=target_peer_id,
355        )
356
357    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
358        """Subscribes a peer to specific tracks of another peer.
359
360        Args:
361            room_id: The ID of the room.
362            peer_id: The ID of the subscribing peer.
363            track_ids: A list of track IDs to subscribe to.
364        """
365        self._request(
366            room_subscribe_tracks,
367            room_id=room_id,
368            id=peer_id,
369            body=SubscribeTracksBody(track_ids=track_ids),
370        )
371
372    def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata:
373        peer_metadata = WebRTCMetadata()
374
375        if not metadata:
376            return peer_metadata
377
378        for key, value in metadata.items():
379            peer_metadata.additional_properties[key] = value
380
381        return peer_metadata

Allows for managing rooms.

FishjamClient(fishjam_id: str, management_token: str)
145    def __init__(
146        self,
147        fishjam_id: str,
148        management_token: str,
149    ):
150        """Create a FishjamClient instance.
151
152        Args:
153            fishjam_id: The unique identifier for the Fishjam instance.
154            management_token: The token used for authenticating management operations.
155        """
156        super().__init__(fishjam_id=fishjam_id, management_token=management_token)

Create a FishjamClient instance.

Args:

  • fishjam_id: The unique identifier for the Fishjam instance.
  • management_token: The token used for authenticating management operations.
def create_peer( self, room_id: str, options: PeerOptions | None = None) -> tuple[Peer, str]:
158    def create_peer(
159        self,
160        room_id: str,
161        options: PeerOptions | None = None,
162    ) -> tuple[Peer, str]:
163        """Creates a peer in the room.
164
165        Args:
166            room_id: The ID of the room where the peer will be created.
167            options: Configuration options for the peer. Defaults to None.
168
169        Returns:
170            A tuple containing:
171                - Peer: The created peer object.
172                - str: The peer token needed to authenticate to Fishjam.
173        """
174        options = options or PeerOptions()
175
176        peer_metadata = self.__parse_peer_metadata(options.metadata)
177        peer_options = PeerOptionsWebRTC(
178            metadata=peer_metadata,
179            subscribe_mode=SubscribeMode(options.subscribe_mode),
180        )
181        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
182
183        resp = cast(
184            PeerDetailsResponse,
185            self._request(room_add_peer, room_id=room_id, body=body),
186        )
187
188        return (resp.data.peer, resp.data.token)

Creates a peer in the room.

Args:

  • room_id: The ID of the room where the peer will be created.
  • options: Configuration options for the peer. Defaults to None.

Returns:

  • A tuple containing:
    • Peer: The created peer object.
    • str: The peer token needed to authenticate to Fishjam.
def create_agent( self, room_id: str, options: AgentOptions | None = None):
190    def create_agent(self, room_id: str, options: AgentOptions | None = None):
191        """Creates an agent in the room.
192
193        Args:
194            room_id: The ID of the room where the agent will be created.
195            options: Configuration options for the agent. Defaults to None.
196
197        Returns:
198            Agent: The created agent instance initialized with peer ID, room ID, token,
199                and Fishjam URL.
200        """
201        options = options or AgentOptions()
202        body = AddPeerBody(
203            type_=PeerType.AGENT,
204            options=PeerOptionsAgent(
205                output=AgentOutput(
206                    audio_format=AudioFormat(options.output.audio_format),
207                    audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate),
208                ),
209                subscribe_mode=SubscribeMode(options.subscribe_mode),
210            ),
211        )
212
213        resp = cast(
214            PeerDetailsResponse,
215            self._request(room_add_peer, room_id=room_id, body=body),
216        )
217
218        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)

Creates an agent in the room.

Args:

  • room_id: The ID of the room where the agent will be created.
  • options: Configuration options for the agent. Defaults to None.

Returns:

  • Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
def create_room( self, options: RoomOptions | None = None) -> Room:
220    def create_room(self, options: RoomOptions | None = None) -> Room:
221        """Creates a new room.
222
223        Args:
224            options: Configuration options for the room. Defaults to None.
225
226        Returns:
227            Room: The created Room object.
228        """
229        options = options or RoomOptions()
230
231        if options.video_codec is None:
232            codec = UNSET
233        else:
234            codec = VideoCodec(options.video_codec)
235
236        config = RoomConfig(
237            max_peers=options.max_peers,
238            video_codec=codec,
239            webhook_url=options.webhook_url,
240            room_type=RoomType(options.room_type),
241            public=options.public,
242        )
243
244        room = cast(
245            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
246        ).data.room
247
248        return Room(config=room.config, id=room.id, peers=room.peers)

Creates a new room.

Args:

  • options: Configuration options for the room. Defaults to None.

Returns:

  • Room: The created Room object.
def get_all_rooms(self) -> list[Room]:
250    def get_all_rooms(self) -> list[Room]:
251        """Returns list of all rooms.
252
253        Returns:
254            list[Room]: A list of all available Room objects.
255        """
256        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
257
258        return [
259            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
260        ]

Returns list of all rooms.

Returns:

  • list[Room]: A list of all available Room objects.
def get_room(self, room_id: str) -> Room:
262    def get_room(self, room_id: str) -> Room:
263        """Returns room with the given id.
264
265        Args:
266            room_id: The ID of the room to retrieve.
267
268        Returns:
269            Room: The Room object corresponding to the given ID.
270        """
271        room = cast(
272            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
273        ).data
274
275        return Room(config=room.config, id=room.id, peers=room.peers)

Returns room with the given id.

Args:

  • room_id: The ID of the room to retrieve.

Returns:

  • Room: The Room object corresponding to the given ID.
def delete_peer(self, room_id: str, peer_id: str) -> None:
277    def delete_peer(self, room_id: str, peer_id: str) -> None:
278        """Deletes a peer from a room.
279
280        Args:
281            room_id: The ID of the room the peer belongs to.
282            peer_id: The ID of the peer to delete.
283        """
284        return self._request(room_delete_peer, id=peer_id, room_id=room_id)

Deletes a peer from a room.

Args:

  • room_id: The ID of the room the peer belongs to.
  • peer_id: The ID of the peer to delete.
def delete_room(self, room_id: str) -> None:
286    def delete_room(self, room_id: str) -> None:
287        """Deletes a room.
288
289        Args:
290            room_id: The ID of the room to delete.
291        """
292        return self._request(room_delete_room, room_id=room_id)

Deletes a room.

Args:

  • room_id: The ID of the room to delete.
def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
294    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
295        """Refreshes a peer token.
296
297        Args:
298            room_id: The ID of the room.
299            peer_id: The ID of the peer whose token needs refreshing.
300
301        Returns:
302            str: The new peer token.
303        """
304        response = cast(
305            PeerRefreshTokenResponse,
306            self._request(room_refresh_token, id=peer_id, room_id=room_id),
307        )
308
309        return response.data.token

Refreshes a peer token.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the peer whose token needs refreshing.

Returns:

  • str: The new peer token.
def create_livestream_viewer_token(self, room_id: str) -> str:
311    def create_livestream_viewer_token(self, room_id: str) -> str:
312        """Generates a viewer token for livestream rooms.
313
314        Args:
315            room_id: The ID of the livestream room.
316
317        Returns:
318            str: The generated viewer token.
319        """
320        response = cast(
321            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
322        )
323
324        return response.token

Generates a viewer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated viewer token.
def create_livestream_streamer_token(self, room_id: str) -> str:
326    def create_livestream_streamer_token(self, room_id: str) -> str:
327        """Generates a streamer token for livestream rooms.
328
329        Args:
330            room_id: The ID of the livestream room.
331
332        Returns:
333            str: The generated streamer token.
334        """
335        response = cast(
336            StreamerToken,
337            self._request(streamer_generate_streamer_token, room_id=room_id),
338        )
339
340        return response.token

Generates a streamer token for livestream rooms.

Args:

  • room_id: The ID of the livestream room.

Returns:

  • str: The generated streamer token.
def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
342    def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str):
343        """Subscribes a peer to all tracks of another peer.
344
345        Args:
346            room_id: The ID of the room.
347            peer_id: The ID of the subscribing peer.
348            target_peer_id: The ID of the peer to subscribe to.
349        """
350        self._request(
351            room_subscribe_peer,
352            room_id=room_id,
353            id=peer_id,
354            peer_id=target_peer_id,
355        )

Subscribes a peer to all tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • target_peer_id: The ID of the peer to subscribe to.
def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
357    def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]):
358        """Subscribes a peer to specific tracks of another peer.
359
360        Args:
361            room_id: The ID of the room.
362            peer_id: The ID of the subscribing peer.
363            track_ids: A list of track IDs to subscribe to.
364        """
365        self._request(
366            room_subscribe_tracks,
367            room_id=room_id,
368            id=peer_id,
369            body=SubscribeTracksBody(track_ids=track_ids),
370        )

Subscribes a peer to specific tracks of another peer.

Args:

  • room_id: The ID of the room.
  • peer_id: The ID of the subscribing peer.
  • track_ids: A list of track IDs to subscribe to.
Inherited Members
fishjam.api._client.Client
client
class FishjamNotifier:
 33class FishjamNotifier:
 34    """Allows for receiving WebSocket messages from Fishjam."""
 35
 36    def __init__(
 37        self,
 38        fishjam_id: str,
 39        management_token: str,
 40    ):
 41        """Create a FishjamNotifier instance with an ID and management token."""
 42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
 43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
 44        self._management_token: str = management_token
 45        self._websocket: client.ClientConnection | None = None
 46        self._ready: bool = False
 47
 48        self._ready_event: asyncio.Event | None = None
 49
 50        self._notification_handler: NotificationHandler | None = None
 51
 52    def on_server_notification(self, handler: NotificationHandler):
 53        """Decorator for defining a handler for Fishjam notifications.
 54
 55        Args:
 56            handler: The function to be registered as the notification handler.
 57
 58        Returns:
 59            NotificationHandler: The original handler function (unmodified).
 60        """
 61        self._notification_handler = handler
 62        return handler
 63
 64    async def connect(self):
 65        """Connects to Fishjam and listens for all incoming messages.
 66
 67        It runs until the connection isn't closed.
 68
 69        The incoming messages are handled by the functions defined using the
 70        `on_server_notification` decorator.
 71
 72        The handler have to be defined before calling `connect`,
 73        otherwise the messages won't be received.
 74        """
 75        async with client.connect(self._fishjam_url) as websocket:
 76            try:
 77                self._websocket = websocket
 78                await self._authenticate()
 79
 80                if self._notification_handler:
 81                    await self._subscribe_event(
 82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
 83                    )
 84
 85                self._ready = True
 86                if self._ready_event:
 87                    self._ready_event.set()
 88
 89                await self._receive_loop()
 90            finally:
 91                self._websocket = None
 92
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()
105
106    async def _authenticate(self):
107        if not self._websocket:
108            raise RuntimeError("Websocket is not connected")
109
110        msg = ServerMessage(
111            auth_request=ServerMessageAuthRequest(token=self._management_token)
112        )
113        await self._websocket.send(bytes(msg))
114
115        try:
116            message = await self._websocket.recv(decode=False)
117        except ConnectionClosed as exception:
118            if "invalid token" in str(exception):
119                raise RuntimeError("Invalid management token") from exception
120            raise
121
122        message = ServerMessage().parse(message)
123
124        _type, message = betterproto.which_one_of(message, "content")
125        assert isinstance(message, ServerMessageAuthenticated)
126
127    async def _receive_loop(self):
128        if not self._websocket:
129            raise RuntimeError("Websocket is not connected")
130        if not self._notification_handler:
131            raise RuntimeError("Notification handler is not defined")
132
133        while True:
134            message = cast(bytes, await self._websocket.recv())
135            message = ServerMessage().parse(message)
136            _which, message = betterproto.which_one_of(message, "content")
137
138            if isinstance(message, ALLOWED_NOTIFICATIONS):
139                res = self._notification_handler(message)
140                if inspect.isawaitable(res):
141                    await res
142
143    async def _subscribe_event(self, event: ServerMessageEventType):
144        if not self._websocket:
145            raise RuntimeError("Websocket is not connected")
146
147        request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))
148
149        await self._websocket.send(bytes(request))
150        message = cast(bytes, await self._websocket.recv())
151        message = ServerMessage().parse(message)
152        _which, message = betterproto.which_one_of(message, "content")
153        assert isinstance(message, ServerMessageSubscribeResponse)

Allows for receiving WebSocket messages from Fishjam.

FishjamNotifier(fishjam_id: str, management_token: str)
36    def __init__(
37        self,
38        fishjam_id: str,
39        management_token: str,
40    ):
41        """Create a FishjamNotifier instance with an ID and management token."""
42        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
43        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
44        self._management_token: str = management_token
45        self._websocket: client.ClientConnection | None = None
46        self._ready: bool = False
47
48        self._ready_event: asyncio.Event | None = None
49
50        self._notification_handler: NotificationHandler | None = None

Create a FishjamNotifier instance with an ID and management token.

def on_server_notification( self, handler: Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamConnected | fishjam.events.ServerMessageStreamDisconnected | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], NoneType] | Callable[[fishjam.events.ServerMessageRoomCreated | fishjam.events.ServerMessageRoomDeleted | fishjam.events.ServerMessageRoomCrashed | fishjam.events.ServerMessagePeerAdded | fishjam.events.ServerMessagePeerDeleted | fishjam.events.ServerMessagePeerConnected | fishjam.events.ServerMessagePeerDisconnected | fishjam.events.ServerMessagePeerMetadataUpdated | fishjam.events.ServerMessagePeerCrashed | fishjam.events.ServerMessageStreamConnected | fishjam.events.ServerMessageStreamDisconnected | fishjam.events.ServerMessageViewerConnected | fishjam.events.ServerMessageViewerDisconnected | fishjam.events.ServerMessageTrackAdded | fishjam.events.ServerMessageTrackRemoved | fishjam.events.ServerMessageTrackMetadataUpdated], Coroutine[Any, Any, None]]):
52    def on_server_notification(self, handler: NotificationHandler):
53        """Decorator for defining a handler for Fishjam notifications.
54
55        Args:
56            handler: The function to be registered as the notification handler.
57
58        Returns:
59            NotificationHandler: The original handler function (unmodified).
60        """
61        self._notification_handler = handler
62        return handler

Decorator for defining a handler for Fishjam notifications.

Args:

  • handler: The function to be registered as the notification handler.

Returns:

  • NotificationHandler: The original handler function (unmodified).
async def connect(self):
64    async def connect(self):
65        """Connects to Fishjam and listens for all incoming messages.
66
67        It runs until the connection isn't closed.
68
69        The incoming messages are handled by the functions defined using the
70        `on_server_notification` decorator.
71
72        The handler have to be defined before calling `connect`,
73        otherwise the messages won't be received.
74        """
75        async with client.connect(self._fishjam_url) as websocket:
76            try:
77                self._websocket = websocket
78                await self._authenticate()
79
80                if self._notification_handler:
81                    await self._subscribe_event(
82                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
83                    )
84
85                self._ready = True
86                if self._ready_event:
87                    self._ready_event.set()
88
89                await self._receive_loop()
90            finally:
91                self._websocket = None

Connects to Fishjam and listens for all incoming messages.

It runs until the connection isn't closed.

The incoming messages are handled by the functions defined using the on_server_notification decorator.

The handler have to be defined before calling connect, otherwise the messages won't be received.

async def wait_ready(self) -> None:
 93    async def wait_ready(self) -> None:
 94        """Waits until the notifier is connected and authenticated to Fishjam.
 95
 96        If already connected, returns immediately.
 97        """
 98        if self._ready:
 99            return
100
101        if self._ready_event is None:
102            self._ready_event = asyncio.Event()
103
104        await self._ready_event.wait()

Waits until the notifier is connected and authenticated to Fishjam.

If already connected, returns immediately.

15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
16    """Transforms a received protobuf notification into a notification instance.
17
18    The available notifications are listed in `fishjam.events` module.
19
20    Args:
21        binary: The raw binary data received from the webhook.
22
23    Returns:
24        AllowedNotification | None: The parsed notification object, or None if
25            the message type is not supported.
26    """
27    message = ServerMessage().parse(binary)
28    _which, message = betterproto.which_one_of(message, "content")
29
30    if isinstance(message, ALLOWED_NOTIFICATIONS):
31        return message
32
33    return None

Transforms a received protobuf notification into a notification instance.

The available notifications are listed in fishjam.events module.

Args:

  • binary: The raw binary data received from the webhook.

Returns:

  • AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
class PeerMetadata:
11@_attrs_define
12class PeerMetadata:
13    """Custom metadata set by the peer
14
15    Example:
16        {'name': 'FishjamUser'}
17
18    """
19
20    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
21
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
27
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
35
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
39
40    def __getitem__(self, key: str) -> Any:
41        return self.additional_properties[key]
42
43    def __setitem__(self, key: str, value: Any) -> None:
44        self.additional_properties[key] = value
45
46    def __delitem__(self, key: str) -> None:
47        del self.additional_properties[key]
48
49    def __contains__(self, key: str) -> bool:
50        return key in self.additional_properties

Custom metadata set by the peer

Example:

  • {'name': 'FishjamUser'}
PeerMetadata()
23def __init__(self, ):
24    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class PeerMetadata.

additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
additional_keys: list[str]
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
@dataclass
class PeerOptions:
100@dataclass
101class PeerOptions:
102    """Options specific to a WebRTC Peer.
103
104    Attributes:
105        metadata: Peer metadata.
106        subscribe_mode: Configuration of peer's subscribing policy.
107    """
108
109    metadata: dict[str, Any] | None = None
110    """Peer metadata"""
111    subscribe_mode: Literal["auto", "manual"] = "auto"
112    """Configuration of peer's subscribing policy"""

Options specific to a WebRTC Peer.

Attributes:

  • metadata: Peer metadata.
  • subscribe_mode: Configuration of peer's subscribing policy.
PeerOptions( metadata: dict[str, Any] | None = None, subscribe_mode: Literal['auto', 'manual'] = 'auto')
metadata: dict[str, Any] | None = None

Peer metadata

subscribe_mode: Literal['auto', 'manual'] = 'auto'

Configuration of peer's subscribing policy

@dataclass
class RoomOptions:
68@dataclass
69class RoomOptions:
70    """Description of a room options.
71
72    Attributes:
73        max_peers: Maximum amount of peers allowed into the room.
74        video_codec: Enforces video codec for each peer in the room.
75        webhook_url: URL where Fishjam notifications will be sent.
76        room_type: The use-case of the room. If not provided, this defaults
77            to conference.
78        public: True if livestream viewers can omit specifying a token.
79    """
80
81    max_peers: int | None = None
82    """Maximum amount of peers allowed into the room"""
83    video_codec: Literal["h264", "vp8"] | None = None
84    """Enforces video codec for each peer in the room"""
85    webhook_url: str | None = None
86    """URL where Fishjam notifications will be sent"""
87    room_type: Literal[
88        "conference",
89        "audio_only",
90        "livestream",
91        "full_feature",
92        "broadcaster",
93        "audio_only_livestream",
94    ] = "conference"
95    """The use-case of the room. If not provided, this defaults to conference."""
96    public: bool = False
97    """True if livestream viewers can omit specifying a token."""

Description of a room options.

Attributes:

  • max_peers: Maximum amount of peers allowed into the room.
  • video_codec: Enforces video codec for each peer in the room.
  • webhook_url: URL where Fishjam notifications will be sent.
  • room_type: The use-case of the room. If not provided, this defaults to conference.
  • public: True if livestream viewers can omit specifying a token.
RoomOptions( max_peers: int | None = None, video_codec: Literal['h264', 'vp8'] | None = None, webhook_url: str | None = None, room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference', public: bool = False)
max_peers: int | None = None

Maximum amount of peers allowed into the room

video_codec: Literal['h264', 'vp8'] | None = None

Enforces video codec for each peer in the room

webhook_url: str | None = None

URL where Fishjam notifications will be sent

room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference'

The use-case of the room. If not provided, this defaults to conference.

public: bool = False

True if livestream viewers can omit specifying a token.

@dataclass
class AgentOptions:
128@dataclass
129class AgentOptions:
130    """Options specific to a WebRTC Peer.
131
132    Attributes:
133        output: Configuration for the agent's output options.
134        subscribe_mode: Configuration of peer's subscribing policy.
135    """
136
137    output: AgentOutputOptions = field(default_factory=AgentOutputOptions)
138
139    subscribe_mode: Literal["auto", "manual"] = "auto"

Options specific to a WebRTC Peer.

Attributes:

  • output: Configuration for the agent's output options.
  • subscribe_mode: Configuration of peer's subscribing policy.
AgentOptions( output: AgentOutputOptions = <factory>, subscribe_mode: Literal['auto', 'manual'] = 'auto')
subscribe_mode: Literal['auto', 'manual'] = 'auto'
@dataclass
class AgentOutputOptions:
115@dataclass
116class AgentOutputOptions:
117    """Options of the desired format of audio tracks going from Fishjam to the agent.
118
119    Attributes:
120        audio_format: The format of the audio stream (e.g., 'pcm16').
121        audio_sample_rate: The sample rate of the audio stream.
122    """
123
124    audio_format: Literal["pcm16"] = "pcm16"
125    audio_sample_rate: Literal[16000, 24000] = 16000

Options of the desired format of audio tracks going from Fishjam to the agent.

Attributes:

  • audio_format: The format of the audio stream (e.g., 'pcm16').
  • audio_sample_rate: The sample rate of the audio stream.
AgentOutputOptions( audio_format: Literal['pcm16'] = 'pcm16', audio_sample_rate: Literal[16000, 24000] = 16000)
audio_format: Literal['pcm16'] = 'pcm16'
audio_sample_rate: Literal[16000, 24000] = 16000
@dataclass
class Room:
50@dataclass
51class Room:
52    """Description of the room state.
53
54    Attributes:
55        config: Room configuration.
56        id: Room ID.
57        peers: List of all peers.
58    """
59
60    config: RoomConfig
61    """Room configuration"""
62    id: str
63    """Room ID"""
64    peers: list[Peer]
65    """List of all peers"""

Description of the room state.

Attributes:

  • config: Room configuration.
  • id: Room ID.
  • peers: List of all peers.
Room( config: fishjam._openapi_client.models.room_config.RoomConfig, id: str, peers: list[Peer])
config: fishjam._openapi_client.models.room_config.RoomConfig

Room configuration

id: str

Room ID

peers: list[Peer]

List of all peers

class Peer:
 27@_attrs_define
 28class Peer:
 29    """Describes peer status
 30
 31    Attributes:
 32        id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
 33        metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
 34        status (PeerStatus): Informs about the peer status Example: disconnected.
 35        subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
 36        subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
 37        tracks (list['Track']): List of all peer's tracks
 38        type_ (PeerType): Peer type Example: webrtc.
 39    """
 40
 41    id: str
 42    metadata: Union["PeerMetadata", None]
 43    status: PeerStatus
 44    subscribe_mode: SubscribeMode
 45    subscriptions: "Subscriptions"
 46    tracks: list["Track"]
 47    type_: PeerType
 48    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
 49
 50    def to_dict(self) -> dict[str, Any]:
 51        from ..models.peer_metadata import PeerMetadata
 52
 53        id = self.id
 54
 55        metadata: Union[None, dict[str, Any]]
 56        if isinstance(self.metadata, PeerMetadata):
 57            metadata = self.metadata.to_dict()
 58        else:
 59            metadata = self.metadata
 60
 61        status = self.status.value
 62
 63        subscribe_mode = self.subscribe_mode.value
 64
 65        subscriptions = self.subscriptions.to_dict()
 66
 67        tracks = []
 68        for tracks_item_data in self.tracks:
 69            tracks_item = tracks_item_data.to_dict()
 70            tracks.append(tracks_item)
 71
 72        type_ = self.type_.value
 73
 74        field_dict: dict[str, Any] = {}
 75        field_dict.update(self.additional_properties)
 76        field_dict.update({
 77            "id": id,
 78            "metadata": metadata,
 79            "status": status,
 80            "subscribeMode": subscribe_mode,
 81            "subscriptions": subscriptions,
 82            "tracks": tracks,
 83            "type": type_,
 84        })
 85
 86        return field_dict
 87
 88    @classmethod
 89    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 90        from ..models.peer_metadata import PeerMetadata
 91        from ..models.subscriptions import Subscriptions
 92        from ..models.track import Track
 93
 94        d = dict(src_dict)
 95        id = d.pop("id")
 96
 97        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
 98            if data is None:
 99                return data
100            try:
101                if not isinstance(data, dict):
102                    raise TypeError()
103                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
104
105                return componentsschemas_peer_metadata_type_0
106            except:  # noqa: E722
107                pass
108            return cast(Union["PeerMetadata", None], data)
109
110        metadata = _parse_metadata(d.pop("metadata"))
111
112        status = PeerStatus(d.pop("status"))
113
114        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
115
116        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
117
118        tracks = []
119        _tracks = d.pop("tracks")
120        for tracks_item_data in _tracks:
121            tracks_item = Track.from_dict(tracks_item_data)
122
123            tracks.append(tracks_item)
124
125        type_ = PeerType(d.pop("type"))
126
127        peer = cls(
128            id=id,
129            metadata=metadata,
130            status=status,
131            subscribe_mode=subscribe_mode,
132            subscriptions=subscriptions,
133            tracks=tracks,
134            type_=type_,
135        )
136
137        peer.additional_properties = d
138        return peer
139
140    @property
141    def additional_keys(self) -> list[str]:
142        return list(self.additional_properties.keys())
143
144    def __getitem__(self, key: str) -> Any:
145        return self.additional_properties[key]
146
147    def __setitem__(self, key: str, value: Any) -> None:
148        self.additional_properties[key] = value
149
150    def __delitem__(self, key: str) -> None:
151        del self.additional_properties[key]
152
153    def __contains__(self, key: str) -> bool:
154        return key in self.additional_properties

Describes peer status

Attributes:

  • id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
  • metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
  • status (PeerStatus): Informs about the peer status Example: disconnected.
  • subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
  • subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
  • tracks (list['Track']): List of all peer's tracks
  • type_ (PeerType): Peer type Example: webrtc.
Peer( id: str, metadata: ForwardRef('PeerMetadata') | None, status: fishjam._openapi_client.models.peer_status.PeerStatus, subscribe_mode: fishjam._openapi_client.models.subscribe_mode.SubscribeMode, subscriptions: 'Subscriptions', tracks: list['Track'], type_: fishjam._openapi_client.models.peer_type.PeerType)
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_):
31    self.id = id
32    self.metadata = metadata
33    self.status = status
34    self.subscribe_mode = subscribe_mode
35    self.subscriptions = subscriptions
36    self.tracks = tracks
37    self.type_ = type_
38    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class Peer.

id: str
metadata: ForwardRef('PeerMetadata') | None
status: fishjam._openapi_client.models.peer_status.PeerStatus
subscribe_mode: fishjam._openapi_client.models.subscribe_mode.SubscribeMode
subscriptions: 'Subscriptions'
tracks: list['Track']
type_: fishjam._openapi_client.models.peer_type.PeerType
additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
50    def to_dict(self) -> dict[str, Any]:
51        from ..models.peer_metadata import PeerMetadata
52
53        id = self.id
54
55        metadata: Union[None, dict[str, Any]]
56        if isinstance(self.metadata, PeerMetadata):
57            metadata = self.metadata.to_dict()
58        else:
59            metadata = self.metadata
60
61        status = self.status.value
62
63        subscribe_mode = self.subscribe_mode.value
64
65        subscriptions = self.subscriptions.to_dict()
66
67        tracks = []
68        for tracks_item_data in self.tracks:
69            tracks_item = tracks_item_data.to_dict()
70            tracks.append(tracks_item)
71
72        type_ = self.type_.value
73
74        field_dict: dict[str, Any] = {}
75        field_dict.update(self.additional_properties)
76        field_dict.update({
77            "id": id,
78            "metadata": metadata,
79            "status": status,
80            "subscribeMode": subscribe_mode,
81            "subscriptions": subscriptions,
82            "tracks": tracks,
83            "type": type_,
84        })
85
86        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
 88    @classmethod
 89    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 90        from ..models.peer_metadata import PeerMetadata
 91        from ..models.subscriptions import Subscriptions
 92        from ..models.track import Track
 93
 94        d = dict(src_dict)
 95        id = d.pop("id")
 96
 97        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
 98            if data is None:
 99                return data
100            try:
101                if not isinstance(data, dict):
102                    raise TypeError()
103                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
104
105                return componentsschemas_peer_metadata_type_0
106            except:  # noqa: E722
107                pass
108            return cast(Union["PeerMetadata", None], data)
109
110        metadata = _parse_metadata(d.pop("metadata"))
111
112        status = PeerStatus(d.pop("status"))
113
114        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
115
116        subscriptions = Subscriptions.from_dict(d.pop("subscriptions"))
117
118        tracks = []
119        _tracks = d.pop("tracks")
120        for tracks_item_data in _tracks:
121            tracks_item = Track.from_dict(tracks_item_data)
122
123            tracks.append(tracks_item)
124
125        type_ = PeerType(d.pop("type"))
126
127        peer = cls(
128            id=id,
129            metadata=metadata,
130            status=status,
131            subscribe_mode=subscribe_mode,
132            subscriptions=subscriptions,
133            tracks=tracks,
134            type_=type_,
135        )
136
137        peer.additional_properties = d
138        return peer
additional_keys: list[str]
140    @property
141    def additional_keys(self) -> list[str]:
142        return list(self.additional_properties.keys())