Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient
and FishjamNotifier
.
FishjamClient
wraps http REST api calls, while FishjamNotifier
is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient
instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_url="localhost:5002", management_token="development")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient
may raise one of the exceptions deriving from fishjam.errors.HTTPError
. They are defined in fishjam.errors
.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier
instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_url='localhost:5002', management_token='development')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
Local development
The project is managed using poetry. Make sure to have it installed first.
Then install the dependencies
poetry install
Generating protobuf
To generate Python protobuf definitions run
poetry run ./compile_proto.sh
Testing
You can test the SDK by running
poetry run ci_test
In local development you can use
poetry run local_test
Format & Lint
You can format code by running
poetry run format
You can check linter by running
poetry run lint
Documentation
Documentation is generated via openapi-python-client.
To update documentation you need to:
- in
poetry_scripts.py
change branch from which openapi.yaml should be downloaded - run
poetry run update-client
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""" 2 .. include:: ../README.md 3""" 4 5# pylint: disable=locally-disabled, no-name-in-module, import-error 6 7# Exceptions and Server Messages 8 9# API 10# pylint: disable=locally-disabled, no-name-in-module, import-error 11 12# Exceptions and Server Messages 13from fishjam import errors, events, peer, room 14 15# API 16from fishjam._webhook_notifier import receive_binary 17from fishjam._ws_notifier import FishjamNotifier 18from fishjam.api._fishjam_client import ( 19 FishjamClient, 20 Peer, 21 PeerOptions, 22 Room, 23 RoomOptions, 24) 25 26__all__ = [ 27 "FishjamClient", 28 "FishjamNotifier", 29 "receive_binary", 30 "PeerOptions", 31 "RoomOptions", 32 "Room", 33 "Peer", 34 "events", 35 "errors", 36 "room", 37 "peer", 38] 39 40__docformat__ = "restructuredtext"
81class FishjamClient(Client): 82 """Allows for managing rooms""" 83 84 def __init__(self, fishjam_url: str, management_token: str): 85 """ 86 Create FishjamClient instance, providing the fishjam url and managment token. 87 """ 88 super().__init__(fishjam_url=fishjam_url, management_token=management_token) 89 90 def create_peer( 91 self, room_id: str, options: PeerOptions | None = None 92 ) -> Tuple[Peer, str]: 93 """ 94 Creates peer in the room 95 96 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 97 to authenticate to Fishjam. 98 99 The possible options to pass for peer are `PeerOptions`. 100 """ 101 options = options or PeerOptions() 102 103 peer_type = "webrtc" 104 peer_metadata = self.__parse_peer_metadata(options.metadata) 105 peer_options = PeerOptionsWebRTC( 106 enable_simulcast=options.enable_simulcast, metadata=peer_metadata 107 ) 108 json_body = AddPeerJsonBody(type=peer_type, options=peer_options) 109 110 resp = cast( 111 PeerDetailsResponse, 112 self._request(room_add_peer, room_id=room_id, json_body=json_body), 113 ) 114 115 return (resp.data.peer, resp.data.token) 116 117 def create_room(self, options: RoomOptions | None = None) -> Room: 118 """ 119 Creates a new room 120 Returns the created `Room` 121 """ 122 options = options or RoomOptions() 123 124 codec = None 125 if options.video_codec: 126 codec = RoomConfigVideoCodec(options.video_codec) 127 128 config = RoomConfig( 129 max_peers=options.max_peers, 130 peer_disconnected_timeout=options.peer_disconnected_timeout, 131 peerless_purge_timeout=options.peerless_purge_timeout, 132 video_codec=codec, 133 webhook_url=options.webhook_url, 134 room_type=RoomConfigRoomType(options.room_type), 135 ) 136 137 room = cast( 138 RoomCreateDetailsResponse, self._request(room_create_room, json_body=config) 139 ).data.room 140 141 return Room(config=room.config, id=room.id, peers=room.peers) 142 143 def get_all_rooms(self) -> List[Room]: 144 """Returns list of all rooms""" 145 146 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 147 148 return [ 149 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 150 ] 151 152 def get_room(self, room_id: str) -> Room: 153 """Returns room with the given id""" 154 155 room = cast( 156 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 157 ).data 158 159 return Room(config=room.config, id=room.id, peers=room.peers) 160 161 def delete_peer(self, room_id: str, peer_id: str) -> None: 162 """Deletes peer""" 163 164 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 165 166 def delete_room(self, room_id: str) -> None: 167 """Deletes a room""" 168 169 return self._request(room_delete_room, room_id=room_id) 170 171 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 172 """Refreshes peer token""" 173 174 response = cast( 175 PeerRefreshTokenResponse, 176 self._request(room_refresh_token, id=peer_id, room_id=room_id), 177 ) 178 179 return response.data.token 180 181 def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata: 182 peer_metadata = PeerOptionsWebRTCMetadata() 183 184 if not metadata: 185 return peer_metadata 186 187 for key, value in metadata.items(): 188 peer_metadata.additional_properties[key] = value 189 190 return peer_metadata
Allows for managing rooms
84 def __init__(self, fishjam_url: str, management_token: str): 85 """ 86 Create FishjamClient instance, providing the fishjam url and managment token. 87 """ 88 super().__init__(fishjam_url=fishjam_url, management_token=management_token)
Create FishjamClient instance, providing the fishjam url and managment token.
90 def create_peer( 91 self, room_id: str, options: PeerOptions | None = None 92 ) -> Tuple[Peer, str]: 93 """ 94 Creates peer in the room 95 96 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 97 to authenticate to Fishjam. 98 99 The possible options to pass for peer are `PeerOptions`. 100 """ 101 options = options or PeerOptions() 102 103 peer_type = "webrtc" 104 peer_metadata = self.__parse_peer_metadata(options.metadata) 105 peer_options = PeerOptionsWebRTC( 106 enable_simulcast=options.enable_simulcast, metadata=peer_metadata 107 ) 108 json_body = AddPeerJsonBody(type=peer_type, options=peer_options) 109 110 resp = cast( 111 PeerDetailsResponse, 112 self._request(room_add_peer, room_id=room_id, json_body=json_body), 113 ) 114 115 return (resp.data.peer, resp.data.token)
Creates peer in the room
Returns a tuple (Peer
, PeerToken
) - the token is needed by Peer
to authenticate to Fishjam.
The possible options to pass for peer are PeerOptions
.
117 def create_room(self, options: RoomOptions | None = None) -> Room: 118 """ 119 Creates a new room 120 Returns the created `Room` 121 """ 122 options = options or RoomOptions() 123 124 codec = None 125 if options.video_codec: 126 codec = RoomConfigVideoCodec(options.video_codec) 127 128 config = RoomConfig( 129 max_peers=options.max_peers, 130 peer_disconnected_timeout=options.peer_disconnected_timeout, 131 peerless_purge_timeout=options.peerless_purge_timeout, 132 video_codec=codec, 133 webhook_url=options.webhook_url, 134 room_type=RoomConfigRoomType(options.room_type), 135 ) 136 137 room = cast( 138 RoomCreateDetailsResponse, self._request(room_create_room, json_body=config) 139 ).data.room 140 141 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room
Returns the created Room
143 def get_all_rooms(self) -> List[Room]: 144 """Returns list of all rooms""" 145 146 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 147 148 return [ 149 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 150 ]
Returns list of all rooms
152 def get_room(self, room_id: str) -> Room: 153 """Returns room with the given id""" 154 155 room = cast( 156 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 157 ).data 158 159 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id
161 def delete_peer(self, room_id: str, peer_id: str) -> None: 162 """Deletes peer""" 163 164 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes peer
166 def delete_room(self, room_id: str) -> None: 167 """Deletes a room""" 168 169 return self._request(room_delete_room, room_id=room_id)
Deletes a room
171 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 172 """Refreshes peer token""" 173 174 response = cast( 175 PeerRefreshTokenResponse, 176 self._request(room_refresh_token, id=peer_id, room_id=room_id), 177 ) 178 179 return response.data.token
Refreshes peer token
Inherited Members
- fishjam.api._client.Client
- client
27class FishjamNotifier: 28 """ 29 Allows for receiving WebSocket messages from Fishjam. 30 """ 31 32 def __init__(self, fishjam_url: str, management_token: str): 33 """ 34 Create FishjamNotifier instance, providing the fishjam url and management token. 35 """ 36 37 self._fishjam_url = ( 38 f"{fishjam_url.replace('http', 'ws')}/socket/server/websocket" 39 ) 40 self._management_token: str = management_token 41 self._websocket: client.WebSocketClientProtocol | None = None 42 self._ready: bool = False 43 44 self._ready_event: asyncio.Event | None = None 45 46 self._notification_handler: Callable | None = None 47 48 def on_server_notification(self, handler: Callable[[AllowedNotification], None]): 49 """ 50 Decorator used for defining handler for Fishjam Notifications 51 """ 52 self._notification_handler = handler 53 return handler 54 55 async def connect(self): 56 """ 57 A coroutine which connects FishjamNotifier to Fishjam and listens for 58 all incoming messages from the Fishjam. 59 60 It runs until the connection isn't closed. 61 62 The incoming messages are handled by the functions defined using the 63 `on_server_notification` decorator. 64 65 The handler have to be defined before calling `connect`, 66 otherwise the messages won't be received. 67 """ 68 async with client.connect(self._fishjam_url) as websocket: 69 try: 70 self._websocket = websocket 71 await self._authenticate() 72 73 if self._notification_handler: 74 await self._subscribe_event( 75 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 76 ) 77 78 self._ready = True 79 if self._ready_event: 80 self._ready_event.set() 81 82 await self._receive_loop() 83 finally: 84 self._websocket = None 85 86 async def wait_ready(self) -> Literal[True]: 87 """ 88 Waits until the notifier is connected and authenticated to Fishjam. 89 90 If already connected, returns `True` immediately. 91 """ 92 if self._ready: 93 return True 94 95 if self._ready_event is None: 96 self._ready_event = asyncio.Event() 97 98 return await self._ready_event.wait() 99 100 async def _authenticate(self): 101 if not self._websocket: 102 raise RuntimeError("Websocket is not connected") 103 104 msg = ServerMessage( 105 auth_request=ServerMessageAuthRequest(token=self._management_token) 106 ) 107 await self._websocket.send(bytes(msg)) 108 109 try: 110 message = cast(bytes, await self._websocket.recv()) 111 except ConnectionClosed as exception: 112 if "invalid token" in str(exception): 113 raise RuntimeError("Invalid management token") from exception 114 raise 115 116 message = ServerMessage().parse(message) 117 118 _type, message = betterproto.which_one_of(message, "content") 119 assert isinstance(message, ServerMessageAuthenticated) 120 121 async def _receive_loop(self): 122 if not self._websocket: 123 raise RuntimeError("Websocket is not connected") 124 if not self._notification_handler: 125 raise RuntimeError("Notification handler is not defined") 126 127 while True: 128 message = cast(bytes, await self._websocket.recv()) 129 message = ServerMessage().parse(message) 130 _which, message = betterproto.which_one_of(message, "content") 131 132 if isinstance(message, ALLOWED_NOTIFICATIONS): 133 self._notification_handler(message) 134 135 async def _subscribe_event(self, event: ServerMessageEventType): 136 if not self._websocket: 137 raise RuntimeError("Websocket is not connected") 138 139 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 140 141 await self._websocket.send(bytes(request)) 142 message = cast(bytes, await self._websocket.recv()) 143 message = ServerMessage().parse(message) 144 _which, message = betterproto.which_one_of(message, "content") 145 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
32 def __init__(self, fishjam_url: str, management_token: str): 33 """ 34 Create FishjamNotifier instance, providing the fishjam url and management token. 35 """ 36 37 self._fishjam_url = ( 38 f"{fishjam_url.replace('http', 'ws')}/socket/server/websocket" 39 ) 40 self._management_token: str = management_token 41 self._websocket: client.WebSocketClientProtocol | None = None 42 self._ready: bool = False 43 44 self._ready_event: asyncio.Event | None = None 45 46 self._notification_handler: Callable | None = None
Create FishjamNotifier instance, providing the fishjam url and management token.
48 def on_server_notification(self, handler: Callable[[AllowedNotification], None]): 49 """ 50 Decorator used for defining handler for Fishjam Notifications 51 """ 52 self._notification_handler = handler 53 return handler
Decorator used for defining handler for Fishjam Notifications
55 async def connect(self): 56 """ 57 A coroutine which connects FishjamNotifier to Fishjam and listens for 58 all incoming messages from the Fishjam. 59 60 It runs until the connection isn't closed. 61 62 The incoming messages are handled by the functions defined using the 63 `on_server_notification` decorator. 64 65 The handler have to be defined before calling `connect`, 66 otherwise the messages won't be received. 67 """ 68 async with client.connect(self._fishjam_url) as websocket: 69 try: 70 self._websocket = websocket 71 await self._authenticate() 72 73 if self._notification_handler: 74 await self._subscribe_event( 75 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 76 ) 77 78 self._ready = True 79 if self._ready_event: 80 self._ready_event.set() 81 82 await self._receive_loop() 83 finally: 84 self._websocket = None
A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification
decorator.
The handler have to be defined before calling connect
,
otherwise the messages won't be received.
86 async def wait_ready(self) -> Literal[True]: 87 """ 88 Waits until the notifier is connected and authenticated to Fishjam. 89 90 If already connected, returns `True` immediately. 91 """ 92 if self._ready: 93 return True 94 95 if self._ready_event is None: 96 self._ready_event = asyncio.Event() 97 98 return await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns True
immediately.
18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 19 """ 20 Transform received protobuf notification to adequate notification instance. 21 The available notifications are listed in `fishjam.events` module. 22 """ 23 message = ServerMessage().parse(binary) 24 _which, message = betterproto.which_one_of(message, "content") 25 26 if isinstance(message, ALLOWED_NOTIFICATIONS): 27 return message 28 29 return None
Transform received protobuf notification to adequate notification instance.
The available notifications are listed in fishjam.events
module.
71@dataclass 72class PeerOptions: 73 """Options specific to the Peer""" 74 75 enable_simulcast: bool = True 76 """Enables the peer to use simulcast""" 77 metadata: dict[str, Any] | None = None 78 """Peer metadata"""
Options specific to the Peer
47@dataclass 48class RoomOptions: 49 """Description of a room options""" 50 51 max_peers: int | None = None 52 """Maximum amount of peers allowed into the room""" 53 peer_disconnected_timeout: int | None = None 54 """ 55 Duration (in seconds) after which the peer will be removed if it is disconnected. 56 If not provided, this feature is disabled. 57 """ 58 peerless_purge_timeout: int | None = None 59 """ 60 Duration (in seconds) after which the room will be removed 61 if no peers are connected. If not provided, this feature is disabled. 62 """ 63 video_codec: Literal["h264", "vp8"] | None = None 64 """Enforces video codec for each peer in the room""" 65 webhook_url: str | None = None 66 """URL where Fishjam notifications will be sent""" 67 room_type: Literal["full_feature", "audio_only", "broadcaster"] = "full_feature" 68 """The use-case of the room. If not provided, this defaults to full_feature."""
Description of a room options
Duration (in seconds) after which the peer will be removed if it is disconnected. If not provided, this feature is disabled.
35@dataclass 36class Room: 37 """Description of the room state""" 38 39 config: RoomConfig 40 """Room configuration""" 41 id: str 42 """Room ID""" 43 peers: List[Peer] 44 """List of all peers"""
Description of the room state
16@_attrs_define 17class Peer: 18 """Describes peer status""" 19 20 id: str 21 """Assigned peer id""" 22 metadata: Any 23 """Custom metadata set by the peer""" 24 status: PeerStatus 25 """Informs about the peer status""" 26 tracks: List["Track"] 27 """List of all peer's tracks""" 28 type: str 29 """Peer type""" 30 additional_properties: Dict[str, Any] = _attrs_field(init=False, factory=dict) 31 """@private""" 32 33 def to_dict(self) -> Dict[str, Any]: 34 """@private""" 35 id = self.id 36 metadata = self.metadata 37 status = self.status.value 38 39 tracks = [] 40 for tracks_item_data in self.tracks: 41 tracks_item = tracks_item_data.to_dict() 42 43 tracks.append(tracks_item) 44 45 type = self.type 46 47 field_dict: Dict[str, Any] = {} 48 field_dict.update(self.additional_properties) 49 field_dict.update( 50 { 51 "id": id, 52 "metadata": metadata, 53 "status": status, 54 "tracks": tracks, 55 "type": type, 56 } 57 ) 58 59 return field_dict 60 61 @classmethod 62 def from_dict(cls: Type[T], src_dict: Dict[str, Any]) -> T: 63 """@private""" 64 from ..models.track import Track 65 66 d = src_dict.copy() 67 id = d.pop("id") 68 69 metadata = d.pop("metadata") 70 71 status = PeerStatus(d.pop("status")) 72 73 tracks = [] 74 _tracks = d.pop("tracks") 75 for tracks_item_data in _tracks: 76 tracks_item = Track.from_dict(tracks_item_data) 77 78 tracks.append(tracks_item) 79 80 type = d.pop("type") 81 82 peer = cls( 83 id=id, 84 metadata=metadata, 85 status=status, 86 tracks=tracks, 87 type=type, 88 ) 89 90 peer.additional_properties = d 91 return peer 92 93 @property 94 def additional_keys(self) -> List[str]: 95 """@private""" 96 return list(self.additional_properties.keys()) 97 98 def __getitem__(self, key: str) -> Any: 99 return self.additional_properties[key] 100 101 def __setitem__(self, key: str, value: Any) -> None: 102 self.additional_properties[key] = value 103 104 def __delitem__(self, key: str) -> None: 105 del self.additional_properties[key] 106 107 def __contains__(self, key: str) -> bool: 108 return key in self.additional_properties
Describes peer status
2def __init__(self, id, metadata, status, tracks, type): 3 self.id = id 4 self.metadata = metadata 5 self.status = status 6 self.tracks = tracks 7 self.type = type 8 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.