Skip to main content

Documentation Index

Fetch the complete documentation index at: https://agno-v2-shaloo-ai-support-link.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

"""
Running Teams with AgentOSClient

This example demonstrates how to execute team runs using
AgentOSClient, including both streaming and non-streaming responses.

Prerequisites:
1. Start an AgentOS server with a team configured
2. Run this script: python 06_run_teams.py
"""

import asyncio

from agno.client import AgentOSClient

# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------


async def run_team_non_streaming():
    """Execute a non-streaming team run."""
    print("=" * 60)
    print("Non-Streaming Team Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available teams
    config = await client.aget_config()
    if not config.teams:
        print("No teams available")
        return

    team_id = config.teams[0].id
    print(f"Running team: {team_id}")

    # Execute the team
    result = await client.run_team(
        team_id=team_id,
        message="What is the capital of France and what is 15 * 7?",
    )

    print(f"\nRun ID: {result.run_id}")
    print(f"Content: {result.content}")


async def run_team_streaming():
    """Execute a streaming team run."""
    print("\n" + "=" * 60)
    print("Streaming Team Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available teams
    config = await client.aget_config()
    if not config.teams:
        print("No teams available")
        return

    team_id = config.teams[0].id
    print(f"Streaming from team: {team_id}")
    print("\nResponse: ", end="", flush=True)

    from agno.run.team import RunCompletedEvent, RunContentEvent

    # Stream the response
    async for event in client.run_team_stream(
        team_id=team_id,
        message="Tell me about Python programming in 2 sentences.",
    ):
        # Handle different event types
        if isinstance(event, RunContentEvent):
            print(event.content, end="", flush=True)
        elif isinstance(event, RunCompletedEvent):
            # Run completed - could access event.run_id here if needed
            pass

    print("\n")


async def main():
    await run_team_non_streaming()
    await run_team_streaming()


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    asyncio.run(main())

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

python 06_run_teams.py