"""Programmatic SDK for using agenix as a library.
Example usage:
```python
import asyncio
from agenix import create_session
async def main():
# Create a session
session = await create_session(
api_key="your-api-key",
model="gpt-4o",
working_dir="/path/to/project"
)
# Send a prompt
response = await session.prompt("What files are in the current directory?")
print(response)
# Continue conversation
response = await session.prompt("Can you read the README?")
print(response)
# Get conversation history
messages = session.get_messages()
print(f"Conversation has {len(messages)} messages")
if __name__ == "__main__":
asyncio.run(main())
```
"""
from .extensions import (CommandDefinition, EventType, ExtensionAPI,
ExtensionSetup, ToolDefinition)
import asyncio
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from .core.agent import Agent, AgentConfig
from .core.messages import Message
from .extensions import (ExtensionContext, ExtensionRunner, SessionEndEvent,
SessionStartEvent, discover_and_load_extensions)
from .tools.bash import BashTool
from .tools.edit import EditTool
from .tools.grep import GrepTool
from .tools.read import ReadTool
from .tools.write import WriteTool
[docs]
class AgentSession:
"""High-level agent session for programmatic usage.
Wraps the low-level Agent class with a simpler API.
"""
[docs]
def __init__(
self,
agent: Agent,
tools: List[Any],
extension_runner: Optional[ExtensionRunner] = None,
working_dir: str = "."
):
self.agent = agent
self.tools = tools
self.extension_runner = extension_runner
self.working_dir = working_dir
self._started = False
async def _ensure_started(self) -> None:
"""Ensure session has been started."""
if not self._started:
if self.extension_runner:
await self.extension_runner.emit(SessionStartEvent())
self._started = True
[docs]
async def prompt(self, message: str) -> str:
"""Send a prompt to the agent and return the response.
Args:
message: User message/prompt
Returns:
Agent's text response
"""
await self._ensure_started()
# Collect the response
response_parts = []
async for event in self.agent.prompt(message):
# Collect text from message update events
from .core.messages import MessageUpdateEvent
if isinstance(event, MessageUpdateEvent):
response_parts.append(event.delta)
return "".join(response_parts)
[docs]
def get_messages(self) -> List[Message]:
"""Get conversation history.
Returns:
List of messages in the conversation
"""
return self.agent.messages
[docs]
def clear_messages(self) -> None:
"""Clear conversation history."""
self.agent.clear_messages()
[docs]
async def close(self) -> None:
"""Close the session and cleanup."""
if self.extension_runner:
await self.extension_runner.emit(SessionEndEvent())
[docs]
async def create_session(
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = "gpt-4o",
system_prompt: Optional[str] = None,
working_dir: str = ".",
max_turns: int = 10,
enable_extensions: bool = True
) -> AgentSession:
"""Create an agent session for programmatic usage.
Args:
api_key: API key for the LLM provider (or set OPENAI_API_KEY env var)
base_url: Optional API base URL
model: Model to use (default: gpt-4o)
system_prompt: Optional custom system prompt
working_dir: Working directory for file operations
max_turns: Maximum conversation turns per prompt
enable_extensions: Whether to load extensions
Returns:
AgentSession instance
Example:
>>> import asyncio
>>> from agenix import create_session
>>>
>>> async def main():
... session = await create_session(
... api_key="your-key",
... model="gpt-4o"
... )
... response = await session.prompt("Hello!")
... print(response)
...
>>> asyncio.run(main())
"""
# Get API key
if not api_key:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"API key required. Set OPENAI_API_KEY environment variable "
"or pass api_key parameter."
)
# Resolve working directory
working_dir = os.path.abspath(working_dir)
# Setup tools
tools = [
ReadTool(working_dir=working_dir),
WriteTool(working_dir=working_dir),
EditTool(working_dir=working_dir),
BashTool(working_dir=working_dir),
GrepTool(working_dir=working_dir),
]
# Load extensions if enabled
extension_runner = None
if enable_extensions:
extensions = await discover_and_load_extensions(cwd=working_dir)
if extensions:
# Create extension context
# Note: We need an agent instance, so we'll create it first
# and then set up extensions
pass
# Get default system prompt if not provided
if not system_prompt:
from .cli import get_default_system_prompt
system_prompt = get_default_system_prompt(tools)
# Create agent config
config = AgentConfig(
model=model,
api_key=api_key,
base_url=base_url,
system_prompt=system_prompt,
max_turns=max_turns
)
# Create agent
agent = Agent(config=config, tools=tools)
# Setup extension runner if extensions were loaded
if enable_extensions:
extensions = await discover_and_load_extensions(cwd=working_dir)
if extensions:
ext_context = ExtensionContext(
agent=agent,
cwd=working_dir,
tools=tools
)
extension_runner = ExtensionRunner(extensions, ext_context)
# Create session
session = AgentSession(
agent=agent,
tools=tools,
extension_runner=extension_runner,
working_dir=working_dir
)
return session
[docs]
class AOSAgentSession:
"""High-level AOS agent session for programmatic usage.
Uses the Agent OS architecture with MessageBus, DirectChannel, and AOSAgentRuntime.
Provides the same API as AgentSession but with AOS benefits:
- Better modularity and separation of concerns
- Support for multiple channels and agents
- Network-ready architecture
- Enhanced observability
"""
[docs]
def __init__(
self,
bus: Any, # MessageBus
channel: Any, # DirectChannel
runtime: Any, # AOSAgentRuntime
agent_address: Any, # AgentAddress
tools: List[Any],
extension_runner: Optional[ExtensionRunner] = None,
working_dir: str = "."
):
self.bus = bus
self.channel = channel
self.runtime = runtime
self.agent_address = agent_address
self.agent = runtime.agent # Access underlying agent
self.tools = tools
self.extension_runner = extension_runner
self.working_dir = working_dir
self._started = False
async def _ensure_started(self) -> None:
"""Ensure session has been started."""
if not self._started:
if self.extension_runner:
await self.extension_runner.emit(SessionStartEvent())
self._started = True
[docs]
async def prompt(self, message: str) -> str:
"""Send a prompt to the agent and return the response.
Args:
message: User message/prompt
Returns:
Agent's text response
"""
await self._ensure_started()
# Send message via AOS channel and wait for response
response = await self.channel.send_and_wait(message, sender_id="user")
# Extract content from response
if response and "content" in response:
return response["content"]
return ""
[docs]
def get_messages(self) -> List[Message]:
"""Get conversation history.
Returns:
List of messages in the conversation
"""
return self.agent.messages
[docs]
def clear_messages(self) -> None:
"""Clear conversation history."""
self.agent.clear_messages()
[docs]
async def close(self) -> None:
"""Close the session and cleanup AOS components."""
if self.extension_runner:
await self.extension_runner.emit(SessionEndEvent())
# Stop AOS components
await self.channel.stop()
await self.runtime.stop()
await self.bus.stop()
[docs]
async def create_aos_session(
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = "gpt-4o",
system_prompt: Optional[str] = None,
working_dir: str = ".",
max_turns: int = 10,
enable_extensions: bool = True,
node_id: str = "local",
agent_id: str = "main"
) -> AOSAgentSession:
"""Create an AOS agent session for programmatic usage.
Uses the Agent OS architecture with MessageBus, DirectChannel, and AOSAgentRuntime.
Args:
api_key: API key for the LLM provider (or set OPENAI_API_KEY env var)
base_url: Optional API base URL
model: Model to use (default: gpt-4o)
system_prompt: Optional custom system prompt
working_dir: Working directory for file operations
max_turns: Maximum conversation turns per prompt
enable_extensions: Whether to load extensions
node_id: Node ID for AOS addressing (default: "local")
agent_id: Agent ID for AOS addressing (default: "main")
Returns:
AOSAgentSession instance
Example:
>>> import asyncio
>>> from agenix import create_aos_session
>>>
>>> async def main():
... session = await create_aos_session(
... api_key="your-key",
... model="gpt-4o"
... )
... response = await session.prompt("Hello!")
... print(response)
... await session.close()
...
>>> asyncio.run(main())
"""
# Import AOS components
from agenix.aos.kernel.bus import MessageBus
from agenix.aos.kernel.channel import DirectChannel
from agenix.aos.kernel.runtime import AOSAgentRuntime
from agenix.aos.types import AgentAddress
# Get API key
if not api_key:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"API key required. Set OPENAI_API_KEY environment variable "
"or pass api_key parameter."
)
# Resolve working directory
working_dir = os.path.abspath(working_dir)
# Setup tools
tools = [
ReadTool(working_dir=working_dir),
WriteTool(working_dir=working_dir),
EditTool(working_dir=working_dir),
BashTool(working_dir=working_dir),
GrepTool(working_dir=working_dir),
]
# Get default system prompt if not provided
if not system_prompt:
from .cli import get_default_system_prompt
system_prompt = get_default_system_prompt(tools)
# Create agent config
config = AgentConfig(
model=model,
api_key=api_key,
base_url=base_url,
system_prompt=system_prompt,
max_turns=max_turns
)
# Create agent
agent = Agent(config=config, tools=tools)
# Setup extension runner if extensions are enabled
extension_runner = None
if enable_extensions:
extensions = await discover_and_load_extensions(cwd=working_dir)
if extensions:
ext_context = ExtensionContext(
agent=agent,
cwd=working_dir,
tools=tools
)
extension_runner = ExtensionRunner(extensions, ext_context)
# Create AOS components
bus = MessageBus()
await bus.start()
# Create agent address
agent_address = AgentAddress(node_id=node_id, agent_id=agent_id)
# Wrap agent in AOS runtime
runtime = AOSAgentRuntime(
agent=agent,
address=agent_address,
bus=bus
)
await runtime.start()
# Create DirectChannel for programmatic interaction
channel = DirectChannel(
channel_id="sdk",
bus=bus
)
channel.connect_to_agent(agent_address)
await channel.start()
# Create AOS session
session = AOSAgentSession(
bus=bus,
channel=channel,
runtime=runtime,
agent_address=agent_address,
tools=tools,
extension_runner=extension_runner,
working_dir=working_dir
)
return session
# Re-export key types for convenience
__all__ = [
# Main API
"create_session",
"AgentSession",
# AOS API
"create_aos_session",
"AOSAgentSession",
# Extension types (for custom extensions)
"ToolDefinition",
"CommandDefinition",
"ExtensionAPI",
"ExtensionSetup",
"EventType",
]