diff --git a/examples/duckduckgo-agent.py b/examples/duckduckgo-agent.py new file mode 100644 index 0000000..456811b --- /dev/null +++ b/examples/duckduckgo-agent.py @@ -0,0 +1,100 @@ +# WARNING: This might not work in the future. Do NOT use this in production. + +import asyncio +import socketio +from smolagents import ToolCallingAgent, LiteLLMModel, DuckDuckGoSearchTool + + +from env import WEBUI_URL, TOKEN +from utils import send_message, send_typing + +search_tool = DuckDuckGoSearchTool() + +MODEL_ID = "gpt-4o" + +model = LiteLLMModel( + model_id=f"openai/{MODEL_ID}", api_base=f"{WEBUI_URL}/api/", api_key=TOKEN +) +agent = ToolCallingAgent(tools=[search_tool], model=model) + +# Create an asynchronous Socket.IO client instance +sio = socketio.AsyncClient(logger=False, engineio_logger=False) + + +# Event handlers +@sio.event +async def connect(): + print("Connected!") + + +@sio.event +async def disconnect(): + print("Disconnected from the server!") + + +# Define a function to handle channel events +def events(user_id): + @sio.on("channel-events") + async def channel_events(data): + if data["user"]["id"] == user_id: + # Ignore events from the bot itself + return + + if data["data"]["type"] == "message": + print(f'{data["user"]["name"]}: {data["data"]["data"]["content"]}') + + # Send typing events every second while processing the input + async def simulate_typing(channel_id): + try: + while not processing_event.is_set(): + await send_typing(sio, channel_id) + await asyncio.sleep(1) + except asyncio.CancelledError: + pass + + # Create an asyncio.Event to manage typing simulation + processing_event = asyncio.Event() + typing_task = asyncio.create_task(simulate_typing(data["channel_id"])) + + try: + # Run the blocking agent.run in a non-blocking way using asyncio + loop = asyncio.get_running_loop() + output = await loop.run_in_executor( + None, agent.run, data["data"]["data"]["content"] + ) + finally: + # Signal that typing simulation should stop + processing_event.set() + # Wait for the typing task to finish + await typing_task + + # Send the generated output as a message + await send_message(data["channel_id"], f"{output}") + + +# Define an async function for the main workflow +async def main(): + try: + print(f"Connecting to {WEBUI_URL}...") + await sio.connect( + WEBUI_URL, socketio_path="/ws/socket.io", transports=["websocket"] + ) + print("Connection established!") + except Exception as e: + print(f"Failed to connect: {e}") + return + + # Callback function for user-join + async def join_callback(data): + events(data["id"]) # Attach the event handlers dynamically + + # Authenticate with the server + await sio.emit("user-join", {"auth": {"token": TOKEN}}, callback=join_callback) + + # Wait indefinitely to keep the connection open + await sio.wait() + + +# Actually run the async `main` function using `asyncio` +if __name__ == "__main__": + asyncio.run(main())