1from .agent import ( 2 Agent, 3 AgentSession, 4 IncomingTrackData, 5 OutgoingAudioTrackOptions, 6 OutgoingTrack, 7) 8from .errors import AgentAuthError, AgentError 9 10__all__ = [ 11 "Agent", 12 "AgentError", 13 "AgentSession", 14 "AgentAuthError", 15 "IncomingTrackData", 16 "OutgoingTrack", 17 "OutgoingAudioTrackOptions", 18]
164class Agent: 165 """Allows for connecting to a Fishjam room as an agent peer. 166 167 Provides callbacks for receiving audio. 168 169 Attributes: 170 id: The unique identifier of the agent. 171 room_id: The ID of the room the agent is connecting to. 172 """ 173 174 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 175 """Creates an Agent instance. 176 177 This constructor should not be called directly. 178 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 179 180 Args: 181 id: The unique identifier for the agent. 182 room_id: The ID of the room the agent will join. 183 token: The authentication token for the agent. 184 fishjam_url: The URL of the Fishjam instance. 185 """ 186 self.id = id 187 self.room_id = room_id 188 189 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 190 self._token = token 191 192 @asynccontextmanager 193 async def connect(self): 194 """Connects the agent to Fishjam to start receiving messages. 195 196 Incoming messages from Fishjam will be routed to handlers 197 defined with :func:`on_track_data`. 198 199 Yields: 200 AgentSession: An active session for sending media and handling events. 201 202 Raises: 203 AgentAuthError: If authentication with the Fishjam server fails. 204 """ 205 async with client.connect(self._socket_url) as websocket: 206 await self._authenticate(websocket) 207 yield AgentSession(self, websocket) 208 209 async def _authenticate(self, websocket: ClientConnection): 210 req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token)) 211 try: 212 await websocket.send(bytes(req)) 213 # Fishjam will close the socket if auth fails and send a response on success 214 await websocket.recv(decode=False) 215 except ConnectionClosed: 216 raise AgentAuthError(websocket.close_reason or "")
Allows for connecting to a Fishjam room as an agent peer.
Provides callbacks for receiving audio.
Attributes:
- id: The unique identifier of the agent.
- room_id: The ID of the room the agent is connecting to.
174 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 175 """Creates an Agent instance. 176 177 This constructor should not be called directly. 178 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 179 180 Args: 181 id: The unique identifier for the agent. 182 room_id: The ID of the room the agent will join. 183 token: The authentication token for the agent. 184 fishjam_url: The URL of the Fishjam instance. 185 """ 186 self.id = id 187 self.room_id = room_id 188 189 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 190 self._token = token
Creates an Agent instance.
This constructor should not be called directly.
Instead, you should call fishjam.FishjamClient.create_agent().
Args:
- id: The unique identifier for the agent.
- room_id: The ID of the room the agent will join.
- token: The authentication token for the agent.
- fishjam_url: The URL of the Fishjam instance.
192 @asynccontextmanager 193 async def connect(self): 194 """Connects the agent to Fishjam to start receiving messages. 195 196 Incoming messages from Fishjam will be routed to handlers 197 defined with :func:`on_track_data`. 198 199 Yields: 200 AgentSession: An active session for sending media and handling events. 201 202 Raises: 203 AgentAuthError: If authentication with the Fishjam server fails. 204 """ 205 async with client.connect(self._socket_url) as websocket: 206 await self._authenticate(websocket) 207 yield AgentSession(self, websocket)
Connects the agent to Fishjam to start receiving messages.
Incoming messages from Fishjam will be routed to handlers
defined with on_track_data().
Yields:
- AgentSession: An active session for sending media and handling events.
Raises:
- AgentAuthError: If authentication with the Fishjam server fails.
Base exception class for all agent exceptions.
Inherited Members
- builtins.BaseException
- BaseException
- with_traceback
- add_note
- args
97class AgentSession: 98 """Represents an active connection session for an Agent.""" 99 100 def __init__(self, agent: Agent, websocket: ClientConnection): 101 """Initializes the AgentSession. 102 103 Args: 104 agent: The Agent instance owning this session. 105 websocket: The active websocket connection. 106 """ 107 self.agent = agent 108 109 self._ws = websocket 110 self._closed = False 111 112 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 113 """Returns an async iterator over incoming messages from Fishjam. 114 115 Yields: 116 IncomingAgentMessage: The next message received from the server. 117 """ 118 while message := await self._ws.recv(decode=False): 119 parsed = AgentResponse().parse(message) 120 _, msg = betterproto.which_one_of(parsed, "content") 121 match msg: 122 case IncomingTrackData() as content: 123 yield content 124 125 async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack: 126 """Adds a track to the connected agent with the specified options. 127 128 Args: 129 options: Configuration options and metadata for the new track. 130 131 Returns: 132 OutgoingTrack: An object used to send data over the added track. 133 """ 134 track_id = uuid.uuid4().hex 135 metadata_json = json.dumps(options.metadata) 136 message = AgentRequest( 137 add_track=AgentRequestAddTrack( 138 track=Track( 139 id=track_id, 140 type=TrackType.TRACK_TYPE_AUDIO, 141 metadata=metadata_json, 142 ), 143 codec_params=AgentRequestAddTrackCodecParameters( 144 encoding=options.encoding, 145 sample_rate=options.sample_rate, 146 channels=options.channels, 147 ), 148 ) 149 ) 150 await self._send(message) 151 return OutgoingTrack(id=track_id, session=self, options=options) 152 153 async def _send(self, message: AgentRequest): 154 await self._ws.send(bytes(message), text=False) 155 156 async def disconnect(self): 157 """Ends the agent session by closing the websocket connection. 158 159 Useful when you don't use the context manager to obtain the session. 160 """ 161 await self._ws.close()
Represents an active connection session for an Agent.
100 def __init__(self, agent: Agent, websocket: ClientConnection): 101 """Initializes the AgentSession. 102 103 Args: 104 agent: The Agent instance owning this session. 105 websocket: The active websocket connection. 106 """ 107 self.agent = agent 108 109 self._ws = websocket 110 self._closed = False
Initializes the AgentSession.
Args:
- agent: The Agent instance owning this session.
- websocket: The active websocket connection.
112 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 113 """Returns an async iterator over incoming messages from Fishjam. 114 115 Yields: 116 IncomingAgentMessage: The next message received from the server. 117 """ 118 while message := await self._ws.recv(decode=False): 119 parsed = AgentResponse().parse(message) 120 _, msg = betterproto.which_one_of(parsed, "content") 121 match msg: 122 case IncomingTrackData() as content: 123 yield content
Returns an async iterator over incoming messages from Fishjam.
Yields:
- IncomingAgentMessage: The next message received from the server.
125 async def add_track(self, options: OutgoingAudioTrackOptions) -> OutgoingTrack: 126 """Adds a track to the connected agent with the specified options. 127 128 Args: 129 options: Configuration options and metadata for the new track. 130 131 Returns: 132 OutgoingTrack: An object used to send data over the added track. 133 """ 134 track_id = uuid.uuid4().hex 135 metadata_json = json.dumps(options.metadata) 136 message = AgentRequest( 137 add_track=AgentRequestAddTrack( 138 track=Track( 139 id=track_id, 140 type=TrackType.TRACK_TYPE_AUDIO, 141 metadata=metadata_json, 142 ), 143 codec_params=AgentRequestAddTrackCodecParameters( 144 encoding=options.encoding, 145 sample_rate=options.sample_rate, 146 channels=options.channels, 147 ), 148 ) 149 ) 150 await self._send(message) 151 return OutgoingTrack(id=track_id, session=self, options=options)
Adds a track to the connected agent with the specified options.
Args:
- options: Configuration options and metadata for the new track.
Returns:
- OutgoingTrack: An object used to send data over the added track.
156 async def disconnect(self): 157 """Ends the agent session by closing the websocket connection. 158 159 Useful when you don't use the context manager to obtain the session. 160 """ 161 await self._ws.close()
Ends the agent session by closing the websocket connection.
Useful when you don't use the context manager to obtain the session.
6class AgentAuthError(AgentError): 7 """Agent failed to authenticate properly.""" 8 9 def __init__(self, reason: str): 10 self.reason = reason 11 12 def __str__(self) -> str: 13 return f"agent failed to authenticate: {self.reason}"
Agent failed to authenticate properly.
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
48@dataclass(frozen=True) 49class OutgoingTrack: 50 """Represents an outgoing track of an agent connected to Fishjam. 51 52 This is created by :func:`Agent.add_track`. 53 54 Attributes: 55 id: The global identifier of the track. 56 session: The agent the track belongs to. 57 options: The parameters used to create the track. 58 """ 59 60 id: str 61 session: AgentSession 62 options: OutgoingAudioTrackOptions 63 64 async def send_chunk(self, data: bytes): 65 """Sends a chunk of audio to Fishjam on this track. 66 67 Peers connected to the room of the agent will receive this data. 68 69 Args: 70 data: The raw audio bytes to send. 71 """ 72 message = AgentRequest( 73 track_data=OutgoingTrackData( 74 track_id=self.id, 75 data=data, 76 ) 77 ) 78 79 await self.session._send(message) 80 81 async def interrupt(self): 82 """Interrupts the current track. 83 84 Any audio that has been sent, but not played, will be cleared and 85 prevented from playing. Audio sent after the interrupt will be played 86 normally. 87 """ 88 message = AgentRequest( 89 interrupt_track=AgentRequestInterruptTrack( 90 track_id=self.id, 91 ) 92 ) 93 94 await self.session._send(message)
Represents an outgoing track of an agent connected to Fishjam.
This is created by Agent.add_track().
Attributes:
- id: The global identifier of the track.
- session: The agent the track belongs to.
- options: The parameters used to create the track.
64 async def send_chunk(self, data: bytes): 65 """Sends a chunk of audio to Fishjam on this track. 66 67 Peers connected to the room of the agent will receive this data. 68 69 Args: 70 data: The raw audio bytes to send. 71 """ 72 message = AgentRequest( 73 track_data=OutgoingTrackData( 74 track_id=self.id, 75 data=data, 76 ) 77 ) 78 79 await self.session._send(message)
Sends a chunk of audio to Fishjam on this track.
Peers connected to the room of the agent will receive this data.
Args:
- data: The raw audio bytes to send.
81 async def interrupt(self): 82 """Interrupts the current track. 83 84 Any audio that has been sent, but not played, will be cleared and 85 prevented from playing. Audio sent after the interrupt will be played 86 normally. 87 """ 88 message = AgentRequest( 89 interrupt_track=AgentRequestInterruptTrack( 90 track_id=self.id, 91 ) 92 ) 93 94 await self.session._send(message)
Interrupts the current track.
Any audio that has been sent, but not played, will be cleared and prevented from playing. Audio sent after the interrupt will be played normally.
30@dataclass 31class OutgoingAudioTrackOptions: 32 """Parameters of an outgoing audio track. 33 34 Attributes: 35 encoding: The encoding of the audio source. Defaults to raw 16-bit PCM. 36 sample_rate: The sample rate of the audio source. Defaults to 16000. 37 channels: The number of channels in the audio source. Supported values are 38 1 (mono) and 2 (stereo). Defaults to 1 (mono). 39 metadata: Custom metadata for the track. Must be JSON-encodable. 40 """ 41 42 encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED 43 sample_rate: Literal[16000, 24000] = 16000 44 channels: Literal[1, 2] = 1 45 metadata: dict[str, Any] | None = None
Parameters of an outgoing audio track.
Attributes:
- encoding: The encoding of the audio source. Defaults to raw 16-bit PCM.
- sample_rate: The sample rate of the audio source. Defaults to 16000.
- channels: The number of channels in the audio source. Supported values are 1 (mono) and 2 (stereo). Defaults to 1 (mono).
- metadata: Custom metadata for the track. Must be JSON-encodable.