1from .agent import ( 2 Agent, 3 AgentSession, 4 IncomingTrackData, 5 OutgoingAudioTrackOptions, 6 OutgoingTrack, 7) 8from .errors import AgentAuthError, AgentError 9 10__all__ = [ 11 "Agent", 12 "AgentError", 13 "AgentSession", 14 "AgentAuthError", 15 "IncomingTrackData", 16 "OutgoingTrack", 17 "OutgoingAudioTrackOptions", 18]
163class Agent: 164 """ 165 Allows for connecting to a Fishjam room as an agent peer. 166 Provides callbacks for receiving audio. 167 """ 168 169 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 170 """ 171 Create Agent instance, providing the fishjam id and management token. 172 173 This constructor should not be called directly. 174 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 175 """ 176 177 self.id = id 178 self.room_id = room_id 179 180 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 181 self._token = token 182 183 @asynccontextmanager 184 async def connect(self): 185 """ 186 Connect the agent to Fishjam to start receiving messages. 187 188 Incoming messages from Fishjam will be routed to handlers 189 defined with :func:`on_track_data`. 190 191 :raises AgentAuthError: authentication failed 192 """ 193 async with client.connect(self._socket_url) as websocket: 194 await self._authenticate(websocket) 195 yield AgentSession(self, websocket) 196 197 async def _authenticate(self, websocket: ClientConnection): 198 req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token)) 199 try: 200 await websocket.send(bytes(req)) 201 # Fishjam will close the socket if auth fails and send a response on success 202 await websocket.recv(decode=False) 203 except ConnectionClosed: 204 raise AgentAuthError(websocket.close_reason or "")
Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.
169 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 170 """ 171 Create Agent instance, providing the fishjam id and management token. 172 173 This constructor should not be called directly. 174 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 175 """ 176 177 self.id = id 178 self.room_id = room_id 179 180 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 181 self._token = token
Create Agent instance, providing the fishjam id and management token.
This constructor should not be called directly.
Instead, you should call fishjam.FishjamClient.create_agent().
183 @asynccontextmanager 184 async def connect(self): 185 """ 186 Connect the agent to Fishjam to start receiving messages. 187 188 Incoming messages from Fishjam will be routed to handlers 189 defined with :func:`on_track_data`. 190 191 :raises AgentAuthError: authentication failed 192 """ 193 async with client.connect(self._socket_url) as websocket: 194 await self._authenticate(websocket) 195 yield AgentSession(self, websocket)
Connect the agent to Fishjam to start receiving messages.
Incoming messages from Fishjam will be routed to handlers
defined with on_track_data().
Raises
- AgentAuthError: authentication failed
Base exception class for all agent exceptions
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
107class AgentSession: 108 def __init__(self, agent: Agent, websocket: ClientConnection): 109 self.agent = agent 110 111 self._ws = websocket 112 self._closed = False 113 114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """ 116 Returns an infinite async iterator over the incoming messages from Fishjam to 117 the agent. 118 """ 119 while message := await self._ws.recv(decode=False): 120 parsed = AgentResponse().parse(message) 121 _, msg = betterproto.which_one_of(parsed, "content") 122 match msg: 123 case IncomingTrackData() as content: 124 yield content 125 126 async def add_track(self, options: OutgoingAudioTrackOptions): 127 """ 128 Adds a track to the connected agent, with the specified options and metadata. 129 130 Returns an instance of :class:`OutgoingTrack`, which can be used to send data 131 over the added track. 132 """ 133 track_id = uuid.uuid4().hex 134 metadata_json = json.dumps(options.metadata) 135 message = AgentRequest( 136 add_track=AgentRequestAddTrack( 137 track=Track( 138 id=track_id, 139 type=TrackType.TRACK_TYPE_AUDIO, 140 metadata=metadata_json, 141 ), 142 codec_params=AgentRequestAddTrackCodecParameters( 143 encoding=options.encoding, 144 sample_rate=options.sample_rate, 145 channels=options.channels, 146 ), 147 ) 148 ) 149 await self._send(message) 150 return OutgoingTrack(id=track_id, session=self, options=options) 151 152 async def _send(self, message: AgentRequest): 153 await self._ws.send(bytes(message), text=False) 154 155 async def disconnect(self): 156 """ 157 Ends the agent session by closing the websocket connection. 158 Useful when you don't use the context manager to obtain the session. 159 """ 160 await self._ws.close()
114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """ 116 Returns an infinite async iterator over the incoming messages from Fishjam to 117 the agent. 118 """ 119 while message := await self._ws.recv(decode=False): 120 parsed = AgentResponse().parse(message) 121 _, msg = betterproto.which_one_of(parsed, "content") 122 match msg: 123 case IncomingTrackData() as content: 124 yield content
Returns an infinite async iterator over the incoming messages from Fishjam to the agent.
126 async def add_track(self, options: OutgoingAudioTrackOptions): 127 """ 128 Adds a track to the connected agent, with the specified options and metadata. 129 130 Returns an instance of :class:`OutgoingTrack`, which can be used to send data 131 over the added track. 132 """ 133 track_id = uuid.uuid4().hex 134 metadata_json = json.dumps(options.metadata) 135 message = AgentRequest( 136 add_track=AgentRequestAddTrack( 137 track=Track( 138 id=track_id, 139 type=TrackType.TRACK_TYPE_AUDIO, 140 metadata=metadata_json, 141 ), 142 codec_params=AgentRequestAddTrackCodecParameters( 143 encoding=options.encoding, 144 sample_rate=options.sample_rate, 145 channels=options.channels, 146 ), 147 ) 148 ) 149 await self._send(message) 150 return OutgoingTrack(id=track_id, session=self, options=options)
Adds a track to the connected agent, with the specified options and metadata.
Returns an instance of OutgoingTrack, which can be used to send data
over the added track.
155 async def disconnect(self): 156 """ 157 Ends the agent session by closing the websocket connection. 158 Useful when you don't use the context manager to obtain the session. 159 """ 160 await self._ws.close()
Ends the agent session by closing the websocket connection. Useful when you don't use the context manager to obtain the session.
6class AgentAuthError(AgentError): 7 """Agent failed to authenticate properly""" 8 9 def __init__(self, reason: str): 10 self.reason = reason 11 12 def __str__(self) -> str: 13 return f"agent failed to authenticate: {self.reason}"
Agent failed to authenticate properly
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
60@dataclass(frozen=True) 61class OutgoingTrack: 62 """ 63 Represents an outgoing track of an agent connected to Fishjam, 64 created by :func:`Agent.add_track`. 65 """ 66 67 id: str 68 """The global identifier of the track.""" 69 session: AgentSession 70 """The agent the track belongs to.""" 71 options: OutgoingAudioTrackOptions 72 """The parameters used to create the track.""" 73 74 async def send_chunk(self, data: bytes): 75 """ 76 Send a chunk of audio to Fishjam on this track. 77 78 Peers connected to the room of the agent will receive this data. 79 """ 80 message = AgentRequest( 81 track_data=OutgoingTrackData( 82 track_id=self.id, 83 data=data, 84 ) 85 ) 86 87 await self.session._send(message) 88 89 async def interrupt(self): 90 """ 91 Interrupt current track. 92 93 Any audio that has been sent, but not played 94 will be cleared and be prevented from playing. 95 96 Audio sent after the interrupt will be played normally. 97 """ 98 message = AgentRequest( 99 interrupt_track=AgentRequestInterruptTrack( 100 track_id=self.id, 101 ) 102 ) 103 104 await self.session._send(message)
Represents an outgoing track of an agent connected to Fishjam,
created by Agent.add_track().
74 async def send_chunk(self, data: bytes): 75 """ 76 Send a chunk of audio to Fishjam on this track. 77 78 Peers connected to the room of the agent will receive this data. 79 """ 80 message = AgentRequest( 81 track_data=OutgoingTrackData( 82 track_id=self.id, 83 data=data, 84 ) 85 ) 86 87 await self.session._send(message)
Send a chunk of audio to Fishjam on this track.
Peers connected to the room of the agent will receive this data.
89 async def interrupt(self): 90 """ 91 Interrupt current track. 92 93 Any audio that has been sent, but not played 94 will be cleared and be prevented from playing. 95 96 Audio sent after the interrupt will be played normally. 97 """ 98 message = AgentRequest( 99 interrupt_track=AgentRequestInterruptTrack( 100 track_id=self.id, 101 ) 102 ) 103 104 await self.session._send(message)
Interrupt current track.
Any audio that has been sent, but not played will be cleared and be prevented from playing.
Audio sent after the interrupt will be played normally.
30@dataclass 31class OutgoingAudioTrackOptions: 32 """Parameters of an outgoing audio track.""" 33 34 encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED 35 """ 36 The encoding of the audio source. 37 Defaults to raw 16-bit PCM. 38 """ 39 40 sample_rate: Literal[16000, 24000] = 16000 41 """ 42 The sample rate of the audio source. 43 Defaults to 16000. 44 """ 45 46 channels: Literal[1, 2] = 1 47 """ 48 The number of channels in the audio source. 49 Supported values are 1 (mono) and 2 (stereo). 50 Defaults to 1 (mono) 51 """ 52 53 metadata: dict[str, Any] | None = None 54 """ 55 Custom metadata for the track. 56 Must be JSON-encodable. 57 """
Parameters of an outgoing audio track.