Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient and FishjamNotifier.
FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""".. include:: ../README.md""" 2 3# pylint: disable=locally-disabled, no-name-in-module, import-error 4 5# Exceptions and Server Messages 6 7# API 8# pylint: disable=locally-disabled, no-name-in-module, import-error 9 10# Exceptions and Server Messages 11from fishjam import agent, errors, events, integrations, peer, room, version 12from fishjam._openapi_client.models import PeerMetadata 13 14# API 15from fishjam._webhook_notifier import receive_binary 16from fishjam._ws_notifier import FishjamNotifier 17from fishjam.api._fishjam_client import ( 18 AgentOptions, 19 AgentOutputOptions, 20 FishjamClient, 21 Peer, 22 PeerOptions, 23 Room, 24 RoomOptions, 25) 26 27__version__ = version.__version__ 28 29__all__ = [ 30 "FishjamClient", 31 "FishjamNotifier", 32 "receive_binary", 33 "PeerMetadata", 34 "PeerOptions", 35 "RoomOptions", 36 "AgentOptions", 37 "AgentOutputOptions", 38 "Room", 39 "Peer", 40 "events", 41 "errors", 42 "room", 43 "peer", 44 "agent", 45 "integrations", 46] 47 48 49__docformat__ = "restructuredtext"
142class FishjamClient(Client): 143 """Allows for managing rooms.""" 144 145 def __init__( 146 self, 147 fishjam_id: str, 148 management_token: str, 149 ): 150 """Create a FishjamClient instance. 151 152 Args: 153 fishjam_id: The unique identifier for the Fishjam instance. 154 management_token: The token used for authenticating management operations. 155 """ 156 super().__init__(fishjam_id=fishjam_id, management_token=management_token) 157 158 def create_peer( 159 self, 160 room_id: str, 161 options: PeerOptions | None = None, 162 ) -> tuple[Peer, str]: 163 """Creates a peer in the room. 164 165 Args: 166 room_id: The ID of the room where the peer will be created. 167 options: Configuration options for the peer. Defaults to None. 168 169 Returns: 170 A tuple containing: 171 - Peer: The created peer object. 172 - str: The peer token needed to authenticate to Fishjam. 173 """ 174 options = options or PeerOptions() 175 176 peer_metadata = self.__parse_peer_metadata(options.metadata) 177 peer_options = PeerOptionsWebRTC( 178 metadata=peer_metadata, 179 subscribe_mode=SubscribeMode(options.subscribe_mode), 180 ) 181 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 182 183 resp = cast( 184 PeerDetailsResponse, 185 self._request(room_add_peer, room_id=room_id, body=body), 186 ) 187 188 return (resp.data.peer, resp.data.token) 189 190 def create_agent(self, room_id: str, options: AgentOptions | None = None): 191 """Creates an agent in the room. 192 193 Args: 194 room_id: The ID of the room where the agent will be created. 195 options: Configuration options for the agent. Defaults to None. 196 197 Returns: 198 Agent: The created agent instance initialized with peer ID, room ID, token, 199 and Fishjam URL. 200 """ 201 options = options or AgentOptions() 202 body = AddPeerBody( 203 type_=PeerType.AGENT, 204 options=PeerOptionsAgent( 205 output=AgentOutput( 206 audio_format=AudioFormat(options.output.audio_format), 207 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 208 ), 209 subscribe_mode=SubscribeMode(options.subscribe_mode), 210 ), 211 ) 212 213 resp = cast( 214 PeerDetailsResponse, 215 self._request(room_add_peer, room_id=room_id, body=body), 216 ) 217 218 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url) 219 220 def create_room(self, options: RoomOptions | None = None) -> Room: 221 """Creates a new room. 222 223 Args: 224 options: Configuration options for the room. Defaults to None. 225 226 Returns: 227 Room: The created Room object. 228 """ 229 options = options or RoomOptions() 230 231 if options.video_codec is None: 232 codec = UNSET 233 else: 234 codec = VideoCodec(options.video_codec) 235 236 config = RoomConfig( 237 max_peers=options.max_peers, 238 video_codec=codec, 239 webhook_url=options.webhook_url, 240 room_type=RoomType(options.room_type), 241 public=options.public, 242 ) 243 244 room = cast( 245 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 246 ).data.room 247 248 return Room(config=room.config, id=room.id, peers=room.peers) 249 250 def get_all_rooms(self) -> list[Room]: 251 """Returns list of all rooms. 252 253 Returns: 254 list[Room]: A list of all available Room objects. 255 """ 256 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 257 258 return [ 259 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 260 ] 261 262 def get_room(self, room_id: str) -> Room: 263 """Returns room with the given id. 264 265 Args: 266 room_id: The ID of the room to retrieve. 267 268 Returns: 269 Room: The Room object corresponding to the given ID. 270 """ 271 room = cast( 272 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 273 ).data 274 275 return Room(config=room.config, id=room.id, peers=room.peers) 276 277 def delete_peer(self, room_id: str, peer_id: str) -> None: 278 """Deletes a peer from a room. 279 280 Args: 281 room_id: The ID of the room the peer belongs to. 282 peer_id: The ID of the peer to delete. 283 """ 284 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 285 286 def delete_room(self, room_id: str) -> None: 287 """Deletes a room. 288 289 Args: 290 room_id: The ID of the room to delete. 291 """ 292 return self._request(room_delete_room, room_id=room_id) 293 294 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 295 """Refreshes a peer token. 296 297 Args: 298 room_id: The ID of the room. 299 peer_id: The ID of the peer whose token needs refreshing. 300 301 Returns: 302 str: The new peer token. 303 """ 304 response = cast( 305 PeerRefreshTokenResponse, 306 self._request(room_refresh_token, id=peer_id, room_id=room_id), 307 ) 308 309 return response.data.token 310 311 def create_livestream_viewer_token(self, room_id: str) -> str: 312 """Generates a viewer token for livestream rooms. 313 314 Args: 315 room_id: The ID of the livestream room. 316 317 Returns: 318 str: The generated viewer token. 319 """ 320 response = cast( 321 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 322 ) 323 324 return response.token 325 326 def create_livestream_streamer_token(self, room_id: str) -> str: 327 """Generates a streamer token for livestream rooms. 328 329 Args: 330 room_id: The ID of the livestream room. 331 332 Returns: 333 str: The generated streamer token. 334 """ 335 response = cast( 336 StreamerToken, 337 self._request(streamer_generate_streamer_token, room_id=room_id), 338 ) 339 340 return response.token 341 342 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 343 """Subscribes a peer to all tracks of another peer. 344 345 Args: 346 room_id: The ID of the room. 347 peer_id: The ID of the subscribing peer. 348 target_peer_id: The ID of the peer to subscribe to. 349 """ 350 self._request( 351 room_subscribe_peer, 352 room_id=room_id, 353 id=peer_id, 354 peer_id=target_peer_id, 355 ) 356 357 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 358 """Subscribes a peer to specific tracks of another peer. 359 360 Args: 361 room_id: The ID of the room. 362 peer_id: The ID of the subscribing peer. 363 track_ids: A list of track IDs to subscribe to. 364 """ 365 self._request( 366 room_subscribe_tracks, 367 room_id=room_id, 368 id=peer_id, 369 body=SubscribeTracksBody(track_ids=track_ids), 370 ) 371 372 def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata: 373 peer_metadata = WebRTCMetadata() 374 375 if not metadata: 376 return peer_metadata 377 378 for key, value in metadata.items(): 379 peer_metadata.additional_properties[key] = value 380 381 return peer_metadata
Allows for managing rooms.
145 def __init__( 146 self, 147 fishjam_id: str, 148 management_token: str, 149 ): 150 """Create a FishjamClient instance. 151 152 Args: 153 fishjam_id: The unique identifier for the Fishjam instance. 154 management_token: The token used for authenticating management operations. 155 """ 156 super().__init__(fishjam_id=fishjam_id, management_token=management_token)
Create a FishjamClient instance.
Args:
- fishjam_id: The unique identifier for the Fishjam instance.
- management_token: The token used for authenticating management operations.
158 def create_peer( 159 self, 160 room_id: str, 161 options: PeerOptions | None = None, 162 ) -> tuple[Peer, str]: 163 """Creates a peer in the room. 164 165 Args: 166 room_id: The ID of the room where the peer will be created. 167 options: Configuration options for the peer. Defaults to None. 168 169 Returns: 170 A tuple containing: 171 - Peer: The created peer object. 172 - str: The peer token needed to authenticate to Fishjam. 173 """ 174 options = options or PeerOptions() 175 176 peer_metadata = self.__parse_peer_metadata(options.metadata) 177 peer_options = PeerOptionsWebRTC( 178 metadata=peer_metadata, 179 subscribe_mode=SubscribeMode(options.subscribe_mode), 180 ) 181 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 182 183 resp = cast( 184 PeerDetailsResponse, 185 self._request(room_add_peer, room_id=room_id, body=body), 186 ) 187 188 return (resp.data.peer, resp.data.token)
Creates a peer in the room.
Args:
- room_id: The ID of the room where the peer will be created.
- options: Configuration options for the peer. Defaults to None.
Returns:
- A tuple containing:
- Peer: The created peer object.
- str: The peer token needed to authenticate to Fishjam.
190 def create_agent(self, room_id: str, options: AgentOptions | None = None): 191 """Creates an agent in the room. 192 193 Args: 194 room_id: The ID of the room where the agent will be created. 195 options: Configuration options for the agent. Defaults to None. 196 197 Returns: 198 Agent: The created agent instance initialized with peer ID, room ID, token, 199 and Fishjam URL. 200 """ 201 options = options or AgentOptions() 202 body = AddPeerBody( 203 type_=PeerType.AGENT, 204 options=PeerOptionsAgent( 205 output=AgentOutput( 206 audio_format=AudioFormat(options.output.audio_format), 207 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 208 ), 209 subscribe_mode=SubscribeMode(options.subscribe_mode), 210 ), 211 ) 212 213 resp = cast( 214 PeerDetailsResponse, 215 self._request(room_add_peer, room_id=room_id, body=body), 216 ) 217 218 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
Creates an agent in the room.
Args:
- room_id: The ID of the room where the agent will be created.
- options: Configuration options for the agent. Defaults to None.
Returns:
- Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
220 def create_room(self, options: RoomOptions | None = None) -> Room: 221 """Creates a new room. 222 223 Args: 224 options: Configuration options for the room. Defaults to None. 225 226 Returns: 227 Room: The created Room object. 228 """ 229 options = options or RoomOptions() 230 231 if options.video_codec is None: 232 codec = UNSET 233 else: 234 codec = VideoCodec(options.video_codec) 235 236 config = RoomConfig( 237 max_peers=options.max_peers, 238 video_codec=codec, 239 webhook_url=options.webhook_url, 240 room_type=RoomType(options.room_type), 241 public=options.public, 242 ) 243 244 room = cast( 245 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 246 ).data.room 247 248 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room.
Args:
- options: Configuration options for the room. Defaults to None.
Returns:
- Room: The created Room object.
250 def get_all_rooms(self) -> list[Room]: 251 """Returns list of all rooms. 252 253 Returns: 254 list[Room]: A list of all available Room objects. 255 """ 256 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 257 258 return [ 259 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 260 ]
Returns list of all rooms.
Returns:
- list[Room]: A list of all available Room objects.
262 def get_room(self, room_id: str) -> Room: 263 """Returns room with the given id. 264 265 Args: 266 room_id: The ID of the room to retrieve. 267 268 Returns: 269 Room: The Room object corresponding to the given ID. 270 """ 271 room = cast( 272 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 273 ).data 274 275 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id.
Args:
- room_id: The ID of the room to retrieve.
Returns:
- Room: The Room object corresponding to the given ID.
277 def delete_peer(self, room_id: str, peer_id: str) -> None: 278 """Deletes a peer from a room. 279 280 Args: 281 room_id: The ID of the room the peer belongs to. 282 peer_id: The ID of the peer to delete. 283 """ 284 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes a peer from a room.
Args:
- room_id: The ID of the room the peer belongs to.
- peer_id: The ID of the peer to delete.
286 def delete_room(self, room_id: str) -> None: 287 """Deletes a room. 288 289 Args: 290 room_id: The ID of the room to delete. 291 """ 292 return self._request(room_delete_room, room_id=room_id)
Deletes a room.
Args:
- room_id: The ID of the room to delete.
294 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 295 """Refreshes a peer token. 296 297 Args: 298 room_id: The ID of the room. 299 peer_id: The ID of the peer whose token needs refreshing. 300 301 Returns: 302 str: The new peer token. 303 """ 304 response = cast( 305 PeerRefreshTokenResponse, 306 self._request(room_refresh_token, id=peer_id, room_id=room_id), 307 ) 308 309 return response.data.token
Refreshes a peer token.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the peer whose token needs refreshing.
Returns:
- str: The new peer token.
311 def create_livestream_viewer_token(self, room_id: str) -> str: 312 """Generates a viewer token for livestream rooms. 313 314 Args: 315 room_id: The ID of the livestream room. 316 317 Returns: 318 str: The generated viewer token. 319 """ 320 response = cast( 321 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 322 ) 323 324 return response.token
Generates a viewer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated viewer token.
326 def create_livestream_streamer_token(self, room_id: str) -> str: 327 """Generates a streamer token for livestream rooms. 328 329 Args: 330 room_id: The ID of the livestream room. 331 332 Returns: 333 str: The generated streamer token. 334 """ 335 response = cast( 336 StreamerToken, 337 self._request(streamer_generate_streamer_token, room_id=room_id), 338 ) 339 340 return response.token
Generates a streamer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated streamer token.
342 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 343 """Subscribes a peer to all tracks of another peer. 344 345 Args: 346 room_id: The ID of the room. 347 peer_id: The ID of the subscribing peer. 348 target_peer_id: The ID of the peer to subscribe to. 349 """ 350 self._request( 351 room_subscribe_peer, 352 room_id=room_id, 353 id=peer_id, 354 peer_id=target_peer_id, 355 )
Subscribes a peer to all tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- target_peer_id: The ID of the peer to subscribe to.
357 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 358 """Subscribes a peer to specific tracks of another peer. 359 360 Args: 361 room_id: The ID of the room. 362 peer_id: The ID of the subscribing peer. 363 track_ids: A list of track IDs to subscribe to. 364 """ 365 self._request( 366 room_subscribe_tracks, 367 room_id=room_id, 368 id=peer_id, 369 body=SubscribeTracksBody(track_ids=track_ids), 370 )
Subscribes a peer to specific tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- track_ids: A list of track IDs to subscribe to.
Inherited Members
- fishjam.api._client.Client
- client
33class FishjamNotifier: 34 """Allows for receiving WebSocket messages from Fishjam.""" 35 36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None 51 52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler 63 64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None 92 93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait() 105 106 async def _authenticate(self): 107 if not self._websocket: 108 raise RuntimeError("Websocket is not connected") 109 110 msg = ServerMessage( 111 auth_request=ServerMessageAuthRequest(token=self._management_token) 112 ) 113 await self._websocket.send(bytes(msg)) 114 115 try: 116 message = await self._websocket.recv(decode=False) 117 except ConnectionClosed as exception: 118 if "invalid token" in str(exception): 119 raise RuntimeError("Invalid management token") from exception 120 raise 121 122 message = ServerMessage().parse(message) 123 124 _type, message = betterproto.which_one_of(message, "content") 125 assert isinstance(message, ServerMessageAuthenticated) 126 127 async def _receive_loop(self): 128 if not self._websocket: 129 raise RuntimeError("Websocket is not connected") 130 if not self._notification_handler: 131 raise RuntimeError("Notification handler is not defined") 132 133 while True: 134 message = cast(bytes, await self._websocket.recv()) 135 message = ServerMessage().parse(message) 136 _which, message = betterproto.which_one_of(message, "content") 137 138 if isinstance(message, ALLOWED_NOTIFICATIONS): 139 res = self._notification_handler(message) 140 if inspect.isawaitable(res): 141 await res 142 143 async def _subscribe_event(self, event: ServerMessageEventType): 144 if not self._websocket: 145 raise RuntimeError("Websocket is not connected") 146 147 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 148 149 await self._websocket.send(bytes(request)) 150 message = cast(bytes, await self._websocket.recv()) 151 message = ServerMessage().parse(message) 152 _which, message = betterproto.which_one_of(message, "content") 153 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None
Create a FishjamNotifier instance with an ID and management token.
52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler
Decorator for defining a handler for Fishjam notifications.
Args:
- handler: The function to be registered as the notification handler.
Returns:
- NotificationHandler: The original handler function (unmodified).
64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None
Connects to Fishjam and listens for all incoming messages.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification decorator.
The handler have to be defined before calling connect,
otherwise the messages won't be received.
93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns immediately.
15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 16 """Transforms a received protobuf notification into a notification instance. 17 18 The available notifications are listed in `fishjam.events` module. 19 20 Args: 21 binary: The raw binary data received from the webhook. 22 23 Returns: 24 AllowedNotification | None: The parsed notification object, or None if 25 the message type is not supported. 26 """ 27 message = ServerMessage().parse(binary) 28 _which, message = betterproto.which_one_of(message, "content") 29 30 if isinstance(message, ALLOWED_NOTIFICATIONS): 31 return message 32 33 return None
Transforms a received protobuf notification into a notification instance.
The available notifications are listed in fishjam.events module.
Args:
- binary: The raw binary data received from the webhook.
Returns:
- AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
11@_attrs_define 12class PeerMetadata: 13 """Custom metadata set by the peer 14 15 Example: 16 {'name': 'FishjamUser'} 17 18 """ 19 20 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 21 22 def to_dict(self) -> dict[str, Any]: 23 field_dict: dict[str, Any] = {} 24 field_dict.update(self.additional_properties) 25 26 return field_dict 27 28 @classmethod 29 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 30 d = dict(src_dict) 31 peer_metadata = cls() 32 33 peer_metadata.additional_properties = d 34 return peer_metadata 35 36 @property 37 def additional_keys(self) -> list[str]: 38 return list(self.additional_properties.keys()) 39 40 def __getitem__(self, key: str) -> Any: 41 return self.additional_properties[key] 42 43 def __setitem__(self, key: str, value: Any) -> None: 44 self.additional_properties[key] = value 45 46 def __delitem__(self, key: str) -> None: 47 del self.additional_properties[key] 48 49 def __contains__(self, key: str) -> bool: 50 return key in self.additional_properties
Custom metadata set by the peer
Example:
- {'name': 'FishjamUser'}
100@dataclass 101class PeerOptions: 102 """Options specific to a WebRTC Peer. 103 104 Attributes: 105 metadata: Peer metadata. 106 subscribe_mode: Configuration of peer's subscribing policy. 107 """ 108 109 metadata: dict[str, Any] | None = None 110 """Peer metadata""" 111 subscribe_mode: Literal["auto", "manual"] = "auto" 112 """Configuration of peer's subscribing policy"""
Options specific to a WebRTC Peer.
Attributes:
- metadata: Peer metadata.
- subscribe_mode: Configuration of peer's subscribing policy.
68@dataclass 69class RoomOptions: 70 """Description of a room options. 71 72 Attributes: 73 max_peers: Maximum amount of peers allowed into the room. 74 video_codec: Enforces video codec for each peer in the room. 75 webhook_url: URL where Fishjam notifications will be sent. 76 room_type: The use-case of the room. If not provided, this defaults 77 to conference. 78 public: True if livestream viewers can omit specifying a token. 79 """ 80 81 max_peers: int | None = None 82 """Maximum amount of peers allowed into the room""" 83 video_codec: Literal["h264", "vp8"] | None = None 84 """Enforces video codec for each peer in the room""" 85 webhook_url: str | None = None 86 """URL where Fishjam notifications will be sent""" 87 room_type: Literal[ 88 "conference", 89 "audio_only", 90 "livestream", 91 "full_feature", 92 "broadcaster", 93 "audio_only_livestream", 94 ] = "conference" 95 """The use-case of the room. If not provided, this defaults to conference.""" 96 public: bool = False 97 """True if livestream viewers can omit specifying a token."""
Description of a room options.
Attributes:
- max_peers: Maximum amount of peers allowed into the room.
- video_codec: Enforces video codec for each peer in the room.
- webhook_url: URL where Fishjam notifications will be sent.
- room_type: The use-case of the room. If not provided, this defaults to conference.
- public: True if livestream viewers can omit specifying a token.
128@dataclass 129class AgentOptions: 130 """Options specific to a WebRTC Peer. 131 132 Attributes: 133 output: Configuration for the agent's output options. 134 subscribe_mode: Configuration of peer's subscribing policy. 135 """ 136 137 output: AgentOutputOptions = field(default_factory=AgentOutputOptions) 138 139 subscribe_mode: Literal["auto", "manual"] = "auto"
Options specific to a WebRTC Peer.
Attributes:
- output: Configuration for the agent's output options.
- subscribe_mode: Configuration of peer's subscribing policy.
115@dataclass 116class AgentOutputOptions: 117 """Options of the desired format of audio tracks going from Fishjam to the agent. 118 119 Attributes: 120 audio_format: The format of the audio stream (e.g., 'pcm16'). 121 audio_sample_rate: The sample rate of the audio stream. 122 """ 123 124 audio_format: Literal["pcm16"] = "pcm16" 125 audio_sample_rate: Literal[16000, 24000] = 16000
Options of the desired format of audio tracks going from Fishjam to the agent.
Attributes:
- audio_format: The format of the audio stream (e.g., 'pcm16').
- audio_sample_rate: The sample rate of the audio stream.
50@dataclass 51class Room: 52 """Description of the room state. 53 54 Attributes: 55 config: Room configuration. 56 id: Room ID. 57 peers: List of all peers. 58 """ 59 60 config: RoomConfig 61 """Room configuration""" 62 id: str 63 """Room ID""" 64 peers: list[Peer] 65 """List of all peers"""
Description of the room state.
Attributes:
- config: Room configuration.
- id: Room ID.
- peers: List of all peers.
27@_attrs_define 28class Peer: 29 """Describes peer status 30 31 Attributes: 32 id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. 33 metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. 34 status (PeerStatus): Informs about the peer status Example: disconnected. 35 subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy 36 subscriptions (Subscriptions): Describes peer's subscriptions in manual mode 37 tracks (list['Track']): List of all peer's tracks 38 type_ (PeerType): Peer type Example: webrtc. 39 """ 40 41 id: str 42 metadata: Union["PeerMetadata", None] 43 status: PeerStatus 44 subscribe_mode: SubscribeMode 45 subscriptions: "Subscriptions" 46 tracks: list["Track"] 47 type_: PeerType 48 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 49 50 def to_dict(self) -> dict[str, Any]: 51 from ..models.peer_metadata import PeerMetadata 52 53 id = self.id 54 55 metadata: Union[None, dict[str, Any]] 56 if isinstance(self.metadata, PeerMetadata): 57 metadata = self.metadata.to_dict() 58 else: 59 metadata = self.metadata 60 61 status = self.status.value 62 63 subscribe_mode = self.subscribe_mode.value 64 65 subscriptions = self.subscriptions.to_dict() 66 67 tracks = [] 68 for tracks_item_data in self.tracks: 69 tracks_item = tracks_item_data.to_dict() 70 tracks.append(tracks_item) 71 72 type_ = self.type_.value 73 74 field_dict: dict[str, Any] = {} 75 field_dict.update(self.additional_properties) 76 field_dict.update({ 77 "id": id, 78 "metadata": metadata, 79 "status": status, 80 "subscribeMode": subscribe_mode, 81 "subscriptions": subscriptions, 82 "tracks": tracks, 83 "type": type_, 84 }) 85 86 return field_dict 87 88 @classmethod 89 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 90 from ..models.peer_metadata import PeerMetadata 91 from ..models.subscriptions import Subscriptions 92 from ..models.track import Track 93 94 d = dict(src_dict) 95 id = d.pop("id") 96 97 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 98 if data is None: 99 return data 100 try: 101 if not isinstance(data, dict): 102 raise TypeError() 103 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 104 105 return componentsschemas_peer_metadata_type_0 106 except: # noqa: E722 107 pass 108 return cast(Union["PeerMetadata", None], data) 109 110 metadata = _parse_metadata(d.pop("metadata")) 111 112 status = PeerStatus(d.pop("status")) 113 114 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 115 116 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 117 118 tracks = [] 119 _tracks = d.pop("tracks") 120 for tracks_item_data in _tracks: 121 tracks_item = Track.from_dict(tracks_item_data) 122 123 tracks.append(tracks_item) 124 125 type_ = PeerType(d.pop("type")) 126 127 peer = cls( 128 id=id, 129 metadata=metadata, 130 status=status, 131 subscribe_mode=subscribe_mode, 132 subscriptions=subscriptions, 133 tracks=tracks, 134 type_=type_, 135 ) 136 137 peer.additional_properties = d 138 return peer 139 140 @property 141 def additional_keys(self) -> list[str]: 142 return list(self.additional_properties.keys()) 143 144 def __getitem__(self, key: str) -> Any: 145 return self.additional_properties[key] 146 147 def __setitem__(self, key: str, value: Any) -> None: 148 self.additional_properties[key] = value 149 150 def __delitem__(self, key: str) -> None: 151 del self.additional_properties[key] 152 153 def __contains__(self, key: str) -> bool: 154 return key in self.additional_properties
Describes peer status
Attributes:
- id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
- metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
- status (PeerStatus): Informs about the peer status Example: disconnected.
- subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
- subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
- tracks (list['Track']): List of all peer's tracks
- type_ (PeerType): Peer type Example: webrtc.
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_): 31 self.id = id 32 self.metadata = metadata 33 self.status = status 34 self.subscribe_mode = subscribe_mode 35 self.subscriptions = subscriptions 36 self.tracks = tracks 37 self.type_ = type_ 38 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.
50 def to_dict(self) -> dict[str, Any]: 51 from ..models.peer_metadata import PeerMetadata 52 53 id = self.id 54 55 metadata: Union[None, dict[str, Any]] 56 if isinstance(self.metadata, PeerMetadata): 57 metadata = self.metadata.to_dict() 58 else: 59 metadata = self.metadata 60 61 status = self.status.value 62 63 subscribe_mode = self.subscribe_mode.value 64 65 subscriptions = self.subscriptions.to_dict() 66 67 tracks = [] 68 for tracks_item_data in self.tracks: 69 tracks_item = tracks_item_data.to_dict() 70 tracks.append(tracks_item) 71 72 type_ = self.type_.value 73 74 field_dict: dict[str, Any] = {} 75 field_dict.update(self.additional_properties) 76 field_dict.update({ 77 "id": id, 78 "metadata": metadata, 79 "status": status, 80 "subscribeMode": subscribe_mode, 81 "subscriptions": subscriptions, 82 "tracks": tracks, 83 "type": type_, 84 }) 85 86 return field_dict
88 @classmethod 89 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 90 from ..models.peer_metadata import PeerMetadata 91 from ..models.subscriptions import Subscriptions 92 from ..models.track import Track 93 94 d = dict(src_dict) 95 id = d.pop("id") 96 97 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 98 if data is None: 99 return data 100 try: 101 if not isinstance(data, dict): 102 raise TypeError() 103 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 104 105 return componentsschemas_peer_metadata_type_0 106 except: # noqa: E722 107 pass 108 return cast(Union["PeerMetadata", None], data) 109 110 metadata = _parse_metadata(d.pop("metadata")) 111 112 status = PeerStatus(d.pop("status")) 113 114 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 115 116 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 117 118 tracks = [] 119 _tracks = d.pop("tracks") 120 for tracks_item_data in _tracks: 121 tracks_item = Track.from_dict(tracks_item_data) 122 123 tracks.append(tracks_item) 124 125 type_ = PeerType(d.pop("type")) 126 127 peer = cls( 128 id=id, 129 metadata=metadata, 130 status=status, 131 subscribe_mode=subscribe_mode, 132 subscriptions=subscriptions, 133 tracks=tracks, 134 type_=type_, 135 ) 136 137 peer.additional_properties = d 138 return peer