class
Agent:
33class Agent: 34 """ 35 Allows for connecting to a Fishjam room as an agent peer. 36 Provides callbacks for receiving audio. 37 """ 38 39 def __init__(self, id: str, token: str, fishjam_url: str): 40 """ 41 Create FishjamAgent instance, providing the fishjam id and management token. 42 """ 43 44 self.id = id 45 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 46 self._token = token 47 self._msg_loop: asyncio.Task[None] | None = None 48 self._end_event = asyncio.Event() 49 50 @functools.singledispatch 51 def _message_handler(content: Any) -> None: 52 raise TypeError(f"Unexpected message of type #{type(content)}") 53 54 @_message_handler.register 55 def _(_content: AgentResponseTrackData): 56 return 57 58 self._dispatch_handler = _message_handler 59 60 def on_track_data(self, handler: TrackDataHandlerT) -> TrackDataHandlerT: 61 """ 62 Decorator used for defining a handler for track data messages from Fishjam. 63 """ 64 self._dispatch_handler.register(AgentResponseTrackData, handler) 65 return handler 66 67 async def connect(self): 68 """ 69 Connect the agent to Fishjam to start receiving messages. 70 71 Incoming messages from Fishjam will be routed to handlers 72 defined with :func:`on_track_data`. 73 74 :raises AgentAuthError: authentication failed 75 """ 76 await self.disconnect() 77 78 websocket = await client.connect(self._socket_url) 79 await self._authenticate(websocket) 80 81 task = asyncio.create_task(self._recv_loop(websocket)) 82 83 self._msg_loop = task 84 85 async def disconnect(self, code: CloseCode = CloseCode.NORMAL_CLOSURE): 86 """ 87 Disconnect the agent from Fishjam. 88 89 Does nothing if already disconnected. 90 """ 91 if (task := self._msg_loop) is None: 92 return 93 94 event = self._end_event 95 96 self._end_event = asyncio.Event() 97 self._msg_loop = None 98 99 task.add_done_callback(lambda _t: event.set()) 100 if task.cancel(code): 101 await event.wait() 102 103 async def __aenter__(self): 104 await self.connect() 105 return self 106 107 async def __aexit__( 108 self, 109 exc_type: type[BaseException] | None, 110 exc_value: BaseException | None, 111 traceback: TracebackType | None, 112 ): 113 if exc_type is not None: 114 await self.disconnect(CloseCode.INTERNAL_ERROR) 115 else: 116 await self.disconnect() 117 118 async def _authenticate(self, websocket: ClientConnection): 119 req = AgentRequest(auth_request=AgentRequestAuthRequest(token=self._token)) 120 try: 121 await websocket.send(bytes(req)) 122 # Fishjam will close the socket if auth fails and send a response on success 123 await websocket.recv(decode=False) 124 except ConnectionClosed as e: 125 raise AgentAuthError(e.reason) 126 127 async def _recv_loop(self, websocket: ClientConnection): 128 close_code = CloseCode.NORMAL_CLOSURE 129 try: 130 while True: 131 message = await websocket.recv(decode=False) 132 message = AgentResponse().parse(message) 133 134 _which, content = betterproto.which_one_of(message, "content") 135 self._dispatch_handler(content) 136 except ConnectionClosed as e: 137 if not _close_ok(e): 138 close_code = CloseCode.INTERNAL_ERROR 139 raise 140 except asyncio.CancelledError as e: 141 # NOTE: e.args[0] is the close code supplied by disconnect() 142 # However cancellation can have other causes, which we treat as normal 143 with suppress(IndexError): 144 close_code = e.args[0] 145 raise 146 except Exception: 147 close_code = CloseCode.INTERNAL_ERROR 148 raise 149 finally: 150 await websocket.close(close_code)
Allows for connecting to a Fishjam room as an agent peer. Provides callbacks for receiving audio.
Agent(id: str, token: str, fishjam_url: str)
39 def __init__(self, id: str, token: str, fishjam_url: str): 40 """ 41 Create FishjamAgent instance, providing the fishjam id and management token. 42 """ 43 44 self.id = id 45 self._socket_url = f"{fishjam_url}/socket/agent/websocket".replace("http", "ws") 46 self._token = token 47 self._msg_loop: asyncio.Task[None] | None = None 48 self._end_event = asyncio.Event() 49 50 @functools.singledispatch 51 def _message_handler(content: Any) -> None: 52 raise TypeError(f"Unexpected message of type #{type(content)}") 53 54 @_message_handler.register 55 def _(_content: AgentResponseTrackData): 56 return 57 58 self._dispatch_handler = _message_handler
Create FishjamAgent instance, providing the fishjam id and management token.
def
on_track_data(self, handler: ~TrackDataHandlerT) -> ~TrackDataHandlerT:
60 def on_track_data(self, handler: TrackDataHandlerT) -> TrackDataHandlerT: 61 """ 62 Decorator used for defining a handler for track data messages from Fishjam. 63 """ 64 self._dispatch_handler.register(AgentResponseTrackData, handler) 65 return handler
Decorator used for defining a handler for track data messages from Fishjam.
async def
connect(self):
67 async def connect(self): 68 """ 69 Connect the agent to Fishjam to start receiving messages. 70 71 Incoming messages from Fishjam will be routed to handlers 72 defined with :func:`on_track_data`. 73 74 :raises AgentAuthError: authentication failed 75 """ 76 await self.disconnect() 77 78 websocket = await client.connect(self._socket_url) 79 await self._authenticate(websocket) 80 81 task = asyncio.create_task(self._recv_loop(websocket)) 82 83 self._msg_loop = task
Connect the agent to Fishjam to start receiving messages.
Incoming messages from Fishjam will be routed to handlers
defined with on_track_data().
Raises
- AgentAuthError: authentication failed
async def
disconnect( self, code: websockets.frames.CloseCode = <CloseCode.NORMAL_CLOSURE: 1000>):
85 async def disconnect(self, code: CloseCode = CloseCode.NORMAL_CLOSURE): 86 """ 87 Disconnect the agent from Fishjam. 88 89 Does nothing if already disconnected. 90 """ 91 if (task := self._msg_loop) is None: 92 return 93 94 event = self._end_event 95 96 self._end_event = asyncio.Event() 97 self._msg_loop = None 98 99 task.add_done_callback(lambda _t: event.set()) 100 if task.cancel(code): 101 await event.wait()
Disconnect the agent from Fishjam.
Does nothing if already disconnected.
class
AgentError(builtins.Exception):
Base exception class for all agent exceptions
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args
6class AgentAuthError(AgentError): 7 """Agent failed to authenticate properly""" 8 9 def __init__(self, reason: str): 10 self.reason = reason 11 12 def __str__(self) -> str: 13 return f"agent failed to authenticate: {self.reason}"
Agent failed to authenticate properly
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
TrackDataHandler =
typing.Callable[[AgentResponseTrackData], NoneType]
@dataclass(eq=False, repr=False)
class
AgentResponseTrackData103@dataclass(eq=False, repr=False) 104class AgentResponseTrackData(betterproto.Message): 105 """Notification containing a chunk of a track's data stream""" 106 107 peer_id: str = betterproto.string_field(1) 108 track: "notifications.Track" = betterproto.message_field(2) 109 data: bytes = betterproto.bytes_field(3)
Notification containing a chunk of a track's data stream
AgentResponseTrackData( peer_id: str = <object object>, track: fishjam.events.Track = <object object>, data: bytes = <object object>)
Inherited Members
- betterproto.Message
- SerializeToString
- parse
- FromString
- to_dict
- from_dict
- to_json
- from_json
- to_pydict
- from_pydict
- is_set