Documentation Index
Fetch the complete documentation index at: https://agno-v2-shaloo-ai-support-link.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
"""
Running Teams with AgentOSClient
This example demonstrates how to execute team runs using
AgentOSClient, including both streaming and non-streaming responses.
Prerequisites:
1. Start an AgentOS server with a team configured
2. Run this script: python 06_run_teams.py
"""
import asyncio
from agno.client import AgentOSClient
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
async def run_team_non_streaming():
"""Execute a non-streaming team run."""
print("=" * 60)
print("Non-Streaming Team Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available teams
config = await client.aget_config()
if not config.teams:
print("No teams available")
return
team_id = config.teams[0].id
print(f"Running team: {team_id}")
# Execute the team
result = await client.run_team(
team_id=team_id,
message="What is the capital of France and what is 15 * 7?",
)
print(f"\nRun ID: {result.run_id}")
print(f"Content: {result.content}")
async def run_team_streaming():
"""Execute a streaming team run."""
print("\n" + "=" * 60)
print("Streaming Team Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available teams
config = await client.aget_config()
if not config.teams:
print("No teams available")
return
team_id = config.teams[0].id
print(f"Streaming from team: {team_id}")
print("\nResponse: ", end="", flush=True)
from agno.run.team import RunCompletedEvent, RunContentEvent
# Stream the response
async for event in client.run_team_stream(
team_id=team_id,
message="Tell me about Python programming in 2 sentences.",
):
# Handle different event types
if isinstance(event, RunContentEvent):
print(event.content, end="", flush=True)
elif isinstance(event, RunCompletedEvent):
# Run completed - could access event.run_id here if needed
pass
print("\n")
async def main():
await run_team_non_streaming()
await run_team_streaming()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
asyncio.run(main())
Run the Example
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python 06_run_teams.py