Fishjam Python Server SDK
Python server SDK for the Fishjam.
Read the docs here
Installation
pip install fishjam-server-sdk
Usage
The SDK exports two main classes for interacting with Fishjam server:
FishjamClient
and FishjamNotifier
.
FishjamClient
wraps http REST api calls, while FishjamNotifier
is responsible for receiving real-time updates from the server.
FishjamClient
Create a FishjamClient
instance, providing the fishjam server address and api token
from fishjam import FishjamClient
fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")
You can use it to interact with Fishjam to manage rooms and peers
# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)
# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])
# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)
# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'
All methods in FishjamClient
may raise one of the exceptions deriving from fishjam.errors.HTTPError
. They are defined in fishjam.errors
.
FishjamNotifier
FishjamNotifier allows for receiving real-time updates from the Fishjam Server.
You can read more about notifications in the Fishjam Docs.
Create FishjamNotifier
instance
from fishjam import FishjamNotifier
fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')
Then define a handler for incoming messages
@notifier.on_server_notification
def handle_notification(server_notification):
print(f'Received a notification: {server_notification}')
After that you can start the notifier
async def test_notifier():
notifier_task = asyncio.create_task(fishjam_notifier.connect())
# Wait for notifier to be ready to receive messages
await fishjam_notifier.wait_ready()
# Create a room to trigger a server notification
fishjam_client = FishjamClient()
fishjam_client.create_room()
await notifier_task
asyncio.run(test_notifier())
# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')
License
Licensed under the Apache License, Version 2.0
Fishjam is created by Software Mansion
Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.
1""" 2.. include:: ../README.md 3""" 4 5# pylint: disable=locally-disabled, no-name-in-module, import-error 6 7# Exceptions and Server Messages 8 9# API 10# pylint: disable=locally-disabled, no-name-in-module, import-error 11 12# Exceptions and Server Messages 13from fishjam import agent, errors, events, peer, room 14from fishjam._openapi_client.models import PeerMetadata 15 16# API 17from fishjam._webhook_notifier import receive_binary 18from fishjam._ws_notifier import FishjamNotifier 19from fishjam.api._fishjam_client import ( 20 AgentOptions, 21 AgentOutputOptions, 22 FishjamClient, 23 Peer, 24 PeerOptions, 25 Room, 26 RoomOptions, 27) 28 29__all__ = [ 30 "FishjamClient", 31 "FishjamNotifier", 32 "receive_binary", 33 "PeerMetadata", 34 "PeerOptions", 35 "RoomOptions", 36 "AgentOptions", 37 "AgentOutputOptions", 38 "Room", 39 "Peer", 40 "events", 41 "errors", 42 "room", 43 "peer", 44 "agent", 45] 46 47__docformat__ = "restructuredtext"
108class FishjamClient(Client): 109 """Allows for managing rooms""" 110 111 def __init__( 112 self, 113 fishjam_id: str, 114 management_token: str, 115 ): 116 """ 117 Create a FishjamClient instance, providing the fishjam id and management token. 118 """ 119 super().__init__(fishjam_id=fishjam_id, management_token=management_token) 120 121 def create_peer( 122 self, 123 room_id: str, 124 options: PeerOptions | None = None, 125 ) -> tuple[Peer, str]: 126 """ 127 Creates peer in the room 128 129 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 130 to authenticate to Fishjam. 131 132 The possible options to pass for peer are `PeerOptions`. 133 """ 134 options = options or PeerOptions() 135 136 peer_metadata = self.__parse_peer_metadata(options.metadata) 137 peer_options = PeerOptionsWebRTC( 138 enable_simulcast=options.enable_simulcast, 139 metadata=peer_metadata, 140 ) 141 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 142 143 resp = cast( 144 PeerDetailsResponse, 145 self._request(room_add_peer, room_id=room_id, body=body), 146 ) 147 148 return (resp.data.peer, resp.data.token) 149 150 def create_agent(self, room_id: str, options: AgentOptions | None = None): 151 options = options or AgentOptions() 152 body = AddPeerBody( 153 type_=PeerType.AGENT, 154 options=PeerOptionsAgent( 155 output=PeerOptionsAgentOutput( 156 audio_format=PeerOptionsAgentOutputAudioFormat( 157 options.output.audio_format 158 ), 159 audio_sample_rate=PeerOptionsAgentOutputAudioSampleRate( 160 options.output.audio_sample_rate 161 ), 162 ) 163 ), 164 ) 165 166 resp = cast( 167 PeerDetailsResponse, 168 self._request(room_add_peer, room_id=room_id, body=body), 169 ) 170 171 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url) 172 173 def create_room(self, options: RoomOptions | None = None) -> Room: 174 """ 175 Creates a new room 176 Returns the created `Room` 177 """ 178 options = options or RoomOptions() 179 180 if options.video_codec is None: 181 codec = UNSET 182 else: 183 codec = RoomConfigVideoCodec(options.video_codec) 184 185 config = RoomConfig( 186 max_peers=options.max_peers, 187 video_codec=codec, 188 webhook_url=options.webhook_url, 189 room_type=RoomConfigRoomType(options.room_type), 190 public=options.public, 191 ) 192 193 room = cast( 194 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 195 ).data.room 196 197 return Room(config=room.config, id=room.id, peers=room.peers) 198 199 def get_all_rooms(self) -> list[Room]: 200 """Returns list of all rooms""" 201 202 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 203 204 return [ 205 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 206 ] 207 208 def get_room(self, room_id: str) -> Room: 209 """Returns room with the given id""" 210 211 room = cast( 212 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 213 ).data 214 215 return Room(config=room.config, id=room.id, peers=room.peers) 216 217 def delete_peer(self, room_id: str, peer_id: str) -> None: 218 """Deletes peer""" 219 220 return self._request(room_delete_peer, id=peer_id, room_id=room_id) 221 222 def delete_room(self, room_id: str) -> None: 223 """Deletes a room""" 224 225 return self._request(room_delete_room, room_id=room_id) 226 227 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 228 """Refreshes peer token""" 229 230 response = cast( 231 PeerRefreshTokenResponse, 232 self._request(room_refresh_token, id=peer_id, room_id=room_id), 233 ) 234 235 return response.data.token 236 237 def create_livestream_viewer_token(self, room_id: str) -> str: 238 """Generates viewer token for livestream rooms""" 239 240 response = cast( 241 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 242 ) 243 244 return response.token 245 246 def create_livestream_streamer_token(self, room_id: str) -> str: 247 """Generates streamer token for livestream rooms""" 248 249 response = cast( 250 StreamerToken, 251 self._request(streamer_generate_streamer_token, room_id=room_id), 252 ) 253 254 return response.token 255 256 def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata: 257 peer_metadata = PeerOptionsWebRTCMetadata() 258 259 if not metadata: 260 return peer_metadata 261 262 for key, value in metadata.items(): 263 peer_metadata.additional_properties[key] = value 264 265 return peer_metadata
Allows for managing rooms
111 def __init__( 112 self, 113 fishjam_id: str, 114 management_token: str, 115 ): 116 """ 117 Create a FishjamClient instance, providing the fishjam id and management token. 118 """ 119 super().__init__(fishjam_id=fishjam_id, management_token=management_token)
Create a FishjamClient instance, providing the fishjam id and management token.
121 def create_peer( 122 self, 123 room_id: str, 124 options: PeerOptions | None = None, 125 ) -> tuple[Peer, str]: 126 """ 127 Creates peer in the room 128 129 Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer 130 to authenticate to Fishjam. 131 132 The possible options to pass for peer are `PeerOptions`. 133 """ 134 options = options or PeerOptions() 135 136 peer_metadata = self.__parse_peer_metadata(options.metadata) 137 peer_options = PeerOptionsWebRTC( 138 enable_simulcast=options.enable_simulcast, 139 metadata=peer_metadata, 140 ) 141 body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options) 142 143 resp = cast( 144 PeerDetailsResponse, 145 self._request(room_add_peer, room_id=room_id, body=body), 146 ) 147 148 return (resp.data.peer, resp.data.token)
Creates peer in the room
Returns a tuple (Peer
, PeerToken
) - the token is needed by Peer
to authenticate to Fishjam.
The possible options to pass for peer are PeerOptions
.
150 def create_agent(self, room_id: str, options: AgentOptions | None = None): 151 options = options or AgentOptions() 152 body = AddPeerBody( 153 type_=PeerType.AGENT, 154 options=PeerOptionsAgent( 155 output=PeerOptionsAgentOutput( 156 audio_format=PeerOptionsAgentOutputAudioFormat( 157 options.output.audio_format 158 ), 159 audio_sample_rate=PeerOptionsAgentOutputAudioSampleRate( 160 options.output.audio_sample_rate 161 ), 162 ) 163 ), 164 ) 165 166 resp = cast( 167 PeerDetailsResponse, 168 self._request(room_add_peer, room_id=room_id, body=body), 169 ) 170 171 return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
173 def create_room(self, options: RoomOptions | None = None) -> Room: 174 """ 175 Creates a new room 176 Returns the created `Room` 177 """ 178 options = options or RoomOptions() 179 180 if options.video_codec is None: 181 codec = UNSET 182 else: 183 codec = RoomConfigVideoCodec(options.video_codec) 184 185 config = RoomConfig( 186 max_peers=options.max_peers, 187 video_codec=codec, 188 webhook_url=options.webhook_url, 189 room_type=RoomConfigRoomType(options.room_type), 190 public=options.public, 191 ) 192 193 room = cast( 194 RoomCreateDetailsResponse, self._request(room_create_room, body=config) 195 ).data.room 196 197 return Room(config=room.config, id=room.id, peers=room.peers)
Creates a new room
Returns the created Room
199 def get_all_rooms(self) -> list[Room]: 200 """Returns list of all rooms""" 201 202 rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data 203 204 return [ 205 Room(config=room.config, id=room.id, peers=room.peers) for room in rooms 206 ]
Returns list of all rooms
208 def get_room(self, room_id: str) -> Room: 209 """Returns room with the given id""" 210 211 room = cast( 212 RoomDetailsResponse, self._request(room_get_room, room_id=room_id) 213 ).data 214 215 return Room(config=room.config, id=room.id, peers=room.peers)
Returns room with the given id
217 def delete_peer(self, room_id: str, peer_id: str) -> None: 218 """Deletes peer""" 219 220 return self._request(room_delete_peer, id=peer_id, room_id=room_id)
Deletes peer
222 def delete_room(self, room_id: str) -> None: 223 """Deletes a room""" 224 225 return self._request(room_delete_room, room_id=room_id)
Deletes a room
227 def refresh_peer_token(self, room_id: str, peer_id: str) -> str: 228 """Refreshes peer token""" 229 230 response = cast( 231 PeerRefreshTokenResponse, 232 self._request(room_refresh_token, id=peer_id, room_id=room_id), 233 ) 234 235 return response.data.token
Refreshes peer token
237 def create_livestream_viewer_token(self, room_id: str) -> str: 238 """Generates viewer token for livestream rooms""" 239 240 response = cast( 241 ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id) 242 ) 243 244 return response.token
Generates viewer token for livestream rooms
246 def create_livestream_streamer_token(self, room_id: str) -> str: 247 """Generates streamer token for livestream rooms""" 248 249 response = cast( 250 StreamerToken, 251 self._request(streamer_generate_streamer_token, room_id=room_id), 252 ) 253 254 return response.token
Generates streamer token for livestream rooms
Inherited Members
- fishjam.api._client.Client
- client
35class FishjamNotifier: 36 """ 37 Allows for receiving WebSocket messages from Fishjam. 38 """ 39 40 def __init__( 41 self, 42 fishjam_id: str, 43 management_token: str, 44 ): 45 """ 46 Create FishjamNotifier instance, providing the fishjam id and management token. 47 """ 48 49 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 50 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 51 self._management_token: str = management_token 52 self._websocket: client.ClientConnection | None = None 53 self._ready: bool = False 54 55 self._ready_event: asyncio.Event | None = None 56 57 self._notification_handler: NotificationHandler | None = None 58 59 def on_server_notification(self, handler: NotificationHandler): 60 """ 61 Decorator used for defining handler for Fishjam Notifications 62 """ 63 self._notification_handler = handler 64 return handler 65 66 async def connect(self): 67 """ 68 A coroutine which connects FishjamNotifier to Fishjam and listens for 69 all incoming messages from the Fishjam. 70 71 It runs until the connection isn't closed. 72 73 The incoming messages are handled by the functions defined using the 74 `on_server_notification` decorator. 75 76 The handler have to be defined before calling `connect`, 77 otherwise the messages won't be received. 78 """ 79 async with client.connect(self._fishjam_url) as websocket: 80 try: 81 self._websocket = websocket 82 await self._authenticate() 83 84 if self._notification_handler: 85 await self._subscribe_event( 86 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 87 ) 88 89 self._ready = True 90 if self._ready_event: 91 self._ready_event.set() 92 93 await self._receive_loop() 94 finally: 95 self._websocket = None 96 97 async def wait_ready(self) -> None: 98 """ 99 Waits until the notifier is connected and authenticated to Fishjam. 100 101 If already connected, returns immediately. 102 """ 103 if self._ready: 104 return 105 106 if self._ready_event is None: 107 self._ready_event = asyncio.Event() 108 109 await self._ready_event.wait() 110 111 async def _authenticate(self): 112 if not self._websocket: 113 raise RuntimeError("Websocket is not connected") 114 115 msg = ServerMessage( 116 auth_request=ServerMessageAuthRequest(token=self._management_token) 117 ) 118 await self._websocket.send(bytes(msg)) 119 120 try: 121 message = await self._websocket.recv(decode=False) 122 except ConnectionClosed as exception: 123 if "invalid token" in str(exception): 124 raise RuntimeError("Invalid management token") from exception 125 raise 126 127 message = ServerMessage().parse(message) 128 129 _type, message = betterproto.which_one_of(message, "content") 130 assert isinstance(message, ServerMessageAuthenticated) 131 132 async def _receive_loop(self): 133 if not self._websocket: 134 raise RuntimeError("Websocket is not connected") 135 if not self._notification_handler: 136 raise RuntimeError("Notification handler is not defined") 137 138 while True: 139 message = cast(bytes, await self._websocket.recv()) 140 message = ServerMessage().parse(message) 141 _which, message = betterproto.which_one_of(message, "content") 142 143 if isinstance(message, ALLOWED_NOTIFICATIONS): 144 res = self._notification_handler(message) 145 if inspect.isawaitable(res): 146 await res 147 148 async def _subscribe_event(self, event: ServerMessageEventType): 149 if not self._websocket: 150 raise RuntimeError("Websocket is not connected") 151 152 request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event)) 153 154 await self._websocket.send(bytes(request)) 155 message = cast(bytes, await self._websocket.recv()) 156 message = ServerMessage().parse(message) 157 _which, message = betterproto.which_one_of(message, "content") 158 assert isinstance(message, ServerMessageSubscribeResponse)
Allows for receiving WebSocket messages from Fishjam.
40 def __init__( 41 self, 42 fishjam_id: str, 43 management_token: str, 44 ): 45 """ 46 Create FishjamNotifier instance, providing the fishjam id and management token. 47 """ 48 49 websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws") 50 self._fishjam_url = f"{websocket_url}/socket/server/websocket" 51 self._management_token: str = management_token 52 self._websocket: client.ClientConnection | None = None 53 self._ready: bool = False 54 55 self._ready_event: asyncio.Event | None = None 56 57 self._notification_handler: NotificationHandler | None = None
Create FishjamNotifier instance, providing the fishjam id and management token.
59 def on_server_notification(self, handler: NotificationHandler): 60 """ 61 Decorator used for defining handler for Fishjam Notifications 62 """ 63 self._notification_handler = handler 64 return handler
Decorator used for defining handler for Fishjam Notifications
66 async def connect(self): 67 """ 68 A coroutine which connects FishjamNotifier to Fishjam and listens for 69 all incoming messages from the Fishjam. 70 71 It runs until the connection isn't closed. 72 73 The incoming messages are handled by the functions defined using the 74 `on_server_notification` decorator. 75 76 The handler have to be defined before calling `connect`, 77 otherwise the messages won't be received. 78 """ 79 async with client.connect(self._fishjam_url) as websocket: 80 try: 81 self._websocket = websocket 82 await self._authenticate() 83 84 if self._notification_handler: 85 await self._subscribe_event( 86 event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION 87 ) 88 89 self._ready = True 90 if self._ready_event: 91 self._ready_event.set() 92 93 await self._receive_loop() 94 finally: 95 self._websocket = None
A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.
It runs until the connection isn't closed.
The incoming messages are handled by the functions defined using the
on_server_notification
decorator.
The handler have to be defined before calling connect
,
otherwise the messages won't be received.
97 async def wait_ready(self) -> None: 98 """ 99 Waits until the notifier is connected and authenticated to Fishjam. 100 101 If already connected, returns immediately. 102 """ 103 if self._ready: 104 return 105 106 if self._ready_event is None: 107 self._ready_event = asyncio.Event() 108 109 await self._ready_event.wait()
Waits until the notifier is connected and authenticated to Fishjam.
If already connected, returns immediately.
18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]: 19 """ 20 Transform received protobuf notification to adequate notification instance. 21 The available notifications are listed in `fishjam.events` module. 22 """ 23 message = ServerMessage().parse(binary) 24 _which, message = betterproto.which_one_of(message, "content") 25 26 if isinstance(message, ALLOWED_NOTIFICATIONS): 27 return message 28 29 return None
Transform received protobuf notification to adequate notification instance.
The available notifications are listed in fishjam.events
module.
11@_attrs_define 12class PeerMetadata: 13 """Custom metadata set by the peer 14 15 Example: 16 {'name': 'FishjamUser'} 17 18 """ 19 20 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 21 22 def to_dict(self) -> dict[str, Any]: 23 field_dict: dict[str, Any] = {} 24 field_dict.update(self.additional_properties) 25 26 return field_dict 27 28 @classmethod 29 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 30 d = dict(src_dict) 31 peer_metadata = cls() 32 33 peer_metadata.additional_properties = d 34 return peer_metadata 35 36 @property 37 def additional_keys(self) -> list[str]: 38 return list(self.additional_properties.keys()) 39 40 def __getitem__(self, key: str) -> Any: 41 return self.additional_properties[key] 42 43 def __setitem__(self, key: str, value: Any) -> None: 44 self.additional_properties[key] = value 45 46 def __delitem__(self, key: str) -> None: 47 del self.additional_properties[key] 48 49 def __contains__(self, key: str) -> bool: 50 return key in self.additional_properties
Custom metadata set by the peer
Example: {'name': 'FishjamUser'}
83@dataclass 84class PeerOptions: 85 """Options specific to a WebRTC Peer""" 86 87 enable_simulcast: bool = True 88 """Enables the peer to use simulcast""" 89 metadata: dict[str, Any] | None = None 90 """Peer metadata"""
Options specific to a WebRTC Peer
60@dataclass 61class RoomOptions: 62 """Description of a room options""" 63 64 max_peers: int | None = None 65 """Maximum amount of peers allowed into the room""" 66 video_codec: Literal["h264", "vp8"] | None = None 67 """Enforces video codec for each peer in the room""" 68 webhook_url: str | None = None 69 """URL where Fishjam notifications will be sent""" 70 room_type: Literal[ 71 "conference", 72 "audio_only", 73 "livestream", 74 "full_feature", 75 "broadcaster", 76 "audio_only_livestream", 77 ] = "conference" 78 """The use-case of the room. If not provided, this defaults to conference.""" 79 public: bool = False 80 """True if livestream viewers can omit specifying a token."""
Description of a room options
101@dataclass 102class AgentOptions: 103 """Options specific to a WebRTC Peer""" 104 105 output: AgentOutputOptions = field(default_factory=AgentOutputOptions)
Options specific to a WebRTC Peer
93@dataclass 94class AgentOutputOptions: 95 """Options of the desired format of audio tracks going from Fishjam to the agent.""" 96 97 audio_format: Literal["pcm16"] = "pcm16" 98 audio_sample_rate: Literal[16000, 24000] = 16000
Options of the desired format of audio tracks going from Fishjam to the agent.
48@dataclass 49class Room: 50 """Description of the room state""" 51 52 config: RoomConfig 53 """Room configuration""" 54 id: str 55 """Room ID""" 56 peers: list[Peer] 57 """List of all peers"""
Description of the room state
26@_attrs_define 27class Peer: 28 """Describes peer status 29 30 Attributes: 31 id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. 32 metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. 33 status (PeerStatus): Informs about the peer status Example: disconnected. 34 subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy 35 subscriptions (list[str]): Describes peer's subscriptions in manual mode 36 tracks (list['Track']): List of all peer's tracks 37 type_ (PeerType): Peer type Example: webrtc. 38 """ 39 40 id: str 41 metadata: Union["PeerMetadata", None] 42 status: PeerStatus 43 subscribe_mode: SubscribeMode 44 subscriptions: list[str] 45 tracks: list["Track"] 46 type_: PeerType 47 additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) 48 49 def to_dict(self) -> dict[str, Any]: 50 from ..models.peer_metadata import PeerMetadata 51 52 id = self.id 53 54 metadata: Union[None, dict[str, Any]] 55 if isinstance(self.metadata, PeerMetadata): 56 metadata = self.metadata.to_dict() 57 else: 58 metadata = self.metadata 59 60 status = self.status.value 61 62 subscribe_mode = self.subscribe_mode.value 63 64 subscriptions = self.subscriptions 65 66 tracks = [] 67 for tracks_item_data in self.tracks: 68 tracks_item = tracks_item_data.to_dict() 69 tracks.append(tracks_item) 70 71 type_ = self.type_.value 72 73 field_dict: dict[str, Any] = {} 74 field_dict.update(self.additional_properties) 75 field_dict.update( 76 { 77 "id": id, 78 "metadata": metadata, 79 "status": status, 80 "subscribeMode": subscribe_mode, 81 "subscriptions": subscriptions, 82 "tracks": tracks, 83 "type": type_, 84 } 85 ) 86 87 return field_dict 88 89 @classmethod 90 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 91 from ..models.peer_metadata import PeerMetadata 92 from ..models.track import Track 93 94 d = dict(src_dict) 95 id = d.pop("id") 96 97 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 98 if data is None: 99 return data 100 try: 101 if not isinstance(data, dict): 102 raise TypeError() 103 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 104 105 return componentsschemas_peer_metadata_type_0 106 except: # noqa: E722 107 pass 108 return cast(Union["PeerMetadata", None], data) 109 110 metadata = _parse_metadata(d.pop("metadata")) 111 112 status = PeerStatus(d.pop("status")) 113 114 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 115 116 subscriptions = cast(list[str], d.pop("subscriptions")) 117 118 tracks = [] 119 _tracks = d.pop("tracks") 120 for tracks_item_data in _tracks: 121 tracks_item = Track.from_dict(tracks_item_data) 122 123 tracks.append(tracks_item) 124 125 type_ = PeerType(d.pop("type")) 126 127 peer = cls( 128 id=id, 129 metadata=metadata, 130 status=status, 131 subscribe_mode=subscribe_mode, 132 subscriptions=subscriptions, 133 tracks=tracks, 134 type_=type_, 135 ) 136 137 peer.additional_properties = d 138 return peer 139 140 @property 141 def additional_keys(self) -> list[str]: 142 return list(self.additional_properties.keys()) 143 144 def __getitem__(self, key: str) -> Any: 145 return self.additional_properties[key] 146 147 def __setitem__(self, key: str, value: Any) -> None: 148 self.additional_properties[key] = value 149 150 def __delitem__(self, key: str) -> None: 151 del self.additional_properties[key] 152 153 def __contains__(self, key: str) -> bool: 154 return key in self.additional_properties
Describes peer status
Attributes: id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. status (PeerStatus): Informs about the peer status Example: disconnected. subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy subscriptions (list[str]): Describes peer's subscriptions in manual mode tracks (list['Track']): List of all peer's tracks type_ (PeerType): Peer type Example: webrtc.
2def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_): 3 self.id = id 4 self.metadata = metadata 5 self.status = status 6 self.subscribe_mode = subscribe_mode 7 self.subscriptions = subscriptions 8 self.tracks = tracks 9 self.type_ = type_ 10 self.additional_properties = __attr_factory_additional_properties()
Method generated by attrs for class Peer.
49 def to_dict(self) -> dict[str, Any]: 50 from ..models.peer_metadata import PeerMetadata 51 52 id = self.id 53 54 metadata: Union[None, dict[str, Any]] 55 if isinstance(self.metadata, PeerMetadata): 56 metadata = self.metadata.to_dict() 57 else: 58 metadata = self.metadata 59 60 status = self.status.value 61 62 subscribe_mode = self.subscribe_mode.value 63 64 subscriptions = self.subscriptions 65 66 tracks = [] 67 for tracks_item_data in self.tracks: 68 tracks_item = tracks_item_data.to_dict() 69 tracks.append(tracks_item) 70 71 type_ = self.type_.value 72 73 field_dict: dict[str, Any] = {} 74 field_dict.update(self.additional_properties) 75 field_dict.update( 76 { 77 "id": id, 78 "metadata": metadata, 79 "status": status, 80 "subscribeMode": subscribe_mode, 81 "subscriptions": subscriptions, 82 "tracks": tracks, 83 "type": type_, 84 } 85 ) 86 87 return field_dict
89 @classmethod 90 def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: 91 from ..models.peer_metadata import PeerMetadata 92 from ..models.track import Track 93 94 d = dict(src_dict) 95 id = d.pop("id") 96 97 def _parse_metadata(data: object) -> Union["PeerMetadata", None]: 98 if data is None: 99 return data 100 try: 101 if not isinstance(data, dict): 102 raise TypeError() 103 componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data) 104 105 return componentsschemas_peer_metadata_type_0 106 except: # noqa: E722 107 pass 108 return cast(Union["PeerMetadata", None], data) 109 110 metadata = _parse_metadata(d.pop("metadata")) 111 112 status = PeerStatus(d.pop("status")) 113 114 subscribe_mode = SubscribeMode(d.pop("subscribeMode")) 115 116 subscriptions = cast(list[str], d.pop("subscriptions")) 117 118 tracks = [] 119 _tracks = d.pop("tracks") 120 for tracks_item_data in _tracks: 121 tracks_item = Track.from_dict(tracks_item_data) 122 123 tracks.append(tracks_item) 124 125 type_ = PeerType(d.pop("type")) 126 127 peer = cls( 128 id=id, 129 metadata=metadata, 130 status=status, 131 subscribe_mode=subscribe_mode, 132 subscriptions=subscriptions, 133 tracks=tracks, 134 type_=type_, 135 ) 136 137 peer.additional_properties = d 138 return peer