Documentation Index
Fetch the complete documentation index at: https://agno-v2-shaloo-ai-support-link.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
"""
Running Workflows with AgentOSClient
This example demonstrates how to execute workflow runs using
AgentOSClient, including both streaming and non-streaming responses.
Prerequisites:
1. Start an AgentOS server with a workflow configured
2. Run this script: python 07_run_workflows.py
"""
import asyncio
from agno.client import AgentOSClient
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
async def run_workflow_non_streaming():
"""Execute a non-streaming workflow run."""
print("=" * 60)
print("Non-Streaming Workflow Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available workflows
config = await client.aget_config()
if not config.workflows:
print("No workflows available")
return
workflow_id = config.workflows[0].id
print(f"Running workflow: {workflow_id}")
try:
# Execute the workflow
result = await client.run_workflow(
workflow_id=workflow_id,
message="What are the benefits of using Python for data science?",
)
print(f"\nRun ID: {result.run_id}")
print(f"Content: {result.content}")
except Exception as e:
print(f"Error: {e}")
if hasattr(e, "response"):
print(f"Response: {e.response.text}")
async def run_workflow_streaming():
"""Execute a streaming workflow run."""
print("\n" + "=" * 60)
print("Streaming Workflow Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available workflows
config = await client.aget_config()
if not config.workflows:
print("No workflows available")
return
workflow_id = config.workflows[0].id
print(f"Streaming from workflow: {workflow_id}")
print("\nResponse: ", end="", flush=True)
try:
# Stream the response - returns typed WorkflowRunOutputEvent objects
# Workflows can emit both workflow events and nested agent events
async for event in client.run_workflow_stream(
workflow_id=workflow_id,
message="Explain machine learning in simple terms.",
):
# Handle content from agent events (RunContent) or workflow completion
if event.event == "RunContent" and hasattr(event, "content"):
print(event.content, end="", flush=True)
elif (
event.event == "WorkflowAgentCompleted"
and hasattr(event, "content")
and event.content
):
print(event.content, end="", flush=True)
print("\n")
except Exception as e:
print(f"\nError: {type(e).__name__}: {e}")
async def main():
await run_workflow_non_streaming()
await run_workflow_streaming()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
asyncio.run(main())
Run the Example
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python 07_run_workflows.py