Documentation Index
Fetch the complete documentation index at: https://agno-v2-shaloo-ai-support-link.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The AgentOSClient provides a convenient interface for interacting with a running AgentOS instance. It supports all AgentOS operations including running agents, teams, and workflows, managing sessions and memories, and searching knowledge bases.
Basic Usage
from agno.client import AgentOSClient
# Connect to AgentOS
client = AgentOSClient(base_url="http://localhost:7777")
# Get configuration
config = await client.aget_config()
print(f"Connected to: {config.name or config.os_id}")
print(f"Available agents: {[a.id for a in config.agents]}")
Parameters
| Parameter | Type | Default | Description |
|---|
base_url | str | Required | Base URL of the AgentOS instance (e.g., "http://localhost:7777") |
timeout | float | 60.0 | Request timeout in seconds |
Methods
Discovery & Configuration
aget_config
Get AgentOS configuration and metadata asynchronously.
config = await client.aget_config()
Returns: ConfigResponse containing:
os_id: Unique identifier for the OS instance
name: Name of the OS instance
agents: List of registered agents
teams: List of registered teams
workflows: List of registered workflows
interfaces: List of available interfaces
get_config
Synchronous version of aget_config.
config = client.get_config()
list_agents
List all agents configured in the AgentOS instance.
agents = await client.list_agents()
for agent in agents:
print(f"{agent.id}: {agent.name}")
Returns: List[AgentSummaryResponse]
aget_agent
Get detailed configuration for a specific agent.
agent = await client.aget_agent(agent_id="my-agent")
print(f"Name: {agent.name}")
print(f"Model: {agent.model}")
print(f"Tools: {agent.tools}")
Parameters:
agent_id (str): ID of the agent to retrieve
Returns: AgentResponse
list_teams
List all teams configured in the AgentOS instance.
teams = await client.list_teams()
Returns: List[TeamSummaryResponse]
aget_team
Get detailed configuration for a specific team.
team = await client.aget_team(team_id="my-team")
Parameters:
team_id (str): ID of the team to retrieve
Returns: TeamResponse
list_workflows
List all workflows configured in the AgentOS instance.
workflows = await client.list_workflows()
Returns: List[WorkflowSummaryResponse]
aget_workflow
Get detailed configuration for a specific workflow.
workflow = await client.aget_workflow(workflow_id="my-workflow")
Parameters:
workflow_id (str): ID of the workflow to retrieve
Returns: WorkflowResponse
Running Agents
run_agent
Execute an agent run (non-streaming).
result = await client.run_agent(
agent_id="my-agent",
message="What is 2 + 2?",
session_id="session-123",
user_id="user-456",
)
print(f"Response: {result.content}")
print(f"Tokens: {result.metrics.total_tokens}")
Parameters:
| Parameter | Type | Default | Description |
|---|
agent_id | str | Required | ID of the agent to run |
message | str | Required | The message/prompt for the agent |
session_id | Optional[str] | None | Session ID for context persistence |
user_id | Optional[str] | None | User ID for the run |
images | Optional[Sequence[Image]] | None | Images to include |
audio | Optional[Sequence[Audio]] | None | Audio to include |
videos | Optional[Sequence[Video]] | None | Videos to include |
files | Optional[Sequence[File]] | None | Files to include |
session_state | Optional[Dict] | None | Session state dictionary |
dependencies | Optional[Dict] | None | Dependencies dictionary |
metadata | Optional[Dict] | None | Metadata dictionary |
knowledge_filters | Optional[Dict] | None | Filters for knowledge search |
Returns: RunOutput
run_agent_stream
Stream an agent run response.
from agno.run.agent import RunContentEvent, RunCompletedEvent
async for event in client.run_agent_stream(
agent_id="my-agent",
message="Tell me a story",
):
if isinstance(event, RunContentEvent):
print(event.content, end="", flush=True)
elif isinstance(event, RunCompletedEvent):
print(f"\nRun ID: {event.run_id}")
Parameters: Same as run_agent
Yields: RunOutputEvent (one of RunStartedEvent, RunContentEvent, RunToolCallEvent, RunCompletedEvent, etc.)
continue_agent_run
Continue a paused agent run with tool results.
from agno.models.response import ToolExecution
result = await client.continue_agent_run(
agent_id="my-agent",
run_id="run-123",
tools=[
ToolExecution(
tool_call_id="call-1",
result="Tool result here",
)
],
)
Parameters:
| Parameter | Type | Default | Description |
|---|
agent_id | str | Required | ID of the agent |
run_id | str | Required | ID of the run to continue |
tools | List[ToolExecution] | Required | Tool execution results |
session_id | Optional[str] | None | Session ID |
user_id | Optional[str] | None | User ID |
Returns: RunOutput
cancel_agent_run
Cancel an agent run.
await client.cancel_agent_run(agent_id="my-agent", run_id="run-123")
Running Teams
run_team
Execute a team run (non-streaming).
result = await client.run_team(
team_id="research-team",
message="Research the latest AI trends",
user_id="user-123",
)
print(f"Response: {result.content}")
Parameters:
| Parameter | Type | Default | Description |
|---|
team_id | str | Required | ID of the team to run |
message | str | Required | The message/prompt for the team |
session_id | Optional[str] | None | Session ID for context persistence |
user_id | Optional[str] | None | User ID for the run |
images | Optional[Sequence[Image]] | None | Images to include |
audio | Optional[Sequence[Audio]] | None | Audio to include |
videos | Optional[Sequence[Video]] | None | Videos to include |
files | Optional[Sequence[File]] | None | Files to include |
Returns: TeamRunOutput
run_team_stream
Stream a team run response.
from agno.run.team import RunContentEvent
async for event in client.run_team_stream(
team_id="research-team",
message="Analyze this topic",
):
if isinstance(event, RunContentEvent):
print(event.content, end="", flush=True)
Yields: TeamRunOutputEvent
cancel_team_run
Cancel a team run.
await client.cancel_team_run(team_id="my-team", run_id="run-123")
Running Workflows
run_workflow
Execute a workflow run (non-streaming).
result = await client.run_workflow(
workflow_id="qa-workflow",
message="What are the benefits of Python?",
user_id="user-123",
)
print(f"Response: {result.content}")
print(f"Status: {result.status}")
Parameters:
| Parameter | Type | Default | Description |
|---|
workflow_id | str | Required | ID of the workflow to run |
message | str | Required | The message/prompt for the workflow |
session_id | Optional[str] | None | Session ID for context persistence |
user_id | Optional[str] | None | User ID for the run |
Returns: WorkflowRunOutput
run_workflow_stream
Stream a workflow run response.
async for event in client.run_workflow_stream(
workflow_id="qa-workflow",
message="Explain machine learning",
):
if event.event == "RunContent" and hasattr(event, "content"):
print(event.content, end="", flush=True)
Yields: WorkflowRunOutputEvent
cancel_workflow_run
Cancel a workflow run.
await client.cancel_workflow_run(workflow_id="my-workflow", run_id="run-123")
Memory Operations
create_memory
Create a new user memory.
memory = await client.create_memory(
memory="User prefers dark mode",
user_id="user-123",
topics=["preferences", "ui"],
)
print(f"Created: {memory.memory_id}")
Parameters:
| Parameter | Type | Default | Description |
|---|
memory | str | Required | The memory content to store |
user_id | str | Required | User ID to associate with the memory |
topics | Optional[List[str]] | None | Topics to categorize the memory |
db_id | Optional[str] | None | Database ID to use |
Returns: UserMemorySchema
list_memories
List user memories with filtering and pagination.
memories = await client.list_memories(
user_id="user-123",
topics=["preferences"],
limit=10,
)
for mem in memories.data:
print(f"{mem.memory_id}: {mem.memory}")
Parameters:
| Parameter | Type | Default | Description |
|---|
user_id | Optional[str] | None | Filter by user ID |
topics | Optional[List[str]] | None | Filter by topics |
search_content | Optional[str] | None | Search within memory content |
limit | int | 20 | Number of memories per page |
page | int | 1 | Page number |
Returns: PaginatedResponse[UserMemorySchema]
get_memory
Get a specific memory by ID.
memory = await client.get_memory(memory_id="mem-123", user_id="user-123")
Returns: UserMemorySchema
update_memory
Update an existing memory.
updated = await client.update_memory(
memory_id="mem-123",
memory="User strongly prefers dark mode",
user_id="user-123",
topics=["preferences", "ui", "accessibility"],
)
Returns: UserMemorySchema
delete_memory
Delete a specific memory.
await client.delete_memory(memory_id="mem-123", user_id="user-123")
Session Operations
create_session
Create a new session.
from agno.db.base import SessionType
session = await client.create_session(
session_type=SessionType.AGENT,
agent_id="my-agent",
user_id="user-123",
session_name="My Chat Session",
)
print(f"Session ID: {session.session_id}")
Parameters:
| Parameter | Type | Default | Description |
|---|
session_type | SessionType | SessionType.AGENT | Type of session (AGENT, TEAM, WORKFLOW) |
session_id | Optional[str] | None | Optional session ID (auto-generated if not provided) |
user_id | Optional[str] | None | User ID to associate with the session |
session_name | Optional[str] | None | Human-readable session name |
agent_id | Optional[str] | None | Agent ID (for agent sessions) |
team_id | Optional[str] | None | Team ID (for team sessions) |
workflow_id | Optional[str] | None | Workflow ID (for workflow sessions) |
Returns: AgentSessionDetailSchema, TeamSessionDetailSchema, or WorkflowSessionDetailSchema
get_sessions
List sessions with filtering and pagination.
sessions = await client.get_sessions(
user_id="user-123",
session_type=SessionType.AGENT,
limit=20,
)
for session in sessions.data:
print(f"{session.session_id}: {session.session_name}")
Returns: PaginatedResponse[SessionSchema]
get_session
Get a specific session by ID.
session = await client.get_session(
session_id="session-123",
session_type=SessionType.AGENT,
)
Returns: AgentSessionDetailSchema, TeamSessionDetailSchema, or WorkflowSessionDetailSchema
get_session_runs
Get all runs for a specific session.
runs = await client.get_session_runs(session_id="session-123")
for run in runs:
print(f"{run.run_id}: {run.content[:50]}...")
Returns: List[RunSchema | TeamRunSchema | WorkflowRunSchema]
rename_session
Rename a session.
session = await client.rename_session(
session_id="session-123",
session_name="My Updated Session Name",
)
Returns: Session detail schema
delete_session
Delete a specific session.
await client.delete_session(session_id="session-123")
Knowledge Operations
upload_knowledge_content
Upload content to the knowledge base.
from agno.media import File
content = await client.upload_knowledge_content(
name="My Document",
description="Important documentation",
file=File(content=b"...", filename="doc.pdf", mime_type="application/pdf"),
)
print(f"Content ID: {content.id}")
Parameters:
| Parameter | Type | Default | Description |
|---|
name | Optional[str] | None | Content name |
description | Optional[str] | None | Content description |
url | Optional[str] | None | URL to fetch content from |
file | Optional[File] | None | File object to upload |
text_content | Optional[str] | None | Raw text content |
reader_id | Optional[str] | None | Reader to use for processing |
chunker | Optional[str] | None | Chunking strategy |
Returns: ContentResponseSchema
search_knowledge
Search the knowledge base.
results = await client.search_knowledge(
query="What is Agno?",
limit=5,
)
for result in results.data:
print(f"Score: {result.score}")
print(f"Content: {result.content[:100]}...")
Parameters:
| Parameter | Type | Default | Description |
|---|
query | str | Required | Search query string |
max_results | Optional[int] | None | Maximum results to return |
filters | Optional[Dict] | None | Filters to apply |
search_type | Optional[str] | None | Search type (vector, keyword, hybrid) |
Returns: PaginatedResponse[VectorSearchResult]
list_knowledge_content
List all content in the knowledge base.
content = await client.list_knowledge_content(limit=20)
for item in content.data:
print(f"{item.id}: {item.name}")
Returns: PaginatedResponse[ContentResponseSchema]
get_knowledge_config
Get knowledge base configuration.
config = await client.get_knowledge_config()
print(f"Readers: {config.readers}")
print(f"Chunkers: {config.chunkers}")
Returns: KnowledgeConfigResponse
Trace Operations
get_traces
List execution traces with filtering and pagination.
traces = await client.get_traces(
agent_id="my-agent",
limit=20,
)
for trace in traces.data:
print(f"{trace.trace_id}: {trace.status}")
Returns: PaginatedResponse[TraceSummary]
get_trace
Get detailed trace information.
trace = await client.get_trace(trace_id="trace-123")
print(f"Duration: {trace.duration_ms}ms")
print(f"Spans: {len(trace.spans)}")
Returns: TraceDetail or TraceNode (if span_id provided)
Metrics Operations
get_metrics
Retrieve AgentOS metrics and analytics data.
from datetime import date
metrics = await client.get_metrics(
starting_date=date(2024, 1, 1),
ending_date=date(2024, 1, 31),
)
Returns: MetricsResponse
refresh_metrics
Manually trigger recalculation of system metrics.
metrics = await client.refresh_metrics()
Returns: List[DayAggregatedMetrics]
Error Handling
The client raises RemoteServerUnavailableError when the remote server is unavailable:
from agno.exceptions import RemoteServerUnavailableError
try:
config = await client.aget_config()
except RemoteServerUnavailableError as e:
print(f"Server unavailable: {e.message}")
print(f"Base URL: {e.base_url}")
For HTTP errors (4xx, 5xx), the client raises httpx.HTTPStatusError.
Authentication
To include authentication headers in requests, pass the headers parameter to any method:
headers = {"Authorization": "Bearer your-token"}
config = await client.aget_config(headers=headers)