Fishjam Python Server SDK
 1from .agent import Agent, AgentResponseTrackData, TrackDataHandler
 2from .errors import AgentAuthError, AgentError
 3
 4__all__ = [
 5    "Agent",
 6    "AgentError",
 7    "AgentAuthError",
 8    "TrackDataHandler",
 9    "AgentResponseTrackData",
10]
class Agent:
 33class Agent:
 34    """
 35    Allows for connecting to a Fishjam room as an agent peer.
 36    Provides callbacks for receiving audio.
 37    """
 38
 39    def __init__(self, id: str, token: str, fishjam_url: str):
 40        """
 41        Create FishjamAgent instance, providing the fishjam id and management token.
 42        """
 43
 44        self.id = id
 45        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
 46        self._token = token
 47        self._msg_loop: asyncio.Task[None] | None = None
 48        self._end_event = asyncio.Event()
 49
 50        @functools.singledispatch
 51        def _message_handler(content: Any) -> None:
 52            raise TypeError(f"Unexpected message of type #{type(content)}")
 53
 54        @_message_handler.register
 55        def _(_content: AgentResponseTrackData):
 56            return
 57
 58        self._dispatch_handler = _message_handler
 59
 60    def on_track_data(self, handler: TrackDataHandlerT) -> TrackDataHandlerT:
 61        """
 62        Decorator used for defining a handler for track data messages from Fishjam.
 63        """
 64        self._dispatch_handler.register(AgentResponseTrackData, handler)
 65        return handler
 66
 67    async def connect(self):
 68        """
 69        Connect the agent to Fishjam to start receiving messages.
 70
 71        Incoming messages from Fishjam will be routed to handlers
 72        defined with :func:`on_track_data`.
 73
 74        :raises AgentAuthError: authentication failed
 75        """
 76        await self.disconnect()
 77
 78        websocket = await client.connect(self._socket_url)
 79        await self._authenticate(websocket)
 80
 81        task = asyncio.create_task(self._recv_loop(websocket))
 82
 83        self._msg_loop = task
 84
 85    async def disconnect(self, code: CloseCode = CloseCode.NORMAL_CLOSURE):
 86        """
 87        Disconnect the agent from Fishjam.
 88
 89        Does nothing if already disconnected.
 90        """
 91        if (task := self._msg_loop) is None:
 92            return
 93
 94        event = self._end_event
 95
 96        self._end_event = asyncio.Event()
 97        self._msg_loop = None
 98
 99        task.add_done_callback(lambda _t: event.set())
100        if task.cancel(code):
101            await event.wait()
102
103    async def __aenter__(self):
104        await self.connect()
105        return self
106
107    async def __aexit__(
108        self,
109        exc_type: type[BaseException] | None,
110        exc_value: BaseException | None,
111        traceback: TracebackType | None,
112    ):
113        if exc_type is not None:
114            await self.disconnect(CloseCode.INTERNAL_ERROR)
115        else:
116            await self.disconnect()
117
118    async def _authenticate(self, websocket: ClientConnection):
119        req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token))
120        try:
121            await websocket.send(bytes(req))
122            # Fishjam will close the socket if auth fails and send a response on success
123            await websocket.recv(decode=False)
124        except ConnectionClosed as e:
125            raise AgentAuthError(e.reason)
126
127    async def _recv_loop(self, websocket: ClientConnection):
128        close_code = CloseCode.NORMAL_CLOSURE
129        try:
130            while True:
131                message = await websocket.recv(decode=False)
132                message = AgentResponse().parse(message)
133
134                _which, content = betterproto.which_one_of(message, "content")
135                self._dispatch_handler(content)
136        except ConnectionClosed as e:
137            if not _close_ok(e):
138                close_code = CloseCode.INTERNAL_ERROR
139                raise
140        except asyncio.CancelledError as e:
141            # NOTE: e.args[0] is the close code supplied by disconnect()
142            # However cancellation can have other causes, which we treat as normal
143            with suppress(IndexError):
144                close_code = e.args[0]
145            raise
146        except Exception:
147            close_code = CloseCode.INTERNAL_ERROR
148            raise
149        finally:
150            await websocket.close(close_code)

Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.

Agent(id: str, token: str, fishjam_url: str)
39    def __init__(self, id: str, token: str, fishjam_url: str):
40        """
41        Create FishjamAgent instance, providing the fishjam id and management token.
42        """
43
44        self.id = id
45        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
46        self._token = token
47        self._msg_loop: asyncio.Task[None] | None = None
48        self._end_event = asyncio.Event()
49
50        @functools.singledispatch
51        def _message_handler(content: Any) -> None:
52            raise TypeError(f"Unexpected message of type #{type(content)}")
53
54        @_message_handler.register
55        def _(_content: AgentResponseTrackData):
56            return
57
58        self._dispatch_handler = _message_handler

Create FishjamAgent instance, providing the fishjam id and management token.

id
def on_track_data(self, handler: ~TrackDataHandlerT) -> ~TrackDataHandlerT:
60    def on_track_data(self, handler: TrackDataHandlerT) -> TrackDataHandlerT:
61        """
62        Decorator used for defining a handler for track data messages from Fishjam.
63        """
64        self._dispatch_handler.register(AgentResponseTrackData, handler)
65        return handler

Decorator used for defining a handler for track data messages from Fishjam.

async def connect(self):
67    async def connect(self):
68        """
69        Connect the agent to Fishjam to start receiving messages.
70
71        Incoming messages from Fishjam will be routed to handlers
72        defined with :func:`on_track_data`.
73
74        :raises AgentAuthError: authentication failed
75        """
76        await self.disconnect()
77
78        websocket = await client.connect(self._socket_url)
79        await self._authenticate(websocket)
80
81        task = asyncio.create_task(self._recv_loop(websocket))
82
83        self._msg_loop = task

Connect the agent to Fishjam to start receiving messages.

Incoming messages from Fishjam will be routed to handlers defined with on_track_data().

Raises
  • AgentAuthError: authentication failed
async def disconnect( self, code: websockets.frames.CloseCode = <CloseCode.NORMAL_CLOSURE: 1000>):
 85    async def disconnect(self, code: CloseCode = CloseCode.NORMAL_CLOSURE):
 86        """
 87        Disconnect the agent from Fishjam.
 88
 89        Does nothing if already disconnected.
 90        """
 91        if (task := self._msg_loop) is None:
 92            return
 93
 94        event = self._end_event
 95
 96        self._end_event = asyncio.Event()
 97        self._msg_loop = None
 98
 99        task.add_done_callback(lambda _t: event.set())
100        if task.cancel(code):
101            await event.wait()

Disconnect the agent from Fishjam.

Does nothing if already disconnected.

class AgentError(builtins.Exception):
2class AgentError(Exception):
3    """Base exception class for all agent exceptions"""

Base exception class for all agent exceptions

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class AgentAuthError(fishjam.agent.AgentError):
 6class AgentAuthError(AgentError):
 7    """Agent failed to authenticate properly"""
 8
 9    def __init__(self, reason: str):
10        self.reason = reason
11
12    def __str__(self) -> str:
13        return f"agent failed to authenticate: {self.reason}"

Agent failed to authenticate properly

AgentAuthError(reason: str)
 9    def __init__(self, reason: str):
10        self.reason = reason
reason
Inherited Members
builtins.BaseException
with_traceback
add_note
args
TrackDataHandler = typing.Callable[[AgentResponseTrackData], NoneType]
@dataclass(eq=False, repr=False)
class AgentResponseTrackData(betterproto.Message):
103@dataclass(eq=False, repr=False)
104class AgentResponseTrackData(betterproto.Message):
105    """Notification containing a chunk of a track's data stream"""
106
107    peer_id: str = betterproto.string_field(1)
108    track: "notifications.Track" = betterproto.message_field(2)
109    data: bytes = betterproto.bytes_field(3)

Notification containing a chunk of a track's data stream

AgentResponseTrackData( peer_id: str = <object object>, track: fishjam.events.Track = <object object>, data: bytes = <object object>)
peer_id: str = <object object>
track: fishjam.events.Track = <object object>
data: bytes = <object object>
Inherited Members
betterproto.Message
SerializeToString
parse
FromString
to_dict
from_dict
to_json
from_json
to_pydict
from_pydict
is_set