Source code for agenix.tools.read
"""Read file tool."""
import base64
import os
from pathlib import Path
from typing import Any, Callable, Dict, Optional
from ..core.messages import ImageContent, TextContent
from .base import Tool, ToolResult
[docs]
class ReadTool(Tool):
"""Read file content with image support."""
[docs]
def __init__(self, working_dir: str = "."):
self.working_dir = working_dir
super().__init__(
name="read",
description=(
"Read the contents of a file. Supports text files and images (jpg, png, gif, webp). "
"Images are sent as attachments. For large text files, output may be truncated - "
"use offset/limit parameters to read in chunks. When you need the full file, "
"continue with offset until complete."
),
parameters={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "Path to the file to read (absolute or relative to working directory)"
},
"offset": {
"type": "integer",
"description": "Starting line number (1-indexed). Optional, for reading large files in chunks.",
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to read. Optional, for reading large files in chunks.",
}
},
"required": ["file_path"]
}
)
[docs]
async def execute(
self,
tool_call_id: str,
arguments: Dict[str, Any],
on_update: Optional[Callable[[str], None]] = None,
) -> ToolResult:
"""Execute read operation."""
file_path = arguments["file_path"]
offset = arguments.get("offset", 1)
limit = arguments.get("limit")
# Resolve path
if not os.path.isabs(file_path):
file_path = os.path.join(self.working_dir, file_path)
try:
# Check if file exists
if not os.path.exists(file_path):
return ToolResult(
content=f"Error: File not found: {file_path}",
is_error=True
)
# Check if it's an image
ext = Path(file_path).suffix.lower()
if ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']:
return await self._read_image(file_path)
# Read text file
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
total_lines = len(lines)
# Apply offset and limit
start_idx = max(0, offset - 1)
end_idx = start_idx + limit if limit else len(lines)
selected_lines = lines[start_idx:end_idx]
# Format with line numbers
formatted_lines = []
for i, line in enumerate(selected_lines, start=start_idx + 1):
formatted_lines.append(f"{i:6d}\t{line.rstrip()}")
content = "\n".join(formatted_lines)
# Add truncation info
details = {}
if offset > 1 or (limit and end_idx < total_lines):
truncation_msg = f"\n\n[Showing lines {start_idx + 1}-{end_idx} of {total_lines} total lines]"
content += truncation_msg
details["truncated"] = True
details["total_lines"] = total_lines
details["shown_lines"] = len(selected_lines)
return ToolResult(content=content, details=details)
except PermissionError:
return ToolResult(
content=f"Error: Permission denied: {file_path}",
is_error=True
)
except UnicodeDecodeError:
return ToolResult(
content=f"Error: File is not a text file or uses unsupported encoding: {file_path}",
is_error=True
)
except Exception as e:
return ToolResult(
content=f"Error reading file: {str(e)}",
is_error=True
)
async def _read_image(self, file_path: str) -> ToolResult:
"""Read image file and return as base64."""
try:
with open(file_path, 'rb') as f:
image_data = f.read()
# Get MIME type
ext = Path(file_path).suffix.lower()
mime_map = {
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.webp': 'image/webp',
}
media_type = mime_map.get(ext, 'image/png')
# Encode as base64
encoded = base64.b64encode(image_data).decode('utf-8')
# Return as image content
image_content = ImageContent(
source={
"type": "base64",
"media_type": media_type,
"data": encoded
}
)
return ToolResult(
content=[
TextContent(
text=f"[Image: {os.path.basename(file_path)}]"),
image_content
]
)
except Exception as e:
return ToolResult(
content=f"Error reading image: {str(e)}",
is_error=True
)