Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient and FishjamNotifier.
FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""".. include:: ../README.md""" 2 3# pylint: disable=locally-disabled, no-name-in-module, import-error 4 5# Exceptions and Server Messages 6 7# API 8# pylint: disable=locally-disabled, no-name-in-module, import-error 9 10# Exceptions and Server Messages 11from fishjam import agent, errors, events, integrations, peer, room, version 12from fishjam._openapi_client.models import PeerMetadata 13 14# API 15from fishjam._webhook_notifier import receive_binary 16from fishjam._ws_notifier import FishjamNotifier 17from fishjam.api._fishjam_client import ( 18 AgentOptions, 19 AgentOutputOptions, 20 FishjamClient, 21 Peer, 22 PeerOptions, 23 PeerOptionsVapi, 24 Room, 25 RoomOptions, 26) 27 28__version__ = version.__version__ 29 30__all__ = [ 31 "FishjamClient", 32 "FishjamNotifier", 33 "receive_binary", 34 "PeerMetadata", 35 "PeerOptions", 36 "PeerOptionsVapi", 37 "RoomOptions", 38 "AgentOptions", 39 "AgentOutputOptions", 40 "Room", 41 "Peer", 42 "events", 43 "errors", 44 "room", 45 "peer", 46 "agent", 47 "integrations", 48] 49 50 51__docformat__ = "restructuredtext"
143class FishjamClient(Client): 144 """Allows for managing rooms.""" 145 146 def __init__( 147 self, 148 fishjam_id: str, 149 management_token: str, 150 ): 151 """Create a FishjamClient instance. 152 153 Args: 154 fishjam_id: The unique identifier for the Fishjam instance. 155 management_token: The token used for authenticating management operations. 156 """ 157 super().__init__(fishjam_id=fishjam_id, management_token=management_token) 158 159 def create_peer( 160 self, 161 room_id: str, 162 options: PeerOptions | None = None, 163 ) -> tuple[Peer, str]: 164 """Creates a peer in the room. 165 166 Args: 167 room_id: The ID of the room where the peer will be created. 168 options: Configuration options for the peer. Defaults to None. 169 170 Returns: 171 A tuple containing: 172 - Peer: The created peer object. 173 - str: The peer token needed to authenticate to Fishjam. 174 """ 175 options = options or PeerOptions() 176 177 peer_metadata = self.__parse_peer_metadata(options.metadata) 178 peer_options = PeerOptionsWebRTC( 179 metadata=peer_metadata, 180 subscribe_mode=SubscribeMode(options.subscribe_mode), 181 ) 182 body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options) 183 184 resp = cast( 185 PeerDetailsResponse, 186 self._request(room_add_peer, room_id=room_id, body=body), 187 ) 188 189 return (resp.data.peer, resp.data.token) 190 191 def create_agent(self, room_id: str, options: AgentOptions | None = None): 192 """Creates an agent in the room. 193 194 Args: 195 room_id: The ID of the room where the agent will be created. 196 options: Configuration options for the agent. Defaults to None. 197 198 Returns: 199 Agent: The created agent instance initialized with peer ID, room ID, token, 200 and Fishjam URL. 201 """ 202 options = options or AgentOptions() 203 body = PeerConfig( 204 type_=PeerType.AGENT, 205 options=PeerOptionsAgent( 206 output=AgentOutput( 207 audio_format=AudioFormat(options.output.audio_format), 208 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 209 ), 210 subscribe_mode=SubscribeMode(options.subscribe_mode), 211 ), 212 ) 213 214 resp = cast( 215 PeerDetailsResponse, 216 self._request(room_add_peer, room_id=room_id, body=body), 217 ) 218 219 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url) 220 221 def create_vapi_agent( 222 self, 223 room_id: str, 224 options: PeerOptionsVapi, 225 ) -> Peer: 226 """Creates a vapi agent in the room. 227 228 Args: 229 room_id: The ID of the room where the vapi agent will be created. 230 options: Configuration options for the vapi peer. 231 232 Returns: 233 - Peer: The created peer object. 234 """ 235 body = PeerConfig(type_=PeerType.VAPI, options=options) 236 237 resp = cast( 238 PeerDetailsResponse, 239 self._request(room_add_peer, room_id=room_id, body=body), 240 ) 241 242 return resp.data.peer 243 244 def create_room(self, options: RoomOptions | None = None) -> Room: 245 """Creates a new room. 246 247 Args: 248 options: Configuration options for the room. Defaults to None. 249 250 Returns: 251 Room: The created Room object. 252 """ 253 options = options or RoomOptions() 254 255 if options.video_codec is None: 256 codec = UNSET 257 else: 258 codec = VideoCodec(options.video_codec) 259 260 config = RoomConfig( 261 max_peers=options.max_peers, 262 video_codec=codec, 263 webhook_url=options.webhook_url, 264 room_type=RoomType(options.room_type), 265 public=options.public, 266 ) 267 268 room = cast( 269 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 270 ).data.room 271 272 return Room(config=room.config, id=room.id, peers=room.peers) 273 274 def get_all_rooms(self) -> list[Room]: 275 """Returns list of all rooms. 276 277 Returns: 278 list[Room]: A list of all available Room objects. 279 """ 280 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 281 282 return [ 283 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 284 ] 285 286 def get_room(self, room_id: str) -> Room: 287 """Returns room with the given id. 288 289 Args: 290 room_id: The ID of the room to retrieve. 291 292 Returns: 293 Room: The Room object corresponding to the given ID. 294 """ 295 room = cast( 296 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 297 ).data 298 299 return Room(config=room.config, id=room.id, peers=room.peers) 300 301 def delete_peer(self, room_id: str, peer_id: str) -> None: 302 """Deletes a peer from a room. 303 304 Args: 305 room_id: The ID of the room the peer belongs to. 306 peer_id: The ID of the peer to delete. 307 """ 308 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 309 310 def delete_room(self, room_id: str) -> None: 311 """Deletes a room. 312 313 Args: 314 room_id: The ID of the room to delete. 315 """ 316 return self._request(room_delete_room, room_id=room_id) 317 318 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 319 """Refreshes a peer token. 320 321 Args: 322 room_id: The ID of the room. 323 peer_id: The ID of the peer whose token needs refreshing. 324 325 Returns: 326 str: The new peer token. 327 """ 328 response = cast( 329 PeerRefreshTokenResponse, 330 self._request(room_refresh_token, id=peer_id, room_id=room_id), 331 ) 332 333 return response.data.token 334 335 def create_livestream_viewer_token(self, room_id: str) -> str: 336 """Generates a viewer token for livestream rooms. 337 338 Args: 339 room_id: The ID of the livestream room. 340 341 Returns: 342 str: The generated viewer token. 343 """ 344 response = cast( 345 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 346 ) 347 348 return response.token 349 350 def create_livestream_streamer_token(self, room_id: str) -> str: 351 """Generates a streamer token for livestream rooms. 352 353 Args: 354 room_id: The ID of the livestream room. 355 356 Returns: 357 str: The generated streamer token. 358 """ 359 response = cast( 360 StreamerToken, 361 self._request(streamer_generate_streamer_token, room_id=room_id), 362 ) 363 364 return response.token 365 366 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 367 """Subscribes a peer to all tracks of another peer. 368 369 Args: 370 room_id: The ID of the room. 371 peer_id: The ID of the subscribing peer. 372 target_peer_id: The ID of the peer to subscribe to. 373 """ 374 self._request( 375 room_subscribe_peer, 376 room_id=room_id, 377 id=peer_id, 378 peer_id=target_peer_id, 379 ) 380 381 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 382 """Subscribes a peer to specific tracks of another peer. 383 384 Args: 385 room_id: The ID of the room. 386 peer_id: The ID of the subscribing peer. 387 track_ids: A list of track IDs to subscribe to. 388 """ 389 self._request( 390 room_subscribe_tracks, 391 room_id=room_id, 392 id=peer_id, 393 body=SubscribeTracksBody(track_ids=track_ids), 394 ) 395 396 def __parse_peer_metadata(self, metadata: dict | None) -> WebRTCMetadata: 397 peer_metadata = WebRTCMetadata() 398 399 if not metadata: 400 return peer_metadata 401 402 for key, value in metadata.items(): 403 peer_metadata.additional_properties[key] = value 404 405 return peer_metadata
Allows for managing rooms.
146 def __init__( 147 self, 148 fishjam_id: str, 149 management_token: str, 150 ): 151 """Create a FishjamClient instance. 152 153 Args: 154 fishjam_id: The unique identifier for the Fishjam instance. 155 management_token: The token used for authenticating management operations. 156 """ 157 super().__init__(fishjam_id=fishjam_id, management_token=management_token)
Create a FishjamClient instance.
Args:
- fishjam_id: The unique identifier for the Fishjam instance.
- management_token: The token used for authenticating management operations.
159 def create_peer( 160 self, 161 room_id: str, 162 options: PeerOptions | None = None, 163 ) -> tuple[Peer, str]: 164 """Creates a peer in the room. 165 166 Args: 167 room_id: The ID of the room where the peer will be created. 168 options: Configuration options for the peer. Defaults to None. 169 170 Returns: 171 A tuple containing: 172 - Peer: The created peer object. 173 - str: The peer token needed to authenticate to Fishjam. 174 """ 175 options = options or PeerOptions() 176 177 peer_metadata = self.__parse_peer_metadata(options.metadata) 178 peer_options = PeerOptionsWebRTC( 179 metadata=peer_metadata, 180 subscribe_mode=SubscribeMode(options.subscribe_mode), 181 ) 182 body = PeerConfig(type_=PeerType.WEBRTC, options=peer_options) 183 184 resp = cast( 185 PeerDetailsResponse, 186 self._request(room_add_peer, room_id=room_id, body=body), 187 ) 188 189 return (resp.data.peer, resp.data.token)
Creates a peer in the room.
Args:
- room_id: The ID of the room where the peer will be created.
- options: Configuration options for the peer. Defaults to None.
Returns:
- A tuple containing:
- Peer: The created peer object.
- str: The peer token needed to authenticate to Fishjam.
191 def create_agent(self, room_id: str, options: AgentOptions | None = None): 192 """Creates an agent in the room. 193 194 Args: 195 room_id: The ID of the room where the agent will be created. 196 options: Configuration options for the agent. Defaults to None. 197 198 Returns: 199 Agent: The created agent instance initialized with peer ID, room ID, token, 200 and Fishjam URL. 201 """ 202 options = options or AgentOptions() 203 body = PeerConfig( 204 type_=PeerType.AGENT, 205 options=PeerOptionsAgent( 206 output=AgentOutput( 207 audio_format=AudioFormat(options.output.audio_format), 208 audio_sample_rate=AudioSampleRate(options.output.audio_sample_rate), 209 ), 210 subscribe_mode=SubscribeMode(options.subscribe_mode), 211 ), 212 ) 213 214 resp = cast( 215 PeerDetailsResponse, 216 self._request(room_add_peer, room_id=room_id, body=body), 217 ) 218 219 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
Creates an agent in the room.
Args:
- room_id: The ID of the room where the agent will be created.
- options: Configuration options for the agent. Defaults to None.
Returns:
- Agent: The created agent instance initialized with peer ID, room ID, token, and Fishjam URL.
221 def create_vapi_agent( 222 self, 223 room_id: str, 224 options: PeerOptionsVapi, 225 ) -> Peer: 226 """Creates a vapi agent in the room. 227 228 Args: 229 room_id: The ID of the room where the vapi agent will be created. 230 options: Configuration options for the vapi peer. 231 232 Returns: 233 - Peer: The created peer object. 234 """ 235 body = PeerConfig(type_=PeerType.VAPI, options=options) 236 237 resp = cast( 238 PeerDetailsResponse, 239 self._request(room_add_peer, room_id=room_id, body=body), 240 ) 241 242 return resp.data.peer
Creates a vapi agent in the room.
Args:
- room_id: The ID of the room where the vapi agent will be created.
- options: Configuration options for the vapi peer.
- - Peer: The created peer object.
244 def create_room(self, options: RoomOptions | None = None) -> Room: 245 """Creates a new room. 246 247 Args: 248 options: Configuration options for the room. Defaults to None. 249 250 Returns: 251 Room: The created Room object. 252 """ 253 options = options or RoomOptions() 254 255 if options.video_codec is None: 256 codec = UNSET 257 else: 258 codec = VideoCodec(options.video_codec) 259 260 config = RoomConfig( 261 max_peers=options.max_peers, 262 video_codec=codec, 263 webhook_url=options.webhook_url, 264 room_type=RoomType(options.room_type), 265 public=options.public, 266 ) 267 268 room = cast( 269 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 270 ).data.room 271 272 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room.
Args:
- options: Configuration options for the room. Defaults to None.
Returns:
- Room: The created Room object.
274 def get_all_rooms(self) -> list[Room]: 275 """Returns list of all rooms. 276 277 Returns: 278 list[Room]: A list of all available Room objects. 279 """ 280 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 281 282 return [ 283 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 284 ]
Returns list of all rooms.
Returns:
- list[Room]: A list of all available Room objects.
286 def get_room(self, room_id: str) -> Room: 287 """Returns room with the given id. 288 289 Args: 290 room_id: The ID of the room to retrieve. 291 292 Returns: 293 Room: The Room object corresponding to the given ID. 294 """ 295 room = cast( 296 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 297 ).data 298 299 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id.
Args:
- room_id: The ID of the room to retrieve.
Returns:
- Room: The Room object corresponding to the given ID.
301 def delete_peer(self, room_id: str, peer_id: str) -> None: 302 """Deletes a peer from a room. 303 304 Args: 305 room_id: The ID of the room the peer belongs to. 306 peer_id: The ID of the peer to delete. 307 """ 308 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes a peer from a room.
Args:
- room_id: The ID of the room the peer belongs to.
- peer_id: The ID of the peer to delete.
310 def delete_room(self, room_id: str) -> None: 311 """Deletes a room. 312 313 Args: 314 room_id: The ID of the room to delete. 315 """ 316 return self._request(room_delete_room, room_id=room_id)
Deletes a room.
Args:
- room_id: The ID of the room to delete.
318 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 319 """Refreshes a peer token. 320 321 Args: 322 room_id: The ID of the room. 323 peer_id: The ID of the peer whose token needs refreshing. 324 325 Returns: 326 str: The new peer token. 327 """ 328 response = cast( 329 PeerRefreshTokenResponse, 330 self._request(room_refresh_token, id=peer_id, room_id=room_id), 331 ) 332 333 return response.data.token
Refreshes a peer token.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the peer whose token needs refreshing.
Returns:
- str: The new peer token.
335 def create_livestream_viewer_token(self, room_id: str) -> str: 336 """Generates a viewer token for livestream rooms. 337 338 Args: 339 room_id: The ID of the livestream room. 340 341 Returns: 342 str: The generated viewer token. 343 """ 344 response = cast( 345 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 346 ) 347 348 return response.token
Generates a viewer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated viewer token.
350 def create_livestream_streamer_token(self, room_id: str) -> str: 351 """Generates a streamer token for livestream rooms. 352 353 Args: 354 room_id: The ID of the livestream room. 355 356 Returns: 357 str: The generated streamer token. 358 """ 359 response = cast( 360 StreamerToken, 361 self._request(streamer_generate_streamer_token, room_id=room_id), 362 ) 363 364 return response.token
Generates a streamer token for livestream rooms.
Args:
- room_id: The ID of the livestream room.
Returns:
- str: The generated streamer token.
366 def subscribe_peer(self, room_id: str, peer_id: str, target_peer_id: str): 367 """Subscribes a peer to all tracks of another peer. 368 369 Args: 370 room_id: The ID of the room. 371 peer_id: The ID of the subscribing peer. 372 target_peer_id: The ID of the peer to subscribe to. 373 """ 374 self._request( 375 room_subscribe_peer, 376 room_id=room_id, 377 id=peer_id, 378 peer_id=target_peer_id, 379 )
Subscribes a peer to all tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- target_peer_id: The ID of the peer to subscribe to.
381 def subscribe_tracks(self, room_id: str, peer_id: str, track_ids: list[str]): 382 """Subscribes a peer to specific tracks of another peer. 383 384 Args: 385 room_id: The ID of the room. 386 peer_id: The ID of the subscribing peer. 387 track_ids: A list of track IDs to subscribe to. 388 """ 389 self._request( 390 room_subscribe_tracks, 391 room_id=room_id, 392 id=peer_id, 393 body=SubscribeTracksBody(track_ids=track_ids), 394 )
Subscribes a peer to specific tracks of another peer.
Args:
- room_id: The ID of the room.
- peer_id: The ID of the subscribing peer.
- track_ids: A list of track IDs to subscribe to.
Inherited Members
- fishjam.api._client.Client
- client
- warnings_shown
33class FishjamNotifier: 34 """Allows for receiving WebSocket messages from Fishjam.""" 35 36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None 51 52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler 63 64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None 92 93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait() 105 106 async def _authenticate(self): 107 if not self._websocket: 108 raise RuntimeError("Websocket is not connected") 109 110 msg = ServerMessage( 111 auth_request=ServerMessageAuthRequest(token=self._management_token) 112 ) 113 await self._websocket.send(bytes(msg)) 114 115 try: 116 message = await self._websocket.recv(decode=False) 117 except ConnectionClosed as exception: 118 if "invalid token" in str(exception): 119 raise RuntimeError("Invalid management token") from exception 120 raise 121 122 message = ServerMessage().parse(message) 123 124 _type, message = betterproto.which_one_of(message, "content") 125 assert isinstance(message, ServerMessageAuthenticated) 126 127 async def _receive_loop(self): 128 if not self._websocket: 129 raise RuntimeError("Websocket is not connected") 130 if not self._notification_handler: 131 raise RuntimeError("Notification handler is not defined") 132 133 while True: 134 message = await self._websocket.recv(decode=False) 135 message = ServerMessage().parse(message) 136 _which, message = betterproto.which_one_of(message, "content") 137 138 if isinstance(message, ALLOWED_NOTIFICATIONS): 139 res = self._notification_handler(message) 140 if inspect.isawaitable(res): 141 await res 142 143 async def _subscribe_event(self, event: ServerMessageEventType): 144 if not self._websocket: 145 raise RuntimeError("Websocket is not connected") 146 147 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 148 149 await self._websocket.send(bytes(request)) 150 message = cast(bytes, await self._websocket.recv()) 151 message = ServerMessage().parse(message) 152 _which, message = betterproto.which_one_of(message, "content") 153 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
36 def __init__( 37 self, 38 fishjam_id: str, 39 management_token: str, 40 ): 41 """Create a FishjamNotifier instance with an ID and management token.""" 42 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 43 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 44 self._management_token: str = management_token 45 self._websocket: client.ClientConnection | None = None 46 self._ready: bool = False 47 48 self._ready_event: asyncio.Event | None = None 49 50 self._notification_handler: NotificationHandler | None = None
Create a FishjamNotifier instance with an ID and management token.
52 def on_server_notification(self, handler: NotificationHandler): 53 """Decorator for defining a handler for Fishjam notifications. 54 55 Args: 56 handler: The function to be registered as the notification handler. 57 58 Returns: 59 NotificationHandler: The original handler function (unmodified). 60 """ 61 self._notification_handler = handler 62 return handler
Decorator for defining a handler for Fishjam notifications.
Args:
- handler: The function to be registered as the notification handler.
Returns:
- NotificationHandler: The original handler function (unmodified).
64 async def connect(self): 65 """Connects to Fishjam and listens for all incoming messages. 66 67 It runs until the connection isn't closed. 68 69 The incoming messages are handled by the functions defined using the 70 `on_server_notification` decorator. 71 72 The handler have to be defined before calling `connect`, 73 otherwise the messages won't be received. 74 """ 75 async with client.connect(self._fishjam_url) as websocket: 76 try: 77 self._websocket = websocket 78 await self._authenticate() 79 80 if self._notification_handler: 81 await self._subscribe_event( 82 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 83 ) 84 85 self._ready = True 86 if self._ready_event: 87 self._ready_event.set() 88 89 await self._receive_loop() 90 finally: 91 self._websocket = None
Connects to Fishjam and listens for all incoming messages.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification decorator.
The handler have to be defined before calling connect,
otherwise the messages won't be received.
93 async def wait_ready(self) -> None: 94 """Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns immediately. 97 """ 98 if self._ready: 99 return 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns immediately.
15def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 16 """Transforms a received protobuf notification into a notification instance. 17 18 The available notifications are listed in `fishjam.events` module. 19 20 Args: 21 binary: The raw binary data received from the webhook. 22 23 Returns: 24 AllowedNotification | None: The parsed notification object, or None if 25 the message type is not supported. 26 """ 27 message = ServerMessage().parse(binary) 28 _which, message = betterproto.which_one_of(message, "content") 29 30 if isinstance(message, ALLOWED_NOTIFICATIONS): 31 return message 32 33 return None
Transforms a received protobuf notification into a notification instance.
The available notifications are listed in fishjam.events module.
Args:
- binary: The raw binary data received from the webhook.
Returns:
- AllowedNotification | None: The parsed notification object, or None if the message type is not supported.
13@_attrs_define 14class PeerMetadata: 15 """Custom metadata set by the peer 16 17 Example: 18 {'name': 'FishjamUser'} 19 20 """ 21 22 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 23 24 def to_dict(self) -> dict[str, Any]: 25 field_dict: dict[str, Any] = {} 26 field_dict.update(self.additional_properties) 27 28 return field_dict 29 30 @classmethod 31 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 32 d = dict(src_dict) 33 peer_metadata = cls() 34 35 peer_metadata.additional_properties = d 36 return peer_metadata 37 38 @property 39 def additional_keys(self) -> list[str]: 40 return list(self.additional_properties.keys()) 41 42 def __getitem__(self, key: str) -> Any: 43 return self.additional_properties[key] 44 45 def __setitem__(self, key: str, value: Any) -> None: 46 self.additional_properties[key] = value 47 48 def __delitem__(self, key: str) -> None: 49 del self.additional_properties[key] 50 51 def __contains__(self, key: str) -> bool: 52 return key in self.additional_properties
Custom metadata set by the peer
Example:
- {'name': 'FishjamUser'}
101@dataclass 102class PeerOptions: 103 """Options specific to a WebRTC Peer. 104 105 Attributes: 106 metadata: Peer metadata. 107 subscribe_mode: Configuration of peer's subscribing policy. 108 """ 109 110 metadata: dict[str, Any] | None = None 111 """Peer metadata""" 112 subscribe_mode: Literal["auto", "manual"] = "auto" 113 """Configuration of peer's subscribing policy"""
Options specific to a WebRTC Peer.
Attributes:
- metadata: Peer metadata.
- subscribe_mode: Configuration of peer's subscribing policy.
15@_attrs_define 16class PeerOptionsVapi: 17 """Options specific to the VAPI peer 18 19 Attributes: 20 api_key (str): VAPI API key 21 call_id (str): VAPI call ID 22 subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy 23 """ 24 25 api_key: str 26 call_id: str 27 subscribe_mode: SubscribeMode | Unset = UNSET 28 29 def to_dict(self) -> dict[str, Any]: 30 api_key = self.api_key 31 32 call_id = self.call_id 33 34 subscribe_mode: str | Unset = UNSET 35 if not isinstance(self.subscribe_mode, Unset): 36 subscribe_mode = self.subscribe_mode.value 37 38 field_dict: dict[str, Any] = {} 39 40 field_dict.update({ 41 "apiKey": api_key, 42 "callId": call_id, 43 }) 44 if subscribe_mode is not UNSET: 45 field_dict["subscribeMode"] = subscribe_mode 46 47 return field_dict 48 49 @classmethod 50 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 51 d = dict(src_dict) 52 api_key = d.pop("apiKey") 53 54 call_id = d.pop("callId") 55 56 _subscribe_mode = d.pop("subscribeMode", UNSET) 57 subscribe_mode: SubscribeMode | Unset 58 if isinstance(_subscribe_mode, Unset): 59 subscribe_mode = UNSET 60 else: 61 subscribe_mode = SubscribeMode(_subscribe_mode) 62 63 peer_options_vapi = cls( 64 api_key=api_key, 65 call_id=call_id, 66 subscribe_mode=subscribe_mode, 67 ) 68 69 return peer_options_vapi
Options specific to the VAPI peer
Attributes:
- api_key (str): VAPI API key
- call_id (str): VAPI call ID
- subscribe_mode (SubscribeMode | Unset): Configuration of peer's subscribing policy
25def __init__(self, api_key, call_id, subscribe_mode=attr_dict['subscribe_mode'].default): 26 self.api_key = api_key 27 self.call_id = call_id 28 self.subscribe_mode = subscribe_mode
Method generated by attrs for class PeerOptionsVapi.
29 def to_dict(self) -> dict[str, Any]: 30 api_key = self.api_key 31 32 call_id = self.call_id 33 34 subscribe_mode: str | Unset = UNSET 35 if not isinstance(self.subscribe_mode, Unset): 36 subscribe_mode = self.subscribe_mode.value 37 38 field_dict: dict[str, Any] = {} 39 40 field_dict.update({ 41 "apiKey": api_key, 42 "callId": call_id, 43 }) 44 if subscribe_mode is not UNSET: 45 field_dict["subscribeMode"] = subscribe_mode 46 47 return field_dict
49 @classmethod 50 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 51 d = dict(src_dict) 52 api_key = d.pop("apiKey") 53 54 call_id = d.pop("callId") 55 56 _subscribe_mode = d.pop("subscribeMode", UNSET) 57 subscribe_mode: SubscribeMode | Unset 58 if isinstance(_subscribe_mode, Unset): 59 subscribe_mode = UNSET 60 else: 61 subscribe_mode = SubscribeMode(_subscribe_mode) 62 63 peer_options_vapi = cls( 64 api_key=api_key, 65 call_id=call_id, 66 subscribe_mode=subscribe_mode, 67 ) 68 69 return peer_options_vapi
69@dataclass 70class RoomOptions: 71 """Description of a room options. 72 73 Attributes: 74 max_peers: Maximum amount of peers allowed into the room. 75 video_codec: Enforces video codec for each peer in the room. 76 webhook_url: URL where Fishjam notifications will be sent. 77 room_type: The use-case of the room. If not provided, this defaults 78 to conference. 79 public: True if livestream viewers can omit specifying a token. 80 """ 81 82 max_peers: int | None = None 83 """Maximum amount of peers allowed into the room""" 84 video_codec: Literal["h264", "vp8"] | None = None 85 """Enforces video codec for each peer in the room""" 86 webhook_url: str | None = None 87 """URL where Fishjam notifications will be sent""" 88 room_type: Literal[ 89 "conference", 90 "audio_only", 91 "livestream", 92 "full_feature", 93 "broadcaster", 94 "audio_only_livestream", 95 ] = "conference" 96 """The use-case of the room. If not provided, this defaults to conference.""" 97 public: bool = False 98 """True if livestream viewers can omit specifying a token."""
Description of a room options.
Attributes:
- max_peers: Maximum amount of peers allowed into the room.
- video_codec: Enforces video codec for each peer in the room.
- webhook_url: URL where Fishjam notifications will be sent.
- room_type: The use-case of the room. If not provided, this defaults to conference.
- public: True if livestream viewers can omit specifying a token.
129@dataclass 130class AgentOptions: 131 """Options specific to an Agent Peer. 132 133 Attributes: 134 output: Configuration for the agent's output options. 135 subscribe_mode: Configuration of peer's subscribing policy. 136 """ 137 138 output: AgentOutputOptions = field(default_factory=AgentOutputOptions) 139 140 subscribe_mode: Literal["auto", "manual"] = "auto"
Options specific to an Agent Peer.
Attributes:
- output: Configuration for the agent's output options.
- subscribe_mode: Configuration of peer's subscribing policy.
116@dataclass 117class AgentOutputOptions: 118 """Options of the desired format of audio tracks going from Fishjam to the agent. 119 120 Attributes: 121 audio_format: The format of the audio stream (e.g., 'pcm16'). 122 audio_sample_rate: The sample rate of the audio stream. 123 """ 124 125 audio_format: Literal["pcm16"] = "pcm16" 126 audio_sample_rate: Literal[16000, 24000] = 16000
Options of the desired format of audio tracks going from Fishjam to the agent.
Attributes:
- audio_format: The format of the audio stream (e.g., 'pcm16').
- audio_sample_rate: The sample rate of the audio stream.
51@dataclass 52class Room: 53 """Description of the room state. 54 55 Attributes: 56 config: Room configuration. 57 id: Room ID. 58 peers: List of all peers. 59 """ 60 61 config: RoomConfig 62 """Room configuration""" 63 id: str 64 """Room ID""" 65 peers: list[Peer] 66 """List of all peers"""
Description of the room state.
Attributes:
- config: Room configuration.
- id: Room ID.
- peers: List of all peers.
23@_attrs_define 24class Peer: 25 """Describes peer status 26 27 Attributes: 28 id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. 29 metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. 30 status (PeerStatus): Informs about the peer status Example: disconnected. 31 subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy 32 subscriptions (Subscriptions): Describes peer's subscriptions in manual mode 33 tracks (list[Track]): List of all peer's tracks 34 type_ (PeerType): Peer type Example: webrtc. 35 """ 36 37 id: str 38 metadata: None | PeerMetadata 39 status: PeerStatus 40 subscribe_mode: SubscribeMode 41 subscriptions: Subscriptions 42 tracks: list[Track] 43 type_: PeerType 44 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 45 46 def to_dict(self) -> dict[str, Any]: 47 from ..models.peer_metadata import PeerMetadata 48 49 id = self.id 50 51 metadata: dict[str, Any] | None 52 if isinstance(self.metadata, PeerMetadata): 53 metadata = self.metadata.to_dict() 54 else: 55 metadata = self.metadata 56 57 status = self.status.value 58 59 subscribe_mode = self.subscribe_mode.value 60 61 subscriptions = self.subscriptions.to_dict() 62 63 tracks = [] 64 for tracks_item_data in self.tracks: 65 tracks_item = tracks_item_data.to_dict() 66 tracks.append(tracks_item) 67 68 type_ = self.type_.value 69 70 field_dict: dict[str, Any] = {} 71 field_dict.update(self.additional_properties) 72 field_dict.update({ 73 "id": id, 74 "metadata": metadata, 75 "status": status, 76 "subscribeMode": subscribe_mode, 77 "subscriptions": subscriptions, 78 "tracks": tracks, 79 "type": type_, 80 }) 81 82 return field_dict 83 84 @classmethod 85 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 86 from ..models.peer_metadata import PeerMetadata 87 from ..models.subscriptions import Subscriptions 88 from ..models.track import Track 89 90 d = dict(src_dict) 91 id = d.pop("id") 92 93 def _parse_metadata(data: object) -> None | PeerMetadata: 94 if data is None: 95 return data 96 try: 97 if not isinstance(data, dict): 98 raise TypeError() 99 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 100 101 return componentsschemas_peer_metadata_type_0 102 except (TypeError, ValueError, AttributeError, KeyError): 103 pass 104 return cast(None | PeerMetadata, data) 105 106 metadata = _parse_metadata(d.pop("metadata")) 107 108 status = PeerStatus(d.pop("status")) 109 110 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 111 112 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 113 114 tracks = [] 115 _tracks = d.pop("tracks") 116 for tracks_item_data in _tracks: 117 tracks_item = Track.from_dict(tracks_item_data) 118 119 tracks.append(tracks_item) 120 121 type_ = PeerType(d.pop("type")) 122 123 peer = cls( 124 id=id, 125 metadata=metadata, 126 status=status, 127 subscribe_mode=subscribe_mode, 128 subscriptions=subscriptions, 129 tracks=tracks, 130 type_=type_, 131 ) 132 133 peer.additional_properties = d 134 return peer 135 136 @property 137 def additional_keys(self) -> list[str]: 138 return list(self.additional_properties.keys()) 139 140 def __getitem__(self, key: str) -> Any: 141 return self.additional_properties[key] 142 143 def __setitem__(self, key: str, value: Any) -> None: 144 self.additional_properties[key] = value 145 146 def __delitem__(self, key: str) -> None: 147 del self.additional_properties[key] 148 149 def __contains__(self, key: str) -> bool: 150 return key in self.additional_properties
Describes peer status
Attributes:
- id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
- metadata (None | PeerMetadata): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
- status (PeerStatus): Informs about the peer status Example: disconnected.
- subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
- subscriptions (Subscriptions): Describes peer's subscriptions in manual mode
- tracks (list[Track]): List of all peer's tracks
- type_ (PeerType): Peer type Example: webrtc.
30def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_): 31 self.id = id 32 self.metadata = metadata 33 self.status = status 34 self.subscribe_mode = subscribe_mode 35 self.subscriptions = subscriptions 36 self.tracks = tracks 37 self.type_ = type_ 38 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.
46 def to_dict(self) -> dict[str, Any]: 47 from ..models.peer_metadata import PeerMetadata 48 49 id = self.id 50 51 metadata: dict[str, Any] | None 52 if isinstance(self.metadata, PeerMetadata): 53 metadata = self.metadata.to_dict() 54 else: 55 metadata = self.metadata 56 57 status = self.status.value 58 59 subscribe_mode = self.subscribe_mode.value 60 61 subscriptions = self.subscriptions.to_dict() 62 63 tracks = [] 64 for tracks_item_data in self.tracks: 65 tracks_item = tracks_item_data.to_dict() 66 tracks.append(tracks_item) 67 68 type_ = self.type_.value 69 70 field_dict: dict[str, Any] = {} 71 field_dict.update(self.additional_properties) 72 field_dict.update({ 73 "id": id, 74 "metadata": metadata, 75 "status": status, 76 "subscribeMode": subscribe_mode, 77 "subscriptions": subscriptions, 78 "tracks": tracks, 79 "type": type_, 80 }) 81 82 return field_dict
84 @classmethod 85 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 86 from ..models.peer_metadata import PeerMetadata 87 from ..models.subscriptions import Subscriptions 88 from ..models.track import Track 89 90 d = dict(src_dict) 91 id = d.pop("id") 92 93 def _parse_metadata(data: object) -> None | PeerMetadata: 94 if data is None: 95 return data 96 try: 97 if not isinstance(data, dict): 98 raise TypeError() 99 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 100 101 return componentsschemas_peer_metadata_type_0 102 except (TypeError, ValueError, AttributeError, KeyError): 103 pass 104 return cast(None | PeerMetadata, data) 105 106 metadata = _parse_metadata(d.pop("metadata")) 107 108 status = PeerStatus(d.pop("status")) 109 110 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 111 112 subscriptions = Subscriptions.from_dict(d.pop("subscriptions")) 113 114 tracks = [] 115 _tracks = d.pop("tracks") 116 for tracks_item_data in _tracks: 117 tracks_item = Track.from_dict(tracks_item_data) 118 119 tracks.append(tracks_item) 120 121 type_ = PeerType(d.pop("type")) 122 123 peer = cls( 124 id=id, 125 metadata=metadata, 126 status=status, 127 subscribe_mode=subscribe_mode, 128 subscriptions=subscriptions, 129 tracks=tracks, 130 type_=type_, 131 ) 132 133 peer.additional_properties = d 134 return peer