Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient and FishjamNotifier.
FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""".. include:: ../README.md""" 2 3# pylint: disable=locally-disabled, no-name-in-module, import-error 4 5# Exceptions and Server Messages 6 7# API 8# pylint: disable=locally-disabled, no-name-in-module, import-error 9 10# Exceptions and Server Messages 11from fishjam import agent, errors, events, integrations, peer, room, version 12from fishjam._openapi_client.models import PeerMetadata 13 14# API 15from fishjam._webhook_notifier import receive_binary 16from fishjam._ws_notifier import FishjamNotifier 17from fishjam.api._fishjam_client import ( 18 AgentOptions, 19 AgentOutputOptions, 20 FishjamClient, 21 Peer, 22 PeerOptions, 23 PeerOptionsVapi, 24 Room, 25 RoomOptions, 26) 27 28__version__ = version.__version__ 29 30__all__ = [ 31 "FishjamClient", 32 "FishjamNotifier", 33 "receive_binary", 34 "PeerMetadata", 35 "PeerOptions", 36 "PeerOptionsVapi", 37 "RoomOptions", 38 "AgentOptions", 39 "AgentOutputOptions", 40 "Room", 41 "Peer", 42 "events", 43 "errors", 44 "room", 45 "peer", 46 "agent", 47 "integrations", 48] 49 50 51__docformat__ = "restructuredtext"
148class FishjamClient(Client): 149 """Allows for managing rooms.""" 150 151 def __init__( 152 self, 153 fishjam_id: str, 154 management_token: str, 155 ): 156 """Create a FishjamClient instance. 157 158 Args: 159 fishjam_id: The unique identifier for the Fishjam instance. 160 management_token: The token used for authenticating management operations. 161 """ 162 super().__init__(fishjam_id=fishjam_id, management_token=management_token) 163 164 def create_peer( 165 self, 166 room_id: str, 167 options: PeerOptions | None = None, 168 ) -> tuple[Peer, str]: 169 """Creates a peer in the room. 170 171 Args: 172 room_id: The ID of the room where the peer will be created. 173 options: Configuration options for the peer. Defaults to None. 174 175 Returns: 176 A tuple containing: 177 - Peer: The created peer object. 178 - str: The peer token needed to authenticate to Fishjam. 179 """ 180 options = options or PeerOptions() 181 182 peer_metadata = self.__parse_peer_metadata(options.metadata) 183 peer_options = PeerOptionsWebRTC( 184 metadata=peer_metadata, 185 subscribe_mode=SubscribeMode(options.subscribe_mode), 186 ) 187 body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options) 188 189 resp = cast( 190 PeerDetailsResponse, 191 self._request(room_add_peer, room_id=room_id, body=body), 192 ) 193 194 return (resp.data.peer, resp.data.token) 195 196 def create_agent(self, room_id: str, options: AgentOptions | None = None): 197 """Creates an agent in the room. 198 199 Args: 200 room_id: The ID of the room where the agent will be created. 201 options: Configuration options for the agent. Defaults to None. 202 203 Returns: 204 Agent: The created agent instance initialized with peer ID, room ID, token, 205 and Fishjam URL. 206 """ 207 options = options or AgentOptions() 208 body = PeerConfig( 209 type_=PeerType.AGENT, 210 options=PeerOptionsAgent( 211 output=AgentOutput( 212 audio_format=AudioFormat(options.output.audio_format), 213 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 214 ), 215 subscribe_mode=SubscribeMode(options.subscribe_mode), 216 ), 217 ) 218 219 resp = cast( 220 PeerDetailsResponse, 221 self._request(room_add_peer, room_id=room_id, body=body), 222 ) 223 224 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url) 225 226 def create_vapi_agent( 227 self, 228 room_id: str, 229 options: PeerOptionsVapi, 230 ) -> Peer: 231 """Creates a vapi agent in the room. 232 233 Args: 234 room_id: The ID of the room where the vapi agent will be created. 235 options: Configuration options for the vapi peer. 236 237 Returns: 238 - Peer: The created peer object. 239 """ 240 body = PeerConfig(type_=PeerType.VAPI, options=options) 241 242 resp = cast( 243 PeerDetailsResponse, 244 self._request(room_add_peer, room_id=room_id, body=body), 245 ) 246 247 return resp.data.peer 248 249 def create_room(self, options: RoomOptions | None = None) -> Room: 250 """Creates a new room. 251 252 Args: 253 options: Configuration options for the room. Defaults to None. 254 255 Returns: 256 Room: The created Room object. 257 """ 258 options = options or RoomOptions() 259 260 if options.video_codec is None: 261 codec = UNSET 262 else: 263 codec = VideoCodec(options.video_codec) 264 265 config = RoomConfig( 266 max_peers=options.max_peers, 267 video_codec=codec, 268 webhook_url=options.webhook_url, 269 room_type=RoomType(options.room_type), 270 public=options.public, 271 ) 272 273 room = cast( 274 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 275 ).data.room 276 277 return Room(config=room.config, id=room.id, peers=room.peers) 278 279 def get_all_rooms(self) -> list[Room]: 280 """Returns list of all rooms. 281 282 Returns: 283 list[Room]: A list of all available Room objects. 284 """ 285 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 286 287 return [ 288 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 289 ] 290 291 def get_room(self, room_id: str) -> Room: 292 """Returns room with the given id. 293 294 Args: 295 room_id: The ID of the room to retrieve. 296 297 Returns: 298 Room: The Room object corresponding to the given ID. 299 """ 300 room = cast( 301 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 302 ).data 303 304 return Room(config=room.config, id=room.id, peers=room.peers) 305 306 def delete_peer(self, room_id: str, peer_id: str) -> None: 307 """Deletes a peer from a room. 308 309 Args: 310 room_id: The ID of the room the peer belongs to. 311 peer_id: The ID of the peer to delete. 312 """ 313 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 314 315 def delete_room(self, room_id: str) -> None: 316 """Deletes a room. 317 318 Args: 319 room_id: The ID of the room to delete. 320 """ 321 return self._request(room_delete_room, room_id=room_id) 322 323 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 324 """Refreshes a peer token. 325 326 Args: 327 room_id: The ID of the room. 328 peer_id: The ID of the peer whose token needs refreshing. 329 330 Returns: 331 str: The new peer token. 332 """ 333 response = cast( 334 PeerRefreshTokenResponse, 335 self._request(room_refresh_token, id=peer_id, room_id=room_id), 336 ) 337 338 return response.data.token 339 340 def create_livestream_viewer_token(self, room_id: str) -> str: 341 """Generates a viewer token for livestream rooms. 342 343 Args: 344 room_id: The ID of the livestream room. 345 346 Returns: 347 str: The generated viewer token. 348 """ 349 response = cast( 350 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 351 ) 352 353 return response.token 354 355 def create_livestream_streamer_token(self, room_id: str) -> str: 356 """Generates a streamer token for livestream rooms. 357 358 Args: 359 room_id: The ID of the livestream room. 360 361 Returns: 362 str: The generated streamer token. 363 """ 364 response = cast( 365 StreamerToken, 366 self._request(streamer_generate_streamer_token, room_id=room_id), 367 ) 368 369 return response.token 370 371 def create_moq_token( 372 self, 373 publish_path: str | None = None, 374 subscribe_path: str | None = None, 375 ) -> str: 376 """Generates a MoQ token. 377 378 Args: 379 publish_path: Path the token grants publish access to. 380 subscribe_path: Path the token grants subscribe access to. 381 382 Returns: 383 str: The generated token. 384 """ 385 config = MoqTokenConfig( 386 publish_path=publish_path, subscribe_path=subscribe_path 387 ) 388 response = cast( 389 MoqToken, 390 self._request(moq_create_token, body=config), 391 ) 392 393 return response.token 394 395 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 396 """Subscribes a peer to all tracks of another peer. 397 398 Args: 399 room_id: The ID of the room. 400 peer_id: The ID of the subscribing peer. 401 target_peer_id: The ID of the peer to subscribe to. 402 """ 403 self._request( 404 room_subscribe_peer, 405 room_id=room_id, 406 id=peer_id, 407 peer_id=target_peer_id, 408 ) 409 410 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 411 """Subscribes a peer to specific tracks of another peer. 412 413 Args: 414 room_id: The ID of the room. 415 peer_id: The ID of the subscribing peer. 416 track_ids: A list of track IDs to subscribe to. 417 """ 418 self._request( 419 room_subscribe_tracks, 420 room_id=room_id, 421 id=peer_id, 422 body=SubscribeTracksBody(track_ids=track_ids), 423 ) 424 425 def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata: 426 peer_metadata = WebRTCMetadata() 427 428 if not metadata: 429 return peer_metadata 430 431 for key, value in metadata.items(): 432 peer_metadata.additional_properties[key] = value 433 434 return peer_metadata
Allows for managing rooms.
151 def __init__( 152 self, 153 fishjam_id: str, 154 management_token: str, 155 ): 156 """Create a FishjamClient instance. 157 158 Args: 159 fishjam_id: The unique identifier for the Fishjam instance. 160 management_token: The token used for authenticating management operations. 161 """ 162 super().__init__(fishjam_id=fishjam_id, management_token=management_token)
Create a FishjamClient instance.
Args:
- fishjam_id: The unique identifier for the Fishjam instance.
- management_token: The token used for authenticating management operations.
164 def create_peer( 165 self, 166 room_id: str, 167 options: PeerOptions | None = None, 168 ) -> tuple[Peer, str]: 169 """Creates a peer in the room. 170 171 Args: 172 room_id: The ID of the room where the peer will be created. 173 options: Configuration options for the peer. Defaults to None. 174 175 Returns: 176 A tuple containing: 177 - Peer: The created peer object. 178 - str: The peer token needed to authenticate to Fishjam. 179 """ 180 options = options or PeerOptions() 181 182 peer_metadata = self.__parse_peer_metadata(options.metadata) 183 peer_options = PeerOptionsWebRTC( 184 metadata=peer_metadata, 185 subscribe_mode=SubscribeMode(options.subscribe_mode), 186 ) 187 body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options) 188 189 resp = cast( 190 PeerDetailsResponse, 191 self._request(room_add_peer, room_id=room_id, body=body), 192 ) 193 194 return (resp.data.peer, resp.data.token)
Creates a peer in the room.
Args:
- room_id: The ID of the room where the peer will be created.
- options: Configuration options for the peer. Defaults to None.
Returns:
- A tuple containing:
- Peer: The created peer object.
- str: The peer token needed to authenticate to Fishjam.
196 def create_agent(self, room_id: str, options: AgentOptions | None = None): 197 """Creates an agent in the room. 198 199 Args: 200 room_id: The ID of the room where the agent will be created. 201 options: Configuration options for the agent. Defaults to None. 202 203 Returns: 204 Agent: The created agent instance initialized with peer ID, room ID, token, 205 and Fishjam URL. 206 """ 207 options = options or AgentOptions() 208 body = PeerConfig( 209 type_=PeerType.AGENT, 210 options=PeerOptionsAgent( 211 output=AgentOutput( 212 audio_format=AudioFormat(options.output.audio_format), 213 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 214 ), 215 subscribe_mode=SubscribeMode(options.subscribe_mode), 216 ), 217 ) 218 219 resp = cast( 220 PeerDetailsResponse, 221 self._request(room_add_peer, room_id=room_id, body=body), 222 ) 223 224 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
Creates an agent in the room.
Args:
- room_id: The ID of the room where the agent will be created.
- options: Configuration options for the agent. Defaults to None.
Returns:
- Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
226 def create_vapi_agent( 227 self, 228 room_id: str, 229 options: PeerOptionsVapi, 230 ) -> Peer: 231 """Creates a vapi agent in the room. 232 233 Args: 234 room_id: The ID of the room where the vapi agent will be created. 235 options: Configuration options for the vapi peer. 236 237 Returns: 238 - Peer: The created peer object. 239 """ 240 body = PeerConfig(type_=PeerType.VAPI, options=options) 241 242 resp = cast( 243 PeerDetailsResponse, 244 self._request(room_add_peer, room_id=room_id, body=body), 245 ) 246 247 return resp.data.peer
Creates a vapi agent in the room.
Args:
- room_id: The ID of the room where the vapi agent will be created.
- options: Configuration options for the vapi peer.
- - Peer: The created peer object.
249 def create_room(self, options: RoomOptions | None = None) -> Room: 250 """Creates a new room. 251 252 Args: 253 options: Configuration options for the room. Defaults to None. 254 255 Returns: 256 Room: The created Room object. 257 """ 258 options = options or RoomOptions() 259 260 if options.video_codec is None: 261 codec = UNSET 262 else: 263 codec = VideoCodec(options.video_codec) 264 265 config = RoomConfig( 266 max_peers=options.max_peers, 267 video_codec=codec, 268 webhook_url=options.webhook_url, 269 room_type=RoomType(options.room_type), 270 public=options.public, 271 ) 272 273 room = cast( 274 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 275 ).data.room 276 277 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room.
Args:
- options: Configuration options for the room. Defaults to None.
Returns:
- Room: The created Room object.
279 def get_all_rooms(self) -> list[Room]: 280 """Returns list of all rooms. 281 282 Returns: 283 list[Room]: A list of all available Room objects. 284 """ 285 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 286 287 return [ 288 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 289 ]
Returns list of all rooms.
Returns:
- list[Room]: A list of all available Room objects.
291 def get_room(self, room_id: str) -> Room: 292 """Returns room with the given id. 293 294 Args: 295 room_id: The ID of the room to retrieve. 296 297 Returns: 298 Room: The Room object corresponding to the given ID. 299 """ 300 room = cast( 301 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 302 ).data 303 304 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id.
Args:
- room_id: The ID of the room to retrieve.
Returns:
- Room: The Room object corresponding to the given ID.
306 def delete_peer(self, room_id: str, peer_id: str) -> None: 307 """Deletes a peer from a room. 308 309 Args: 310 room_id: The ID of the room the peer belongs to. 311 peer_id: The ID of the peer to delete. 312 """ 313 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes a peer from a room.
Args:
- room_id: The ID of the room the peer belongs to.
- peer_id: The ID of the peer to delete.
315 def delete_room(self, room_id: str) -> None: 316 """Deletes a room. 317 318 Args: 319 room_id: The ID of the room to delete. 320 """ 321 return self._request(room_delete_room, room_id=room_id)
Deletes a room.
Args:
- room_id: The ID of the room to delete.
323 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 324 """Refreshes a peer token. 325 326 Args: 327 room_id: The ID of the room. 328 peer_id: The ID of the peer whose token needs refreshing. 329 330 Returns: 331 str: The new peer token. 332 """ 333 response = cast( 334 PeerRefreshTokenResponse, 335 self._request(room_refresh_token, id=peer_id, room_id=room_id), 336 ) 337 338 return response.data.token
Refreshes a peer token.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the peer whose token needs refreshing.
Returns:
- str: The new peer token.
340 def create_livestream_viewer_token(self, room_id: str) -> str: 341 """Generates a viewer token for livestream rooms. 342 343 Args: 344 room_id: The ID of the livestream room. 345 346 Returns: 347 str: The generated viewer token. 348 """ 349 response = cast( 350 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 351 ) 352 353 return response.token
Generates a viewer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated viewer token.
355 def create_livestream_streamer_token(self, room_id: str) -> str: 356 """Generates a streamer token for livestream rooms. 357 358 Args: 359 room_id: The ID of the livestream room. 360 361 Returns: 362 str: The generated streamer token. 363 """ 364 response = cast( 365 StreamerToken, 366 self._request(streamer_generate_streamer_token, room_id=room_id), 367 ) 368 369 return response.token
Generates a streamer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated streamer token.
371 def create_moq_token( 372 self, 373 publish_path: str | None = None, 374 subscribe_path: str | None = None, 375 ) -> str: 376 """Generates a MoQ token. 377 378 Args: 379 publish_path: Path the token grants publish access to. 380 subscribe_path: Path the token grants subscribe access to. 381 382 Returns: 383 str: The generated token. 384 """ 385 config = MoqTokenConfig( 386 publish_path=publish_path, subscribe_path=subscribe_path 387 ) 388 response = cast( 389 MoqToken, 390 self._request(moq_create_token, body=config), 391 ) 392 393 return response.token
Generates a MoQ token.
Args:
- publish_path: Path the token grants publish access to.
- subscribe_path: Path the token grants subscribe access to.
Returns:
- str: The generated token.
395 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 396 """Subscribes a peer to all tracks of another peer. 397 398 Args: 399 room_id: The ID of the room. 400 peer_id: The ID of the subscribing peer. 401 target_peer_id: The ID of the peer to subscribe to. 402 """ 403 self._request( 404 room_subscribe_peer, 405 room_id=room_id, 406 id=peer_id, 407 peer_id=target_peer_id, 408 )
Subscribes a peer to all tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- target_peer_id: The ID of the peer to subscribe to.
410 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 411 """Subscribes a peer to specific tracks of another peer. 412 413 Args: 414 room_id: The ID of the room. 415 peer_id: The ID of the subscribing peer. 416 track_ids: A list of track IDs to subscribe to. 417 """ 418 self._request( 419 room_subscribe_tracks, 420 room_id=room_id, 421 id=peer_id, 422 body=SubscribeTracksBody(track_ids=track_ids), 423 )
Subscribes a peer to specific tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- track_ids: A list of track IDs to subscribe to.
Inherited Members
- fishjam.api._client.Client
- client
- warnings_shown
33class FishjamNotifier: 34 """Allows for receiving WebSocket messages from Fishjam.""" 35 36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None 51 52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler 63 64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None 92 93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait() 105 106 async def _authenticate(self): 107 if not self._websocket: 108 raise RuntimeError("Websocket is not connected") 109 110 msg = ServerMessage( 111 auth_request=ServerMessageAuthRequest(token=self._management_token) 112 ) 113 await self._websocket.send(bytes(msg)) 114 115 try: 116 message = await self._websocket.recv(decode=False) 117 except ConnectionClosed as exception: 118 if "invalid token" in str(exception): 119 raise RuntimeError("Invalid management token") from exception 120 raise 121 122 message = ServerMessage().parse(message) 123 124 _type, message = betterproto.which_one_of(message, "content") 125 assert isinstance(message, ServerMessageAuthenticated) 126 127 async def _receive_loop(self): 128 if not self._websocket: 129 raise RuntimeError("Websocket is not connected") 130 if not self._notification_handler: 131 raise RuntimeError("Notification handler is not defined") 132 133 while True: 134 message = await self._websocket.recv(decode=False) 135 message = ServerMessage().parse(message) 136 _which, message = betterproto.which_one_of(message, "content") 137 138 if isinstance(message, ALLOWED_NOTIFICATIONS): 139 res = self._notification_handler(message) 140 if inspect.isawaitable(res): 141 await res 142 143 async def _subscribe_event(self, event: ServerMessageEventType): 144 if not self._websocket: 145 raise RuntimeError("Websocket is not connected") 146 147 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 148 149 await self._websocket.send(bytes(request)) 150 message = cast(bytes, await self._websocket.recv()) 151 message = ServerMessage().parse(message) 152 _which, message = betterproto.which_one_of(message, "content") 153 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None
Create a FishjamNotifier instance with an ID and management token.
52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler
Decorator for defining a handler for Fishjam notifications.
Args:
- handler: The function to be registered as the notification handler.
Returns:
- NotificationHandler: The original handler function (unmodified).
64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None
Connects to Fishjam and listens for all incoming messages.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification decorator.
The handler have to be defined before calling connect,
otherwise the messages won't be received.
93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns immediately.
15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 16 """Transforms a received protobuf notification into a notification instance. 17 18 The available notifications are listed in `fishjam.events` module. 19 20 Args: 21 binary: The raw binary data received from the webhook. 22 23 Returns: 24 AllowedNotification | None: The parsed notification object, or None if 25 the message type is not supported. 26 """ 27 message = ServerMessage().parse(binary) 28 _which, message = betterproto.which_one_of(message, "content") 29 30 if isinstance(message, ALLOWED_NOTIFICATIONS): 31 return message 32 33 return None
Transforms a received protobuf notification into a notification instance.
The available notifications are listed in fishjam.events module.
Args:
- binary: The raw binary data received from the webhook.
Returns:
- AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
13@_attrs_define 14class PeerMetadata: 15 """Custom metadata set by the peer 16 17 Example: 18 {'name': 'FishjamUser'} 19 20 """ 21 22 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 23 24 def to_dict(self) -> dict[str, Any]: 25 field_dict: dict[str, Any] = {} 26 field_dict.update(self.additional_properties) 27 28 return field_dict 29 30 @classmethod 31 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 32 d = dict(src_dict) 33 peer_metadata = cls() 34 35 peer_metadata.additional_properties = d 36 return peer_metadata 37 38 @property 39 def additional_keys(self) -> list[str]: 40 return list(self.additional_properties.keys()) 41 42 def __getitem__(self, key: str) -> Any: 43 return self.additional_properties[key] 44 45 def __setitem__(self, key: str, value: Any) -> None: 46 self.additional_properties[key] = value 47 48 def __delitem__(self, key: str) -> None: 49 del self.additional_properties[key] 50 51 def __contains__(self, key: str) -> bool: 52 return key in self.additional_properties
Custom metadata set by the peer
Example:
- {'name': 'FishjamUser'}
106@dataclass 107class PeerOptions: 108 """Options specific to a WebRTC Peer. 109 110 Attributes: 111 metadata: Peer metadata. 112 subscribe_mode: Configuration of peer's subscribing policy. 113 """ 114 115 metadata: dict[str, Any] | None = None 116 """Peer metadata""" 117 subscribe_mode: Literal["auto", "manual"] = "auto" 118 """Configuration of peer's subscribing policy"""
Options specific to a WebRTC Peer.
Attributes:
- metadata: Peer metadata.
- subscribe_mode: Configuration of peer's subscribing policy.
15@_attrs_define 16class PeerOptionsVapi: 17 """Options specific to the VAPI peer 18 19 Attributes: 20 api_key (str): VAPI API key 21 call_id (str): VAPI call ID 22 subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy 23 """ 24 25 api_key: str 26 call_id: str 27 subscribe_mode: SubscribeMode | Unset = UNSET 28 29 def to_dict(self) -> dict[str, Any]: 30 api_key = self.api_key 31 32 call_id = self.call_id 33 34 subscribe_mode: str | Unset = UNSET 35 if not isinstance(self.subscribe_mode, Unset): 36 subscribe_mode = self.subscribe_mode.value 37 38 field_dict: dict[str, Any] = {} 39 40 field_dict.update({ 41 "apiKey": api_key, 42 "callId": call_id, 43 }) 44 if subscribe_mode is not UNSET: 45 field_dict["subscribeMode"] = subscribe_mode 46 47 return field_dict 48 49 @classmethod 50 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 51 d = dict(src_dict) 52 api_key = d.pop("apiKey") 53 54 call_id = d.pop("callId") 55 56 _subscribe_mode = d.pop("subscribeMode", UNSET) 57 subscribe_mode: SubscribeMode | Unset 58 if isinstance(_subscribe_mode, Unset): 59 subscribe_mode = UNSET 60 else: 61 subscribe_mode = SubscribeMode(_subscribe_mode) 62 63 peer_options_vapi = cls( 64 api_key=api_key, 65 call_id=call_id, 66 subscribe_mode=subscribe_mode, 67 ) 68 69 return peer_options_vapi
Options specific to the VAPI peer
Attributes:
- api_key (str): VAPI API key
- call_id (str): VAPI call ID
- subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
25def __init__(self, api_key, call_id, subscribe_mode=attr_dict['subscribe_mode'].default): 26 self.api_key = api_key 27 self.call_id = call_id 28 self.subscribe_mode = subscribe_mode
Method generated by attrs for class PeerOptionsVapi.
29 def to_dict(self) -> dict[str, Any]: 30 api_key = self.api_key 31 32 call_id = self.call_id 33 34 subscribe_mode: str | Unset = UNSET 35 if not isinstance(self.subscribe_mode, Unset): 36 subscribe_mode = self.subscribe_mode.value 37 38 field_dict: dict[str, Any] = {} 39 40 field_dict.update({ 41 "apiKey": api_key, 42 "callId": call_id, 43 }) 44 if subscribe_mode is not UNSET: 45 field_dict["subscribeMode"] = subscribe_mode 46 47 return field_dict
49 @classmethod 50 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 51 d = dict(src_dict) 52 api_key = d.pop("apiKey") 53 54 call_id = d.pop("callId") 55 56 _subscribe_mode = d.pop("subscribeMode", UNSET) 57 subscribe_mode: SubscribeMode | Unset 58 if isinstance(_subscribe_mode, Unset): 59 subscribe_mode = UNSET 60 else: 61 subscribe_mode = SubscribeMode(_subscribe_mode) 62 63 peer_options_vapi = cls( 64 api_key=api_key, 65 call_id=call_id, 66 subscribe_mode=subscribe_mode, 67 ) 68 69 return peer_options_vapi
74@dataclass 75class RoomOptions: 76 """Description of a room options. 77 78 Attributes: 79 max_peers: Maximum amount of peers allowed into the room. 80 video_codec: Enforces video codec for each peer in the room. 81 webhook_url: URL where Fishjam notifications will be sent. 82 room_type: The use-case of the room. If not provided, this defaults 83 to conference. 84 public: True if livestream viewers can omit specifying a token. 85 """ 86 87 max_peers: int | None = None 88 """Maximum amount of peers allowed into the room""" 89 video_codec: Literal["h264", "vp8"] | None = None 90 """Enforces video codec for each peer in the room""" 91 webhook_url: str | None = None 92 """URL where Fishjam notifications will be sent""" 93 room_type: Literal[ 94 "conference", 95 "audio_only", 96 "livestream", 97 "full_feature", 98 "broadcaster", 99 "audio_only_livestream", 100 ] = "conference" 101 """The use-case of the room. If not provided, this defaults to conference.""" 102 public: bool = False 103 """True if livestream viewers can omit specifying a token."""
Description of a room options.
Attributes:
- max_peers: Maximum amount of peers allowed into the room.
- video_codec: Enforces video codec for each peer in the room.
- webhook_url: URL where Fishjam notifications will be sent.
- room_type: The use-case of the room. If not provided, this defaults to conference.
- public: True if livestream viewers can omit specifying a token.
134@dataclass 135class AgentOptions: 136 """Options specific to an Agent Peer. 137 138 Attributes: 139 output: Configuration for the agent's output options. 140 subscribe_mode: Configuration of peer's subscribing policy. 141 """ 142 143 output: AgentOutputOptions = field(default_factory=AgentOutputOptions) 144 145 subscribe_mode: Literal["auto", "manual"] = "auto"
Options specific to an Agent Peer.
Attributes:
- output: Configuration for the agent's output options.
- subscribe_mode: Configuration of peer's subscribing policy.
121@dataclass 122class AgentOutputOptions: 123 """Options of the desired format of audio tracks going from Fishjam to the agent. 124 125 Attributes: 126 audio_format: The format of the audio stream (e.g., 'pcm16'). 127 audio_sample_rate: The sample rate of the audio stream. 128 """ 129 130 audio_format: Literal["pcm16"] = "pcm16" 131 audio_sample_rate: Literal[16000, 24000] = 16000
Options of the desired format of audio tracks going from Fishjam to the agent.
Attributes:
- audio_format: The format of the audio stream (e.g., 'pcm16').
- audio_sample_rate: The sample rate of the audio stream.
56@dataclass 57class Room: 58 """Description of the room state. 59 60 Attributes: 61 config: Room configuration. 62 id: Room ID. 63 peers: List of all peers. 64 """ 65 66 config: RoomConfig 67 """Room configuration""" 68 id: str 69 """Room ID""" 70 peers: list[Peer] 71 """List of all peers"""
Description of the room state.
Attributes:
- config: Room configuration.
- id: Room ID.
- peers: List of all peers.
23@_attrs_define 24class Peer: 25 """Describes peer status 26 27 Attributes: 28 id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. 29 metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. 30 status (PeerStatus): Informs about the peer status Example: disconnected. 31 subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy 32 subscriptions (Subscriptions): Describes peer's subscriptions in manual mode 33 tracks (list[Track]): List of all peer's tracks 34 type_ (PeerType): Peer type Example: webrtc. 35 """ 36 37 id: str 38 metadata: None | PeerMetadata 39 status: PeerStatus 40 subscribe_mode: SubscribeMode 41 subscriptions: Subscriptions 42 tracks: list[Track] 43 type_: PeerType 44 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 45 46 def to_dict(self) -> dict[str, Any]: 47 from ..models.peer_metadata import PeerMetadata 48 49 id = self.id 50 51 metadata: dict[str, Any] | None 52 if isinstance(self.metadata, PeerMetadata): 53 metadata = self.metadata.to_dict() 54 else: 55 metadata = self.metadata 56 57 status = self.status.value 58 59 subscribe_mode = self.subscribe_mode.value 60 61 subscriptions = self.subscriptions.to_dict() 62 63 tracks = [] 64 for tracks_item_data in self.tracks: 65 tracks_item = tracks_item_data.to_dict() 66 tracks.append(tracks_item) 67 68 type_ = self.type_.value 69 70 field_dict: dict[str, Any] = {} 71 field_dict.update(self.additional_properties) 72 field_dict.update({ 73 "id": id, 74 "metadata": metadata, 75 "status": status, 76 "subscribeMode": subscribe_mode, 77 "subscriptions": subscriptions, 78 "tracks": tracks, 79 "type": type_, 80 }) 81 82 return field_dict 83 84 @classmethod 85 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 86 from ..models.peer_metadata import PeerMetadata 87 from ..models.subscriptions import Subscriptions 88 from ..models.track import Track 89 90 d = dict(src_dict) 91 id = d.pop("id") 92 93 def _parse_metadata(data: object) -> None | PeerMetadata: 94 if data is None: 95 return data 96 try: 97 if not isinstance(data, dict): 98 raise TypeError() 99 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 100 101 return componentsschemas_peer_metadata_type_0 102 except (TypeError, ValueError, AttributeError, KeyError): 103 pass 104 return cast(None | PeerMetadata, data) 105 106 metadata = _parse_metadata(d.pop("metadata")) 107 108 status = PeerStatus(d.pop("status")) 109 110 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 111 112 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 113 114 tracks = [] 115 _tracks = d.pop("tracks") 116 for tracks_item_data in _tracks: 117 tracks_item = Track.from_dict(tracks_item_data) 118 119 tracks.append(tracks_item) 120 121 type_ = PeerType(d.pop("type")) 122 123 peer = cls( 124 id=id, 125 metadata=metadata, 126 status=status, 127 subscribe_mode=subscribe_mode, 128 subscriptions=subscriptions, 129 tracks=tracks, 130 type_=type_, 131 ) 132 133 peer.additional_properties = d 134 return peer 135 136 @property 137 def additional_keys(self) -> list[str]: 138 return list(self.additional_properties.keys()) 139 140 def __getitem__(self, key: str) -> Any: 141 return self.additional_properties[key] 142 143 def __setitem__(self, key: str, value: Any) -> None: 144 self.additional_properties[key] = value 145 146 def __delitem__(self, key: str) -> None: 147 del self.additional_properties[key] 148 149 def __contains__(self, key: str) -> bool: 150 return key in self.additional_properties
Describes peer status
Attributes:
- id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
- metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
- status (PeerStatus): Informs about the peer status Example: disconnected.
- subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
- subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
- tracks (list[Track]): List of all peer's tracks
- type_ (PeerType): Peer type Example: webrtc.
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_): 31 self.id = id 32 self.metadata = metadata 33 self.status = status 34 self.subscribe_mode = subscribe_mode 35 self.subscriptions = subscriptions 36 self.tracks = tracks 37 self.type_ = type_ 38 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.
46 def to_dict(self) -> dict[str, Any]: 47 from ..models.peer_metadata import PeerMetadata 48 49 id = self.id 50 51 metadata: dict[str, Any] | None 52 if isinstance(self.metadata, PeerMetadata): 53 metadata = self.metadata.to_dict() 54 else: 55 metadata = self.metadata 56 57 status = self.status.value 58 59 subscribe_mode = self.subscribe_mode.value 60 61 subscriptions = self.subscriptions.to_dict() 62 63 tracks = [] 64 for tracks_item_data in self.tracks: 65 tracks_item = tracks_item_data.to_dict() 66 tracks.append(tracks_item) 67 68 type_ = self.type_.value 69 70 field_dict: dict[str, Any] = {} 71 field_dict.update(self.additional_properties) 72 field_dict.update({ 73 "id": id, 74 "metadata": metadata, 75 "status": status, 76 "subscribeMode": subscribe_mode, 77 "subscriptions": subscriptions, 78 "tracks": tracks, 79 "type": type_, 80 }) 81 82 return field_dict
84 @classmethod 85 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 86 from ..models.peer_metadata import PeerMetadata 87 from ..models.subscriptions import Subscriptions 88 from ..models.track import Track 89 90 d = dict(src_dict) 91 id = d.pop("id") 92 93 def _parse_metadata(data: object) -> None | PeerMetadata: 94 if data is None: 95 return data 96 try: 97 if not isinstance(data, dict): 98 raise TypeError() 99 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 100 101 return componentsschemas_peer_metadata_type_0 102 except (TypeError, ValueError, AttributeError, KeyError): 103 pass 104 return cast(None | PeerMetadata, data) 105 106 metadata = _parse_metadata(d.pop("metadata")) 107 108 status = PeerStatus(d.pop("status")) 109 110 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 111 112 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 113 114 tracks = [] 115 _tracks = d.pop("tracks") 116 for tracks_item_data in _tracks: 117 tracks_item = Track.from_dict(tracks_item_data) 118 119 tracks.append(tracks_item) 120 121 type_ = PeerType(d.pop("type")) 122 123 peer = cls( 124 id=id, 125 metadata=metadata, 126 status=status, 127 subscribe_mode=subscribe_mode, 128 subscriptions=subscriptions, 129 tracks=tracks, 130 type_=type_, 131 ) 132 133 peer.additional_properties = d 134 return peer