"""Agent runtime with tool execution loop."""
import asyncio
import uuid
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Callable, Dict, List, Optional
from ..tools.base import Tool
from .llm import LLMProvider, StreamEvent
from .messages import (AgentEndEvent, AgentStartEvent, AssistantMessage, Event,
Message, MessageEndEvent, MessageStartEvent,
MessageUpdateEvent, TextContent, ToolCall,
ToolExecutionEndEvent, ToolExecutionStartEvent,
ToolExecutionUpdateEvent, ToolResultMessage,
TurnEndEvent, TurnStartEvent, UserMessage)
[docs]
@dataclass
class AgentConfig:
"""Agent configuration."""
model: str
api_key: str
base_url: Optional[str] = None
system_prompt: Optional[str] = None
max_turns: int = 10 # Maximum conversation turns per prompt
max_tool_calls_per_turn: int = 20 # Maximum tool calls per turn
skill_dirs: Optional[List[str]] = None # Directories to load skills from
[docs]
def __post_init__(self):
"""Create provider after initialization."""
# Auto-detect provider from base_url or use OpenAI-compatible by default
try:
from .llm import OpenAIProvider
self.provider = OpenAIProvider(
api_key=self.api_key, base_url=self.base_url)
except Exception as e:
raise ValueError(f"Failed to initialize LLM provider: {e}")
[docs]
class Agent:
"""Agent runtime with tool execution loop."""
[docs]
def __init__(self, config: AgentConfig, tools: Optional[List[Tool]] = None):
self.config = config
self.tools = tools or []
self.messages: List[Message] = []
self.subscribers: List[Callable[[Event], None]] = []
# Build tool lookup
self.tool_map = {tool.name: tool for tool in self.tools}
# Initialize skill manager
from .skills import SkillManager
self.skill_manager = SkillManager(skill_dirs=config.skill_dirs)
# Inject skills into system prompt
self._inject_skills_into_system_prompt()
def _inject_skills_into_system_prompt(self):
"""Inject available skills into system prompt.
Uses simple, clear format following pi-mono's approach.
Skills provide specialized instructions via progressive disclosure.
"""
skills = self.skill_manager.list_skills()
visible_skills = [s for s in skills if not s.disable_model_invocation]
if not visible_skills:
return
# Simple, clear instructions (pi-mono style)
instructions = []
instructions.append("\n\n")
instructions.append(
"The following skills provide specialized instructions for specific tasks.\n")
instructions.append(
"Use the read tool to load a skill's file when the task matches its description.\n")
instructions.append(
"When a skill file references a relative path, resolve it against the skill directory.\n")
instructions.append("\n")
instructions.append("<available_skills>\n")
# Add skills with name, description, and location (XML format)
for skill in visible_skills:
instructions.append(" <skill>\n")
instructions.append(
f" <name>{self._escape_xml(skill.name)}</name>\n")
instructions.append(
f" <description>{self._escape_xml(skill.description)}</description>\n")
instructions.append(
f" <location>{self._escape_xml(skill.file_path)}</location>\n")
instructions.append(" </skill>\n")
instructions.append("</available_skills>\n")
# Append to system prompt
if self.config.system_prompt:
self.config.system_prompt += "".join(instructions)
def _escape_xml(self, text: str) -> str:
"""Escape XML special characters."""
return (text
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("'", "'"))
[docs]
def subscribe(self, callback: Callable[[Event], None]) -> Callable[[], None]:
"""Subscribe to agent events.
Returns:
Unsubscribe function
"""
self.subscribers.append(callback)
return lambda: self.subscribers.remove(callback)
def _emit(self, event: Event) -> None:
"""Emit event to all subscribers."""
for callback in self.subscribers:
try:
callback(event)
except Exception as e:
# Don't let subscriber errors break the agent
print(f"Error in event subscriber: {e}")
[docs]
async def prompt(self, user_message: str) -> AsyncIterator[Event]:
"""Process a user prompt through the agent loop.
Args:
user_message: User's message
Yields:
Agent events
"""
# Add user message
msg = UserMessage(content=user_message)
self.messages.append(msg)
# Run agent loop
async for event in self._run_loop():
yield event
async def _run_loop(self) -> AsyncIterator[Event]:
"""Main agent loop with tool calling."""
# Emit agent start
event = AgentStartEvent()
self._emit(event)
yield event
for turn in range(self.config.max_turns):
# Emit turn start
turn_event = TurnStartEvent()
self._emit(turn_event)
yield turn_event
# Get LLM response
assistant_message = None
async for msg_event in self._stream_llm_response():
yield msg_event
if isinstance(msg_event, MessageEndEvent):
assistant_message = msg_event.message
if not assistant_message:
break
# Add to messages
self.messages.append(assistant_message)
# Execute tools if any
tool_results = []
if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls[:self.config.max_tool_calls_per_turn]:
async for tool_event in self._execute_tool(tool_call):
yield tool_event
if isinstance(tool_event, ToolExecutionEndEvent):
# Create tool result message
result_msg = ToolResultMessage(
tool_call_id=tool_event.tool_call_id,
name=tool_event.tool_name,
content=str(tool_event.result),
is_error=tool_event.is_error
)
tool_results.append(result_msg)
self.messages.append(result_msg)
# Emit turn end
turn_end = TurnEndEvent(
message=assistant_message, tool_results=tool_results)
self._emit(turn_end)
yield turn_end
# Check if we should continue (if there were tool calls, continue)
if not assistant_message.tool_calls:
break
# Emit agent end
end_event = AgentEndEvent(messages=self.messages)
self._emit(end_event)
yield end_event
async def _stream_llm_response(self) -> AsyncIterator[Event]:
"""Stream LLM response."""
# Prepare context
tools_dict = [tool.to_dict()
for tool in self.tools] if self.tools else None
try:
# Start streaming
message = AssistantMessage(content=[], model=self.config.model)
start_event = MessageStartEvent(message=message)
self._emit(start_event)
yield start_event
# Collect streaming content
text_parts = []
tool_calls_list = []
# Stream from LLM
stream = self.config.provider.stream(
model=self.config.model,
messages=self.messages,
system_prompt=self.config.system_prompt,
tools=tools_dict,
max_tokens=4096,
)
async for event in stream:
if event.type == "text_delta":
text_parts.append(event.delta)
# Update message
message.content = [TextContent(text="".join(text_parts))]
# Emit update
update_event = MessageUpdateEvent(
message=message, delta=event.delta)
self._emit(update_event)
yield update_event
elif event.type == "tool_call" and event.tool_call:
# Complete tool call received
tool_calls_list.append(event.tool_call)
# Finalize message
if text_parts:
message.content = [TextContent(text="".join(text_parts))]
message.tool_calls = tool_calls_list
# Note: We don't fetch usage info to avoid extra API call and delay
# The streaming response should be sufficient for user interaction
message.usage = None
message.stop_reason = "stop" if not tool_calls_list else "tool_calls"
# Emit end
end_event = MessageEndEvent(message=message)
self._emit(end_event)
yield end_event
except Exception as e:
# Emit error
error_msg = AssistantMessage(
content=[TextContent(text=f"Error: {str(e)}")],
model=self.config.model,
stop_reason="error"
)
end_event = MessageEndEvent(message=error_msg)
self._emit(end_event)
yield end_event
async def _execute_tool(self, tool_call: ToolCall) -> AsyncIterator[Event]:
"""Execute a tool call."""
tool = self.tool_map.get(tool_call.name)
# Emit start
start_event = ToolExecutionStartEvent(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
args=tool_call.arguments
)
self._emit(start_event)
yield start_event
if not tool:
# Tool not found
end_event = ToolExecutionEndEvent(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
result=f"Error: Tool '{tool_call.name}' not found",
is_error=True
)
self._emit(end_event)
yield end_event
return
# Execute tool
try:
def on_update(partial_result: str):
"""Progress callback."""
update_event = ToolExecutionUpdateEvent(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
partial_result=partial_result
)
self._emit(update_event)
# Note: Not yielding update events to simplify async flow
result = await tool.execute(
tool_call_id=tool_call.id,
arguments=tool_call.arguments,
on_update=on_update
)
# Emit end
end_event = ToolExecutionEndEvent(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
result=result.content,
is_error=result.is_error
)
self._emit(end_event)
yield end_event
except Exception as e:
# Tool execution error
end_event = ToolExecutionEndEvent(
tool_call_id=tool_call.id,
tool_name=tool_call.name,
result=f"Error executing tool: {str(e)}",
is_error=True
)
self._emit(end_event)
yield end_event
[docs]
def get_messages(self) -> List[Message]:
"""Get conversation messages."""
return self.messages.copy()
[docs]
def clear_messages(self) -> None:
"""Clear conversation history."""
self.messages.clear()