1from .agent import ( 2 Agent, 3 AgentSession, 4 IncomingTrackData, 5 IncomingTrackImage, 6 OutgoingAudioTrackOptions, 7 OutgoingTrack, 8) 9from .errors import AgentAuthError, AgentError 10 11__all__ = [ 12 "Agent", 13 "AgentError", 14 "AgentSession", 15 "AgentAuthError", 16 "IncomingTrackData", 17 "IncomingTrackImage", 18 "OutgoingTrack", 19 "OutgoingAudioTrackOptions", 20]
182class Agent: 183 """Allows for connecting to a Fishjam room as an agent peer. 184 185 Provides callbacks for receiving audio. 186 187 Attributes: 188 id: The unique identifier of the agent. 189 room_id: The ID of the room the agent is connecting to. 190 """ 191 192 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 193 """Creates an Agent instance. 194 195 This constructor should not be called directly. 196 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 197 198 Args: 199 id: The unique identifier for the agent. 200 room_id: The ID of the room the agent will join. 201 token: The authentication token for the agent. 202 fishjam_url: The URL of the Fishjam instance. 203 """ 204 self.id = id 205 self.room_id = room_id 206 207 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 208 self._token = token 209 210 @asynccontextmanager 211 async def connect(self): 212 """Connects the agent to Fishjam to start receiving messages. 213 214 Incoming messages from Fishjam will be routed to handlers 215 defined with :func:`on_track_data`. 216 217 Yields: 218 AgentSession: An active session for sending media and handling events. 219 220 Raises: 221 AgentAuthError: If authentication with the Fishjam server fails. 222 """ 223 async with client.connect(self._socket_url) as websocket: 224 await self._authenticate(websocket) 225 yield AgentSession(self, websocket) 226 227 async def _authenticate(self, websocket: ClientConnection): 228 req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token)) 229 try: 230 await websocket.send(bytes(req)) 231 # Fishjam will close the socket if auth fails and send a response on success 232 await websocket.recv(decode=False) 233 except ConnectionClosed: 234 raise AgentAuthError(websocket.close_reason or "")
Allows for connecting to a Fishjam room as an agent peer.
Provides callbacks for receiving audio.
Attributes:
- id: The unique identifier of the agent.
- room_id: The ID of the room the agent is connecting to.
192 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 193 """Creates an Agent instance. 194 195 This constructor should not be called directly. 196 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 197 198 Args: 199 id: The unique identifier for the agent. 200 room_id: The ID of the room the agent will join. 201 token: The authentication token for the agent. 202 fishjam_url: The URL of the Fishjam instance. 203 """ 204 self.id = id 205 self.room_id = room_id 206 207 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 208 self._token = token
Creates an Agent instance.
This constructor should not be called directly.
Instead, you should call fishjam.FishjamClient.create_agent().
Args:
- id: The unique identifier for the agent.
- room_id: The ID of the room the agent will join.
- token: The authentication token for the agent.
- fishjam_url: The URL of the Fishjam instance.
210 @asynccontextmanager 211 async def connect(self): 212 """Connects the agent to Fishjam to start receiving messages. 213 214 Incoming messages from Fishjam will be routed to handlers 215 defined with :func:`on_track_data`. 216 217 Yields: 218 AgentSession: An active session for sending media and handling events. 219 220 Raises: 221 AgentAuthError: If authentication with the Fishjam server fails. 222 """ 223 async with client.connect(self._socket_url) as websocket: 224 await self._authenticate(websocket) 225 yield AgentSession(self, websocket)
Connects the agent to Fishjam to start receiving messages.
Incoming messages from Fishjam will be routed to handlers
defined with on_track_data().
Yields:
- AgentSession: An active session for sending media and handling events.
Raises:
- AgentAuthError: If authentication with the Fishjam server fails.
Base exception class for all agent exceptions.
Inherited Members
- builtins.BaseException
- BaseException
- with_traceback
- add_note
- args
99class AgentSession: 100 """Represents an active connection session for an Agent.""" 101 102 def __init__(self, agent: Agent, websocket: ClientConnection): 103 """Initializes the AgentSession. 104 105 Args: 106 agent: The Agent instance owning this session. 107 websocket: The active websocket connection. 108 """ 109 self.agent = agent 110 111 self._ws = websocket 112 self._closed = False 113 114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """Returns an async iterator over incoming messages from Fishjam. 116 117 Yields: 118 IncomingAgentMessage: The next message received from the server. 119 """ 120 while message := await self._ws.recv(decode=False): 121 parsed = AgentResponse().parse(message) 122 _, msg = betterproto.which_one_of(parsed, "content") 123 match msg: 124 case IncomingTrackData() as content: 125 yield content 126 case IncomingTrackImage() as content: 127 yield content 128 129 async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack: 130 """Adds a track to the connected agent with the specified options. 131 132 Args: 133 options: Configuration options and metadata for the new track. 134 135 Returns: 136 OutgoingTrack: An object used to send data over the added track. 137 """ 138 track_id = uuid.uuid4().hex 139 metadata_json = json.dumps(options.metadata) 140 message = AgentRequest( 141 add_track=AgentRequestAddTrack( 142 track=Track( 143 id=track_id, 144 type=TrackType.TRACK_TYPE_AUDIO, 145 metadata=metadata_json, 146 ), 147 codec_params=AgentRequestAddTrackCodecParameters( 148 encoding=options.encoding, 149 sample_rate=options.sample_rate, 150 channels=options.channels, 151 ), 152 ) 153 ) 154 await self._send(message) 155 return OutgoingTrack(id=track_id, session=self, options=options) 156 157 async def capture_image(self, track_id: str): 158 """Requests a image capture from a remote track. 159 160 The captured image will be delivered as an 161 :class:`IncomingTrackImage` message through :func:`receive`. 162 163 Args: 164 track_id: The identifier of the track to capture an image from. 165 """ 166 message = AgentRequest( 167 capture_image=AgentRequestCaptureImage(track_id=track_id) 168 ) 169 await self._send(message) 170 171 async def _send(self, message: AgentRequest): 172 await self._ws.send(bytes(message), text=False) 173 174 async def disconnect(self): 175 """Ends the agent session by closing the websocket connection. 176 177 Useful when you don't use the context manager to obtain the session. 178 """ 179 await self._ws.close()
Represents an active connection session for an Agent.
102 def __init__(self, agent: Agent, websocket: ClientConnection): 103 """Initializes the AgentSession. 104 105 Args: 106 agent: The Agent instance owning this session. 107 websocket: The active websocket connection. 108 """ 109 self.agent = agent 110 111 self._ws = websocket 112 self._closed = False
Initializes the AgentSession.
Args:
- agent: The Agent instance owning this session.
- websocket: The active websocket connection.
114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """Returns an async iterator over incoming messages from Fishjam. 116 117 Yields: 118 IncomingAgentMessage: The next message received from the server. 119 """ 120 while message := await self._ws.recv(decode=False): 121 parsed = AgentResponse().parse(message) 122 _, msg = betterproto.which_one_of(parsed, "content") 123 match msg: 124 case IncomingTrackData() as content: 125 yield content 126 case IncomingTrackImage() as content: 127 yield content
Returns an async iterator over incoming messages from Fishjam.
Yields:
- IncomingAgentMessage: The next message received from the server.
129 async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack: 130 """Adds a track to the connected agent with the specified options. 131 132 Args: 133 options: Configuration options and metadata for the new track. 134 135 Returns: 136 OutgoingTrack: An object used to send data over the added track. 137 """ 138 track_id = uuid.uuid4().hex 139 metadata_json = json.dumps(options.metadata) 140 message = AgentRequest( 141 add_track=AgentRequestAddTrack( 142 track=Track( 143 id=track_id, 144 type=TrackType.TRACK_TYPE_AUDIO, 145 metadata=metadata_json, 146 ), 147 codec_params=AgentRequestAddTrackCodecParameters( 148 encoding=options.encoding, 149 sample_rate=options.sample_rate, 150 channels=options.channels, 151 ), 152 ) 153 ) 154 await self._send(message) 155 return OutgoingTrack(id=track_id, session=self, options=options)
Adds a track to the connected agent with the specified options.
Args:
- options: Configuration options and metadata for the new track.
Returns:
- OutgoingTrack: An object used to send data over the added track.
157 async def capture_image(self, track_id: str): 158 """Requests a image capture from a remote track. 159 160 The captured image will be delivered as an 161 :class:`IncomingTrackImage` message through :func:`receive`. 162 163 Args: 164 track_id: The identifier of the track to capture an image from. 165 """ 166 message = AgentRequest( 167 capture_image=AgentRequestCaptureImage(track_id=track_id) 168 ) 169 await self._send(message)
Requests a image capture from a remote track.
The captured image will be delivered as an
IncomingTrackImage message through receive().
Args:
- track_id: The identifier of the track to capture an image from.
174 async def disconnect(self): 175 """Ends the agent session by closing the websocket connection. 176 177 Useful when you don't use the context manager to obtain the session. 178 """ 179 await self._ws.close()
Ends the agent session by closing the websocket connection.
Useful when you don't use the context manager to obtain the session.
6class AgentAuthError(AgentError): 7 """Agent failed to authenticate properly.""" 8 9 def __init__(self, reason: str): 10 self.reason = reason 11 12 def __str__(self) -> str: 13 return f"agent failed to authenticate: {self.reason}"
Agent failed to authenticate properly.
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
50@dataclass(frozen=True) 51class OutgoingTrack: 52 """Represents an outgoing track of an agent connected to Fishjam. 53 54 This is created by :func:`Agent.add_track`. 55 56 Attributes: 57 id: The global identifier of the track. 58 session: The agent the track belongs to. 59 options: The parameters used to create the track. 60 """ 61 62 id: str 63 session: AgentSession 64 options: OutgoingAudioTrackOptions 65 66 async def send_chunk(self, data: bytes): 67 """Sends a chunk of audio to Fishjam on this track. 68 69 Peers connected to the room of the agent will receive this data. 70 71 Args: 72 data: The raw audio bytes to send. 73 """ 74 message = AgentRequest( 75 track_data=OutgoingTrackData( 76 track_id=self.id, 77 data=data, 78 ) 79 ) 80 81 await self.session._send(message) 82 83 async def interrupt(self): 84 """Interrupts the current track. 85 86 Any audio that has been sent, but not played, will be cleared and 87 prevented from playing. Audio sent after the interrupt will be played 88 normally. 89 """ 90 message = AgentRequest( 91 interrupt_track=AgentRequestInterruptTrack( 92 track_id=self.id, 93 ) 94 ) 95 96 await self.session._send(message)
Represents an outgoing track of an agent connected to Fishjam.
This is created by Agent.add_track().
Attributes:
- id: The global identifier of the track.
- session: The agent the track belongs to.
- options: The parameters used to create the track.
66 async def send_chunk(self, data: bytes): 67 """Sends a chunk of audio to Fishjam on this track. 68 69 Peers connected to the room of the agent will receive this data. 70 71 Args: 72 data: The raw audio bytes to send. 73 """ 74 message = AgentRequest( 75 track_data=OutgoingTrackData( 76 track_id=self.id, 77 data=data, 78 ) 79 ) 80 81 await self.session._send(message)
Sends a chunk of audio to Fishjam on this track.
Peers connected to the room of the agent will receive this data.
Args:
- data: The raw audio bytes to send.
83 async def interrupt(self): 84 """Interrupts the current track. 85 86 Any audio that has been sent, but not played, will be cleared and 87 prevented from playing. Audio sent after the interrupt will be played 88 normally. 89 """ 90 message = AgentRequest( 91 interrupt_track=AgentRequestInterruptTrack( 92 track_id=self.id, 93 ) 94 ) 95 96 await self.session._send(message)
Interrupts the current track.
Any audio that has been sent, but not played, will be cleared and prevented from playing. Audio sent after the interrupt will be played normally.
32@dataclass 33class OutgoingAudioTrackOptions: 34 """Parameters of an outgoing audio track. 35 36 Attributes: 37 encoding: The encoding of the audio source. Defaults to raw 16-bit PCM. 38 sample_rate: The sample rate of the audio source. Defaults to 16000. 39 channels: The number of channels in the audio source. Supported values are 40 1 (mono) and 2 (stereo). Defaults to 1 (mono). 41 metadata: Custom metadata for the track. Must be JSON-encodable. 42 """ 43 44 encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED 45 sample_rate: Literal[16000, 24000] = 16000 46 channels: Literal[1, 2] = 1 47 metadata: dict[str, Any] | None = None
Parameters of an outgoing audio track.
Attributes:
- encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
- sample_rate: The sample rate of the audio source. Defaults to 16000.
- channels: The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono).
- metadata: Custom metadata for the track. Must be JSON-encodable.