Source code for agenix.core.agent

"""Agent runtime with tool execution loop."""

import asyncio
import uuid
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Callable, Dict, List, Optional

from ..tools.base import Tool
from .llm import LLMProvider, StreamEvent
from .messages import (AgentEndEvent, AgentStartEvent, AssistantMessage, Event,
                       Message, MessageEndEvent, MessageStartEvent,
                       MessageUpdateEvent, TextContent, ToolCall,
                       ToolExecutionEndEvent, ToolExecutionStartEvent,
                       ToolExecutionUpdateEvent, ToolResultMessage,
                       TurnEndEvent, TurnStartEvent, UserMessage)


[docs] @dataclass class AgentConfig: """Agent configuration.""" model: str api_key: str base_url: Optional[str] = None system_prompt: Optional[str] = None max_turns: int = 10 # Maximum conversation turns per prompt max_tool_calls_per_turn: int = 20 # Maximum tool calls per turn skill_dirs: Optional[List[str]] = None # Directories to load skills from
[docs] def __post_init__(self): """Create provider after initialization.""" # Auto-detect provider from base_url or use OpenAI-compatible by default try: from .llm import OpenAIProvider self.provider = OpenAIProvider( api_key=self.api_key, base_url=self.base_url) except Exception as e: raise ValueError(f"Failed to initialize LLM provider: {e}")
[docs] class Agent: """Agent runtime with tool execution loop."""
[docs] def __init__(self, config: AgentConfig, tools: Optional[List[Tool]] = None): self.config = config self.tools = tools or [] self.messages: List[Message] = [] self.subscribers: List[Callable[[Event], None]] = [] # Build tool lookup self.tool_map = {tool.name: tool for tool in self.tools} # Initialize skill manager from .skills import SkillManager self.skill_manager = SkillManager(skill_dirs=config.skill_dirs) # Inject skills into system prompt self._inject_skills_into_system_prompt()
def _inject_skills_into_system_prompt(self): """Inject available skills into system prompt. Uses simple, clear format following pi-mono's approach. Skills provide specialized instructions via progressive disclosure. """ skills = self.skill_manager.list_skills() visible_skills = [s for s in skills if not s.disable_model_invocation] if not visible_skills: return # Simple, clear instructions (pi-mono style) instructions = [] instructions.append("\n\n") instructions.append( "The following skills provide specialized instructions for specific tasks.\n") instructions.append( "Use the read tool to load a skill's file when the task matches its description.\n") instructions.append( "When a skill file references a relative path, resolve it against the skill directory.\n") instructions.append("\n") instructions.append("<available_skills>\n") # Add skills with name, description, and location (XML format) for skill in visible_skills: instructions.append(" <skill>\n") instructions.append( f" <name>{self._escape_xml(skill.name)}</name>\n") instructions.append( f" <description>{self._escape_xml(skill.description)}</description>\n") instructions.append( f" <location>{self._escape_xml(skill.file_path)}</location>\n") instructions.append(" </skill>\n") instructions.append("</available_skills>\n") # Append to system prompt if self.config.system_prompt: self.config.system_prompt += "".join(instructions) def _escape_xml(self, text: str) -> str: """Escape XML special characters.""" return (text .replace("&", "&amp;") .replace("<", "&lt;") .replace(">", "&gt;") .replace('"', "&quot;") .replace("'", "&apos;"))
[docs] def subscribe(self, callback: Callable[[Event], None]) -> Callable[[], None]: """Subscribe to agent events. Returns: Unsubscribe function """ self.subscribers.append(callback) return lambda: self.subscribers.remove(callback)
def _emit(self, event: Event) -> None: """Emit event to all subscribers.""" for callback in self.subscribers: try: callback(event) except Exception as e: # Don't let subscriber errors break the agent print(f"Error in event subscriber: {e}")
[docs] async def prompt(self, user_message: str) -> AsyncIterator[Event]: """Process a user prompt through the agent loop. Args: user_message: User's message Yields: Agent events """ # Add user message msg = UserMessage(content=user_message) self.messages.append(msg) # Run agent loop async for event in self._run_loop(): yield event
async def _run_loop(self) -> AsyncIterator[Event]: """Main agent loop with tool calling.""" # Emit agent start event = AgentStartEvent() self._emit(event) yield event for turn in range(self.config.max_turns): # Emit turn start turn_event = TurnStartEvent() self._emit(turn_event) yield turn_event # Get LLM response assistant_message = None async for msg_event in self._stream_llm_response(): yield msg_event if isinstance(msg_event, MessageEndEvent): assistant_message = msg_event.message if not assistant_message: break # Add to messages self.messages.append(assistant_message) # Execute tools if any tool_results = [] if assistant_message.tool_calls: for tool_call in assistant_message.tool_calls[:self.config.max_tool_calls_per_turn]: async for tool_event in self._execute_tool(tool_call): yield tool_event if isinstance(tool_event, ToolExecutionEndEvent): # Create tool result message result_msg = ToolResultMessage( tool_call_id=tool_event.tool_call_id, name=tool_event.tool_name, content=str(tool_event.result), is_error=tool_event.is_error ) tool_results.append(result_msg) self.messages.append(result_msg) # Emit turn end turn_end = TurnEndEvent( message=assistant_message, tool_results=tool_results) self._emit(turn_end) yield turn_end # Check if we should continue (if there were tool calls, continue) if not assistant_message.tool_calls: break # Emit agent end end_event = AgentEndEvent(messages=self.messages) self._emit(end_event) yield end_event async def _stream_llm_response(self) -> AsyncIterator[Event]: """Stream LLM response.""" # Prepare context tools_dict = [tool.to_dict() for tool in self.tools] if self.tools else None try: # Start streaming message = AssistantMessage(content=[], model=self.config.model) start_event = MessageStartEvent(message=message) self._emit(start_event) yield start_event # Collect streaming content text_parts = [] tool_calls_list = [] # Stream from LLM stream = self.config.provider.stream( model=self.config.model, messages=self.messages, system_prompt=self.config.system_prompt, tools=tools_dict, max_tokens=4096, ) async for event in stream: if event.type == "text_delta": text_parts.append(event.delta) # Update message message.content = [TextContent(text="".join(text_parts))] # Emit update update_event = MessageUpdateEvent( message=message, delta=event.delta) self._emit(update_event) yield update_event elif event.type == "tool_call" and event.tool_call: # Complete tool call received tool_calls_list.append(event.tool_call) # Finalize message if text_parts: message.content = [TextContent(text="".join(text_parts))] message.tool_calls = tool_calls_list # Note: We don't fetch usage info to avoid extra API call and delay # The streaming response should be sufficient for user interaction message.usage = None message.stop_reason = "stop" if not tool_calls_list else "tool_calls" # Emit end end_event = MessageEndEvent(message=message) self._emit(end_event) yield end_event except Exception as e: # Emit error error_msg = AssistantMessage( content=[TextContent(text=f"Error: {str(e)}")], model=self.config.model, stop_reason="error" ) end_event = MessageEndEvent(message=error_msg) self._emit(end_event) yield end_event async def _execute_tool(self, tool_call: ToolCall) -> AsyncIterator[Event]: """Execute a tool call.""" tool = self.tool_map.get(tool_call.name) # Emit start start_event = ToolExecutionStartEvent( tool_call_id=tool_call.id, tool_name=tool_call.name, args=tool_call.arguments ) self._emit(start_event) yield start_event if not tool: # Tool not found end_event = ToolExecutionEndEvent( tool_call_id=tool_call.id, tool_name=tool_call.name, result=f"Error: Tool '{tool_call.name}' not found", is_error=True ) self._emit(end_event) yield end_event return # Execute tool try: def on_update(partial_result: str): """Progress callback.""" update_event = ToolExecutionUpdateEvent( tool_call_id=tool_call.id, tool_name=tool_call.name, partial_result=partial_result ) self._emit(update_event) # Note: Not yielding update events to simplify async flow result = await tool.execute( tool_call_id=tool_call.id, arguments=tool_call.arguments, on_update=on_update ) # Emit end end_event = ToolExecutionEndEvent( tool_call_id=tool_call.id, tool_name=tool_call.name, result=result.content, is_error=result.is_error ) self._emit(end_event) yield end_event except Exception as e: # Tool execution error end_event = ToolExecutionEndEvent( tool_call_id=tool_call.id, tool_name=tool_call.name, result=f"Error executing tool: {str(e)}", is_error=True ) self._emit(end_event) yield end_event
[docs] def get_messages(self) -> List[Message]: """Get conversation messages.""" return self.messages.copy()
[docs] def clear_messages(self) -> None: """Clear conversation history.""" self.messages.clear()