import asynciofrom agno.client import AgentOSClientasync def main(): # Connect to AgentOS client = AgentOSClient(base_url="http://localhost:7777") # Get configuration and available agents config = await client.aget_config() print(f"Connected to: {config.name or config.os_id}") print(f"Available agents: {[a.id for a in config.agents]}") # Run an agent if config.agents: result = await client.run_agent( agent_id=config.agents[0].id, message="Hello, how can you help me?", ) print(f"Response: {result.content}")asyncio.run(main())
Stream responses in real-time for a better user experience:
from agno.client import AgentOSClientfrom agno.run.agent import RunContentEvent, RunCompletedEventclient = AgentOSClient(base_url="http://localhost:7777")async for event in client.run_agent_stream( agent_id="my-agent", message="Tell me a story",): if isinstance(event, RunContentEvent): print(event.content, end="", flush=True) elif isinstance(event, RunCompletedEvent): print(f"\nCompleted! Run ID: {event.run_id}")