Skip to main content

Documentation Index

Fetch the complete documentation index at: https://agno-v2-shaloo-ai-support-link.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

"""
Running Agents with AgentOSClient

This example demonstrates how to execute agent runs using
AgentOSClient, including both streaming and non-streaming responses.

Prerequisites:
1. Start an AgentOS server with an agent
2. Run this script: python 02_run_agents.py
"""

import asyncio

from agno.client import AgentOSClient

# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------


async def run_agent_non_streaming():
    """Execute a non-streaming agent run."""
    print("=" * 60)
    print("Non-Streaming Agent Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")
    # Get available agents
    config = await client.aget_config()
    if not config.agents:
        print("No agents available")
        return

    agent_id = config.agents[0].id
    print(f"Running agent: {agent_id}")

    # Execute the agent
    result = await client.run_agent(
        agent_id=agent_id,
        message="What is 2 + 2? Explain your answer briefly.",
    )

    print(f"\nRun ID: {result.run_id}")
    print(f"Content: {result.content}")
    print(f"Tokens: {result.metrics.total_tokens if result.metrics else 'N/A'}")


async def run_agent_streaming():
    """Execute a streaming agent run."""
    print("\n" + "=" * 60)
    print("Streaming Agent Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available agents
    config = await client.aget_config()
    if not config.agents:
        print("No agents available")
        return

    agent_id = config.agents[0].id
    print(f"Streaming from agent: {agent_id}")
    print("\nResponse: ", end="", flush=True)

    from agno.run.agent import RunCompletedEvent, RunContentEvent

    full_content = ""
    async for event in client.run_agent_stream(
        agent_id=agent_id,
        message="Tell me a short joke.",
    ):
        # Handle different event types
        if isinstance(event, RunContentEvent):
            print(event.content, end="", flush=True)
            full_content += event.content
        elif isinstance(event, RunCompletedEvent):
            # Run completed - could access event.run_id here if needed
            pass

    print("\n")


async def main():
    await run_agent_non_streaming()
    await run_agent_streaming()


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    asyncio.run(main())

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

python 02_run_agents.py