Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient
and FishjamNotifier
.
FishjamClient
wraps http REST api calls, while FishjamNotifier
is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient
instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_url="localhost:5002", management_token="development")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient
may raise one of the exceptions deriving from fishjam.errors.HTTPError
. They are defined in fishjam.errors
.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier
instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_url='localhost:5002', management_token='development')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
Local development
The project is managed using poetry. Make sure to have it installed first.
Then install the dependencies
poetry install
Generating protobuf
To generate Python protobuf definitions run
poetry run ./compile_proto.sh
Testing
You can test the SDK by running
poetry run ci_test
In local development you can use
poetry run local_test
Format & Lint
You can format code by running
poetry run format
You can check linter by running
poetry run lint
Documentation
Documentation is generated via openapi-python-client.
To update documentation you need to:
- Go to https://github.com/fishjam-cloud/fishjam/blob/main/openapi.yaml and open the raw file.
- Copy the URL.
- Run
poetry run update_client <copied-url>
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""" 2 .. include:: ../README.md 3""" 4 5# pylint: disable=locally-disabled, no-name-in-module, import-error 6 7# Exceptions and Server Messages 8 9# API 10# pylint: disable=locally-disabled, no-name-in-module, import-error 11 12# Exceptions and Server Messages 13from fishjam import errors, events, peer, room 14 15# API 16from fishjam._webhook_notifier import receive_binary 17from fishjam._ws_notifier import FishjamNotifier 18from fishjam.api._fishjam_client import ( 19 FishjamClient, 20 Peer, 21 PeerOptions, 22 Room, 23 RoomOptions, 24) 25 26__all__ = [ 27 "FishjamClient", 28 "FishjamNotifier", 29 "receive_binary", 30 "PeerOptions", 31 "RoomOptions", 32 "Room", 33 "Peer", 34 "events", 35 "errors", 36 "room", 37 "peer", 38] 39 40__docformat__ = "restructuredtext"
83class FishjamClient(Client): 84 """Allows for managing rooms""" 85 86 def __init__( 87 self, 88 fishjam_id: str, 89 management_token: str, 90 *, 91 fishjam_url: str | None = None, 92 ): 93 """ 94 Create a FishjamClient instance, providing the fishjam id and management token. 95 """ 96 super().__init__( 97 fishjam_id=fishjam_id, 98 management_token=management_token, 99 fishjam_url=fishjam_url, 100 ) 101 102 def create_peer( 103 self, room_id: str, options: PeerOptions | None = None 104 ) -> Tuple[Peer, str]: 105 """ 106 Creates peer in the room 107 108 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 109 to authenticate to Fishjam. 110 111 The possible options to pass for peer are `PeerOptions`. 112 """ 113 options = options or PeerOptions() 114 115 peer_type = "webrtc" 116 peer_metadata = self.__parse_peer_metadata(options.metadata) 117 peer_options = PeerOptionsWebRTC( 118 enable_simulcast=options.enable_simulcast, metadata=peer_metadata 119 ) 120 json_body = AddPeerJsonBody(type=peer_type, options=peer_options) 121 122 resp = cast( 123 PeerDetailsResponse, 124 self._request(room_add_peer, room_id=room_id, json_body=json_body), 125 ) 126 127 return (resp.data.peer, resp.data.token) 128 129 def create_room(self, options: RoomOptions | None = None) -> Room: 130 """ 131 Creates a new room 132 Returns the created `Room` 133 """ 134 options = options or RoomOptions() 135 136 codec = None 137 if options.video_codec: 138 codec = RoomConfigVideoCodec(options.video_codec) 139 140 config = RoomConfig( 141 max_peers=options.max_peers, 142 video_codec=codec, 143 webhook_url=options.webhook_url, 144 room_type=RoomConfigRoomType(options.room_type), 145 public=options.public, 146 ) 147 148 room = cast( 149 RoomCreateDetailsResponse, self._request(room_create_room, json_body=config) 150 ).data.room 151 152 return Room(config=room.config, id=room.id, peers=room.peers) 153 154 def get_all_rooms(self) -> List[Room]: 155 """Returns list of all rooms""" 156 157 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 158 159 return [ 160 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 161 ] 162 163 def get_room(self, room_id: str) -> Room: 164 """Returns room with the given id""" 165 166 room = cast( 167 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 168 ).data 169 170 return Room(config=room.config, id=room.id, peers=room.peers) 171 172 def delete_peer(self, room_id: str, peer_id: str) -> None: 173 """Deletes peer""" 174 175 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 176 177 def delete_room(self, room_id: str) -> None: 178 """Deletes a room""" 179 180 return self._request(room_delete_room, room_id=room_id) 181 182 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 183 """Refreshes peer token""" 184 185 response = cast( 186 PeerRefreshTokenResponse, 187 self._request(room_refresh_token, id=peer_id, room_id=room_id), 188 ) 189 190 return response.data.token 191 192 def create_livestream_viewer_token(self, room_id: str) -> str: 193 """Generates viewer token for livestream rooms""" 194 195 response = cast( 196 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 197 ) 198 199 return response.token 200 201 def create_livestream_streamer_token(self, room_id: str) -> str: 202 """Generates streamer token for livestream rooms""" 203 204 response = cast( 205 StreamerToken, 206 self._request(streamer_generate_streamer_token, room_id=room_id), 207 ) 208 209 return response.token 210 211 def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata: 212 peer_metadata = PeerOptionsWebRTCMetadata() 213 214 if not metadata: 215 return peer_metadata 216 217 for key, value in metadata.items(): 218 peer_metadata.additional_properties[key] = value 219 220 return peer_metadata
Allows for managing rooms
86 def __init__( 87 self, 88 fishjam_id: str, 89 management_token: str, 90 *, 91 fishjam_url: str | None = None, 92 ): 93 """ 94 Create a FishjamClient instance, providing the fishjam id and management token. 95 """ 96 super().__init__( 97 fishjam_id=fishjam_id, 98 management_token=management_token, 99 fishjam_url=fishjam_url, 100 )
Create a FishjamClient instance, providing the fishjam id and management token.
102 def create_peer( 103 self, room_id: str, options: PeerOptions | None = None 104 ) -> Tuple[Peer, str]: 105 """ 106 Creates peer in the room 107 108 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 109 to authenticate to Fishjam. 110 111 The possible options to pass for peer are `PeerOptions`. 112 """ 113 options = options or PeerOptions() 114 115 peer_type = "webrtc" 116 peer_metadata = self.__parse_peer_metadata(options.metadata) 117 peer_options = PeerOptionsWebRTC( 118 enable_simulcast=options.enable_simulcast, metadata=peer_metadata 119 ) 120 json_body = AddPeerJsonBody(type=peer_type, options=peer_options) 121 122 resp = cast( 123 PeerDetailsResponse, 124 self._request(room_add_peer, room_id=room_id, json_body=json_body), 125 ) 126 127 return (resp.data.peer, resp.data.token)
Creates peer in the room
Returns a tuple (Peer
, PeerToken
) - the token is needed by Peer
to authenticate to Fishjam.
The possible options to pass for peer are PeerOptions
.
129 def create_room(self, options: RoomOptions | None = None) -> Room: 130 """ 131 Creates a new room 132 Returns the created `Room` 133 """ 134 options = options or RoomOptions() 135 136 codec = None 137 if options.video_codec: 138 codec = RoomConfigVideoCodec(options.video_codec) 139 140 config = RoomConfig( 141 max_peers=options.max_peers, 142 video_codec=codec, 143 webhook_url=options.webhook_url, 144 room_type=RoomConfigRoomType(options.room_type), 145 public=options.public, 146 ) 147 148 room = cast( 149 RoomCreateDetailsResponse, self._request(room_create_room, json_body=config) 150 ).data.room 151 152 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room
Returns the created Room
154 def get_all_rooms(self) -> List[Room]: 155 """Returns list of all rooms""" 156 157 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 158 159 return [ 160 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 161 ]
Returns list of all rooms
163 def get_room(self, room_id: str) -> Room: 164 """Returns room with the given id""" 165 166 room = cast( 167 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 168 ).data 169 170 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id
172 def delete_peer(self, room_id: str, peer_id: str) -> None: 173 """Deletes peer""" 174 175 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes peer
177 def delete_room(self, room_id: str) -> None: 178 """Deletes a room""" 179 180 return self._request(room_delete_room, room_id=room_id)
Deletes a room
182 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 183 """Refreshes peer token""" 184 185 response = cast( 186 PeerRefreshTokenResponse, 187 self._request(room_refresh_token, id=peer_id, room_id=room_id), 188 ) 189 190 return response.data.token
Refreshes peer token
192 def create_livestream_viewer_token(self, room_id: str) -> str: 193 """Generates viewer token for livestream rooms""" 194 195 response = cast( 196 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 197 ) 198 199 return response.token
Generates viewer token for livestream rooms
201 def create_livestream_streamer_token(self, room_id: str) -> str: 202 """Generates streamer token for livestream rooms""" 203 204 response = cast( 205 StreamerToken, 206 self._request(streamer_generate_streamer_token, room_id=room_id), 207 ) 208 209 return response.token
Generates streamer token for livestream rooms
Inherited Members
- fishjam.api._client.Client
- client
28class FishjamNotifier: 29 """ 30 Allows for receiving WebSocket messages from Fishjam. 31 """ 32 33 def __init__( 34 self, 35 fishjam_id: str, 36 management_token: str, 37 *, 38 fishjam_url: str | None = None, 39 ): 40 """ 41 Create FishjamNotifier instance, providing the fishjam id and management token. 42 """ 43 44 websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws") 45 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 46 self._management_token: str = management_token 47 self._websocket: client.WebSocketClientProtocol | None = None 48 self._ready: bool = False 49 50 self._ready_event: asyncio.Event | None = None 51 52 self._notification_handler: Callable | None = None 53 54 def on_server_notification(self, handler: Callable[[AllowedNotification], None]): 55 """ 56 Decorator used for defining handler for Fishjam Notifications 57 """ 58 self._notification_handler = handler 59 return handler 60 61 async def connect(self): 62 """ 63 A coroutine which connects FishjamNotifier to Fishjam and listens for 64 all incoming messages from the Fishjam. 65 66 It runs until the connection isn't closed. 67 68 The incoming messages are handled by the functions defined using the 69 `on_server_notification` decorator. 70 71 The handler have to be defined before calling `connect`, 72 otherwise the messages won't be received. 73 """ 74 async with client.connect(self._fishjam_url) as websocket: 75 try: 76 self._websocket = websocket 77 await self._authenticate() 78 79 if self._notification_handler: 80 await self._subscribe_event( 81 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 82 ) 83 84 self._ready = True 85 if self._ready_event: 86 self._ready_event.set() 87 88 await self._receive_loop() 89 finally: 90 self._websocket = None 91 92 async def wait_ready(self) -> Literal[True]: 93 """ 94 Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns `True` immediately. 97 """ 98 if self._ready: 99 return True 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 return await self._ready_event.wait() 105 106 async def _authenticate(self): 107 if not self._websocket: 108 raise RuntimeError("Websocket is not connected") 109 110 msg = ServerMessage( 111 auth_request=ServerMessageAuthRequest(token=self._management_token) 112 ) 113 await self._websocket.send(bytes(msg)) 114 115 try: 116 message = cast(bytes, await self._websocket.recv()) 117 except ConnectionClosed as exception: 118 if "invalid token" in str(exception): 119 raise RuntimeError("Invalid management token") from exception 120 raise 121 122 message = ServerMessage().parse(message) 123 124 _type, message = betterproto.which_one_of(message, "content") 125 assert isinstance(message, ServerMessageAuthenticated) 126 127 async def _receive_loop(self): 128 if not self._websocket: 129 raise RuntimeError("Websocket is not connected") 130 if not self._notification_handler: 131 raise RuntimeError("Notification handler is not defined") 132 133 while True: 134 message = cast(bytes, await self._websocket.recv()) 135 message = ServerMessage().parse(message) 136 _which, message = betterproto.which_one_of(message, "content") 137 138 if isinstance(message, ALLOWED_NOTIFICATIONS): 139 self._notification_handler(message) 140 141 async def _subscribe_event(self, event: ServerMessageEventType): 142 if not self._websocket: 143 raise RuntimeError("Websocket is not connected") 144 145 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 146 147 await self._websocket.send(bytes(request)) 148 message = cast(bytes, await self._websocket.recv()) 149 message = ServerMessage().parse(message) 150 _which, message = betterproto.which_one_of(message, "content") 151 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
33 def __init__( 34 self, 35 fishjam_id: str, 36 management_token: str, 37 *, 38 fishjam_url: str | None = None, 39 ): 40 """ 41 Create FishjamNotifier instance, providing the fishjam id and management token. 42 """ 43 44 websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws") 45 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 46 self._management_token: str = management_token 47 self._websocket: client.WebSocketClientProtocol | None = None 48 self._ready: bool = False 49 50 self._ready_event: asyncio.Event | None = None 51 52 self._notification_handler: Callable | None = None
Create FishjamNotifier instance, providing the fishjam id and management token.
54 def on_server_notification(self, handler: Callable[[AllowedNotification], None]): 55 """ 56 Decorator used for defining handler for Fishjam Notifications 57 """ 58 self._notification_handler = handler 59 return handler
Decorator used for defining handler for Fishjam Notifications
61 async def connect(self): 62 """ 63 A coroutine which connects FishjamNotifier to Fishjam and listens for 64 all incoming messages from the Fishjam. 65 66 It runs until the connection isn't closed. 67 68 The incoming messages are handled by the functions defined using the 69 `on_server_notification` decorator. 70 71 The handler have to be defined before calling `connect`, 72 otherwise the messages won't be received. 73 """ 74 async with client.connect(self._fishjam_url) as websocket: 75 try: 76 self._websocket = websocket 77 await self._authenticate() 78 79 if self._notification_handler: 80 await self._subscribe_event( 81 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 82 ) 83 84 self._ready = True 85 if self._ready_event: 86 self._ready_event.set() 87 88 await self._receive_loop() 89 finally: 90 self._websocket = None
A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification
decorator.
The handler have to be defined before calling connect
,
otherwise the messages won't be received.
92 async def wait_ready(self) -> Literal[True]: 93 """ 94 Waits until the notifier is connected and authenticated to Fishjam. 95 96 If already connected, returns `True` immediately. 97 """ 98 if self._ready: 99 return True 100 101 if self._ready_event is None: 102 self._ready_event = asyncio.Event() 103 104 return await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns True
immediately.
18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 19 """ 20 Transform received protobuf notification to adequate notification instance. 21 The available notifications are listed in `fishjam.events` module. 22 """ 23 message = ServerMessage().parse(binary) 24 _which, message = betterproto.which_one_of(message, "content") 25 26 if isinstance(message, ALLOWED_NOTIFICATIONS): 27 return message 28 29 return None
Transform received protobuf notification to adequate notification instance.
The available notifications are listed in fishjam.events
module.
73@dataclass 74class PeerOptions: 75 """Options specific to the Peer""" 76 77 enable_simulcast: bool = True 78 """Enables the peer to use simulcast""" 79 metadata: dict[str, Any] | None = None 80 """Peer metadata"""
Options specific to the Peer
55@dataclass 56class RoomOptions: 57 """Description of a room options""" 58 59 max_peers: int | None = None 60 """Maximum amount of peers allowed into the room""" 61 video_codec: Literal["h264", "vp8"] | None = None 62 """Enforces video codec for each peer in the room""" 63 webhook_url: str | None = None 64 """URL where Fishjam notifications will be sent""" 65 room_type: Literal[ 66 "conference", "audio_only", "livestream", "full_feature", "broadcaster" 67 ] = "conference" 68 """The use-case of the room. If not provided, this defaults to conference.""" 69 public: bool = False 70 """True if livestream viewers can omit specifying a token."""
Description of a room options
43@dataclass 44class Room: 45 """Description of the room state""" 46 47 config: RoomConfig 48 """Room configuration""" 49 id: str 50 """Room ID""" 51 peers: List[Peer] 52 """List of all peers"""
Description of the room state
16@_attrs_define 17class Peer: 18 """Describes peer status""" 19 20 id: str 21 """Assigned peer id""" 22 metadata: Any 23 """Custom metadata set by the peer""" 24 status: PeerStatus 25 """Informs about the peer status""" 26 tracks: List["Track"] 27 """List of all peer's tracks""" 28 type: str 29 """Peer type""" 30 additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) 31 """@private""" 32 33 def to_dict(self) -> Dict[str, Any]: 34 """@private""" 35 id = self.id 36 metadata = self.metadata 37 status = self.status.value 38 39 tracks = [] 40 for tracks_item_data in self.tracks: 41 tracks_item = tracks_item_data.to_dict() 42 43 tracks.append(tracks_item) 44 45 type = self.type 46 47 field_dict: Dict[str, Any] = {} 48 field_dict.update(self.additional_properties) 49 field_dict.update( 50 { 51 "id": id, 52 "metadata": metadata, 53 "status": status, 54 "tracks": tracks, 55 "type": type, 56 } 57 ) 58 59 return field_dict 60 61 @classmethod 62 def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: 63 """@private""" 64 from ..models.track import Track 65 66 d = src_dict.copy() 67 id = d.pop("id") 68 69 metadata = d.pop("metadata") 70 71 status = PeerStatus(d.pop("status")) 72 73 tracks = [] 74 _tracks = d.pop("tracks") 75 for tracks_item_data in _tracks: 76 tracks_item = Track.from_dict(tracks_item_data) 77 78 tracks.append(tracks_item) 79 80 type = d.pop("type") 81 82 peer = cls( 83 id=id, 84 metadata=metadata, 85 status=status, 86 tracks=tracks, 87 type=type, 88 ) 89 90 peer.additional_properties = d 91 return peer 92 93 @property 94 def additional_keys(self) -> List[str]: 95 """@private""" 96 return list(self.additional_properties.keys()) 97 98 def __getitem__(self, key: str) -> Any: 99 return self.additional_properties[key] 100 101 def __setitem__(self, key: str, value: Any) -> None: 102 self.additional_properties[key] = value 103 104 def __delitem__(self, key: str) -> None: 105 del self.additional_properties[key] 106 107 def __contains__(self, key: str) -> bool: 108 return key in self.additional_properties
Describes peer status
2def __init__(self, id, metadata, status, tracks, type): 3 self.id = id 4 self.metadata = metadata 5 self.status = status 6 self.tracks = tracks 7 self.type = type 8 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.