Fishjam Python Server SDK
 1from .agent import (
 2    Agent,
 3    AgentSession,
 4    IncomingTrackData,
 5    OutgoingAudioTrackOptions,
 6    OutgoingTrack,
 7)
 8from .errors import AgentAuthError, AgentError
 9
10__all__ = [
11    "Agent",
12    "AgentError",
13    "AgentSession",
14    "AgentAuthError",
15    "IncomingTrackData",
16    "OutgoingTrack",
17    "OutgoingAudioTrackOptions",
18]
class Agent:
156class Agent:
157    """
158    Allows for connecting to a Fishjam room as an agent peer.
159    Provides callbacks for receiving audio.
160    """
161
162    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
163        """
164        Create Agent instance, providing the fishjam id and management token.
165
166        This constructor should not be called directly.
167        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
168        """
169
170        self.id = id
171        self.room_id = room_id
172
173        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
174        self._token = token
175
176    @asynccontextmanager
177    async def connect(self):
178        """
179        Connect the agent to Fishjam to start receiving messages.
180
181        Incoming messages from Fishjam will be routed to handlers
182        defined with :func:`on_track_data`.
183
184        :raises AgentAuthError: authentication failed
185        """
186        async with client.connect(self._socket_url) as websocket:
187            await self._authenticate(websocket)
188            yield AgentSession(self, websocket)
189
190    async def _authenticate(self, websocket: ClientConnection):
191        req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token))
192        try:
193            await websocket.send(bytes(req))
194            # Fishjam will close the socket if auth fails and send a response on success
195            await websocket.recv(decode=False)
196        except ConnectionClosed:
197            raise AgentAuthError(websocket.close_reason or "")

Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.

Agent(id: str, room_id: str, token: str, fishjam_url: str)
162    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
163        """
164        Create Agent instance, providing the fishjam id and management token.
165
166        This constructor should not be called directly.
167        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
168        """
169
170        self.id = id
171        self.room_id = room_id
172
173        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
174        self._token = token

Create Agent instance, providing the fishjam id and management token.

This constructor should not be called directly. Instead, you should call fishjam.FishjamClient.create_agent().

id
room_id
@asynccontextmanager
async def connect(self):
176    @asynccontextmanager
177    async def connect(self):
178        """
179        Connect the agent to Fishjam to start receiving messages.
180
181        Incoming messages from Fishjam will be routed to handlers
182        defined with :func:`on_track_data`.
183
184        :raises AgentAuthError: authentication failed
185        """
186        async with client.connect(self._socket_url) as websocket:
187            await self._authenticate(websocket)
188            yield AgentSession(self, websocket)

Connect the agent to Fishjam to start receiving messages.

Incoming messages from Fishjam will be routed to handlers defined with on_track_data().

Raises
  • AgentAuthError: authentication failed
class AgentError(builtins.Exception):
2class AgentError(Exception):
3    """Base exception class for all agent exceptions"""

Base exception class for all agent exceptions

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args
class AgentSession:
107class AgentSession:
108    def __init__(self, agent: Agent, websocket: ClientConnection):
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False
113
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """
116        Returns an infinite async iterator over the incoming messages from Fishjam to
117        the agent.
118        """
119        while message := await self._ws.recv(decode=False):
120            parsed = AgentResponse().parse(message)
121            _, msg = betterproto.which_one_of(parsed, "content")
122            match msg:
123                case IncomingTrackData() as content:
124                    yield content
125
126    async def add_track(self, options: OutgoingAudioTrackOptions):
127        """
128        Adds a track to the connected agent, with the specified options and metadata.
129
130        Returns an instance of :class:`OutgoingTrack`, which can be used to send data
131        over the added track.
132        """
133        track_id = uuid.uuid4().hex
134        metadata_json = json.dumps(options.metadata)
135        message = AgentRequest(
136            add_track=AgentRequestAddTrack(
137                track=Track(
138                    id=track_id,
139                    type=TrackType.TRACK_TYPE_AUDIO,
140                    metadata=metadata_json,
141                ),
142                codec_params=AgentRequestAddTrackCodecParameters(
143                    encoding=options.encoding,
144                    sample_rate=options.sample_rate,
145                    channels=options.channels,
146                ),
147            )
148        )
149        await self._send(message)
150        return OutgoingTrack(id=track_id, session=self, options=options)
151
152    async def _send(self, message: AgentRequest):
153        await self._ws.send(bytes(message), text=False)
AgentSession( agent: Agent, websocket: websockets.asyncio.client.ClientConnection)
108    def __init__(self, agent: Agent, websocket: ClientConnection):
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False
agent
async def receive( self) -> AsyncIterator[fishjam.events._protos.fishjam.AgentResponseTrackData]:
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """
116        Returns an infinite async iterator over the incoming messages from Fishjam to
117        the agent.
118        """
119        while message := await self._ws.recv(decode=False):
120            parsed = AgentResponse().parse(message)
121            _, msg = betterproto.which_one_of(parsed, "content")
122            match msg:
123                case IncomingTrackData() as content:
124                    yield content

Returns an infinite async iterator over the incoming messages from Fishjam to the agent.

async def add_track(self, options: OutgoingAudioTrackOptions):
126    async def add_track(self, options: OutgoingAudioTrackOptions):
127        """
128        Adds a track to the connected agent, with the specified options and metadata.
129
130        Returns an instance of :class:`OutgoingTrack`, which can be used to send data
131        over the added track.
132        """
133        track_id = uuid.uuid4().hex
134        metadata_json = json.dumps(options.metadata)
135        message = AgentRequest(
136            add_track=AgentRequestAddTrack(
137                track=Track(
138                    id=track_id,
139                    type=TrackType.TRACK_TYPE_AUDIO,
140                    metadata=metadata_json,
141                ),
142                codec_params=AgentRequestAddTrackCodecParameters(
143                    encoding=options.encoding,
144                    sample_rate=options.sample_rate,
145                    channels=options.channels,
146                ),
147            )
148        )
149        await self._send(message)
150        return OutgoingTrack(id=track_id, session=self, options=options)

Adds a track to the connected agent, with the specified options and metadata.

Returns an instance of OutgoingTrack, which can be used to send data over the added track.

class AgentAuthError(fishjam.agent.AgentError):
 6class AgentAuthError(AgentError):
 7    """Agent failed to authenticate properly"""
 8
 9    def __init__(self, reason: str):
10        self.reason = reason
11
12    def __str__(self) -> str:
13        return f"agent failed to authenticate: {self.reason}"

Agent failed to authenticate properly

AgentAuthError(reason: str)
 9    def __init__(self, reason: str):
10        self.reason = reason
reason
Inherited Members
builtins.BaseException
with_traceback
add_note
args
IncomingTrackData = <class 'fishjam.events._protos.fishjam.AgentResponseTrackData'>
@dataclass(frozen=True)
class OutgoingTrack:
 60@dataclass(frozen=True)
 61class OutgoingTrack:
 62    """
 63    Represents an outgoing track of an agent connected to Fishjam,
 64    created by :func:`Agent.add_track`.
 65    """
 66
 67    id: str
 68    """The global identifier of the track."""
 69    session: AgentSession
 70    """The agent the track belongs to."""
 71    options: OutgoingAudioTrackOptions
 72    """The parameters used to create the track."""
 73
 74    async def send_chunk(self, data: bytes):
 75        """
 76        Send a chunk of audio to Fishjam on this track.
 77
 78        Peers connected to the room of the agent will receive this data.
 79        """
 80        message = AgentRequest(
 81            track_data=OutgoingTrackData(
 82                track_id=self.id,
 83                data=data,
 84            )
 85        )
 86
 87        await self.session._send(message)
 88
 89    async def interrupt(self):
 90        """
 91        Interrupt current track.
 92
 93        Any audio that has been sent, but not played
 94        will be cleared and be prevented from playing.
 95
 96        Audio sent after the interrupt will be played normally.
 97        """
 98        message = AgentRequest(
 99            interrupt_track=AgentRequestInterruptTrack(
100                track_id=self.id,
101            )
102        )
103
104        await self.session._send(message)

Represents an outgoing track of an agent connected to Fishjam, created by Agent.add_track().

OutgoingTrack( id: str, session: AgentSession, options: OutgoingAudioTrackOptions)
id: str

The global identifier of the track.

session: AgentSession

The agent the track belongs to.

The parameters used to create the track.

async def send_chunk(self, data: bytes):
74    async def send_chunk(self, data: bytes):
75        """
76        Send a chunk of audio to Fishjam on this track.
77
78        Peers connected to the room of the agent will receive this data.
79        """
80        message = AgentRequest(
81            track_data=OutgoingTrackData(
82                track_id=self.id,
83                data=data,
84            )
85        )
86
87        await self.session._send(message)

Send a chunk of audio to Fishjam on this track.

Peers connected to the room of the agent will receive this data.

async def interrupt(self):
 89    async def interrupt(self):
 90        """
 91        Interrupt current track.
 92
 93        Any audio that has been sent, but not played
 94        will be cleared and be prevented from playing.
 95
 96        Audio sent after the interrupt will be played normally.
 97        """
 98        message = AgentRequest(
 99            interrupt_track=AgentRequestInterruptTrack(
100                track_id=self.id,
101            )
102        )
103
104        await self.session._send(message)

Interrupt current track.

Any audio that has been sent, but not played will be cleared and be prevented from playing.

Audio sent after the interrupt will be played normally.

@dataclass
class OutgoingAudioTrackOptions:
30@dataclass
31class OutgoingAudioTrackOptions:
32    """Parameters of an outgoing audio track."""
33
34    encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED
35    """
36    The encoding of the audio source.
37    Defaults to raw 16-bit PCM.
38    """
39
40    sample_rate: Literal[16000, 24000] = 16000
41    """
42    The sample rate of the audio source.
43    Defaults to 16000.
44    """
45
46    channels: Literal[1, 2] = 1
47    """
48    The number of channels in the audio source.
49    Supported values are 1 (mono) and 2 (stereo).
50    Defaults to 1 (mono)
51    """
52
53    metadata: dict[str, Any] | None = None
54    """
55    Custom metadata for the track.
56    Must be JSON-encodable.
57    """

Parameters of an outgoing audio track.

OutgoingAudioTrackOptions( encoding: fishjam.events.TrackEncoding = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>, sample_rate: Literal[16000, 24000] = 16000, channels: Literal[1, 2] = 1, metadata: dict[str, typing.Any] | None = None)
encoding: fishjam.events.TrackEncoding = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>

The encoding of the audio source. Defaults to raw 16-bit PCM.

sample_rate: Literal[16000, 24000] = 16000

The sample rate of the audio source. Defaults to 16000.

channels: Literal[1, 2] = 1

The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono)

metadata: dict[str, typing.Any] | None = None

Custom metadata for the track. Must be JSON-encodable.