Fishjam Python Server SDK
 1from .agent import (
 2    Agent,
 3    AgentSession,
 4    IncomingTrackData,
 5    IncomingTrackImage,
 6    OutgoingAudioTrackOptions,
 7    OutgoingTrack,
 8)
 9from .errors import AgentAuthError, AgentError
10
11__all__ = [
12    "Agent",
13    "AgentError",
14    "AgentSession",
15    "AgentAuthError",
16    "IncomingTrackData",
17    "IncomingTrackImage",
18    "OutgoingTrack",
19    "OutgoingAudioTrackOptions",
20]
class Agent:
182class Agent:
183    """Allows for connecting to a Fishjam room as an agent peer.
184
185    Provides callbacks for receiving audio.
186
187    Attributes:
188        id: The unique identifier of the agent.
189        room_id: The ID of the room the agent is connecting to.
190    """
191
192    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
193        """Creates an Agent instance.
194
195        This constructor should not be called directly.
196        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
197
198        Args:
199            id: The unique identifier for the agent.
200            room_id: The ID of the room the agent will join.
201            token: The authentication token for the agent.
202            fishjam_url: The URL of the Fishjam instance.
203        """
204        self.id = id
205        self.room_id = room_id
206
207        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
208        self._token = token
209
210    @asynccontextmanager
211    async def connect(self):
212        """Connects the agent to Fishjam to start receiving messages.
213
214        Incoming messages from Fishjam will be routed to handlers
215        defined with :func:`on_track_data`.
216
217        Yields:
218            AgentSession: An active session for sending media and handling events.
219
220        Raises:
221            AgentAuthError: If authentication with the Fishjam server fails.
222        """
223        async with client.connect(self._socket_url) as websocket:
224            await self._authenticate(websocket)
225            yield AgentSession(self, websocket)
226
227    async def _authenticate(self, websocket: ClientConnection):
228        req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token))
229        try:
230            await websocket.send(bytes(req))
231            # Fishjam will close the socket if auth fails and send a response on success
232            await websocket.recv(decode=False)
233        except ConnectionClosed:
234            raise AgentAuthError(websocket.close_reason or "")

Allows for connecting to a Fishjam room as an agent peer.

Provides callbacks for receiving audio.

Attributes:

  • id: The unique identifier of the agent.
  • room_id: The ID of the room the agent is connecting to.
Agent(id: 'str', room_id: 'str', token: 'str', fishjam_url: 'str')
192    def __init__(self, id: str, room_id: str, token: str, fishjam_url: str):
193        """Creates an Agent instance.
194
195        This constructor should not be called directly.
196        Instead, you should call :func:`fishjam.FishjamClient.create_agent`.
197
198        Args:
199            id: The unique identifier for the agent.
200            room_id: The ID of the room the agent will join.
201            token: The authentication token for the agent.
202            fishjam_url: The URL of the Fishjam instance.
203        """
204        self.id = id
205        self.room_id = room_id
206
207        self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws")
208        self._token = token

Creates an Agent instance.

This constructor should not be called directly. Instead, you should call fishjam.FishjamClient.create_agent().

Args:

  • id: The unique identifier for the agent.
  • room_id: The ID of the room the agent will join.
  • token: The authentication token for the agent.
  • fishjam_url: The URL of the Fishjam instance.
id
room_id
@asynccontextmanager
async def connect(self):
210    @asynccontextmanager
211    async def connect(self):
212        """Connects the agent to Fishjam to start receiving messages.
213
214        Incoming messages from Fishjam will be routed to handlers
215        defined with :func:`on_track_data`.
216
217        Yields:
218            AgentSession: An active session for sending media and handling events.
219
220        Raises:
221            AgentAuthError: If authentication with the Fishjam server fails.
222        """
223        async with client.connect(self._socket_url) as websocket:
224            await self._authenticate(websocket)
225            yield AgentSession(self, websocket)

Connects the agent to Fishjam to start receiving messages.

Incoming messages from Fishjam will be routed to handlers defined with on_track_data().

Yields:

  • AgentSession: An active session for sending media and handling events.

Raises:

  • AgentAuthError: If authentication with the Fishjam server fails.
class AgentError(builtins.Exception):
2class AgentError(Exception):
3    """Base exception class for all agent exceptions."""

Base exception class for all agent exceptions.

Inherited Members
builtins.BaseException
BaseException
with_traceback
add_note
args
class AgentSession:
 99class AgentSession:
100    """Represents an active connection session for an Agent."""
101
102    def __init__(self, agent: Agent, websocket: ClientConnection):
103        """Initializes the AgentSession.
104
105        Args:
106            agent: The Agent instance owning this session.
107            websocket: The active websocket connection.
108        """
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False
113
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """Returns an async iterator over incoming messages from Fishjam.
116
117        Yields:
118            IncomingAgentMessage: The next message received from the server.
119        """
120        while message := await self._ws.recv(decode=False):
121            parsed = AgentResponse().parse(message)
122            _, msg = betterproto.which_one_of(parsed, "content")
123            match msg:
124                case IncomingTrackData() as content:
125                    yield content
126                case IncomingTrackImage() as content:
127                    yield content
128
129    async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack:
130        """Adds a track to the connected agent with the specified options.
131
132        Args:
133            options: Configuration options and metadata for the new track.
134
135        Returns:
136            OutgoingTrack: An object used to send data over the added track.
137        """
138        track_id = uuid.uuid4().hex
139        metadata_json = json.dumps(options.metadata)
140        message = AgentRequest(
141            add_track=AgentRequestAddTrack(
142                track=Track(
143                    id=track_id,
144                    type=TrackType.TRACK_TYPE_AUDIO,
145                    metadata=metadata_json,
146                ),
147                codec_params=AgentRequestAddTrackCodecParameters(
148                    encoding=options.encoding,
149                    sample_rate=options.sample_rate,
150                    channels=options.channels,
151                ),
152            )
153        )
154        await self._send(message)
155        return OutgoingTrack(id=track_id, session=self, options=options)
156
157    async def capture_image(self, track_id: str):
158        """Requests a image capture from a remote track.
159
160        The captured image will be delivered as an
161        :class:`IncomingTrackImage` message through :func:`receive`.
162
163        Args:
164            track_id: The identifier of the track to capture an image from.
165        """
166        message = AgentRequest(
167            capture_image=AgentRequestCaptureImage(track_id=track_id)
168        )
169        await self._send(message)
170
171    async def _send(self, message: AgentRequest):
172        await self._ws.send(bytes(message), text=False)
173
174    async def disconnect(self):
175        """Ends the agent session by closing the websocket connection.
176
177        Useful when you don't use the context manager to obtain the session.
178        """
179        await self._ws.close()

Represents an active connection session for an Agent.

AgentSession(agent: 'Agent', websocket: 'ClientConnection')
102    def __init__(self, agent: Agent, websocket: ClientConnection):
103        """Initializes the AgentSession.
104
105        Args:
106            agent: The Agent instance owning this session.
107            websocket: The active websocket connection.
108        """
109        self.agent = agent
110
111        self._ws = websocket
112        self._closed = False

Initializes the AgentSession.

Args:

  • agent: The Agent instance owning this session.
  • websocket: The active websocket connection.
agent
async def receive(self) -> 'AsyncIterator[IncomingAgentMessage]':
114    async def receive(self) -> AsyncIterator[IncomingAgentMessage]:
115        """Returns an async iterator over incoming messages from Fishjam.
116
117        Yields:
118            IncomingAgentMessage: The next message received from the server.
119        """
120        while message := await self._ws.recv(decode=False):
121            parsed = AgentResponse().parse(message)
122            _, msg = betterproto.which_one_of(parsed, "content")
123            match msg:
124                case IncomingTrackData() as content:
125                    yield content
126                case IncomingTrackImage() as content:
127                    yield content

Returns an async iterator over incoming messages from Fishjam.

Yields:

  • IncomingAgentMessage: The next message received from the server.
async def add_track(self, options: 'OutgoingAudioTrackOptions') -> 'OutgoingTrack':
129    async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack:
130        """Adds a track to the connected agent with the specified options.
131
132        Args:
133            options: Configuration options and metadata for the new track.
134
135        Returns:
136            OutgoingTrack: An object used to send data over the added track.
137        """
138        track_id = uuid.uuid4().hex
139        metadata_json = json.dumps(options.metadata)
140        message = AgentRequest(
141            add_track=AgentRequestAddTrack(
142                track=Track(
143                    id=track_id,
144                    type=TrackType.TRACK_TYPE_AUDIO,
145                    metadata=metadata_json,
146                ),
147                codec_params=AgentRequestAddTrackCodecParameters(
148                    encoding=options.encoding,
149                    sample_rate=options.sample_rate,
150                    channels=options.channels,
151                ),
152            )
153        )
154        await self._send(message)
155        return OutgoingTrack(id=track_id, session=self, options=options)

Adds a track to the connected agent with the specified options.

Args:

  • options: Configuration options and metadata for the new track.

Returns:

  • OutgoingTrack: An object used to send data over the added track.
async def capture_image(self, track_id: 'str'):
157    async def capture_image(self, track_id: str):
158        """Requests a image capture from a remote track.
159
160        The captured image will be delivered as an
161        :class:`IncomingTrackImage` message through :func:`receive`.
162
163        Args:
164            track_id: The identifier of the track to capture an image from.
165        """
166        message = AgentRequest(
167            capture_image=AgentRequestCaptureImage(track_id=track_id)
168        )
169        await self._send(message)

Requests a image capture from a remote track.

The captured image will be delivered as an IncomingTrackImage message through receive().

Args:

  • track_id: The identifier of the track to capture an image from.
async def disconnect(self):
174    async def disconnect(self):
175        """Ends the agent session by closing the websocket connection.
176
177        Useful when you don't use the context manager to obtain the session.
178        """
179        await self._ws.close()

Ends the agent session by closing the websocket connection.

Useful when you don't use the context manager to obtain the session.

class AgentAuthError(fishjam.agent.AgentError):
 6class AgentAuthError(AgentError):
 7    """Agent failed to authenticate properly."""
 8
 9    def __init__(self, reason: str):
10        self.reason = reason
11
12    def __str__(self) -> str:
13        return f"agent failed to authenticate: {self.reason}"

Agent failed to authenticate properly.

AgentAuthError(reason: str)
 9    def __init__(self, reason: str):
10        self.reason = reason
reason
Inherited Members
builtins.BaseException
with_traceback
add_note
args
IncomingTrackData = <class 'fishjam.events._protos.fishjam.AgentResponseTrackData'>
IncomingTrackImage = <class 'fishjam.events._protos.fishjam.AgentResponseTrackImage'>
@dataclass(frozen=True)
class OutgoingTrack:
50@dataclass(frozen=True)
51class OutgoingTrack:
52    """Represents an outgoing track of an agent connected to Fishjam.
53
54    This is created by :func:`Agent.add_track`.
55
56    Attributes:
57        id: The global identifier of the track.
58        session: The agent the track belongs to.
59        options: The parameters used to create the track.
60    """
61
62    id: str
63    session: AgentSession
64    options: OutgoingAudioTrackOptions
65
66    async def send_chunk(self, data: bytes):
67        """Sends a chunk of audio to Fishjam on this track.
68
69        Peers connected to the room of the agent will receive this data.
70
71        Args:
72            data: The raw audio bytes to send.
73        """
74        message = AgentRequest(
75            track_data=OutgoingTrackData(
76                track_id=self.id,
77                data=data,
78            )
79        )
80
81        await self.session._send(message)
82
83    async def interrupt(self):
84        """Interrupts the current track.
85
86        Any audio that has been sent, but not played, will be cleared and
87        prevented from playing. Audio sent after the interrupt will be played
88        normally.
89        """
90        message = AgentRequest(
91            interrupt_track=AgentRequestInterruptTrack(
92                track_id=self.id,
93            )
94        )
95
96        await self.session._send(message)

Represents an outgoing track of an agent connected to Fishjam.

This is created by Agent.add_track().

Attributes:

  • id: The global identifier of the track.
  • session: The agent the track belongs to.
  • options: The parameters used to create the track.
OutgoingTrack( id: 'str', session: 'AgentSession', options: 'OutgoingAudioTrackOptions')
id: 'str'
session: 'AgentSession'
options: 'OutgoingAudioTrackOptions'
async def send_chunk(self, data: 'bytes'):
66    async def send_chunk(self, data: bytes):
67        """Sends a chunk of audio to Fishjam on this track.
68
69        Peers connected to the room of the agent will receive this data.
70
71        Args:
72            data: The raw audio bytes to send.
73        """
74        message = AgentRequest(
75            track_data=OutgoingTrackData(
76                track_id=self.id,
77                data=data,
78            )
79        )
80
81        await self.session._send(message)

Sends a chunk of audio to Fishjam on this track.

Peers connected to the room of the agent will receive this data.

Args:

  • data: The raw audio bytes to send.
async def interrupt(self):
83    async def interrupt(self):
84        """Interrupts the current track.
85
86        Any audio that has been sent, but not played, will be cleared and
87        prevented from playing. Audio sent after the interrupt will be played
88        normally.
89        """
90        message = AgentRequest(
91            interrupt_track=AgentRequestInterruptTrack(
92                track_id=self.id,
93            )
94        )
95
96        await self.session._send(message)

Interrupts the current track.

Any audio that has been sent, but not played, will be cleared and prevented from playing. Audio sent after the interrupt will be played normally.

@dataclass
class OutgoingAudioTrackOptions:
32@dataclass
33class OutgoingAudioTrackOptions:
34    """Parameters of an outgoing audio track.
35
36    Attributes:
37        encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
38        sample_rate: The sample rate of the audio source. Defaults to 16000.
39        channels: The number of channels in the audio source. Supported values are
40            1 (mono) and 2 (stereo). Defaults to 1 (mono).
41        metadata: Custom metadata for the track. Must be JSON-encodable.
42    """
43
44    encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED
45    sample_rate: Literal[16000, 24000] = 16000
46    channels: Literal[1, 2] = 1
47    metadata: dict[str, Any] | None = None

Parameters of an outgoing audio track.

Attributes:

  • encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
  • sample_rate: The sample rate of the audio source. Defaults to 16000.
  • channels: The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono).
  • metadata: Custom metadata for the track. Must be JSON-encodable.
OutgoingAudioTrackOptions( encoding: 'TrackEncoding' = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>, sample_rate: 'Literal[16000, 24000]' = 16000, channels: 'Literal[1, 2]' = 1, metadata: 'dict[str, Any] | None' = None)
encoding: 'TrackEncoding' = <TrackEncoding.TRACK_ENCODING_UNSPECIFIED: 0>
sample_rate: 'Literal[16000, 24000]' = 16000
channels: 'Literal[1, 2]' = 1
metadata: 'dict[str, Any] | None' = None