Fishjam Python Server SDK

Fishjam Python Server SDK

Python server SDK for the Fishjam.

Read the docs here

Installation

pip install fishjam-server-sdk

Usage

The SDK exports two main classes for interacting with Fishjam server: FishjamClient and FishjamNotifier.

FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.

FishjamClient

Create a FishjamClient instance, providing the fishjam server address and api token

from fishjam import FishjamClient

fishjam_client = FishjamClient(fishjam_id="<fishjam_id>", management_token="<management_token>")

You can use it to interact with Fishjam to manage rooms and peers

# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)

# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])

# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)

# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'

All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.

FishjamNotifier

FishjamNotifier allows for receiving real-time updates from the Fishjam Server.

You can read more about notifications in the Fishjam Docs.

Create FishjamNotifier instance

from fishjam import FishjamNotifier

fishjam_notifier = FishjamNotifier(fishjam_id='<fishjam_id>', management_token='<management_token>')

Then define a handler for incoming messages

@notifier.on_server_notification
def handle_notification(server_notification):
    print(f'Received a notification: {server_notification}')

After that you can start the notifier

async def test_notifier():
    notifier_task = asyncio.create_task(fishjam_notifier.connect())

    # Wait for notifier to be ready to receive messages
    await fishjam_notifier.wait_ready()

    # Create a room to trigger a server notification
    fishjam_client = FishjamClient()
    fishjam_client.create_room()

    await notifier_task

asyncio.run(test_notifier())

# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')

License

Licensed under the Apache License, Version 2.0

Fishjam is created by Software Mansion

Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.

Software Mansion

 1"""
 2.. include:: ../README.md
 3"""
 4
 5# pylint: disable=locally-disabled, no-name-in-module, import-error
 6
 7# Exceptions and Server Messages
 8
 9# API
10# pylint: disable=locally-disabled, no-name-in-module, import-error
11
12# Exceptions and Server Messages
13from fishjam import agent, errors, events, peer, room
14from fishjam._openapi_client.models import PeerMetadata
15
16# API
17from fishjam._webhook_notifier import receive_binary
18from fishjam._ws_notifier import FishjamNotifier
19from fishjam.api._fishjam_client import (
20    AgentOptions,
21    AgentOutputOptions,
22    FishjamClient,
23    Peer,
24    PeerOptions,
25    Room,
26    RoomOptions,
27)
28
29__all__ = [
30    "FishjamClient",
31    "FishjamNotifier",
32    "receive_binary",
33    "PeerMetadata",
34    "PeerOptions",
35    "RoomOptions",
36    "AgentOptions",
37    "AgentOutputOptions",
38    "Room",
39    "Peer",
40    "events",
41    "errors",
42    "room",
43    "peer",
44    "agent",
45]
46
47__docformat__ = "restructuredtext"
class FishjamClient(fishjam.api._client.Client):
108class FishjamClient(Client):
109    """Allows for managing rooms"""
110
111    def __init__(
112        self,
113        fishjam_id: str,
114        management_token: str,
115    ):
116        """
117        Create a FishjamClient instance, providing the fishjam id and management token.
118        """
119        super().__init__(fishjam_id=fishjam_id, management_token=management_token)
120
121    def create_peer(
122        self,
123        room_id: str,
124        options: PeerOptions | None = None,
125    ) -> tuple[Peer, str]:
126        """
127        Creates peer in the room
128
129        Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer
130        to authenticate to Fishjam.
131
132        The possible options to pass for peer are `PeerOptions`.
133        """
134        options = options or PeerOptions()
135
136        peer_metadata = self.__parse_peer_metadata(options.metadata)
137        peer_options = PeerOptionsWebRTC(
138            enable_simulcast=options.enable_simulcast,
139            metadata=peer_metadata,
140        )
141        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
142
143        resp = cast(
144            PeerDetailsResponse,
145            self._request(room_add_peer, room_id=room_id, body=body),
146        )
147
148        return (resp.data.peer, resp.data.token)
149
150    def create_agent(self, room_id: str, options: AgentOptions | None = None):
151        options = options or AgentOptions()
152        body = AddPeerBody(
153            type_=PeerType.AGENT,
154            options=PeerOptionsAgent(
155                output=PeerOptionsAgentOutput(
156                    audio_format=PeerOptionsAgentOutputAudioFormat(
157                        options.output.audio_format
158                    ),
159                    audio_sample_rate=PeerOptionsAgentOutputAudioSampleRate(
160                        options.output.audio_sample_rate
161                    ),
162                )
163            ),
164        )
165
166        resp = cast(
167            PeerDetailsResponse,
168            self._request(room_add_peer, room_id=room_id, body=body),
169        )
170
171        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
172
173    def create_room(self, options: RoomOptions | None = None) -> Room:
174        """
175        Creates a new room
176        Returns the created `Room`
177        """
178        options = options or RoomOptions()
179
180        if options.video_codec is None:
181            codec = UNSET
182        else:
183            codec = RoomConfigVideoCodec(options.video_codec)
184
185        config = RoomConfig(
186            max_peers=options.max_peers,
187            video_codec=codec,
188            webhook_url=options.webhook_url,
189            room_type=RoomConfigRoomType(options.room_type),
190            public=options.public,
191        )
192
193        room = cast(
194            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
195        ).data.room
196
197        return Room(config=room.config, id=room.id, peers=room.peers)
198
199    def get_all_rooms(self) -> list[Room]:
200        """Returns list of all rooms"""
201
202        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
203
204        return [
205            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
206        ]
207
208    def get_room(self, room_id: str) -> Room:
209        """Returns room with the given id"""
210
211        room = cast(
212            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
213        ).data
214
215        return Room(config=room.config, id=room.id, peers=room.peers)
216
217    def delete_peer(self, room_id: str, peer_id: str) -> None:
218        """Deletes peer"""
219
220        return self._request(room_delete_peer, id=peer_id, room_id=room_id)
221
222    def delete_room(self, room_id: str) -> None:
223        """Deletes a room"""
224
225        return self._request(room_delete_room, room_id=room_id)
226
227    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
228        """Refreshes peer token"""
229
230        response = cast(
231            PeerRefreshTokenResponse,
232            self._request(room_refresh_token, id=peer_id, room_id=room_id),
233        )
234
235        return response.data.token
236
237    def create_livestream_viewer_token(self, room_id: str) -> str:
238        """Generates viewer token for livestream rooms"""
239
240        response = cast(
241            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
242        )
243
244        return response.token
245
246    def create_livestream_streamer_token(self, room_id: str) -> str:
247        """Generates streamer token for livestream rooms"""
248
249        response = cast(
250            StreamerToken,
251            self._request(streamer_generate_streamer_token, room_id=room_id),
252        )
253
254        return response.token
255
256    def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata:
257        peer_metadata = PeerOptionsWebRTCMetadata()
258
259        if not metadata:
260            return peer_metadata
261
262        for key, value in metadata.items():
263            peer_metadata.additional_properties[key] = value
264
265        return peer_metadata

Allows for managing rooms

FishjamClient(fishjam_id: str, management_token: str)
111    def __init__(
112        self,
113        fishjam_id: str,
114        management_token: str,
115    ):
116        """
117        Create a FishjamClient instance, providing the fishjam id and management token.
118        """
119        super().__init__(fishjam_id=fishjam_id, management_token=management_token)

Create a FishjamClient instance, providing the fishjam id and management token.

def create_peer( self, room_id: str, options: PeerOptions | None = None) -> tuple[Peer, str]:
121    def create_peer(
122        self,
123        room_id: str,
124        options: PeerOptions | None = None,
125    ) -> tuple[Peer, str]:
126        """
127        Creates peer in the room
128
129        Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer
130        to authenticate to Fishjam.
131
132        The possible options to pass for peer are `PeerOptions`.
133        """
134        options = options or PeerOptions()
135
136        peer_metadata = self.__parse_peer_metadata(options.metadata)
137        peer_options = PeerOptionsWebRTC(
138            enable_simulcast=options.enable_simulcast,
139            metadata=peer_metadata,
140        )
141        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
142
143        resp = cast(
144            PeerDetailsResponse,
145            self._request(room_add_peer, room_id=room_id, body=body),
146        )
147
148        return (resp.data.peer, resp.data.token)

Creates peer in the room

Returns a tuple (Peer, PeerToken) - the token is needed by Peer to authenticate to Fishjam.

The possible options to pass for peer are PeerOptions.

def create_agent( self, room_id: str, options: AgentOptions | None = None):
150    def create_agent(self, room_id: str, options: AgentOptions | None = None):
151        options = options or AgentOptions()
152        body = AddPeerBody(
153            type_=PeerType.AGENT,
154            options=PeerOptionsAgent(
155                output=PeerOptionsAgentOutput(
156                    audio_format=PeerOptionsAgentOutputAudioFormat(
157                        options.output.audio_format
158                    ),
159                    audio_sample_rate=PeerOptionsAgentOutputAudioSampleRate(
160                        options.output.audio_sample_rate
161                    ),
162                )
163            ),
164        )
165
166        resp = cast(
167            PeerDetailsResponse,
168            self._request(room_add_peer, room_id=room_id, body=body),
169        )
170
171        return Agent(resp.data.peer.id, room_id, resp.data.token, self._fishjam_url)
def create_room( self, options: RoomOptions | None = None) -> Room:
173    def create_room(self, options: RoomOptions | None = None) -> Room:
174        """
175        Creates a new room
176        Returns the created `Room`
177        """
178        options = options or RoomOptions()
179
180        if options.video_codec is None:
181            codec = UNSET
182        else:
183            codec = RoomConfigVideoCodec(options.video_codec)
184
185        config = RoomConfig(
186            max_peers=options.max_peers,
187            video_codec=codec,
188            webhook_url=options.webhook_url,
189            room_type=RoomConfigRoomType(options.room_type),
190            public=options.public,
191        )
192
193        room = cast(
194            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
195        ).data.room
196
197        return Room(config=room.config, id=room.id, peers=room.peers)

Creates a new room Returns the created Room

def get_all_rooms(self) -> list[Room]:
199    def get_all_rooms(self) -> list[Room]:
200        """Returns list of all rooms"""
201
202        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
203
204        return [
205            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
206        ]

Returns list of all rooms

def get_room(self, room_id: str) -> Room:
208    def get_room(self, room_id: str) -> Room:
209        """Returns room with the given id"""
210
211        room = cast(
212            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
213        ).data
214
215        return Room(config=room.config, id=room.id, peers=room.peers)

Returns room with the given id

def delete_peer(self, room_id: str, peer_id: str) -> None:
217    def delete_peer(self, room_id: str, peer_id: str) -> None:
218        """Deletes peer"""
219
220        return self._request(room_delete_peer, id=peer_id, room_id=room_id)

Deletes peer

def delete_room(self, room_id: str) -> None:
222    def delete_room(self, room_id: str) -> None:
223        """Deletes a room"""
224
225        return self._request(room_delete_room, room_id=room_id)

Deletes a room

def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
227    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
228        """Refreshes peer token"""
229
230        response = cast(
231            PeerRefreshTokenResponse,
232            self._request(room_refresh_token, id=peer_id, room_id=room_id),
233        )
234
235        return response.data.token

Refreshes peer token

def create_livestream_viewer_token(self, room_id: str) -> str:
237    def create_livestream_viewer_token(self, room_id: str) -> str:
238        """Generates viewer token for livestream rooms"""
239
240        response = cast(
241            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
242        )
243
244        return response.token

Generates viewer token for livestream rooms

def create_livestream_streamer_token(self, room_id: str) -> str:
246    def create_livestream_streamer_token(self, room_id: str) -> str:
247        """Generates streamer token for livestream rooms"""
248
249        response = cast(
250            StreamerToken,
251            self._request(streamer_generate_streamer_token, room_id=room_id),
252        )
253
254        return response.token

Generates streamer token for livestream rooms

Inherited Members
fishjam.api._client.Client
client
class FishjamNotifier:
 35class FishjamNotifier:
 36    """
 37    Allows for receiving WebSocket messages from Fishjam.
 38    """
 39
 40    def __init__(
 41        self,
 42        fishjam_id: str,
 43        management_token: str,
 44    ):
 45        """
 46        Create FishjamNotifier instance, providing the fishjam id and management token.
 47        """
 48
 49        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
 50        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
 51        self._management_token: str = management_token
 52        self._websocket: client.ClientConnection | None = None
 53        self._ready: bool = False
 54
 55        self._ready_event: asyncio.Event | None = None
 56
 57        self._notification_handler: NotificationHandler | None = None
 58
 59    def on_server_notification(self, handler: NotificationHandler):
 60        """
 61        Decorator used for defining handler for Fishjam Notifications
 62        """
 63        self._notification_handler = handler
 64        return handler
 65
 66    async def connect(self):
 67        """
 68        A coroutine which connects FishjamNotifier to Fishjam and listens for
 69        all incoming messages from the Fishjam.
 70
 71        It runs until the connection isn't closed.
 72
 73        The incoming messages are handled by the functions defined using the
 74        `on_server_notification` decorator.
 75
 76        The handler have to be defined before calling `connect`,
 77        otherwise the messages won't be received.
 78        """
 79        async with client.connect(self._fishjam_url) as websocket:
 80            try:
 81                self._websocket = websocket
 82                await self._authenticate()
 83
 84                if self._notification_handler:
 85                    await self._subscribe_event(
 86                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
 87                    )
 88
 89                self._ready = True
 90                if self._ready_event:
 91                    self._ready_event.set()
 92
 93                await self._receive_loop()
 94            finally:
 95                self._websocket = None
 96
 97    async def wait_ready(self) -> None:
 98        """
 99        Waits until the notifier is connected and authenticated to Fishjam.
100
101        If already connected, returns immediately.
102        """
103        if self._ready:
104            return
105
106        if self._ready_event is None:
107            self._ready_event = asyncio.Event()
108
109        await self._ready_event.wait()
110
111    async def _authenticate(self):
112        if not self._websocket:
113            raise RuntimeError("Websocket is not connected")
114
115        msg = ServerMessage(
116            auth_request=ServerMessageAuthRequest(token=self._management_token)
117        )
118        await self._websocket.send(bytes(msg))
119
120        try:
121            message = await self._websocket.recv(decode=False)
122        except ConnectionClosed as exception:
123            if "invalid token" in str(exception):
124                raise RuntimeError("Invalid management token") from exception
125            raise
126
127        message = ServerMessage().parse(message)
128
129        _type, message = betterproto.which_one_of(message, "content")
130        assert isinstance(message, ServerMessageAuthenticated)
131
132    async def _receive_loop(self):
133        if not self._websocket:
134            raise RuntimeError("Websocket is not connected")
135        if not self._notification_handler:
136            raise RuntimeError("Notification handler is not defined")
137
138        while True:
139            message = cast(bytes, await self._websocket.recv())
140            message = ServerMessage().parse(message)
141            _which, message = betterproto.which_one_of(message, "content")
142
143            if isinstance(message, ALLOWED_NOTIFICATIONS):
144                res = self._notification_handler(message)
145                if inspect.isawaitable(res):
146                    await res
147
148    async def _subscribe_event(self, event: ServerMessageEventType):
149        if not self._websocket:
150            raise RuntimeError("Websocket is not connected")
151
152        request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))
153
154        await self._websocket.send(bytes(request))
155        message = cast(bytes, await self._websocket.recv())
156        message = ServerMessage().parse(message)
157        _which, message = betterproto.which_one_of(message, "content")
158        assert isinstance(message, ServerMessageSubscribeResponse)

Allows for receiving WebSocket messages from Fishjam.

FishjamNotifier(fishjam_id: str, management_token: str)
40    def __init__(
41        self,
42        fishjam_id: str,
43        management_token: str,
44    ):
45        """
46        Create FishjamNotifier instance, providing the fishjam id and management token.
47        """
48
49        websocket_url = get_fishjam_url(fishjam_id).replace("http", "ws")
50        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
51        self._management_token: str = management_token
52        self._websocket: client.ClientConnection | None = None
53        self._ready: bool = False
54
55        self._ready_event: asyncio.Event | None = None
56
57        self._notification_handler: NotificationHandler | None = None

Create FishjamNotifier instance, providing the fishjam id and management token.

def on_server_notification( self, handler: Union[Callable[[Union[fishjam.events.ServerMessageRoomCreated, fishjam.events.ServerMessageRoomDeleted, fishjam.events.ServerMessageRoomCrashed, fishjam.events.ServerMessagePeerAdded, fishjam.events.ServerMessagePeerDeleted, fishjam.events.ServerMessagePeerConnected, fishjam.events.ServerMessagePeerDisconnected, fishjam.events.ServerMessagePeerMetadataUpdated, fishjam.events.ServerMessagePeerCrashed, fishjam.events.ServerMessageStreamConnected, fishjam.events.ServerMessageStreamDisconnected, fishjam.events.ServerMessageViewerConnected, fishjam.events.ServerMessageViewerDisconnected, fishjam.events.ServerMessageTrackAdded, fishjam.events.ServerMessageTrackRemoved, fishjam.events.ServerMessageTrackMetadataUpdated]], NoneType], Callable[[Union[fishjam.events.ServerMessageRoomCreated, fishjam.events.ServerMessageRoomDeleted, fishjam.events.ServerMessageRoomCrashed, fishjam.events.ServerMessagePeerAdded, fishjam.events.ServerMessagePeerDeleted, fishjam.events.ServerMessagePeerConnected, fishjam.events.ServerMessagePeerDisconnected, fishjam.events.ServerMessagePeerMetadataUpdated, fishjam.events.ServerMessagePeerCrashed, fishjam.events.ServerMessageStreamConnected, fishjam.events.ServerMessageStreamDisconnected, fishjam.events.ServerMessageViewerConnected, fishjam.events.ServerMessageViewerDisconnected, fishjam.events.ServerMessageTrackAdded, fishjam.events.ServerMessageTrackRemoved, fishjam.events.ServerMessageTrackMetadataUpdated]], Coroutine[Any, Any, None]]]):
59    def on_server_notification(self, handler: NotificationHandler):
60        """
61        Decorator used for defining handler for Fishjam Notifications
62        """
63        self._notification_handler = handler
64        return handler

Decorator used for defining handler for Fishjam Notifications

async def connect(self):
66    async def connect(self):
67        """
68        A coroutine which connects FishjamNotifier to Fishjam and listens for
69        all incoming messages from the Fishjam.
70
71        It runs until the connection isn't closed.
72
73        The incoming messages are handled by the functions defined using the
74        `on_server_notification` decorator.
75
76        The handler have to be defined before calling `connect`,
77        otherwise the messages won't be received.
78        """
79        async with client.connect(self._fishjam_url) as websocket:
80            try:
81                self._websocket = websocket
82                await self._authenticate()
83
84                if self._notification_handler:
85                    await self._subscribe_event(
86                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
87                    )
88
89                self._ready = True
90                if self._ready_event:
91                    self._ready_event.set()
92
93                await self._receive_loop()
94            finally:
95                self._websocket = None

A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.

It runs until the connection isn't closed.

The incoming messages are handled by the functions defined using the on_server_notification decorator.

The handler have to be defined before calling connect, otherwise the messages won't be received.

async def wait_ready(self) -> None:
 97    async def wait_ready(self) -> None:
 98        """
 99        Waits until the notifier is connected and authenticated to Fishjam.
100
101        If already connected, returns immediately.
102        """
103        if self._ready:
104            return
105
106        if self._ready_event is None:
107            self._ready_event = asyncio.Event()
108
109        await self._ready_event.wait()

Waits until the notifier is connected and authenticated to Fishjam.

If already connected, returns immediately.

18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
19    """
20    Transform received protobuf notification to adequate notification instance.
21    The available notifications are listed in `fishjam.events` module.
22    """
23    message = ServerMessage().parse(binary)
24    _which, message = betterproto.which_one_of(message, "content")
25
26    if isinstance(message, ALLOWED_NOTIFICATIONS):
27        return message
28
29    return None

Transform received protobuf notification to adequate notification instance. The available notifications are listed in fishjam.events module.

class PeerMetadata:
11@_attrs_define
12class PeerMetadata:
13    """Custom metadata set by the peer
14
15    Example:
16        {'name': 'FishjamUser'}
17
18    """
19
20    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
21
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
27
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
35
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
39
40    def __getitem__(self, key: str) -> Any:
41        return self.additional_properties[key]
42
43    def __setitem__(self, key: str, value: Any) -> None:
44        self.additional_properties[key] = value
45
46    def __delitem__(self, key: str) -> None:
47        del self.additional_properties[key]
48
49    def __contains__(self, key: str) -> bool:
50        return key in self.additional_properties

Custom metadata set by the peer

Example: {'name': 'FishjamUser'}

PeerMetadata()
2def __init__(self, ):
3    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class PeerMetadata.

additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
additional_keys: list[str]
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
@dataclass
class PeerOptions:
83@dataclass
84class PeerOptions:
85    """Options specific to a WebRTC Peer"""
86
87    enable_simulcast: bool = True
88    """Enables the peer to use simulcast"""
89    metadata: dict[str, Any] | None = None
90    """Peer metadata"""

Options specific to a WebRTC Peer

PeerOptions( enable_simulcast: bool = True, metadata: dict[str, typing.Any] | None = None)
enable_simulcast: bool = True

Enables the peer to use simulcast

metadata: dict[str, typing.Any] | None = None

Peer metadata

@dataclass
class RoomOptions:
60@dataclass
61class RoomOptions:
62    """Description of a room options"""
63
64    max_peers: int | None = None
65    """Maximum amount of peers allowed into the room"""
66    video_codec: Literal["h264", "vp8"] | None = None
67    """Enforces video codec for each peer in the room"""
68    webhook_url: str | None = None
69    """URL where Fishjam notifications will be sent"""
70    room_type: Literal[
71        "conference",
72        "audio_only",
73        "livestream",
74        "full_feature",
75        "broadcaster",
76        "audio_only_livestream",
77    ] = "conference"
78    """The use-case of the room. If not provided, this defaults to conference."""
79    public: bool = False
80    """True if livestream viewers can omit specifying a token."""

Description of a room options

RoomOptions( max_peers: int | None = None, video_codec: Optional[Literal['h264', 'vp8']] = None, webhook_url: str | None = None, room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference', public: bool = False)
max_peers: int | None = None

Maximum amount of peers allowed into the room

video_codec: Optional[Literal['h264', 'vp8']] = None

Enforces video codec for each peer in the room

webhook_url: str | None = None

URL where Fishjam notifications will be sent

room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster', 'audio_only_livestream'] = 'conference'

The use-case of the room. If not provided, this defaults to conference.

public: bool = False

True if livestream viewers can omit specifying a token.

@dataclass
class AgentOptions:
101@dataclass
102class AgentOptions:
103    """Options specific to a WebRTC Peer"""
104
105    output: AgentOutputOptions = field(default_factory=AgentOutputOptions)

Options specific to a WebRTC Peer

AgentOptions(output: AgentOutputOptions = <factory>)
@dataclass
class AgentOutputOptions:
93@dataclass
94class AgentOutputOptions:
95    """Options of the desired format of audio tracks going from Fishjam to the agent."""
96
97    audio_format: Literal["pcm16"] = "pcm16"
98    audio_sample_rate: Literal[16000, 24000] = 16000

Options of the desired format of audio tracks going from Fishjam to the agent.

AgentOutputOptions( audio_format: Literal['pcm16'] = 'pcm16', audio_sample_rate: Literal[16000, 24000] = 16000)
audio_format: Literal['pcm16'] = 'pcm16'
audio_sample_rate: Literal[16000, 24000] = 16000
@dataclass
class Room:
48@dataclass
49class Room:
50    """Description of the room state"""
51
52    config: RoomConfig
53    """Room configuration"""
54    id: str
55    """Room ID"""
56    peers: list[Peer]
57    """List of all peers"""

Description of the room state

Room( config: fishjam._openapi_client.models.room_config.RoomConfig, id: str, peers: list[Peer])
config: fishjam._openapi_client.models.room_config.RoomConfig

Room configuration

id: str

Room ID

peers: list[Peer]

List of all peers

class Peer:
 26@_attrs_define
 27class Peer:
 28    """Describes peer status
 29
 30    Attributes:
 31        id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf.
 32        metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
 33        status (PeerStatus): Informs about the peer status Example: disconnected.
 34        subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy
 35        subscriptions (list[str]): Describes peer's subscriptions in manual mode
 36        tracks (list['Track']): List of all peer's tracks
 37        type_ (PeerType): Peer type Example: webrtc.
 38    """
 39
 40    id: str
 41    metadata: Union["PeerMetadata", None]
 42    status: PeerStatus
 43    subscribe_mode: SubscribeMode
 44    subscriptions: list[str]
 45    tracks: list["Track"]
 46    type_: PeerType
 47    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
 48
 49    def to_dict(self) -> dict[str, Any]:
 50        from ..models.peer_metadata import PeerMetadata
 51
 52        id = self.id
 53
 54        metadata: Union[None, dict[str, Any]]
 55        if isinstance(self.metadata, PeerMetadata):
 56            metadata = self.metadata.to_dict()
 57        else:
 58            metadata = self.metadata
 59
 60        status = self.status.value
 61
 62        subscribe_mode = self.subscribe_mode.value
 63
 64        subscriptions = self.subscriptions
 65
 66        tracks = []
 67        for tracks_item_data in self.tracks:
 68            tracks_item = tracks_item_data.to_dict()
 69            tracks.append(tracks_item)
 70
 71        type_ = self.type_.value
 72
 73        field_dict: dict[str, Any] = {}
 74        field_dict.update(self.additional_properties)
 75        field_dict.update(
 76            {
 77                "id": id,
 78                "metadata": metadata,
 79                "status": status,
 80                "subscribeMode": subscribe_mode,
 81                "subscriptions": subscriptions,
 82                "tracks": tracks,
 83                "type": type_,
 84            }
 85        )
 86
 87        return field_dict
 88
 89    @classmethod
 90    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 91        from ..models.peer_metadata import PeerMetadata
 92        from ..models.track import Track
 93
 94        d = dict(src_dict)
 95        id = d.pop("id")
 96
 97        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
 98            if data is None:
 99                return data
100            try:
101                if not isinstance(data, dict):
102                    raise TypeError()
103                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
104
105                return componentsschemas_peer_metadata_type_0
106            except:  # noqa: E722
107                pass
108            return cast(Union["PeerMetadata", None], data)
109
110        metadata = _parse_metadata(d.pop("metadata"))
111
112        status = PeerStatus(d.pop("status"))
113
114        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
115
116        subscriptions = cast(list[str], d.pop("subscriptions"))
117
118        tracks = []
119        _tracks = d.pop("tracks")
120        for tracks_item_data in _tracks:
121            tracks_item = Track.from_dict(tracks_item_data)
122
123            tracks.append(tracks_item)
124
125        type_ = PeerType(d.pop("type"))
126
127        peer = cls(
128            id=id,
129            metadata=metadata,
130            status=status,
131            subscribe_mode=subscribe_mode,
132            subscriptions=subscriptions,
133            tracks=tracks,
134            type_=type_,
135        )
136
137        peer.additional_properties = d
138        return peer
139
140    @property
141    def additional_keys(self) -> list[str]:
142        return list(self.additional_properties.keys())
143
144    def __getitem__(self, key: str) -> Any:
145        return self.additional_properties[key]
146
147    def __setitem__(self, key: str, value: Any) -> None:
148        self.additional_properties[key] = value
149
150    def __delitem__(self, key: str) -> None:
151        del self.additional_properties[key]
152
153    def __contains__(self, key: str) -> bool:
154        return key in self.additional_properties

Describes peer status

Attributes: id (str): Assigned peer id Example: 4a1c1164-5fb7-425d-89d7-24cdb8fff1cf. metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. status (PeerStatus): Informs about the peer status Example: disconnected. subscribe_mode (SubscribeMode): Configuration of peer's subscribing policy subscriptions (list[str]): Describes peer's subscriptions in manual mode tracks (list['Track']): List of all peer's tracks type_ (PeerType): Peer type Example: webrtc.

Peer( id: str, metadata: Optional[PeerMetadata], status: fishjam._openapi_client.models.peer_status.PeerStatus, subscribe_mode: fishjam._openapi_client.models.subscribe_mode.SubscribeMode, subscriptions: list[str], tracks: list[fishjam._openapi_client.models.track.Track], type_: fishjam._openapi_client.models.peer_type.PeerType)
 2def __init__(self, id, metadata, status, subscribe_mode, subscriptions, tracks, type_):
 3    self.id = id
 4    self.metadata = metadata
 5    self.status = status
 6    self.subscribe_mode = subscribe_mode
 7    self.subscriptions = subscriptions
 8    self.tracks = tracks
 9    self.type_ = type_
10    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class Peer.

id: str
metadata: Optional[PeerMetadata]
status: fishjam._openapi_client.models.peer_status.PeerStatus
subscribe_mode: fishjam._openapi_client.models.subscribe_mode.SubscribeMode
subscriptions: list[str]
tracks: list[fishjam._openapi_client.models.track.Track]
type_: fishjam._openapi_client.models.peer_type.PeerType
additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
49    def to_dict(self) -> dict[str, Any]:
50        from ..models.peer_metadata import PeerMetadata
51
52        id = self.id
53
54        metadata: Union[None, dict[str, Any]]
55        if isinstance(self.metadata, PeerMetadata):
56            metadata = self.metadata.to_dict()
57        else:
58            metadata = self.metadata
59
60        status = self.status.value
61
62        subscribe_mode = self.subscribe_mode.value
63
64        subscriptions = self.subscriptions
65
66        tracks = []
67        for tracks_item_data in self.tracks:
68            tracks_item = tracks_item_data.to_dict()
69            tracks.append(tracks_item)
70
71        type_ = self.type_.value
72
73        field_dict: dict[str, Any] = {}
74        field_dict.update(self.additional_properties)
75        field_dict.update(
76            {
77                "id": id,
78                "metadata": metadata,
79                "status": status,
80                "subscribeMode": subscribe_mode,
81                "subscriptions": subscriptions,
82                "tracks": tracks,
83                "type": type_,
84            }
85        )
86
87        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
 89    @classmethod
 90    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 91        from ..models.peer_metadata import PeerMetadata
 92        from ..models.track import Track
 93
 94        d = dict(src_dict)
 95        id = d.pop("id")
 96
 97        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
 98            if data is None:
 99                return data
100            try:
101                if not isinstance(data, dict):
102                    raise TypeError()
103                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
104
105                return componentsschemas_peer_metadata_type_0
106            except:  # noqa: E722
107                pass
108            return cast(Union["PeerMetadata", None], data)
109
110        metadata = _parse_metadata(d.pop("metadata"))
111
112        status = PeerStatus(d.pop("status"))
113
114        subscribe_mode = SubscribeMode(d.pop("subscribeMode"))
115
116        subscriptions = cast(list[str], d.pop("subscriptions"))
117
118        tracks = []
119        _tracks = d.pop("tracks")
120        for tracks_item_data in _tracks:
121            tracks_item = Track.from_dict(tracks_item_data)
122
123            tracks.append(tracks_item)
124
125        type_ = PeerType(d.pop("type"))
126
127        peer = cls(
128            id=id,
129            metadata=metadata,
130            status=status,
131            subscribe_mode=subscribe_mode,
132            subscriptions=subscriptions,
133            tracks=tracks,
134            type_=type_,
135        )
136
137        peer.additional_properties = d
138        return peer
additional_keys: list[str]
140    @property
141    def additional_keys(self) -> list[str]:
142        return list(self.additional_properties.keys())