1from .agent import ( 2 Agent, 3 AgentSession, 4 IncomingTrackData, 5 OutgoingAudioTrackOptions, 6 OutgoingTrack, 7) 8from .errors import AgentAuthError, AgentError 9 10__all__ = [ 11 "Agent", 12 "AgentError", 13 "AgentSession", 14 "AgentAuthError", 15 "IncomingTrackData", 16 "OutgoingTrack", 17 "OutgoingAudioTrackOptions", 18]
156class Agent: 157 """ 158 Allows for connecting to a Fishjam room as an agent peer. 159 Provides callbacks for receiving audio. 160 """ 161 162 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 163 """ 164 Create Agent instance, providing the fishjam id and management token. 165 166 This constructor should not be called directly. 167 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 168 """ 169 170 self.id = id 171 self.room_id = room_id 172 173 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 174 self._token = token 175 176 @asynccontextmanager 177 async def connect(self): 178 """ 179 Connect the agent to Fishjam to start receiving messages. 180 181 Incoming messages from Fishjam will be routed to handlers 182 defined with :func:`on_track_data`. 183 184 :raises AgentAuthError: authentication failed 185 """ 186 async with client.connect(self._socket_url) as websocket: 187 await self._authenticate(websocket) 188 yield AgentSession(self, websocket) 189 190 async def _authenticate(self, websocket: ClientConnection): 191 req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token)) 192 try: 193 await websocket.send(bytes(req)) 194 # Fishjam will close the socket if auth fails and send a response on success 195 await websocket.recv(decode=False) 196 except ConnectionClosed: 197 raise AgentAuthError(websocket.close_reason or "")
Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.
162 def __init__(self, id: str, room_id: str, token: str, fishjam_url: str): 163 """ 164 Create Agent instance, providing the fishjam id and management token. 165 166 This constructor should not be called directly. 167 Instead, you should call :func:`fishjam.FishjamClient.create_agent`. 168 """ 169 170 self.id = id 171 self.room_id = room_id 172 173 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 174 self._token = token
Create Agent instance, providing the fishjam id and management token.
This constructor should not be called directly.
Instead, you should call fishjam.FishjamClient.create_agent()
.
176 @asynccontextmanager 177 async def connect(self): 178 """ 179 Connect the agent to Fishjam to start receiving messages. 180 181 Incoming messages from Fishjam will be routed to handlers 182 defined with :func:`on_track_data`. 183 184 :raises AgentAuthError: authentication failed 185 """ 186 async with client.connect(self._socket_url) as websocket: 187 await self._authenticate(websocket) 188 yield AgentSession(self, websocket)
Connect the agent to Fishjam to start receiving messages.
Incoming messages from Fishjam will be routed to handlers
defined with on_track_data()
.
Raises
- AgentAuthError: authentication failed
Base exception class for all agent exceptions
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
107class AgentSession: 108 def __init__(self, agent: Agent, websocket: ClientConnection): 109 self.agent = agent 110 111 self._ws = websocket 112 self._closed = False 113 114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """ 116 Returns an infinite async iterator over the incoming messages from Fishjam to 117 the agent. 118 """ 119 while message := await self._ws.recv(decode=False): 120 parsed = AgentResponse().parse(message) 121 _, msg = betterproto.which_one_of(parsed, "content") 122 match msg: 123 case IncomingTrackData() as content: 124 yield content 125 126 async def add_track(self, options: OutgoingAudioTrackOptions): 127 """ 128 Adds a track to the connected agent, with the specified options and metadata. 129 130 Returns an instance of :class:`OutgoingTrack`, which can be used to send data 131 over the added track. 132 """ 133 track_id = uuid.uuid4().hex 134 metadata_json = json.dumps(options.metadata) 135 message = AgentRequest( 136 add_track=AgentRequestAddTrack( 137 track=Track( 138 id=track_id, 139 type=TrackType.TRACK_TYPE_AUDIO, 140 metadata=metadata_json, 141 ), 142 codec_params=AgentRequestAddTrackCodecParameters( 143 encoding=options.encoding, 144 sample_rate=options.sample_rate, 145 channels=options.channels, 146 ), 147 ) 148 ) 149 await self._send(message) 150 return OutgoingTrack(id=track_id, session=self, options=options) 151 152 async def _send(self, message: AgentRequest): 153 await self._ws.send(bytes(message), text=False)
114 async def receive(self) -> AsyncIterator[IncomingAgentMessage]: 115 """ 116 Returns an infinite async iterator over the incoming messages from Fishjam to 117 the agent. 118 """ 119 while message := await self._ws.recv(decode=False): 120 parsed = AgentResponse().parse(message) 121 _, msg = betterproto.which_one_of(parsed, "content") 122 match msg: 123 case IncomingTrackData() as content: 124 yield content
Returns an infinite async iterator over the incoming messages from Fishjam to the agent.
126 async def add_track(self, options: OutgoingAudioTrackOptions): 127 """ 128 Adds a track to the connected agent, with the specified options and metadata. 129 130 Returns an instance of :class:`OutgoingTrack`, which can be used to send data 131 over the added track. 132 """ 133 track_id = uuid.uuid4().hex 134 metadata_json = json.dumps(options.metadata) 135 message = AgentRequest( 136 add_track=AgentRequestAddTrack( 137 track=Track( 138 id=track_id, 139 type=TrackType.TRACK_TYPE_AUDIO, 140 metadata=metadata_json, 141 ), 142 codec_params=AgentRequestAddTrackCodecParameters( 143 encoding=options.encoding, 144 sample_rate=options.sample_rate, 145 channels=options.channels, 146 ), 147 ) 148 ) 149 await self._send(message) 150 return OutgoingTrack(id=track_id, session=self, options=options)
Adds a track to the connected agent, with the specified options and metadata.
Returns an instance of OutgoingTrack
, which can be used to send data
over the added track.
6class AgentAuthError(AgentError): 7 """Agent failed to authenticate properly""" 8 9 def __init__(self, reason: str): 10 self.reason = reason 11 12 def __str__(self) -> str: 13 return f"agent failed to authenticate: {self.reason}"
Agent failed to authenticate properly
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
60@dataclass(frozen=True) 61class OutgoingTrack: 62 """ 63 Represents an outgoing track of an agent connected to Fishjam, 64 created by :func:`Agent.add_track`. 65 """ 66 67 id: str 68 """The global identifier of the track.""" 69 session: AgentSession 70 """The agent the track belongs to.""" 71 options: OutgoingAudioTrackOptions 72 """The parameters used to create the track.""" 73 74 async def send_chunk(self, data: bytes): 75 """ 76 Send a chunk of audio to Fishjam on this track. 77 78 Peers connected to the room of the agent will receive this data. 79 """ 80 message = AgentRequest( 81 track_data=OutgoingTrackData( 82 track_id=self.id, 83 data=data, 84 ) 85 ) 86 87 await self.session._send(message) 88 89 async def interrupt(self): 90 """ 91 Interrupt current track. 92 93 Any audio that has been sent, but not played 94 will be cleared and be prevented from playing. 95 96 Audio sent after the interrupt will be played normally. 97 """ 98 message = AgentRequest( 99 interrupt_track=AgentRequestInterruptTrack( 100 track_id=self.id, 101 ) 102 ) 103 104 await self.session._send(message)
Represents an outgoing track of an agent connected to Fishjam,
created by Agent.add_track()
.
74 async def send_chunk(self, data: bytes): 75 """ 76 Send a chunk of audio to Fishjam on this track. 77 78 Peers connected to the room of the agent will receive this data. 79 """ 80 message = AgentRequest( 81 track_data=OutgoingTrackData( 82 track_id=self.id, 83 data=data, 84 ) 85 ) 86 87 await self.session._send(message)
Send a chunk of audio to Fishjam on this track.
Peers connected to the room of the agent will receive this data.
89 async def interrupt(self): 90 """ 91 Interrupt current track. 92 93 Any audio that has been sent, but not played 94 will be cleared and be prevented from playing. 95 96 Audio sent after the interrupt will be played normally. 97 """ 98 message = AgentRequest( 99 interrupt_track=AgentRequestInterruptTrack( 100 track_id=self.id, 101 ) 102 ) 103 104 await self.session._send(message)
Interrupt current track.
Any audio that has been sent, but not played will be cleared and be prevented from playing.
Audio sent after the interrupt will be played normally.
30@dataclass 31class OutgoingAudioTrackOptions: 32 """Parameters of an outgoing audio track.""" 33 34 encoding: TrackEncoding = TrackEncoding.TRACK_ENCODING_UNSPECIFIED 35 """ 36 The encoding of the audio source. 37 Defaults to raw 16-bit PCM. 38 """ 39 40 sample_rate: Literal[16000, 24000] = 16000 41 """ 42 The sample rate of the audio source. 43 Defaults to 16000. 44 """ 45 46 channels: Literal[1, 2] = 1 47 """ 48 The number of channels in the audio source. 49 Supported values are 1 (mono) and 2 (stereo). 50 Defaults to 1 (mono) 51 """ 52 53 metadata: dict[str, Any] | None = None 54 """ 55 Custom metadata for the track. 56 Must be JSON-encodable. 57 """
Parameters of an outgoing audio track.