Fishjam Python Server SDK
 1from .agent import (
 2    Agent,
 3    AgentSession,
 4    IncomingTrackData,
 5    OutgoingAudioTrackOptions,
 6    OutgoingTrack,
 7)
 8from .errors import AgentAuthError, AgentError
 9
10__all__ = [
11    "Agent",
12    "AgentError",
13    "AgentSession",
14    "AgentAuthError",
15    "IncomingTrackData",
16    "OutgoingTrack",
17    "OutgoingAudioTrackOptions",
18]
class Agent:
163class Agent:
164    """
165    Allows for connecting to a Fishjam room as an agent peer.
166    Provides callbacks for receiving audio.
167    """
168
169    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
170        """
171        Create Agent instance, providing the fishjam id and management token.
172
173        This constructor should not be called directly.
174        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
175        """
176
177        self.id = id
178        self.room_id = room_id
179
180        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
181        self._token = token
182
183    @asynccontextmanager
184    async def connect(self):
185        """
186        Connect the agent to Fishjam to start receiving messages.
187
188        Incoming messages from Fishjam will be routed to handlers
189        defined with :func:`on_track_data`.
190
191        :raises AgentAuthError: authentication failed
192        """
193        async with client.connect(self._socket_url) as websocket:
194            await self._authenticate(websocket)
195            yield AgentSession(self, websocket)
196
197    async def _authenticate(self, websocket: ClientConnection):
198        req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token))
199        try:
200            await websocket.send(bytes(req))
201            # Fishjam will close the socket if auth fails and send a response on success
202            await websocket.recv(decode=False)
203        except ConnectionClosed:
204            raise AgentAuthError(websocket.close_reason or "")

Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.

Agent(id: str, room_id: str, token: str, fishjam_url: str)
169    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
170        """
171        Create Agent instance, providing the fishjam id and management token.
172
173        This constructor should not be called directly.
174        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
175        """
176
177        self.id = id
178        self.room_id = room_id
179
180        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
181        self._token = token

Create Agent instance, providing the fishjam id and management token.

This constructor should not be called directly. Instead, you should call fishjam.FishjamClient.create_agent().

id
room_id
@asynccontextmanager
async def connect(self):
183    @asynccontextmanager
184    async def connect(self):
185        """
186        Connect the agent to Fishjam to start receiving messages.
187
188        Incoming messages from Fishjam will be routed to handlers
189        defined with :func:`on_track_data`.
190
191        :raises AgentAuthError: authentication failed
192        """
193        async with client.connect(self._socket_url) as websocket:
194            await self._authenticate(websocket)
195            yield AgentSession(self, websocket)

Connect the agent to Fishjam to start receiving messages.

Incoming messages from Fishjam will be routed to handlers defined with on_track_data().

Raises
  • AgentAuthError: authentication failed
class AgentError(builtins.Exception):
2class AgentError(Exception):
3    """Base exception class for all agent exceptions"""

Base exception class for all agent exceptions

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class AgentSession:
107class AgentSession:
108    def __init__(self, agent: Agent, websocket: ClientConnection):
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False
113
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """
116        Returns an infinite async iterator over the incoming messages from Fishjam to
117        the agent.
118        """
119        while message := await self._ws.recv(decode=False):
120            parsed = AgentResponse().parse(message)
121            _, msg = betterproto.which_one_of(parsed, "content")
122            match msg:
123                case IncomingTrackData() as content:
124                    yield content
125
126    async def add_track(self, options: OutgoingAudioTrackOptions):
127        """
128        Adds a track to the connected agent, with the specified options and metadata.
129
130        Returns an instance of :class:`OutgoingTrack`, which can be used to send data
131        over the added track.
132        """
133        track_id = uuid.uuid4().hex
134        metadata_json = json.dumps(options.metadata)
135        message = AgentRequest(
136            add_track=AgentRequestAddTrack(
137                track=Track(
138                    id=track_id,
139                    type=TrackType.TRACK_TYPE_AUDIO,
140                    metadata=metadata_json,
141                ),
142                codec_params=AgentRequestAddTrackCodecParameters(
143                    encoding=options.encoding,
144                    sample_rate=options.sample_rate,
145                    channels=options.channels,
146                ),
147            )
148        )
149        await self._send(message)
150        return OutgoingTrack(id=track_id, session=self, options=options)
151
152    async def _send(self, message: AgentRequest):
153        await self._ws.send(bytes(message), text=False)
154
155    async def disconnect(self):
156        """
157        Ends the agent session by closing the websocket connection.
158        Useful when you don't use the context manager to obtain the session.
159        """
160        await self._ws.close()
AgentSession( agent: Agent, websocket: websockets.asyncio.client.ClientConnection)
108    def __init__(self, agent: Agent, websocket: ClientConnection):
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False
agent
async def receive( self) -> AsyncIterator[fishjam.events._protos.fishjam.AgentResponseTrackData]:
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """
116        Returns an infinite async iterator over the incoming messages from Fishjam to
117        the agent.
118        """
119        while message := await self._ws.recv(decode=False):
120            parsed = AgentResponse().parse(message)
121            _, msg = betterproto.which_one_of(parsed, "content")
122            match msg:
123                case IncomingTrackData() as content:
124                    yield content

Returns an infinite async iterator over the incoming messages from Fishjam to the agent.

async def add_track(self, options: OutgoingAudioTrackOptions):
126    async def add_track(self, options: OutgoingAudioTrackOptions):
127        """
128        Adds a track to the connected agent, with the specified options and metadata.
129
130        Returns an instance of :class:`OutgoingTrack`, which can be used to send data
131        over the added track.
132        """
133        track_id = uuid.uuid4().hex
134        metadata_json = json.dumps(options.metadata)
135        message = AgentRequest(
136            add_track=AgentRequestAddTrack(
137                track=Track(
138                    id=track_id,
139                    type=TrackType.TRACK_TYPE_AUDIO,
140                    metadata=metadata_json,
141                ),
142                codec_params=AgentRequestAddTrackCodecParameters(
143                    encoding=options.encoding,
144                    sample_rate=options.sample_rate,
145                    channels=options.channels,
146                ),
147            )
148        )
149        await self._send(message)
150        return OutgoingTrack(id=track_id, session=self, options=options)

Adds a track to the connected agent, with the specified options and metadata.

Returns an instance of OutgoingTrack, which can be used to send data over the added track.

async def disconnect(self):
155    async def disconnect(self):
156        """
157        Ends the agent session by closing the websocket connection.
158        Useful when you don't use the context manager to obtain the session.
159        """
160        await self._ws.close()

Ends the agent session by closing the websocket connection. Useful when you don't use the context manager to obtain the session.

class AgentAuthError(fishjam.agent.AgentError):
 6class AgentAuthError(AgentError):
 7    """Agent failed to authenticate properly"""
 8
 9    def __init__(self, reason: str):
10        self.reason = reason
11
12    def __str__(self) -> str:
13        return f"agent failed to authenticate: {self.reason}"

Agent failed to authenticate properly

AgentAuthError(reason: str)
 9    def __init__(self, reason: str):
10        self.reason = reason
reason
Inherited Members
builtins.BaseException
with_traceback
add_note
args
IncomingTrackData = <class 'fishjam.events._protos.fishjam.AgentResponseTrackData'>
@dataclass(frozen=True)
class OutgoingTrack:
 60@dataclass(frozen=True)
 61class OutgoingTrack:
 62    """
 63    Represents an outgoing track of an agent connected to Fishjam,
 64    created by :func:`Agent.add_track`.
 65    """
 66
 67    id: str
 68    """The global identifier of the track."""
 69    session: AgentSession
 70    """The agent the track belongs to."""
 71    options: OutgoingAudioTrackOptions
 72    """The parameters used to create the track."""
 73
 74    async def send_chunk(self, data: bytes):
 75        """
 76        Send a chunk of audio to Fishjam on this track.
 77
 78        Peers connected to the room of the agent will receive this data.
 79        """
 80        message = AgentRequest(
 81            track_data=OutgoingTrackData(
 82                track_id=self.id,
 83                data=data,
 84            )
 85        )
 86
 87        await self.session._send(message)
 88
 89    async def interrupt(self):
 90        """
 91        Interrupt current track.
 92
 93        Any audio that has been sent, but not played
 94        will be cleared and be prevented from playing.
 95
 96        Audio sent after the interrupt will be played normally.
 97        """
 98        message = AgentRequest(
 99            interrupt_track=AgentRequestInterruptTrack(
100                track_id=self.id,
101            )
102        )
103
104        await self.session._send(message)

Represents an outgoing track of an agent connected to Fishjam, created by Agent.add_track().

OutgoingTrack( id: str, session: AgentSession, options: OutgoingAudioTrackOptions)
id: str

The global identifier of the track.

session: AgentSession

The agent the track belongs to.

The parameters used to create the track.

async def send_chunk(self, data: bytes):
74    async def send_chunk(self, data: bytes):
75        """
76        Send a chunk of audio to Fishjam on this track.
77
78        Peers connected to the room of the agent will receive this data.
79        """
80        message = AgentRequest(
81            track_data=OutgoingTrackData(
82                track_id=self.id,
83                data=data,
84            )
85        )
86
87        await self.session._send(message)

Send a chunk of audio to Fishjam on this track.

Peers connected to the room of the agent will receive this data.

async def interrupt(self):
 89    async def interrupt(self):
 90        """
 91        Interrupt current track.
 92
 93        Any audio that has been sent, but not played
 94        will be cleared and be prevented from playing.
 95
 96        Audio sent after the interrupt will be played normally.
 97        """
 98        message = AgentRequest(
 99            interrupt_track=AgentRequestInterruptTrack(
100                track_id=self.id,
101            )
102        )
103
104        await self.session._send(message)

Interrupt current track.

Any audio that has been sent, but not played will be cleared and be prevented from playing.

Audio sent after the interrupt will be played normally.

@dataclass
class OutgoingAudioTrackOptions:
30@dataclass
31class OutgoingAudioTrackOptions:
32    """Parameters of an outgoing audio track."""
33
34    encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED
35    """
36    The encoding of the audio source.
37    Defaults to raw 16-bit PCM.
38    """
39
40    sample_rate: Literal[16000, 24000] = 16000
41    """
42    The sample rate of the audio source.
43    Defaults to 16000.
44    """
45
46    channels: Literal[1, 2] = 1
47    """
48    The number of channels in the audio source.
49    Supported values are 1 (mono) and 2 (stereo).
50    Defaults to 1 (mono)
51    """
52
53    metadata: dict[str, Any] | None = None
54    """
55    Custom metadata for the track.
56    Must be JSON-encodable.
57    """

Parameters of an outgoing audio track.

OutgoingAudioTrackOptions( encoding: fishjam.events.TrackEncoding = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>, sample_rate: Literal[16000, 24000] = 16000, channels: Literal[1, 2] = 1, metadata: dict[str, typing.Any] | None = None)
encoding: fishjam.events.TrackEncoding = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>

The encoding of the audio source. Defaults to raw 16-bit PCM.

sample_rate: Literal[16000, 24000] = 16000

The sample rate of the audio source. Defaults to 16000.

channels: Literal[1, 2] = 1

The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono)

metadata: dict[str, typing.Any] | None = None

Custom metadata for the track. Must be JSON-encodable.