Fishjam Python Server SDK
 1from .agent import (
 2    Agent,
 3    AgentSession,
 4    IncomingTrackData,
 5    OutgoingAudioTrackOptions,
 6    OutgoingTrack,
 7)
 8from .errors import AgentAuthError, AgentError
 9
10__all__ = [
11    "Agent",
12    "AgentError",
13    "AgentSession",
14    "AgentAuthError",
15    "IncomingTrackData",
16    "OutgoingTrack",
17    "OutgoingAudioTrackOptions",
18]
class Agent:
164class Agent:
165    """Allows for connecting to a Fishjam room as an agent peer.
166
167    Provides callbacks for receiving audio.
168
169    Attributes:
170        id: The unique identifier of the agent.
171        room_id: The ID of the room the agent is connecting to.
172    """
173
174    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
175        """Creates an Agent instance.
176
177        This constructor should not be called directly.
178        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
179
180        Args:
181            id: The unique identifier for the agent.
182            room_id: The ID of the room the agent will join.
183            token: The authentication token for the agent.
184            fishjam_url: The URL of the Fishjam instance.
185        """
186        self.id = id
187        self.room_id = room_id
188
189        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
190        self._token = token
191
192    @asynccontextmanager
193    async def connect(self):
194        """Connects the agent to Fishjam to start receiving messages.
195
196        Incoming messages from Fishjam will be routed to handlers
197        defined with :func:`on_track_data`.
198
199        Yields:
200            AgentSession: An active session for sending media and handling events.
201
202        Raises:
203            AgentAuthError: If authentication with the Fishjam server fails.
204        """
205        async with client.connect(self._socket_url) as websocket:
206            await self._authenticate(websocket)
207            yield AgentSession(self, websocket)
208
209    async def _authenticate(self, websocket: ClientConnection):
210        req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token))
211        try:
212            await websocket.send(bytes(req))
213            # Fishjam will close the socket if auth fails and send a response on success
214            await websocket.recv(decode=False)
215        except ConnectionClosed:
216            raise AgentAuthError(websocket.close_reason or "")

Allows for connecting to a Fishjam room as an agent peer.

Provides callbacks for receiving audio.

Attributes:

  • id: The unique identifier of the agent.
  • room_id: The ID of the room the agent is connecting to.
Agent(id: 'str', room_id: 'str', token: 'str', fishjam_url: 'str')
174    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
175        """Creates an Agent instance.
176
177        This constructor should not be called directly.
178        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
179
180        Args:
181            id: The unique identifier for the agent.
182            room_id: The ID of the room the agent will join.
183            token: The authentication token for the agent.
184            fishjam_url: The URL of the Fishjam instance.
185        """
186        self.id = id
187        self.room_id = room_id
188
189        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
190        self._token = token

Creates an Agent instance.

This constructor should not be called directly. Instead, you should call fishjam.FishjamClient.create_agent().

Args:

  • id: The unique identifier for the agent.
  • room_id: The ID of the room the agent will join.
  • token: The authentication token for the agent.
  • fishjam_url: The URL of the Fishjam instance.
id
room_id
@asynccontextmanager
async def connect(self):
192    @asynccontextmanager
193    async def connect(self):
194        """Connects the agent to Fishjam to start receiving messages.
195
196        Incoming messages from Fishjam will be routed to handlers
197        defined with :func:`on_track_data`.
198
199        Yields:
200            AgentSession: An active session for sending media and handling events.
201
202        Raises:
203            AgentAuthError: If authentication with the Fishjam server fails.
204        """
205        async with client.connect(self._socket_url) as websocket:
206            await self._authenticate(websocket)
207            yield AgentSession(self, websocket)

Connects the agent to Fishjam to start receiving messages.

Incoming messages from Fishjam will be routed to handlers defined with on_track_data().

Yields:

  • AgentSession: An active session for sending media and handling events.

Raises:

  • AgentAuthError: If authentication with the Fishjam server fails.
class AgentError(builtins.Exception):
2class AgentError(Exception):
3    """Base exception class for all agent exceptions."""

Base exception class for all agent exceptions.

Inherited Members
builtins.BaseException
BaseException
with_traceback
add_note
args
class AgentSession:
 97class AgentSession:
 98    """Represents an active connection session for an Agent."""
 99
100    def __init__(self, agent: Agent, websocket: ClientConnection):
101        """Initializes the AgentSession.
102
103        Args:
104            agent: The Agent instance owning this session.
105            websocket: The active websocket connection.
106        """
107        self.agent = agent
108
109        self._ws = websocket
110        self._closed = False
111
112    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
113        """Returns an async iterator over incoming messages from Fishjam.
114
115        Yields:
116            IncomingAgentMessage: The next message received from the server.
117        """
118        while message := await self._ws.recv(decode=False):
119            parsed = AgentResponse().parse(message)
120            _, msg = betterproto.which_one_of(parsed, "content")
121            match msg:
122                case IncomingTrackData() as content:
123                    yield content
124
125    async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack:
126        """Adds a track to the connected agent with the specified options.
127
128        Args:
129            options: Configuration options and metadata for the new track.
130
131        Returns:
132            OutgoingTrack: An object used to send data over the added track.
133        """
134        track_id = uuid.uuid4().hex
135        metadata_json = json.dumps(options.metadata)
136        message = AgentRequest(
137            add_track=AgentRequestAddTrack(
138                track=Track(
139                    id=track_id,
140                    type=TrackType.TRACK_TYPE_AUDIO,
141                    metadata=metadata_json,
142                ),
143                codec_params=AgentRequestAddTrackCodecParameters(
144                    encoding=options.encoding,
145                    sample_rate=options.sample_rate,
146                    channels=options.channels,
147                ),
148            )
149        )
150        await self._send(message)
151        return OutgoingTrack(id=track_id, session=self, options=options)
152
153    async def _send(self, message: AgentRequest):
154        await self._ws.send(bytes(message), text=False)
155
156    async def disconnect(self):
157        """Ends the agent session by closing the websocket connection.
158
159        Useful when you don't use the context manager to obtain the session.
160        """
161        await self._ws.close()

Represents an active connection session for an Agent.

AgentSession(agent: 'Agent', websocket: 'ClientConnection')
100    def __init__(self, agent: Agent, websocket: ClientConnection):
101        """Initializes the AgentSession.
102
103        Args:
104            agent: The Agent instance owning this session.
105            websocket: The active websocket connection.
106        """
107        self.agent = agent
108
109        self._ws = websocket
110        self._closed = False

Initializes the AgentSession.

Args:

  • agent: The Agent instance owning this session.
  • websocket: The active websocket connection.
agent
async def receive(self) -> 'AsyncIterator[IncomingAgentMessage]':
112    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
113        """Returns an async iterator over incoming messages from Fishjam.
114
115        Yields:
116            IncomingAgentMessage: The next message received from the server.
117        """
118        while message := await self._ws.recv(decode=False):
119            parsed = AgentResponse().parse(message)
120            _, msg = betterproto.which_one_of(parsed, "content")
121            match msg:
122                case IncomingTrackData() as content:
123                    yield content

Returns an async iterator over incoming messages from Fishjam.

Yields:

  • IncomingAgentMessage: The next message received from the server.
async def add_track(self, options: 'OutgoingAudioTrackOptions') -> 'OutgoingTrack':
125    async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack:
126        """Adds a track to the connected agent with the specified options.
127
128        Args:
129            options: Configuration options and metadata for the new track.
130
131        Returns:
132            OutgoingTrack: An object used to send data over the added track.
133        """
134        track_id = uuid.uuid4().hex
135        metadata_json = json.dumps(options.metadata)
136        message = AgentRequest(
137            add_track=AgentRequestAddTrack(
138                track=Track(
139                    id=track_id,
140                    type=TrackType.TRACK_TYPE_AUDIO,
141                    metadata=metadata_json,
142                ),
143                codec_params=AgentRequestAddTrackCodecParameters(
144                    encoding=options.encoding,
145                    sample_rate=options.sample_rate,
146                    channels=options.channels,
147                ),
148            )
149        )
150        await self._send(message)
151        return OutgoingTrack(id=track_id, session=self, options=options)

Adds a track to the connected agent with the specified options.

Args:

  • options: Configuration options and metadata for the new track.

Returns:

  • OutgoingTrack: An object used to send data over the added track.
async def disconnect(self):
156    async def disconnect(self):
157        """Ends the agent session by closing the websocket connection.
158
159        Useful when you don't use the context manager to obtain the session.
160        """
161        await self._ws.close()

Ends the agent session by closing the websocket connection.

Useful when you don't use the context manager to obtain the session.

class AgentAuthError(fishjam.agent.AgentError):
 6class AgentAuthError(AgentError):
 7    """Agent failed to authenticate properly."""
 8
 9    def __init__(self, reason: str):
10        self.reason = reason
11
12    def __str__(self) -> str:
13        return f"agent failed to authenticate: {self.reason}"

Agent failed to authenticate properly.

AgentAuthError(reason: str)
 9    def __init__(self, reason: str):
10        self.reason = reason
reason
Inherited Members
builtins.BaseException
with_traceback
add_note
args
IncomingTrackData = <class 'fishjam.events._protos.fishjam.AgentResponseTrackData'>
@dataclass(frozen=True)
class OutgoingTrack:
48@dataclass(frozen=True)
49class OutgoingTrack:
50    """Represents an outgoing track of an agent connected to Fishjam.
51
52    This is created by :func:`Agent.add_track`.
53
54    Attributes:
55        id: The global identifier of the track.
56        session: The agent the track belongs to.
57        options: The parameters used to create the track.
58    """
59
60    id: str
61    session: AgentSession
62    options: OutgoingAudioTrackOptions
63
64    async def send_chunk(self, data: bytes):
65        """Sends a chunk of audio to Fishjam on this track.
66
67        Peers connected to the room of the agent will receive this data.
68
69        Args:
70            data: The raw audio bytes to send.
71        """
72        message = AgentRequest(
73            track_data=OutgoingTrackData(
74                track_id=self.id,
75                data=data,
76            )
77        )
78
79        await self.session._send(message)
80
81    async def interrupt(self):
82        """Interrupts the current track.
83
84        Any audio that has been sent, but not played, will be cleared and
85        prevented from playing. Audio sent after the interrupt will be played
86        normally.
87        """
88        message = AgentRequest(
89            interrupt_track=AgentRequestInterruptTrack(
90                track_id=self.id,
91            )
92        )
93
94        await self.session._send(message)

Represents an outgoing track of an agent connected to Fishjam.

This is created by Agent.add_track().

Attributes:

  • id: The global identifier of the track.
  • session: The agent the track belongs to.
  • options: The parameters used to create the track.
OutgoingTrack( id: 'str', session: 'AgentSession', options: 'OutgoingAudioTrackOptions')
id: 'str'
session: 'AgentSession'
options: 'OutgoingAudioTrackOptions'
async def send_chunk(self, data: 'bytes'):
64    async def send_chunk(self, data: bytes):
65        """Sends a chunk of audio to Fishjam on this track.
66
67        Peers connected to the room of the agent will receive this data.
68
69        Args:
70            data: The raw audio bytes to send.
71        """
72        message = AgentRequest(
73            track_data=OutgoingTrackData(
74                track_id=self.id,
75                data=data,
76            )
77        )
78
79        await self.session._send(message)

Sends a chunk of audio to Fishjam on this track.

Peers connected to the room of the agent will receive this data.

Args:

  • data: The raw audio bytes to send.
async def interrupt(self):
81    async def interrupt(self):
82        """Interrupts the current track.
83
84        Any audio that has been sent, but not played, will be cleared and
85        prevented from playing. Audio sent after the interrupt will be played
86        normally.
87        """
88        message = AgentRequest(
89            interrupt_track=AgentRequestInterruptTrack(
90                track_id=self.id,
91            )
92        )
93
94        await self.session._send(message)

Interrupts the current track.

Any audio that has been sent, but not played, will be cleared and prevented from playing. Audio sent after the interrupt will be played normally.

@dataclass
class OutgoingAudioTrackOptions:
30@dataclass
31class OutgoingAudioTrackOptions:
32    """Parameters of an outgoing audio track.
33
34    Attributes:
35        encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
36        sample_rate: The sample rate of the audio source. Defaults to 16000.
37        channels: The number of channels in the audio source. Supported values are
38            1 (mono) and 2 (stereo). Defaults to 1 (mono).
39        metadata: Custom metadata for the track. Must be JSON-encodable.
40    """
41
42    encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED
43    sample_rate: Literal[16000, 24000] = 16000
44    channels: Literal[1, 2] = 1
45    metadata: dict[str, Any] | None = None

Parameters of an outgoing audio track.

Attributes:

  • encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
  • sample_rate: The sample rate of the audio source. Defaults to 16000.
  • channels: The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono).
  • metadata: Custom metadata for the track. Must be JSON-encodable.
OutgoingAudioTrackOptions( encoding: 'TrackEncoding' = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>, sample_rate: 'Literal[16000, 24000]' = 16000, channels: 'Literal[1, 2]' = 1, metadata: 'dict[str, Any] | None' = None)
encoding: 'TrackEncoding' = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>
sample_rate: 'Literal[16000, 24000]' = 16000
channels: 'Literal[1, 2]' = 1
metadata: 'dict[str, Any] | None' = None