
Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient and FishjamNotifier.
FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_url="localhost:5002", management_token="development")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_url='localhost:5002', management_token='development')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""" 2.. include:: ../README.md 3""" 4 5# pylint: disable=locally-disabled, no-name-in-module, import-error 6 7# Exceptions and Server Messages 8 9# API 10# pylint: disable=locally-disabled, no-name-in-module, import-error 11 12# Exceptions and Server Messages 13from fishjam import agent, errors, events, peer, room 14from fishjam._openapi_client.models import PeerMetadata 15 16# API 17from fishjam._webhook_notifier import receive_binary 18from fishjam._ws_notifier import FishjamNotifier 19from fishjam.api._fishjam_client import ( 20 FishjamClient, 21 Peer, 22 PeerOptions, 23 Room, 24 RoomOptions, 25) 26 27__all__ = [ 28 "FishjamClient", 29 "FishjamNotifier", 30 "receive_binary", 31 "PeerMetadata", 32 "PeerOptions", 33 "RoomOptions", 34 "Room", 35 "Peer", 36 "events", 37 "errors", 38 "room", 39 "peer", 40 "agent", 41] 42 43__docformat__ = "restructuredtext"
88class FishjamClient(Client): 89 """Allows for managing rooms""" 90 91 def __init__( 92 self, 93 fishjam_id: str, 94 management_token: str, 95 *, 96 fishjam_url: str | None = None, 97 ): 98 """ 99 Create a FishjamClient instance, providing the fishjam id and management token. 100 """ 101 super().__init__( 102 fishjam_id=fishjam_id, 103 management_token=management_token, 104 fishjam_url=fishjam_url, 105 ) 106 107 def create_peer( 108 self, 109 room_id: str, 110 options: PeerOptions | None = None, 111 ) -> tuple[Peer, str]: 112 """ 113 Creates peer in the room 114 115 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 116 to authenticate to Fishjam. 117 118 The possible options to pass for peer are `PeerOptions`. 119 """ 120 options = options or PeerOptions() 121 122 peer_metadata = self.__parse_peer_metadata(options.metadata) 123 peer_options = PeerOptionsWebRTC( 124 enable_simulcast=options.enable_simulcast, 125 metadata=peer_metadata, 126 subscribe=self.__parse_subscribe_options(options.subscribe), 127 ) 128 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 129 130 resp = cast( 131 PeerDetailsResponse, 132 self._request(room_add_peer, room_id=room_id, body=body), 133 ) 134 135 return (resp.data.peer, resp.data.token) 136 137 def create_agent(self, room_id: str): 138 body = AddPeerBody(type_=PeerType.AGENT, options=PeerOptionsAgent()) 139 140 resp = cast( 141 PeerDetailsResponse, 142 self._request(room_add_peer, room_id=room_id, body=body), 143 ) 144 145 return Agent(resp.data.peer.id, resp.data.token, self._fishjam_url) 146 147 def create_room(self, options: RoomOptions | None = None) -> Room: 148 """ 149 Creates a new room 150 Returns the created `Room` 151 """ 152 options = options or RoomOptions() 153 154 if options.video_codec is None: 155 codec = UNSET 156 else: 157 codec = RoomConfigVideoCodec(options.video_codec) 158 159 config = RoomConfig( 160 max_peers=options.max_peers, 161 video_codec=codec, 162 webhook_url=options.webhook_url, 163 room_type=RoomConfigRoomType(options.room_type), 164 public=options.public, 165 ) 166 167 room = cast( 168 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 169 ).data.room 170 171 return Room(config=room.config, id=room.id, peers=room.peers) 172 173 def get_all_rooms(self) -> list[Room]: 174 """Returns list of all rooms""" 175 176 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 177 178 return [ 179 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 180 ] 181 182 def get_room(self, room_id: str) -> Room: 183 """Returns room with the given id""" 184 185 room = cast( 186 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 187 ).data 188 189 return Room(config=room.config, id=room.id, peers=room.peers) 190 191 def delete_peer(self, room_id: str, peer_id: str) -> None: 192 """Deletes peer""" 193 194 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 195 196 def delete_room(self, room_id: str) -> None: 197 """Deletes a room""" 198 199 return self._request(room_delete_room, room_id=room_id) 200 201 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 202 """Refreshes peer token""" 203 204 response = cast( 205 PeerRefreshTokenResponse, 206 self._request(room_refresh_token, id=peer_id, room_id=room_id), 207 ) 208 209 return response.data.token 210 211 def create_livestream_viewer_token(self, room_id: str) -> str: 212 """Generates viewer token for livestream rooms""" 213 214 response = cast( 215 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 216 ) 217 218 return response.token 219 220 def create_livestream_streamer_token(self, room_id: str) -> str: 221 """Generates streamer token for livestream rooms""" 222 223 response = cast( 224 StreamerToken, 225 self._request(streamer_generate_streamer_token, room_id=room_id), 226 ) 227 228 return response.token 229 230 def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata: 231 peer_metadata = PeerOptionsWebRTCMetadata() 232 233 if not metadata: 234 return peer_metadata 235 236 for key, value in metadata.items(): 237 peer_metadata.additional_properties[key] = value 238 239 return peer_metadata 240 241 def __parse_subscribe_options( 242 self, 243 options: SubscribeOptions | None, 244 ) -> PeerOptionsWebRTCSubscribeOptions | None: 245 if options is None: 246 return None 247 return PeerOptionsWebRTCSubscribeOptions.from_dict(options.to_dict())
Allows for managing rooms
91 def __init__( 92 self, 93 fishjam_id: str, 94 management_token: str, 95 *, 96 fishjam_url: str | None = None, 97 ): 98 """ 99 Create a FishjamClient instance, providing the fishjam id and management token. 100 """ 101 super().__init__( 102 fishjam_id=fishjam_id, 103 management_token=management_token, 104 fishjam_url=fishjam_url, 105 )
Create a FishjamClient instance, providing the fishjam id and management token.
107 def create_peer( 108 self, 109 room_id: str, 110 options: PeerOptions | None = None, 111 ) -> tuple[Peer, str]: 112 """ 113 Creates peer in the room 114 115 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 116 to authenticate to Fishjam. 117 118 The possible options to pass for peer are `PeerOptions`. 119 """ 120 options = options or PeerOptions() 121 122 peer_metadata = self.__parse_peer_metadata(options.metadata) 123 peer_options = PeerOptionsWebRTC( 124 enable_simulcast=options.enable_simulcast, 125 metadata=peer_metadata, 126 subscribe=self.__parse_subscribe_options(options.subscribe), 127 ) 128 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 129 130 resp = cast( 131 PeerDetailsResponse, 132 self._request(room_add_peer, room_id=room_id, body=body), 133 ) 134 135 return (resp.data.peer, resp.data.token)
Creates peer in the room
Returns a tuple (Peer, PeerToken) - the token is needed by Peer
to authenticate to Fishjam.
The possible options to pass for peer are PeerOptions.
137 def create_agent(self, room_id: str): 138 body = AddPeerBody(type_=PeerType.AGENT, options=PeerOptionsAgent()) 139 140 resp = cast( 141 PeerDetailsResponse, 142 self._request(room_add_peer, room_id=room_id, body=body), 143 ) 144 145 return Agent(resp.data.peer.id, resp.data.token, self._fishjam_url)
147 def create_room(self, options: RoomOptions | None = None) -> Room: 148 """ 149 Creates a new room 150 Returns the created `Room` 151 """ 152 options = options or RoomOptions() 153 154 if options.video_codec is None: 155 codec = UNSET 156 else: 157 codec = RoomConfigVideoCodec(options.video_codec) 158 159 config = RoomConfig( 160 max_peers=options.max_peers, 161 video_codec=codec, 162 webhook_url=options.webhook_url, 163 room_type=RoomConfigRoomType(options.room_type), 164 public=options.public, 165 ) 166 167 room = cast( 168 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 169 ).data.room 170 171 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room
Returns the created Room
173 def get_all_rooms(self) -> list[Room]: 174 """Returns list of all rooms""" 175 176 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 177 178 return [ 179 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 180 ]
Returns list of all rooms
182 def get_room(self, room_id: str) -> Room: 183 """Returns room with the given id""" 184 185 room = cast( 186 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 187 ).data 188 189 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id
191 def delete_peer(self, room_id: str, peer_id: str) -> None: 192 """Deletes peer""" 193 194 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes peer
196 def delete_room(self, room_id: str) -> None: 197 """Deletes a room""" 198 199 return self._request(room_delete_room, room_id=room_id)
Deletes a room
201 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 202 """Refreshes peer token""" 203 204 response = cast( 205 PeerRefreshTokenResponse, 206 self._request(room_refresh_token, id=peer_id, room_id=room_id), 207 ) 208 209 return response.data.token
Refreshes peer token
211 def create_livestream_viewer_token(self, room_id: str) -> str: 212 """Generates viewer token for livestream rooms""" 213 214 response = cast( 215 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 216 ) 217 218 return response.token
Generates viewer token for livestream rooms
220 def create_livestream_streamer_token(self, room_id: str) -> str: 221 """Generates streamer token for livestream rooms""" 222 223 response = cast( 224 StreamerToken, 225 self._request(streamer_generate_streamer_token, room_id=room_id), 226 ) 227 228 return response.token
Generates streamer token for livestream rooms
Inherited Members
- fishjam.api._client.Client
- client
34class FishjamNotifier: 35 """ 36 Allows for receiving WebSocket messages from Fishjam. 37 """ 38 39 def __init__( 40 self, 41 fishjam_id: str, 42 management_token: str, 43 *, 44 fishjam_url: str | None = None, 45 ): 46 """ 47 Create FishjamNotifier instance, providing the fishjam id and management token. 48 """ 49 50 websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws") 51 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 52 self._management_token: str = management_token 53 self._websocket: client.ClientConnection | None = None 54 self._ready: bool = False 55 56 self._ready_event: asyncio.Event | None = None 57 58 self._notification_handler: NotificationHandler | None = None 59 60 def on_server_notification(self, handler: NotificationHandler): 61 """ 62 Decorator used for defining handler for Fishjam Notifications 63 """ 64 self._notification_handler = handler 65 return handler 66 67 async def connect(self): 68 """ 69 A coroutine which connects FishjamNotifier to Fishjam and listens for 70 all incoming messages from the Fishjam. 71 72 It runs until the connection isn't closed. 73 74 The incoming messages are handled by the functions defined using the 75 `on_server_notification` decorator. 76 77 The handler have to be defined before calling `connect`, 78 otherwise the messages won't be received. 79 """ 80 async with client.connect(self._fishjam_url) as websocket: 81 try: 82 self._websocket = websocket 83 await self._authenticate() 84 85 if self._notification_handler: 86 await self._subscribe_event( 87 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 88 ) 89 90 self._ready = True 91 if self._ready_event: 92 self._ready_event.set() 93 94 await self._receive_loop() 95 finally: 96 self._websocket = None 97 98 async def wait_ready(self) -> None: 99 """ 100 Waits until the notifier is connected and authenticated to Fishjam. 101 102 If already connected, returns immediately. 103 """ 104 if self._ready: 105 return 106 107 if self._ready_event is None: 108 self._ready_event = asyncio.Event() 109 110 await self._ready_event.wait() 111 112 async def _authenticate(self): 113 if not self._websocket: 114 raise RuntimeError("Websocket is not connected") 115 116 msg = ServerMessage( 117 auth_request=ServerMessageAuthRequest(token=self._management_token) 118 ) 119 await self._websocket.send(bytes(msg)) 120 121 try: 122 message = await self._websocket.recv(decode=False) 123 except ConnectionClosed as exception: 124 if "invalid token" in str(exception): 125 raise RuntimeError("Invalid management token") from exception 126 raise 127 128 message = ServerMessage().parse(message) 129 130 _type, message = betterproto.which_one_of(message, "content") 131 assert isinstance(message, ServerMessageAuthenticated) 132 133 async def _receive_loop(self): 134 if not self._websocket: 135 raise RuntimeError("Websocket is not connected") 136 if not self._notification_handler: 137 raise RuntimeError("Notification handler is not defined") 138 139 while True: 140 message = cast(bytes, await self._websocket.recv()) 141 message = ServerMessage().parse(message) 142 _which, message = betterproto.which_one_of(message, "content") 143 144 if isinstance(message, ALLOWED_NOTIFICATIONS): 145 res = self._notification_handler(message) 146 if asyncio.iscoroutine(res): 147 await res 148 149 async def _subscribe_event(self, event: ServerMessageEventType): 150 if not self._websocket: 151 raise RuntimeError("Websocket is not connected") 152 153 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 154 155 await self._websocket.send(bytes(request)) 156 message = cast(bytes, await self._websocket.recv()) 157 message = ServerMessage().parse(message) 158 _which, message = betterproto.which_one_of(message, "content") 159 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
39 def __init__( 40 self, 41 fishjam_id: str, 42 management_token: str, 43 *, 44 fishjam_url: str | None = None, 45 ): 46 """ 47 Create FishjamNotifier instance, providing the fishjam id and management token. 48 """ 49 50 websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws") 51 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 52 self._management_token: str = management_token 53 self._websocket: client.ClientConnection | None = None 54 self._ready: bool = False 55 56 self._ready_event: asyncio.Event | None = None 57 58 self._notification_handler: NotificationHandler | None = None
Create FishjamNotifier instance, providing the fishjam id and management token.
60 def on_server_notification(self, handler: NotificationHandler): 61 """ 62 Decorator used for defining handler for Fishjam Notifications 63 """ 64 self._notification_handler = handler 65 return handler
Decorator used for defining handler for Fishjam Notifications
67 async def connect(self): 68 """ 69 A coroutine which connects FishjamNotifier to Fishjam and listens for 70 all incoming messages from the Fishjam. 71 72 It runs until the connection isn't closed. 73 74 The incoming messages are handled by the functions defined using the 75 `on_server_notification` decorator. 76 77 The handler have to be defined before calling `connect`, 78 otherwise the messages won't be received. 79 """ 80 async with client.connect(self._fishjam_url) as websocket: 81 try: 82 self._websocket = websocket 83 await self._authenticate() 84 85 if self._notification_handler: 86 await self._subscribe_event( 87 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 88 ) 89 90 self._ready = True 91 if self._ready_event: 92 self._ready_event.set() 93 94 await self._receive_loop() 95 finally: 96 self._websocket = None
A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification decorator.
The handler have to be defined before calling connect,
otherwise the messages won't be received.
98 async def wait_ready(self) -> None: 99 """ 100 Waits until the notifier is connected and authenticated to Fishjam. 101 102 If already connected, returns immediately. 103 """ 104 if self._ready: 105 return 106 107 if self._ready_event is None: 108 self._ready_event = asyncio.Event() 109 110 await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns immediately.
18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 19 """ 20 Transform received protobuf notification to adequate notification instance. 21 The available notifications are listed in `fishjam.events` module. 22 """ 23 message = ServerMessage().parse(binary) 24 _which, message = betterproto.which_one_of(message, "content") 25 26 if isinstance(message, ALLOWED_NOTIFICATIONS): 27 return message 28 29 return None
Transform received protobuf notification to adequate notification instance.
The available notifications are listed in fishjam.events module.
11@_attrs_define 12class PeerMetadata: 13 """Custom metadata set by the peer 14 15 Example: 16 {'name': 'FishjamUser'} 17 18 """ 19 20 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 21 22 def to_dict(self) -> dict[str, Any]: 23 field_dict: dict[str, Any] = {} 24 field_dict.update(self.additional_properties) 25 26 return field_dict 27 28 @classmethod 29 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 30 d = dict(src_dict) 31 peer_metadata = cls() 32 33 peer_metadata.additional_properties = d 34 return peer_metadata 35 36 @property 37 def additional_keys(self) -> list[str]: 38 return list(self.additional_properties.keys()) 39 40 def __getitem__(self, key: str) -> Any: 41 return self.additional_properties[key] 42 43 def __setitem__(self, key: str, value: Any) -> None: 44 self.additional_properties[key] = value 45 46 def __delitem__(self, key: str) -> None: 47 del self.additional_properties[key] 48 49 def __contains__(self, key: str) -> bool: 50 return key in self.additional_properties
Custom metadata set by the peer
Example: {'name': 'FishjamUser'}
77@dataclass 78class PeerOptions: 79 """Options specific to the Peer""" 80 81 enable_simulcast: bool = True 82 """Enables the peer to use simulcast""" 83 metadata: dict[str, Any] | None = None 84 """Peer metadata""" 85 subscribe: SubscribeOptions | None = None
Options specific to the Peer
59@dataclass 60class RoomOptions: 61 """Description of a room options""" 62 63 max_peers: int | None = None 64 """Maximum amount of peers allowed into the room""" 65 video_codec: Literal["h264", "vp8"] | None = None 66 """Enforces video codec for each peer in the room""" 67 webhook_url: str | None = None 68 """URL where Fishjam notifications will be sent""" 69 room_type: Literal[ 70 "conference", "audio_only", "livestream", "full_feature", "broadcaster" 71 ] = "conference" 72 """The use-case of the room. If not provided, this defaults to conference.""" 73 public: bool = False 74 """True if livestream viewers can omit specifying a token."""
Description of a room options
47@dataclass 48class Room: 49 """Description of the room state""" 50 51 config: RoomConfig 52 """Room configuration""" 53 id: str 54 """Room ID""" 55 peers: list[Peer] 56 """List of all peers"""
Description of the room state
26@_attrs_define 27class Peer: 28 """Describes peer status 29 30 Attributes: 31 id (str): Assigned peer id Example: peer-1. 32 metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. 33 status (PeerStatus): Informs about the peer status Example: disconnected. 34 subscribe (Union['SubscribeOptions', None]): Configuration of server-side subscriptions to the peer's tracks 35 Example: {'audioFormat': 'pcm16'}. 36 tracks (list['Track']): List of all peer's tracks 37 type_ (PeerType): Peer type Example: webrtc. 38 """ 39 40 id: str 41 metadata: Union["PeerMetadata", None] 42 status: PeerStatus 43 subscribe: Union["SubscribeOptions", None] 44 tracks: list["Track"] 45 type_: PeerType 46 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 47 48 def to_dict(self) -> dict[str, Any]: 49 from ..models.peer_metadata import PeerMetadata 50 from ..models.subscribe_options import SubscribeOptions 51 52 id = self.id 53 54 metadata: Union[None, dict[str, Any]] 55 if isinstance(self.metadata, PeerMetadata): 56 metadata = self.metadata.to_dict() 57 else: 58 metadata = self.metadata 59 60 status = self.status.value 61 62 subscribe: Union[None, dict[str, Any]] 63 if isinstance(self.subscribe, SubscribeOptions): 64 subscribe = self.subscribe.to_dict() 65 else: 66 subscribe = self.subscribe 67 68 tracks = [] 69 for tracks_item_data in self.tracks: 70 tracks_item = tracks_item_data.to_dict() 71 tracks.append(tracks_item) 72 73 type_ = self.type_.value 74 75 field_dict: dict[str, Any] = {} 76 field_dict.update(self.additional_properties) 77 field_dict.update( 78 { 79 "id": id, 80 "metadata": metadata, 81 "status": status, 82 "subscribe": subscribe, 83 "tracks": tracks, 84 "type": type_, 85 } 86 ) 87 88 return field_dict 89 90 @classmethod 91 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 92 from ..models.peer_metadata import PeerMetadata 93 from ..models.subscribe_options import SubscribeOptions 94 from ..models.track import Track 95 96 d = dict(src_dict) 97 id = d.pop("id") 98 99 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 100 if data is None: 101 return data 102 try: 103 if not isinstance(data, dict): 104 raise TypeError() 105 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 106 107 return componentsschemas_peer_metadata_type_0 108 except: # noqa: E722 109 pass 110 return cast(Union["PeerMetadata", None], data) 111 112 metadata = _parse_metadata(d.pop("metadata")) 113 114 status = PeerStatus(d.pop("status")) 115 116 def _parse_subscribe(data: object) -> Union["SubscribeOptions", None]: 117 if data is None: 118 return data 119 try: 120 if not isinstance(data, dict): 121 raise TypeError() 122 componentsschemas_subscribe_options_type_0 = SubscribeOptions.from_dict( 123 data 124 ) 125 126 return componentsschemas_subscribe_options_type_0 127 except: # noqa: E722 128 pass 129 return cast(Union["SubscribeOptions", None], data) 130 131 subscribe = _parse_subscribe(d.pop("subscribe")) 132 133 tracks = [] 134 _tracks = d.pop("tracks") 135 for tracks_item_data in _tracks: 136 tracks_item = Track.from_dict(tracks_item_data) 137 138 tracks.append(tracks_item) 139 140 type_ = PeerType(d.pop("type")) 141 142 peer = cls( 143 id=id, 144 metadata=metadata, 145 status=status, 146 subscribe=subscribe, 147 tracks=tracks, 148 type_=type_, 149 ) 150 151 peer.additional_properties = d 152 return peer 153 154 @property 155 def additional_keys(self) -> list[str]: 156 return list(self.additional_properties.keys()) 157 158 def __getitem__(self, key: str) -> Any: 159 return self.additional_properties[key] 160 161 def __setitem__(self, key: str, value: Any) -> None: 162 self.additional_properties[key] = value 163 164 def __delitem__(self, key: str) -> None: 165 del self.additional_properties[key] 166 167 def __contains__(self, key: str) -> bool: 168 return key in self.additional_properties
Describes peer status
Attributes: id (str): Assigned peer id Example: peer-1. metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. status (PeerStatus): Informs about the peer status Example: disconnected. subscribe (Union['SubscribeOptions', None]): Configuration of server-side subscriptions to the peer's tracks Example: {'audioFormat': 'pcm16'}. tracks (list['Track']): List of all peer's tracks type_ (PeerType): Peer type Example: webrtc.
2def __init__(self, id, metadata, status, subscribe, tracks, type_): 3 self.id = id 4 self.metadata = metadata 5 self.status = status 6 self.subscribe = subscribe 7 self.tracks = tracks 8 self.type_ = type_ 9 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.
48 def to_dict(self) -> dict[str, Any]: 49 from ..models.peer_metadata import PeerMetadata 50 from ..models.subscribe_options import SubscribeOptions 51 52 id = self.id 53 54 metadata: Union[None, dict[str, Any]] 55 if isinstance(self.metadata, PeerMetadata): 56 metadata = self.metadata.to_dict() 57 else: 58 metadata = self.metadata 59 60 status = self.status.value 61 62 subscribe: Union[None, dict[str, Any]] 63 if isinstance(self.subscribe, SubscribeOptions): 64 subscribe = self.subscribe.to_dict() 65 else: 66 subscribe = self.subscribe 67 68 tracks = [] 69 for tracks_item_data in self.tracks: 70 tracks_item = tracks_item_data.to_dict() 71 tracks.append(tracks_item) 72 73 type_ = self.type_.value 74 75 field_dict: dict[str, Any] = {} 76 field_dict.update(self.additional_properties) 77 field_dict.update( 78 { 79 "id": id, 80 "metadata": metadata, 81 "status": status, 82 "subscribe": subscribe, 83 "tracks": tracks, 84 "type": type_, 85 } 86 ) 87 88 return field_dict
90 @classmethod 91 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 92 from ..models.peer_metadata import PeerMetadata 93 from ..models.subscribe_options import SubscribeOptions 94 from ..models.track import Track 95 96 d = dict(src_dict) 97 id = d.pop("id") 98 99 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 100 if data is None: 101 return data 102 try: 103 if not isinstance(data, dict): 104 raise TypeError() 105 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 106 107 return componentsschemas_peer_metadata_type_0 108 except: # noqa: E722 109 pass 110 return cast(Union["PeerMetadata", None], data) 111 112 metadata = _parse_metadata(d.pop("metadata")) 113 114 status = PeerStatus(d.pop("status")) 115 116 def _parse_subscribe(data: object) -> Union["SubscribeOptions", None]: 117 if data is None: 118 return data 119 try: 120 if not isinstance(data, dict): 121 raise TypeError() 122 componentsschemas_subscribe_options_type_0 = SubscribeOptions.from_dict( 123 data 124 ) 125 126 return componentsschemas_subscribe_options_type_0 127 except: # noqa: E722 128 pass 129 return cast(Union["SubscribeOptions", None], data) 130 131 subscribe = _parse_subscribe(d.pop("subscribe")) 132 133 tracks = [] 134 _tracks = d.pop("tracks") 135 for tracks_item_data in _tracks: 136 tracks_item = Track.from_dict(tracks_item_data) 137 138 tracks.append(tracks_item) 139 140 type_ = PeerType(d.pop("type")) 141 142 peer = cls( 143 id=id, 144 metadata=metadata, 145 status=status, 146 subscribe=subscribe, 147 tracks=tracks, 148 type_=type_, 149 ) 150 151 peer.additional_properties = d 152 return peer