Fishjam Python Server SDK

Fishjam Python Server SDK

Python server SDK for the Fishjam.

Read the docs here

Installation

pip install fishjam-server-sdk

Usage

The SDK exports two main classes for interacting with Fishjam server: FishjamClient and FishjamNotifier.

FishjamClient wraps http REST api calls, while FishjamNotifier is responsible for receiving real-time updates from the server.

FishjamClient

Create a FishjamClient instance, providing the fishjam server address and api token

from fishjam import FishjamClient

fishjam_client = FishjamClient(fishjam_url="localhost:5002", management_token="development")

You can use it to interact with Fishjam to manage rooms and peers

# Create a room
options = RoomOptions(video_codec="h264", webhook_url="http://localhost:5000/webhook")
room = fishjam_client.create_room(options=options)

# Room(components=[], config=RoomConfig(max_peers=None, video_codec=<RoomConfigVideoCodec.H264: 'h264'>, webhook_url='http://localhost:5000/webhook'), id='1d905478-ccfc-44d6-a6e7-8ccb1b38d955', peers=[])

# Add peer to the room
peer, token = fishjam_client.create_peer(room.id)

# Peer(id='b1232c7e-c969-4450-acdf-ea24f3cdd7f6', status=<PeerStatus.DISCONNECTED: 'disconnected'>, type='webrtc'), 'M8TUGhj-L11KpyG-2zBPIo'

All methods in FishjamClient may raise one of the exceptions deriving from fishjam.errors.HTTPError. They are defined in fishjam.errors.

FishjamNotifier

FishjamNotifier allows for receiving real-time updates from the Fishjam Server.

You can read more about notifications in the Fishjam Docs.

Create FishjamNotifier instance

from fishjam import FishjamNotifier

fishjam_notifier = FishjamNotifier(fishjam_url='localhost:5002', management_token='development')

Then define a handler for incoming messages

@notifier.on_server_notification
def handle_notification(server_notification):
    print(f'Received a notification: {server_notification}')

After that you can start the notifier

async def test_notifier():
    notifier_task = asyncio.create_task(fishjam_notifier.connect())

    # Wait for notifier to be ready to receive messages
    await fishjam_notifier.wait_ready()

    # Create a room to trigger a server notification
    fishjam_client = FishjamClient()
    fishjam_client.create_room()

    await notifier_task

asyncio.run(test_notifier())

# Received a notification: ServerMessageRoomCreated(room_id='69a3fd1a-6a4d-47bc-ae54-0c72b0d05e29')

License

Licensed under the Apache License, Version 2.0

Fishjam is created by Software Mansion

Since 2012 Software Mansion is a software agency with experience in building web and mobile apps. We are Core React Native Contributors and experts in dealing with all kinds of React Native issues. We can help you build your next dream product – Hire us.

Software Mansion

 1"""
 2.. include:: ../README.md
 3"""
 4
 5# pylint: disable=locally-disabled, no-name-in-module, import-error
 6
 7# Exceptions and Server Messages
 8
 9# API
10# pylint: disable=locally-disabled, no-name-in-module, import-error
11
12# Exceptions and Server Messages
13from fishjam import agent, errors, events, peer, room
14from fishjam._openapi_client.models import PeerMetadata
15
16# API
17from fishjam._webhook_notifier import receive_binary
18from fishjam._ws_notifier import FishjamNotifier
19from fishjam.api._fishjam_client import (
20    FishjamClient,
21    Peer,
22    PeerOptions,
23    Room,
24    RoomOptions,
25)
26
27__all__ = [
28    "FishjamClient",
29    "FishjamNotifier",
30    "receive_binary",
31    "PeerMetadata",
32    "PeerOptions",
33    "RoomOptions",
34    "Room",
35    "Peer",
36    "events",
37    "errors",
38    "room",
39    "peer",
40    "agent",
41]
42
43__docformat__ = "restructuredtext"
class FishjamClient(fishjam.api._client.Client):
 88class FishjamClient(Client):
 89    """Allows for managing rooms"""
 90
 91    def __init__(
 92        self,
 93        fishjam_id: str,
 94        management_token: str,
 95        *,
 96        fishjam_url: str | None = None,
 97    ):
 98        """
 99        Create a FishjamClient instance, providing the fishjam id and management token.
100        """
101        super().__init__(
102            fishjam_id=fishjam_id,
103            management_token=management_token,
104            fishjam_url=fishjam_url,
105        )
106
107    def create_peer(
108        self,
109        room_id: str,
110        options: PeerOptions | None = None,
111    ) -> tuple[Peer, str]:
112        """
113        Creates peer in the room
114
115        Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer
116        to authenticate to Fishjam.
117
118        The possible options to pass for peer are `PeerOptions`.
119        """
120        options = options or PeerOptions()
121
122        peer_metadata = self.__parse_peer_metadata(options.metadata)
123        peer_options = PeerOptionsWebRTC(
124            enable_simulcast=options.enable_simulcast,
125            metadata=peer_metadata,
126            subscribe=self.__parse_subscribe_options(options.subscribe),
127        )
128        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
129
130        resp = cast(
131            PeerDetailsResponse,
132            self._request(room_add_peer, room_id=room_id, body=body),
133        )
134
135        return (resp.data.peer, resp.data.token)
136
137    def create_agent(self, room_id: str):
138        body = AddPeerBody(type_=PeerType.AGENT, options=PeerOptionsAgent())
139
140        resp = cast(
141            PeerDetailsResponse,
142            self._request(room_add_peer, room_id=room_id, body=body),
143        )
144
145        return Agent(resp.data.peer.id, resp.data.token, self._fishjam_url)
146
147    def create_room(self, options: RoomOptions | None = None) -> Room:
148        """
149        Creates a new room
150        Returns the created `Room`
151        """
152        options = options or RoomOptions()
153
154        if options.video_codec is None:
155            codec = UNSET
156        else:
157            codec = RoomConfigVideoCodec(options.video_codec)
158
159        config = RoomConfig(
160            max_peers=options.max_peers,
161            video_codec=codec,
162            webhook_url=options.webhook_url,
163            room_type=RoomConfigRoomType(options.room_type),
164            public=options.public,
165        )
166
167        room = cast(
168            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
169        ).data.room
170
171        return Room(config=room.config, id=room.id, peers=room.peers)
172
173    def get_all_rooms(self) -> list[Room]:
174        """Returns list of all rooms"""
175
176        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
177
178        return [
179            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
180        ]
181
182    def get_room(self, room_id: str) -> Room:
183        """Returns room with the given id"""
184
185        room = cast(
186            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
187        ).data
188
189        return Room(config=room.config, id=room.id, peers=room.peers)
190
191    def delete_peer(self, room_id: str, peer_id: str) -> None:
192        """Deletes peer"""
193
194        return self._request(room_delete_peer, id=peer_id, room_id=room_id)
195
196    def delete_room(self, room_id: str) -> None:
197        """Deletes a room"""
198
199        return self._request(room_delete_room, room_id=room_id)
200
201    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
202        """Refreshes peer token"""
203
204        response = cast(
205            PeerRefreshTokenResponse,
206            self._request(room_refresh_token, id=peer_id, room_id=room_id),
207        )
208
209        return response.data.token
210
211    def create_livestream_viewer_token(self, room_id: str) -> str:
212        """Generates viewer token for livestream rooms"""
213
214        response = cast(
215            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
216        )
217
218        return response.token
219
220    def create_livestream_streamer_token(self, room_id: str) -> str:
221        """Generates streamer token for livestream rooms"""
222
223        response = cast(
224            StreamerToken,
225            self._request(streamer_generate_streamer_token, room_id=room_id),
226        )
227
228        return response.token
229
230    def __parse_peer_metadata(self, metadata: dict | None) -> PeerOptionsWebRTCMetadata:
231        peer_metadata = PeerOptionsWebRTCMetadata()
232
233        if not metadata:
234            return peer_metadata
235
236        for key, value in metadata.items():
237            peer_metadata.additional_properties[key] = value
238
239        return peer_metadata
240
241    def __parse_subscribe_options(
242        self,
243        options: SubscribeOptions | None,
244    ) -> PeerOptionsWebRTCSubscribeOptions | None:
245        if options is None:
246            return None
247        return PeerOptionsWebRTCSubscribeOptions.from_dict(options.to_dict())

Allows for managing rooms

FishjamClient( fishjam_id: str, management_token: str, *, fishjam_url: str | None = None)
 91    def __init__(
 92        self,
 93        fishjam_id: str,
 94        management_token: str,
 95        *,
 96        fishjam_url: str | None = None,
 97    ):
 98        """
 99        Create a FishjamClient instance, providing the fishjam id and management token.
100        """
101        super().__init__(
102            fishjam_id=fishjam_id,
103            management_token=management_token,
104            fishjam_url=fishjam_url,
105        )

Create a FishjamClient instance, providing the fishjam id and management token.

def create_peer( self, room_id: str, options: PeerOptions | None = None) -> tuple[Peer, str]:
107    def create_peer(
108        self,
109        room_id: str,
110        options: PeerOptions | None = None,
111    ) -> tuple[Peer, str]:
112        """
113        Creates peer in the room
114
115        Returns a tuple (`Peer`, `PeerToken`) - the token is needed by Peer
116        to authenticate to Fishjam.
117
118        The possible options to pass for peer are `PeerOptions`.
119        """
120        options = options or PeerOptions()
121
122        peer_metadata = self.__parse_peer_metadata(options.metadata)
123        peer_options = PeerOptionsWebRTC(
124            enable_simulcast=options.enable_simulcast,
125            metadata=peer_metadata,
126            subscribe=self.__parse_subscribe_options(options.subscribe),
127        )
128        body = AddPeerBody(type_=PeerType.WEBRTC, options=peer_options)
129
130        resp = cast(
131            PeerDetailsResponse,
132            self._request(room_add_peer, room_id=room_id, body=body),
133        )
134
135        return (resp.data.peer, resp.data.token)

Creates peer in the room

Returns a tuple (Peer, PeerToken) - the token is needed by Peer to authenticate to Fishjam.

The possible options to pass for peer are PeerOptions.

def create_agent(self, room_id: str):
137    def create_agent(self, room_id: str):
138        body = AddPeerBody(type_=PeerType.AGENT, options=PeerOptionsAgent())
139
140        resp = cast(
141            PeerDetailsResponse,
142            self._request(room_add_peer, room_id=room_id, body=body),
143        )
144
145        return Agent(resp.data.peer.id, resp.data.token, self._fishjam_url)
def create_room( self, options: RoomOptions | None = None) -> Room:
147    def create_room(self, options: RoomOptions | None = None) -> Room:
148        """
149        Creates a new room
150        Returns the created `Room`
151        """
152        options = options or RoomOptions()
153
154        if options.video_codec is None:
155            codec = UNSET
156        else:
157            codec = RoomConfigVideoCodec(options.video_codec)
158
159        config = RoomConfig(
160            max_peers=options.max_peers,
161            video_codec=codec,
162            webhook_url=options.webhook_url,
163            room_type=RoomConfigRoomType(options.room_type),
164            public=options.public,
165        )
166
167        room = cast(
168            RoomCreateDetailsResponse, self._request(room_create_room, body=config)
169        ).data.room
170
171        return Room(config=room.config, id=room.id, peers=room.peers)

Creates a new room Returns the created Room

def get_all_rooms(self) -> list[Room]:
173    def get_all_rooms(self) -> list[Room]:
174        """Returns list of all rooms"""
175
176        rooms = cast(RoomsListingResponse, self._request(room_get_all_rooms)).data
177
178        return [
179            Room(config=room.config, id=room.id, peers=room.peers) for room in rooms
180        ]

Returns list of all rooms

def get_room(self, room_id: str) -> Room:
182    def get_room(self, room_id: str) -> Room:
183        """Returns room with the given id"""
184
185        room = cast(
186            RoomDetailsResponse, self._request(room_get_room, room_id=room_id)
187        ).data
188
189        return Room(config=room.config, id=room.id, peers=room.peers)

Returns room with the given id

def delete_peer(self, room_id: str, peer_id: str) -> None:
191    def delete_peer(self, room_id: str, peer_id: str) -> None:
192        """Deletes peer"""
193
194        return self._request(room_delete_peer, id=peer_id, room_id=room_id)

Deletes peer

def delete_room(self, room_id: str) -> None:
196    def delete_room(self, room_id: str) -> None:
197        """Deletes a room"""
198
199        return self._request(room_delete_room, room_id=room_id)

Deletes a room

def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
201    def refresh_peer_token(self, room_id: str, peer_id: str) -> str:
202        """Refreshes peer token"""
203
204        response = cast(
205            PeerRefreshTokenResponse,
206            self._request(room_refresh_token, id=peer_id, room_id=room_id),
207        )
208
209        return response.data.token

Refreshes peer token

def create_livestream_viewer_token(self, room_id: str) -> str:
211    def create_livestream_viewer_token(self, room_id: str) -> str:
212        """Generates viewer token for livestream rooms"""
213
214        response = cast(
215            ViewerToken, self._request(viewer_generate_viewer_token, room_id=room_id)
216        )
217
218        return response.token

Generates viewer token for livestream rooms

def create_livestream_streamer_token(self, room_id: str) -> str:
220    def create_livestream_streamer_token(self, room_id: str) -> str:
221        """Generates streamer token for livestream rooms"""
222
223        response = cast(
224            StreamerToken,
225            self._request(streamer_generate_streamer_token, room_id=room_id),
226        )
227
228        return response.token

Generates streamer token for livestream rooms

Inherited Members
fishjam.api._client.Client
client
class FishjamNotifier:
 34class FishjamNotifier:
 35    """
 36    Allows for receiving WebSocket messages from Fishjam.
 37    """
 38
 39    def __init__(
 40        self,
 41        fishjam_id: str,
 42        management_token: str,
 43        *,
 44        fishjam_url: str | None = None,
 45    ):
 46        """
 47        Create FishjamNotifier instance, providing the fishjam id and management token.
 48        """
 49
 50        websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws")
 51        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
 52        self._management_token: str = management_token
 53        self._websocket: client.ClientConnection | None = None
 54        self._ready: bool = False
 55
 56        self._ready_event: asyncio.Event | None = None
 57
 58        self._notification_handler: NotificationHandler | None = None
 59
 60    def on_server_notification(self, handler: NotificationHandler):
 61        """
 62        Decorator used for defining handler for Fishjam Notifications
 63        """
 64        self._notification_handler = handler
 65        return handler
 66
 67    async def connect(self):
 68        """
 69        A coroutine which connects FishjamNotifier to Fishjam and listens for
 70        all incoming messages from the Fishjam.
 71
 72        It runs until the connection isn't closed.
 73
 74        The incoming messages are handled by the functions defined using the
 75        `on_server_notification` decorator.
 76
 77        The handler have to be defined before calling `connect`,
 78        otherwise the messages won't be received.
 79        """
 80        async with client.connect(self._fishjam_url) as websocket:
 81            try:
 82                self._websocket = websocket
 83                await self._authenticate()
 84
 85                if self._notification_handler:
 86                    await self._subscribe_event(
 87                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
 88                    )
 89
 90                self._ready = True
 91                if self._ready_event:
 92                    self._ready_event.set()
 93
 94                await self._receive_loop()
 95            finally:
 96                self._websocket = None
 97
 98    async def wait_ready(self) -> None:
 99        """
100        Waits until the notifier is connected and authenticated to Fishjam.
101
102        If already connected, returns immediately.
103        """
104        if self._ready:
105            return
106
107        if self._ready_event is None:
108            self._ready_event = asyncio.Event()
109
110        await self._ready_event.wait()
111
112    async def _authenticate(self):
113        if not self._websocket:
114            raise RuntimeError("Websocket is not connected")
115
116        msg = ServerMessage(
117            auth_request=ServerMessageAuthRequest(token=self._management_token)
118        )
119        await self._websocket.send(bytes(msg))
120
121        try:
122            message = await self._websocket.recv(decode=False)
123        except ConnectionClosed as exception:
124            if "invalid token" in str(exception):
125                raise RuntimeError("Invalid management token") from exception
126            raise
127
128        message = ServerMessage().parse(message)
129
130        _type, message = betterproto.which_one_of(message, "content")
131        assert isinstance(message, ServerMessageAuthenticated)
132
133    async def _receive_loop(self):
134        if not self._websocket:
135            raise RuntimeError("Websocket is not connected")
136        if not self._notification_handler:
137            raise RuntimeError("Notification handler is not defined")
138
139        while True:
140            message = cast(bytes, await self._websocket.recv())
141            message = ServerMessage().parse(message)
142            _which, message = betterproto.which_one_of(message, "content")
143
144            if isinstance(message, ALLOWED_NOTIFICATIONS):
145                res = self._notification_handler(message)
146                if asyncio.iscoroutine(res):
147                    await res
148
149    async def _subscribe_event(self, event: ServerMessageEventType):
150        if not self._websocket:
151            raise RuntimeError("Websocket is not connected")
152
153        request = ServerMessage(subscribe_request=ServerMessageSubscribeRequest(event))
154
155        await self._websocket.send(bytes(request))
156        message = cast(bytes, await self._websocket.recv())
157        message = ServerMessage().parse(message)
158        _which, message = betterproto.which_one_of(message, "content")
159        assert isinstance(message, ServerMessageSubscribeResponse)

Allows for receiving WebSocket messages from Fishjam.

FishjamNotifier( fishjam_id: str, management_token: str, *, fishjam_url: str | None = None)
39    def __init__(
40        self,
41        fishjam_id: str,
42        management_token: str,
43        *,
44        fishjam_url: str | None = None,
45    ):
46        """
47        Create FishjamNotifier instance, providing the fishjam id and management token.
48        """
49
50        websocket_url = get_fishjam_url(fishjam_id, fishjam_url).replace("http", "ws")
51        self._fishjam_url = f"{websocket_url}/socket/server/websocket"
52        self._management_token: str = management_token
53        self._websocket: client.ClientConnection | None = None
54        self._ready: bool = False
55
56        self._ready_event: asyncio.Event | None = None
57
58        self._notification_handler: NotificationHandler | None = None

Create FishjamNotifier instance, providing the fishjam id and management token.

def on_server_notification( self, handler: Union[Callable[[Union[fishjam.events.ServerMessageRoomCreated, fishjam.events.ServerMessageRoomDeleted, fishjam.events.ServerMessageRoomCrashed, fishjam.events.ServerMessagePeerAdded, fishjam.events.ServerMessagePeerDeleted, fishjam.events.ServerMessagePeerConnected, fishjam.events.ServerMessagePeerDisconnected, fishjam.events.ServerMessagePeerMetadataUpdated, fishjam.events.ServerMessagePeerCrashed, fishjam.events.ServerMessageStreamConnected, fishjam.events.ServerMessageStreamDisconnected, fishjam.events.ServerMessageViewerConnected, fishjam.events.ServerMessageViewerDisconnected, fishjam.events.ServerMessageTrackAdded, fishjam.events.ServerMessageTrackRemoved, fishjam.events.ServerMessageTrackMetadataUpdated]], NoneType], Callable[[Union[fishjam.events.ServerMessageRoomCreated, fishjam.events.ServerMessageRoomDeleted, fishjam.events.ServerMessageRoomCrashed, fishjam.events.ServerMessagePeerAdded, fishjam.events.ServerMessagePeerDeleted, fishjam.events.ServerMessagePeerConnected, fishjam.events.ServerMessagePeerDisconnected, fishjam.events.ServerMessagePeerMetadataUpdated, fishjam.events.ServerMessagePeerCrashed, fishjam.events.ServerMessageStreamConnected, fishjam.events.ServerMessageStreamDisconnected, fishjam.events.ServerMessageViewerConnected, fishjam.events.ServerMessageViewerDisconnected, fishjam.events.ServerMessageTrackAdded, fishjam.events.ServerMessageTrackRemoved, fishjam.events.ServerMessageTrackMetadataUpdated]], Coroutine[Any, Any, None]]]):
60    def on_server_notification(self, handler: NotificationHandler):
61        """
62        Decorator used for defining handler for Fishjam Notifications
63        """
64        self._notification_handler = handler
65        return handler

Decorator used for defining handler for Fishjam Notifications

async def connect(self):
67    async def connect(self):
68        """
69        A coroutine which connects FishjamNotifier to Fishjam and listens for
70        all incoming messages from the Fishjam.
71
72        It runs until the connection isn't closed.
73
74        The incoming messages are handled by the functions defined using the
75        `on_server_notification` decorator.
76
77        The handler have to be defined before calling `connect`,
78        otherwise the messages won't be received.
79        """
80        async with client.connect(self._fishjam_url) as websocket:
81            try:
82                self._websocket = websocket
83                await self._authenticate()
84
85                if self._notification_handler:
86                    await self._subscribe_event(
87                        event=ServerMessageEventType.EVENT_TYPE_SERVER_NOTIFICATION
88                    )
89
90                self._ready = True
91                if self._ready_event:
92                    self._ready_event.set()
93
94                await self._receive_loop()
95            finally:
96                self._websocket = None

A coroutine which connects FishjamNotifier to Fishjam and listens for all incoming messages from the Fishjam.

It runs until the connection isn't closed.

The incoming messages are handled by the functions defined using the on_server_notification decorator.

The handler have to be defined before calling connect, otherwise the messages won't be received.

async def wait_ready(self) -> None:
 98    async def wait_ready(self) -> None:
 99        """
100        Waits until the notifier is connected and authenticated to Fishjam.
101
102        If already connected, returns immediately.
103        """
104        if self._ready:
105            return
106
107        if self._ready_event is None:
108            self._ready_event = asyncio.Event()
109
110        await self._ready_event.wait()

Waits until the notifier is connected and authenticated to Fishjam.

If already connected, returns immediately.

18def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
19    """
20    Transform received protobuf notification to adequate notification instance.
21    The available notifications are listed in `fishjam.events` module.
22    """
23    message = ServerMessage().parse(binary)
24    _which, message = betterproto.which_one_of(message, "content")
25
26    if isinstance(message, ALLOWED_NOTIFICATIONS):
27        return message
28
29    return None

Transform received protobuf notification to adequate notification instance. The available notifications are listed in fishjam.events module.

class PeerMetadata:
11@_attrs_define
12class PeerMetadata:
13    """Custom metadata set by the peer
14
15    Example:
16        {'name': 'FishjamUser'}
17
18    """
19
20    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
21
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
27
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
35
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
39
40    def __getitem__(self, key: str) -> Any:
41        return self.additional_properties[key]
42
43    def __setitem__(self, key: str, value: Any) -> None:
44        self.additional_properties[key] = value
45
46    def __delitem__(self, key: str) -> None:
47        del self.additional_properties[key]
48
49    def __contains__(self, key: str) -> bool:
50        return key in self.additional_properties

Custom metadata set by the peer

Example: {'name': 'FishjamUser'}

PeerMetadata()
2def __init__(self, ):
3    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class PeerMetadata.

additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
22    def to_dict(self) -> dict[str, Any]:
23        field_dict: dict[str, Any] = {}
24        field_dict.update(self.additional_properties)
25
26        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
28    @classmethod
29    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
30        d = dict(src_dict)
31        peer_metadata = cls()
32
33        peer_metadata.additional_properties = d
34        return peer_metadata
additional_keys: list[str]
36    @property
37    def additional_keys(self) -> list[str]:
38        return list(self.additional_properties.keys())
@dataclass
class PeerOptions:
77@dataclass
78class PeerOptions:
79    """Options specific to the Peer"""
80
81    enable_simulcast: bool = True
82    """Enables the peer to use simulcast"""
83    metadata: dict[str, Any] | None = None
84    """Peer metadata"""
85    subscribe: SubscribeOptions | None = None

Options specific to the Peer

PeerOptions( enable_simulcast: bool = True, metadata: dict[str, typing.Any] | None = None, subscribe: fishjam._openapi_client.models.subscribe_options.SubscribeOptions | None = None)
enable_simulcast: bool = True

Enables the peer to use simulcast

metadata: dict[str, typing.Any] | None = None

Peer metadata

subscribe: fishjam._openapi_client.models.subscribe_options.SubscribeOptions | None = None
@dataclass
class RoomOptions:
59@dataclass
60class RoomOptions:
61    """Description of a room options"""
62
63    max_peers: int | None = None
64    """Maximum amount of peers allowed into the room"""
65    video_codec: Literal["h264", "vp8"] | None = None
66    """Enforces video codec for each peer in the room"""
67    webhook_url: str | None = None
68    """URL where Fishjam notifications will be sent"""
69    room_type: Literal[
70        "conference", "audio_only", "livestream", "full_feature", "broadcaster"
71    ] = "conference"
72    """The use-case of the room. If not provided, this defaults to conference."""
73    public: bool = False
74    """True if livestream viewers can omit specifying a token."""

Description of a room options

RoomOptions( max_peers: int | None = None, video_codec: Optional[Literal['h264', 'vp8']] = None, webhook_url: str | None = None, room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster'] = 'conference', public: bool = False)
max_peers: int | None = None

Maximum amount of peers allowed into the room

video_codec: Optional[Literal['h264', 'vp8']] = None

Enforces video codec for each peer in the room

webhook_url: str | None = None

URL where Fishjam notifications will be sent

room_type: Literal['conference', 'audio_only', 'livestream', 'full_feature', 'broadcaster'] = 'conference'

The use-case of the room. If not provided, this defaults to conference.

public: bool = False

True if livestream viewers can omit specifying a token.

@dataclass
class Room:
47@dataclass
48class Room:
49    """Description of the room state"""
50
51    config: RoomConfig
52    """Room configuration"""
53    id: str
54    """Room ID"""
55    peers: list[Peer]
56    """List of all peers"""

Description of the room state

Room( config: fishjam._openapi_client.models.room_config.RoomConfig, id: str, peers: list[Peer])
config: fishjam._openapi_client.models.room_config.RoomConfig

Room configuration

id: str

Room ID

peers: list[Peer]

List of all peers

class Peer:
 26@_attrs_define
 27class Peer:
 28    """Describes peer status
 29
 30    Attributes:
 31        id (str): Assigned peer id Example: peer-1.
 32        metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}.
 33        status (PeerStatus): Informs about the peer status Example: disconnected.
 34        subscribe (Union['SubscribeOptions', None]): Configuration of server-side subscriptions to the peer's tracks
 35            Example: {'audioFormat': 'pcm16'}.
 36        tracks (list['Track']): List of all peer's tracks
 37        type_ (PeerType): Peer type Example: webrtc.
 38    """
 39
 40    id: str
 41    metadata: Union["PeerMetadata", None]
 42    status: PeerStatus
 43    subscribe: Union["SubscribeOptions", None]
 44    tracks: list["Track"]
 45    type_: PeerType
 46    additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict)
 47
 48    def to_dict(self) -> dict[str, Any]:
 49        from ..models.peer_metadata import PeerMetadata
 50        from ..models.subscribe_options import SubscribeOptions
 51
 52        id = self.id
 53
 54        metadata: Union[None, dict[str, Any]]
 55        if isinstance(self.metadata, PeerMetadata):
 56            metadata = self.metadata.to_dict()
 57        else:
 58            metadata = self.metadata
 59
 60        status = self.status.value
 61
 62        subscribe: Union[None, dict[str, Any]]
 63        if isinstance(self.subscribe, SubscribeOptions):
 64            subscribe = self.subscribe.to_dict()
 65        else:
 66            subscribe = self.subscribe
 67
 68        tracks = []
 69        for tracks_item_data in self.tracks:
 70            tracks_item = tracks_item_data.to_dict()
 71            tracks.append(tracks_item)
 72
 73        type_ = self.type_.value
 74
 75        field_dict: dict[str, Any] = {}
 76        field_dict.update(self.additional_properties)
 77        field_dict.update(
 78            {
 79                "id": id,
 80                "metadata": metadata,
 81                "status": status,
 82                "subscribe": subscribe,
 83                "tracks": tracks,
 84                "type": type_,
 85            }
 86        )
 87
 88        return field_dict
 89
 90    @classmethod
 91    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 92        from ..models.peer_metadata import PeerMetadata
 93        from ..models.subscribe_options import SubscribeOptions
 94        from ..models.track import Track
 95
 96        d = dict(src_dict)
 97        id = d.pop("id")
 98
 99        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
100            if data is None:
101                return data
102            try:
103                if not isinstance(data, dict):
104                    raise TypeError()
105                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
106
107                return componentsschemas_peer_metadata_type_0
108            except:  # noqa: E722
109                pass
110            return cast(Union["PeerMetadata", None], data)
111
112        metadata = _parse_metadata(d.pop("metadata"))
113
114        status = PeerStatus(d.pop("status"))
115
116        def _parse_subscribe(data: object) -> Union["SubscribeOptions", None]:
117            if data is None:
118                return data
119            try:
120                if not isinstance(data, dict):
121                    raise TypeError()
122                componentsschemas_subscribe_options_type_0 = SubscribeOptions.from_dict(
123                    data
124                )
125
126                return componentsschemas_subscribe_options_type_0
127            except:  # noqa: E722
128                pass
129            return cast(Union["SubscribeOptions", None], data)
130
131        subscribe = _parse_subscribe(d.pop("subscribe"))
132
133        tracks = []
134        _tracks = d.pop("tracks")
135        for tracks_item_data in _tracks:
136            tracks_item = Track.from_dict(tracks_item_data)
137
138            tracks.append(tracks_item)
139
140        type_ = PeerType(d.pop("type"))
141
142        peer = cls(
143            id=id,
144            metadata=metadata,
145            status=status,
146            subscribe=subscribe,
147            tracks=tracks,
148            type_=type_,
149        )
150
151        peer.additional_properties = d
152        return peer
153
154    @property
155    def additional_keys(self) -> list[str]:
156        return list(self.additional_properties.keys())
157
158    def __getitem__(self, key: str) -> Any:
159        return self.additional_properties[key]
160
161    def __setitem__(self, key: str, value: Any) -> None:
162        self.additional_properties[key] = value
163
164    def __delitem__(self, key: str) -> None:
165        del self.additional_properties[key]
166
167    def __contains__(self, key: str) -> bool:
168        return key in self.additional_properties

Describes peer status

Attributes: id (str): Assigned peer id Example: peer-1. metadata (Union['PeerMetadata', None]): Custom metadata set by the peer Example: {'name': 'FishjamUser'}. status (PeerStatus): Informs about the peer status Example: disconnected. subscribe (Union['SubscribeOptions', None]): Configuration of server-side subscriptions to the peer's tracks Example: {'audioFormat': 'pcm16'}. tracks (list['Track']): List of all peer's tracks type_ (PeerType): Peer type Example: webrtc.

Peer( id: str, metadata: Optional[PeerMetadata], status: fishjam._openapi_client.models.peer_status.PeerStatus, subscribe: Optional[fishjam._openapi_client.models.subscribe_options.SubscribeOptions], tracks: list[fishjam._openapi_client.models.track.Track], type_: fishjam._openapi_client.models.peer_type.PeerType)
2def __init__(self, id, metadata, status, subscribe, tracks, type_):
3    self.id = id
4    self.metadata = metadata
5    self.status = status
6    self.subscribe = subscribe
7    self.tracks = tracks
8    self.type_ = type_
9    self.additional_properties = __attr_factory_additional_properties()

Method generated by attrs for class Peer.

id: str
metadata: Optional[PeerMetadata]
status: fishjam._openapi_client.models.peer_status.PeerStatus
subscribe: Optional[fishjam._openapi_client.models.subscribe_options.SubscribeOptions]
tracks: list[fishjam._openapi_client.models.track.Track]
type_: fishjam._openapi_client.models.peer_type.PeerType
additional_properties: dict[str, typing.Any]
def to_dict(self) -> dict[str, typing.Any]:
48    def to_dict(self) -> dict[str, Any]:
49        from ..models.peer_metadata import PeerMetadata
50        from ..models.subscribe_options import SubscribeOptions
51
52        id = self.id
53
54        metadata: Union[None, dict[str, Any]]
55        if isinstance(self.metadata, PeerMetadata):
56            metadata = self.metadata.to_dict()
57        else:
58            metadata = self.metadata
59
60        status = self.status.value
61
62        subscribe: Union[None, dict[str, Any]]
63        if isinstance(self.subscribe, SubscribeOptions):
64            subscribe = self.subscribe.to_dict()
65        else:
66            subscribe = self.subscribe
67
68        tracks = []
69        for tracks_item_data in self.tracks:
70            tracks_item = tracks_item_data.to_dict()
71            tracks.append(tracks_item)
72
73        type_ = self.type_.value
74
75        field_dict: dict[str, Any] = {}
76        field_dict.update(self.additional_properties)
77        field_dict.update(
78            {
79                "id": id,
80                "metadata": metadata,
81                "status": status,
82                "subscribe": subscribe,
83                "tracks": tracks,
84                "type": type_,
85            }
86        )
87
88        return field_dict
@classmethod
def from_dict(cls: type[~T], src_dict: Mapping[str, typing.Any]) -> ~T:
 90    @classmethod
 91    def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T:
 92        from ..models.peer_metadata import PeerMetadata
 93        from ..models.subscribe_options import SubscribeOptions
 94        from ..models.track import Track
 95
 96        d = dict(src_dict)
 97        id = d.pop("id")
 98
 99        def _parse_metadata(data: object) -> Union["PeerMetadata", None]:
100            if data is None:
101                return data
102            try:
103                if not isinstance(data, dict):
104                    raise TypeError()
105                componentsschemas_peer_metadata_type_0 = PeerMetadata.from_dict(data)
106
107                return componentsschemas_peer_metadata_type_0
108            except:  # noqa: E722
109                pass
110            return cast(Union["PeerMetadata", None], data)
111
112        metadata = _parse_metadata(d.pop("metadata"))
113
114        status = PeerStatus(d.pop("status"))
115
116        def _parse_subscribe(data: object) -> Union["SubscribeOptions", None]:
117            if data is None:
118                return data
119            try:
120                if not isinstance(data, dict):
121                    raise TypeError()
122                componentsschemas_subscribe_options_type_0 = SubscribeOptions.from_dict(
123                    data
124                )
125
126                return componentsschemas_subscribe_options_type_0
127            except:  # noqa: E722
128                pass
129            return cast(Union["SubscribeOptions", None], data)
130
131        subscribe = _parse_subscribe(d.pop("subscribe"))
132
133        tracks = []
134        _tracks = d.pop("tracks")
135        for tracks_item_data in _tracks:
136            tracks_item = Track.from_dict(tracks_item_data)
137
138            tracks.append(tracks_item)
139
140        type_ = PeerType(d.pop("type"))
141
142        peer = cls(
143            id=id,
144            metadata=metadata,
145            status=status,
146            subscribe=subscribe,
147            tracks=tracks,
148            type_=type_,
149        )
150
151        peer.additional_properties = d
152        return peer
additional_keys: list[str]
154    @property
155    def additional_keys(self) -> list[str]:
156        return list(self.additional_properties.keys())