Auto-Dev Developer Guide
Learn how to extend and customize the Auto-Dev system by creating custom learning engines, subscribing to events, integrating with databases, and implementing custom fitness functions.
**Version:** 1.0.0
**Last Updated:** 2026-04-10
---
Table of Contents
- Creating Custom Learning Engines
- Event Subscription Patterns
- Database Models and Persistence
- Fitness Evaluation
- Sandbox Integration
- Capability Gate Integration
- Testing Strategies
- Common Patterns
---
Creating Custom Learning Engines
BaseLearningEngine Interface
All learning engines extend BaseLearningEngine, which provides a unified lifecycle:
from core.auto_dev.base_engine import BaseLearningEngine, SandboxProtocol
from sqlalchemy.orm import Session
from typing import Any
class CustomLearningEngine(BaseLearningEngine):
"""
Custom learning engine for domain-specific self-improvement.
"""
def __init__(
self,
db: Session,
llm_service: Any = None,
sandbox: SandboxProtocol = None,
):
super().__init__(db=db, llm_service=llm_service, sandbox=sandbox)
# Add custom initialization hereImplementing Lifecycle Methods
1. analyze_episode()
Extract relevant information from episodes for learning.
async def analyze_episode(self, episode_id: str, **kwargs) -> dict[str, Any]:
"""
Analyze an episode to extract learning signals.
Returns:
dict with analysis results
"""
from core.models import Episode, EpisodeSegment
episode = (
self.db.query(Episode)
.filter(Episode.id == episode_id)
.first()
)
if not episode:
raise ValueError(f"Episode {episode_id} not found")
segments = (
self.db.query(EpisodeSegment)
.filter(EpisodeSegment.episode_id == episode_id)
.all()
)
# Custom analysis logic
return {
"episode_id": episode_id,
"task_description": episode.task_description,
"success": episode.success,
"segments_count": len(segments),
# Add custom fields
}2. propose_code_change()
Generate code modifications using LLM or other methods.
async def propose_code_change(
self, context: dict[str, Any], **kwargs
) -> str:
"""
Generate a code modification proposal.
Args:
context: Analysis output from analyze_episode()
Returns:
str: Generated Python code
"""
llm = self._get_llm_service()
if not llm:
return "# Code generation skipped: LLM unavailable"
system_prompt = "You are an expert Python developer..."
user_prompt = f"Context: {context}\n\nGenerate improved code..."
try:
response = await llm.generate_completion(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
model="auto",
task_type="code",
)
return self._strip_markdown_fences(response.get("content", ""))
except Exception as e:
logger.error(f"Code generation failed: {e}")
return f"# Code generation failed: {e}"3. validate_change()
Test proposed code in sandbox and assess fitness.
async def validate_change(
self,
code: str,
test_inputs: list[dict[str, Any]],
tenant_id: str,
**kwargs,
) -> dict[str, Any]:
"""
Validate proposed code in sandbox.
Returns:
dict with validation results
"""
sandbox = self._get_sandbox()
if not sandbox:
return {"passed": False, "error": "Sandbox unavailable"}
results = []
all_passed = True
for i, inputs in enumerate(test_inputs or [{}]):
result = await sandbox.execute_raw_python(
tenant_id=tenant_id,
code=code,
input_params=inputs,
)
passed = result.get("status") == "success"
if not passed:
all_passed = False
results.append({
"test_index": i,
"passed": passed,
"output": result.get("output", ""),
"execution_seconds": result.get("execution_seconds", 0),
})
return {
"passed": all_passed,
"test_results": results,
"proxy_signals": self._compute_proxy_signals(results),
}Complete Example: CustomErrorLearningEngine
"""
Custom learning engine that specializes in fixing common error patterns.
"""
import logging
from typing import Any
from sqlalchemy.orm import Session
from core.auto_dev.base_engine import BaseLearningEngine, SandboxProtocol
logger = logging.getLogger(__name__)
class CustomErrorLearningEngine(BaseLearningEngine):
"""
Learning engine that specializes in fixing common error patterns.
Focuses on:
- KeyError handling
- IndexError prevention
- Type checking
- Null value handling
"""
async def analyze_episode(self, episode_id: str, **kwargs) -> dict[str, Any]:
"""Analyze episode to extract error patterns."""
from core.models import Episode, EpisodeSegment
episode = (
self.db.query(Episode)
.filter(Episode.id == episode_id)
.first()
)
if not episode:
return {"error": f"Episode {episode_id} not found"}
segments = (
self.db.query(EpisodeSegment)
.filter(EpisodeSegment.episode_id == episode_id)
.all()
)
# Extract error patterns
error_patterns = []
for segment in segments:
metadata = getattr(segment, "metadata", {}) or {}
if metadata.get("error"):
error_patterns.append({
"error_type": self._classify_error(metadata["error"]),
"error_message": metadata["error"],
"segment_id": str(segment.id),
})
return {
"episode_id": episode_id,
"task_description": episode.task_description or "",
"error_patterns": error_patterns,
"most_common_error": self._get_most_common_error(error_patterns),
}
async def propose_code_change(
self, context: dict[str, Any], **kwargs
) -> str:
"""Generate error-handling code."""
llm = self._get_llm_service()
if not llm:
return "# Error fixing skipped: LLM unavailable"
error_type = context.get("most_common_error", "UnknownError")
task_desc = context.get("task_description", "")
system_prompt = (
"You are an expert Python developer specializing in error handling. "
"Generate code that prevents the specified error type. "
"Include proper exception handling, type checking, and defensive programming. "
"Respond ONLY with the Python code."
)
user_prompt = (
f"Task: {task_desc}\n\n"
f"Common error: {error_type}\n\n"
f"Generate Python code that prevents this error:\n"
)
try:
response = await llm.generate_completion(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
model="auto",
task_type="code",
)
return self._strip_markdown_fences(response.get("content", ""))
except Exception as e:
logger.error(f"Error fixing code generation failed: {e}")
return f"# Error fixing failed: {e}"
async def validate_change(
self,
code: str,
test_inputs: list[dict[str, Any]],
tenant_id: str,
**kwargs,
) -> dict[str, Any]:
"""Validate error-handling code."""
sandbox = self._get_sandbox()
if not sandbox:
return {"passed": False, "error": "Sandbox unavailable"}
# Test with error-inducing inputs
error_test_cases = [
{"input": None, "description": "null input"},
{"input": "", "description": "empty string"},
{"input": {"invalid": "key"}, "description": "missing key"},
]
results = []
all_passed = True
for i, test_case in enumerate(error_test_cases):
result = await sandbox.execute_raw_python(
tenant_id=tenant_id,
code=code,
input_params=test_case,
)
passed = result.get("status") == "success"
if not passed:
all_passed = False
results.append({
"test_index": i,
"test_description": test_case["description"],
"passed": passed,
"output": result.get("output", ""),
})
return {
"passed": all_passed,
"test_results": results,
"error_handling_score": self._compute_error_handling_score(results),
}
# --- Helper methods ---
@staticmethod
def _classify_error(error_message: str) -> str:
"""Classify error type from error message."""
error_message = error_message.lower()
if "keyerror" in error_message:
return "KeyError"
elif "indexerror" in error_message:
return "IndexError"
elif "typeerror" in error_message:
return "TypeError"
elif "valueerror" in error_message:
return "ValueError"
elif "attributeerror" in error_message:
return "AttributeError"
else:
return "UnknownError"
@staticmethod
def _get_most_common_error(error_patterns: list[dict]) -> str:
"""Find the most common error type."""
if not error_patterns:
return "UnknownError"
error_counts = {}
for pattern in error_patterns:
error_type = pattern["error_type"]
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return max(error_counts, key=error_counts.get)
@staticmethod
def _compute_error_handling_score(results: list[dict]) -> float:
"""Compute error handling effectiveness score."""
if not results:
return 0.0
passed = sum(1 for r in results if r["passed"])
return passed / len(results)Using Your Custom Engine
from core.auto_dev.custom_error_engine import CustomErrorLearningEngine
# Initialize
engine = CustomErrorLearningEngine(db)
# Use the lifecycle
analysis = await engine.analyze_episode("episode-123")
code = await engine.propose_code_change(analysis)
validation = await engine.validate_change(
code=code,
test_inputs=[{"input": "test"}],
tenant_id="tenant-456",
)
if validation["passed"]:
print("Code passed validation!")
else:
print("Code failed validation")---
Event Subscription Patterns
Registering Event Handlers
Use the global event_bus singleton to subscribe to Auto-Dev events:
from core.auto_dev.event_hooks import event_bus, TaskEvent, SkillExecutionEvent
@event_bus.on_task_fail
async def handle_task_failure(event: TaskEvent):
"""Handle task failure events."""
logger.info(f"Task failed: {event.task_description}")
logger.info(f"Error: {event.error_trace}")
# Trigger custom logic
await analyze_failure_pattern(event)
@event_bus.on_task_success
async def handle_task_success(event: TaskEvent):
"""Handle task success events."""
logger.info(f"Task succeeded: {event.task_description}")
# Trigger custom logic
await optimize_for_success(event)
@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
"""Handle skill execution events."""
logger.info(f"Skill {event.skill_name} executed in {event.execution_seconds}s")
# Trigger custom logic
await track_skill_performance(event)Advanced Pattern: Aggregated Event Processing
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Any
class EventAggregator:
"""
Aggregate events over time windows for batch processing.
"""
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self._events: dict[str, list[TaskEvent]] = defaultdict(list)
async def process_failure(self, event: TaskEvent) -> None:
"""Aggregate failure events."""
agent_id = event.agent_id
self._events[agent_id].append(event)
# Process if window full
if self._should_process_window(agent_id):
await self._analyze_failure_window(agent_id)
def _should_process_window(self, agent_id: str) -> bool:
"""Check if time window is full."""
events = self._events[agent_id]
if len(events) < 5:
return False
# Check if oldest event is outside window
oldest = min(e.metadata.get("timestamp", datetime.now(timezone.utc)) for e in events)
window_start = datetime.now(timezone.utc) - timedelta(seconds=self.window_seconds)
return oldest < window_start
async def _analyze_failure_window(self, agent_id: str) -> None:
"""Analyze aggregated failure window."""
events = self._events[agent_id]
# Extract patterns
error_types = defaultdict(int)
for event in events:
if event.error_trace:
error_type = self._classify_error(event.error_trace)
error_types[error_type] += 1
# Report insights
logger.info(f"Agent {agent_id} failure analysis:")
for error_type, count in error_types.most_common():
logger.info(f" {error_type}: {count} occurrences")
# Clear processed events
self._events[agent_id].clear()
@staticmethod
def _classify_error(error_trace: str) -> str:
"""Classify error from trace."""
error_trace = error_trace.lower()
if "keyerror" in error_trace:
return "KeyError"
elif "timeout" in error_trace:
return "Timeout"
else:
return "Other"
# Usage
aggregator = EventAggregator(window_seconds=300)
event_bus.on_task_fail(aggregator.process_failure)Error Isolation in Handlers
Always wrap handler logic in try-except to prevent cascade failures:
@event_bus.on_task_fail
async def safe_failure_handler(event: TaskEvent) -> None:
"""Safe handler with error isolation."""
try:
# Your logic here
await process_failure(event)
except Exception as e:
logger.error(f"Handler failed for task {event.episode_id}: {e}", exc_info=True)
# Event bus continues processing other handlers---
Database Models and Persistence
Using Auto-Dev Models
ToolMutation
from core.auto_dev.models import ToolMutation
from sqlalchemy.orm import Session
def create_mutation(
db: Session,
tenant_id: str,
tool_name: str,
mutated_code: str,
parent_tool_id: str = None,
) -> ToolMutation:
"""Create a new tool mutation record."""
mutation = ToolMutation(
tenant_id=tenant_id,
parent_tool_id=parent_tool_id,
tool_name=tool_name,
mutated_code=mutated_code,
sandbox_status="pending",
)
db.add(mutation)
db.commit()
db.refresh(mutation)
return mutation
def get_mutation_history(
db: Session,
tenant_id: str,
tool_name: str,
limit: int = 10,
) -> list[ToolMutation]:
"""Get mutation history for a tool."""
return (
db.query(ToolMutation)
.filter(
ToolMutation.tenant_id == tenant_id,
ToolMutation.tool_name == tool_name,
)
.order_by(ToolMutation.created_at.desc())
.limit(limit)
.all()
)
def get_mutation_lineage(
db: Session,
mutation_id: str,
) -> list[ToolMutation]:
"""Trace mutation lineage backwards."""
lineage = []
current = db.query(ToolMutation).filter(ToolMutation.id == mutation_id).first()
while current and current.parent_tool_id:
lineage.append(current)
current = (
db.query(ToolMutation)
.filter(ToolMutation.id == current.parent_tool_id)
.first()
)
return lineageWorkflowVariant
from core.auto_dev.models import WorkflowVariant
def create_variant(
db: Session,
tenant_id: str,
agent_id: str,
workflow_definition: dict,
parent_variant_id: str = None,
) -> WorkflowVariant:
"""Create a new workflow variant."""
variant = WorkflowVariant(
tenant_id=tenant_id,
parent_variant_id=parent_variant_id,
agent_id=agent_id,
workflow_definition=workflow_definition,
evaluation_status="pending",
)
db.add(variant)
db.commit()
db.refresh(variant)
return variant
def update_fitness_score(
db: Session,
variant_id: str,
fitness_score: float,
fitness_signals: dict,
) -> None:
"""Update variant fitness score."""
variant = (
db.query(WorkflowVariant)
.filter(WorkflowVariant.id == variant_id)
.first()
)
if variant:
variant.fitness_score = fitness_score
variant.fitness_signals = fitness_signals
variant.evaluation_status = "evaluated"
variant.last_evaluated_at = datetime.now(timezone.utc)
db.commit()SkillCandidate
from core.auto_dev.models import SkillCandidate
def get_pending_candidates(
db: Session,
tenant_id: str,
) -> list[SkillCandidate]:
"""Get all pending skill candidates."""
return (
db.query(SkillCandidate)
.filter(
SkillCandidate.tenant_id == tenant_id,
SkillCandidate.validation_status == "pending",
)
.order_by(SkillCandidate.created_at.desc())
.all()
)
def promote_candidate(
db: Session,
candidate_id: str,
) -> bool:
"""Promote a validated candidate."""
candidate = (
db.query(SkillCandidate)
.filter(
SkillCandidate.id == candidate_id,
SkillCandidate.validation_status == "validated",
)
.first()
)
if candidate:
candidate.validation_status = "promoted"
candidate.promoted_at = datetime.now(timezone.utc)
db.commit()
return True
return FalseIndex Optimization
Auto-Dev models include indexes for common queries:
# ToolMutation indexes
Index("ix_tool_mutations_tenant_id", ToolMutation.tenant_id)
Index("ix_tool_mutations_parent_tool_id", ToolMutation.parent_tool_id)
# WorkflowVariant indexes
Index("ix_workflow_variants_tenant_id", WorkflowVariant.tenant_id)
Index("ix_workflow_variants_agent_id", WorkflowVariant.agent_id)
# SkillCandidate indexes
Index("ix_skill_candidates_tenant_status", SkillCandidate.tenant_id, SkillCandidate.validation_status)For custom queries, create additional indexes:
from sqlalchemy import Index
# Composite index for tenant + status queries
Index("ix_mutations_tenant_status", ToolMutation.tenant_id, ToolMutation.sandbox_status)
# Index for time-based queries
Index("ix_variants_created_at", WorkflowVariant.created_at.desc())---
Fitness Evaluation
Custom Fitness Functions
Extend FitnessService for domain-specific fitness evaluation:
from core.auto_dev.fitness_service import FitnessService
from typing import Any
class DomainFitnessService(FitnessService):
"""
Custom fitness service for domain-specific evaluation.
Adds domain-specific signals:
- Business value metrics
- User satisfaction scores
- Regulatory compliance checks
"""
def evaluate_business_value(
self,
variant_id: str,
tenant_id: str,
business_metrics: dict[str, Any],
) -> float:
"""
Evaluate business value of a variant.
Args:
variant_id: Variant identifier
tenant_id: Tenant identifier
business_metrics: {
"revenue_impact": float,
"cost_savings": float,
"time_savings_hours": float,
}
Returns:
float: Business value score (0.0 to 1.0)
"""
variant = (
self.db.query(WorkflowVariant)
.filter(
WorkflowVariant.id == variant_id,
WorkflowVariant.tenant_id == tenant_id,
)
.first()
)
if not variant:
return 0.0
# Calculate business value score
score = 0.0
revenue_impact = business_metrics.get("revenue_impact", 0)
if revenue_impact > 0:
score += min(0.5, revenue_impact / 10000) # Max 0.5 for $10K
cost_savings = business_metrics.get("cost_savings", 0)
if cost_savings > 0:
score += min(0.3, cost_savings / 5000) # Max 0.3 for $5K
time_savings = business_metrics.get("time_savings_hours", 0)
if time_savings > 0:
score += min(0.2, time_savings / 100) # Max 0.2 for 100 hours
final_score = max(0.0, min(1.0, score))
# Update variant with business value
signals = variant.fitness_signals or {}
signals["business_value"] = {
"score": final_score,
"metrics": business_metrics,
}
variant.fitness_signals = signals
# Adjust overall fitness score
current_score = variant.fitness_score or 0.0
combined_score = (current_score + final_score) / 2
variant.fitness_score = max(0.0, min(1.0, combined_score))
self.db.commit()
return final_score
def evaluate_user_satisfaction(
self,
variant_id: str,
tenant_id: str,
feedback_scores: list[float],
) -> float:
"""
Evaluate user satisfaction for a variant.
Args:
variant_id: Variant identifier
tenant_id: Tenant identifier
feedback_scores: List of user feedback scores (-1.0 to 1.0)
Returns:
float: Satisfaction score (0.0 to 1.0)
"""
if not feedback_scores:
return 0.5 # Neutral default
# Calculate average satisfaction
avg_score = sum(feedback_scores) / len(feedback_scores)
# Convert from [-1, 1] to [0, 1]
normalized_score = (avg_score + 1) / 2
variant = (
self.db.query(WorkflowVariant)
.filter(
WorkflowVariant.id == variant_id,
WorkflowVariant.tenant_id == tenant_id,
)
.first()
)
if variant:
signals = variant.fitness_signals or {}
signals["user_satisfaction"] = {
"score": normalized_score,
"feedback_count": len(feedback_scores),
}
variant.fitness_signals = signals
self.db.commit()
return normalized_scoreMulti-Objective Fitness
Combine multiple fitness objectives:
class MultiObjectiveFitness:
"""
Combine multiple fitness objectives with weights.
"""
def __init__(
self,
weights: dict[str, float] = None,
):
"""
Args:
weights: {
"performance": 0.3,
"accuracy": 0.4,
"cost": 0.2,
"satisfaction": 0.1,
}
"""
self.weights = weights or {
"performance": 0.3,
"accuracy": 0.4,
"cost": 0.2,
"satisfaction": 0.1,
}
def calculate_fitness(
self,
scores: dict[str, float],
) -> float:
"""
Calculate weighted fitness score.
Args:
scores: {
"performance": 0.8,
"accuracy": 0.9,
"cost": 0.7,
"satisfaction": 0.6,
}
Returns:
float: Combined fitness score
"""
total_weight = sum(self.weights.values())
weighted_score = 0.0
for objective, weight in self.weights.items():
score = scores.get(objective, 0.0)
weighted_score += score * weight
return weighted_score / total_weight
# Usage
fitness = MultiObjectiveFitness(
weights={
"performance": 0.4,
"accuracy": 0.4,
"cost": 0.2,
}
)
score = fitness.calculate_fitness({
"performance": 0.8,
"accuracy": 0.9,
"cost": 0.7,
})
# Result: 0.82---
Sandbox Integration
Implementing SandboxProtocol
Create custom sandbox backends by implementing the SandboxProtocol interface:
from core.auto_dev.base_engine import SandboxProtocol
from typing import Any
class CustomSandbox(SandboxProtocol):
"""
Custom sandbox implementation.
Examples of custom backends:
- Kubernetes-based isolation
- AWS Lambda execution
- Remote server execution
- Virtual machine isolation
"""
async def execute_raw_python(
self,
tenant_id: str,
code: str,
input_params: dict[str, Any],
timeout: int = 60,
safety_level: str = "MEDIUM_RISK",
**kwargs,
) -> dict[str, Any]:
"""
Execute Python code in custom isolated environment.
Returns:
{
"status": "success" | "failed",
"output": str,
"execution_seconds": float,
"execution_id": str,
}
"""
import time
import uuid
start_time = time.monotonic()
execution_id = str(uuid.uuid4())
try:
# Custom execution logic
result = await self._execute_in_custom_env(
code=code,
input_params=input_params,
timeout=timeout,
tenant_id=tenant_id,
)
elapsed = time.monotonic() - start_time
return {
"status": "success" if result["success"] else "failed",
"output": result["output"],
"execution_seconds": round(elapsed, 3),
"execution_id": execution_id,
}
except Exception as e:
elapsed = time.monotonic() - start_time
return {
"status": "failed",
"output": str(e),
"execution_seconds": round(elapsed, 3),
"execution_id": execution_id,
}
async def _execute_in_custom_env(
self,
code: str,
input_params: dict[str, Any],
timeout: int,
tenant_id: str,
) -> dict[str, Any]:
"""
Implement custom execution logic.
Example: Kubernetes Job execution
"""
# Your custom implementation here
passResource Limits
Configure sandbox resource limits:
from core.auto_dev.container_sandbox import ContainerSandbox
# Custom resource limits
sandbox = ContainerSandbox(
docker_image="python:3.11-slim",
timeout=120, # 2 minutes
memory_limit="512m", # 512MB
enable_network=False, # No network access
)
# Execute with custom limits
result = await sandbox.execute_raw_python(
tenant_id="tenant-123",
code=code,
input_params={},
timeout=120, # Override default
)Security Considerations
When implementing custom sandboxes:
- **Network Isolation**
- **Filesystem Isolation**
# Tmpfs for /tmp only
cmd.extend(["--tmpfs", "/tmp:rw,noexec,nosuid,size=64m"])
```
- **Resource Limits**
# CPU limit
cmd.extend(["--cpus=1"])
```
- **Capability Dropping**
---
Capability Gate Integration
Checking Capabilities
from core.auto_dev.capability_gate import AutoDevCapabilityService
gate = AutoDevCapabilityService(db)
# Check if agent can use Memento-Skills
can_use_memento = gate.can_use(
agent_id="agent-123",
capability="auto_dev.memento_skills",
workspace_settings=workspace_config,
)
# Check if agent can use AlphaEvolver
can_use_evolver = gate.can_use(
agent_id="agent-123",
capability="auto_dev.alpha_evolver",
workspace_settings=workspace_config,
)Recording Usage
Track Auto-Dev usage for graduation tracking:
# Record successful usage
gate.record_usage(
agent_id="agent-123",
capability="auto_dev.memento_skills",
success=True,
)
# Record failed usage
gate.record_usage(
agent_id="agent-123",
capability="auto_dev.alpha_evolver",
success=False,
)Checking Daily Limits
# Check before generating mutation
if gate.check_daily_limits(
agent_id="agent-123",
capability="auto_dev.alpha_evolver",
workspace_settings=workspace_config,
):
# Proceed with mutation
await generate_mutation(...)
else:
# Daily limit exceeded
logger.warning("Daily mutation limit exceeded")Custom Capability Gates
Create custom capability gates:
from core.auto_dev.capability_gate import AutoDevCapabilityService, is_at_least
class CustomCapabilityService(AutoDevCapabilityService):
"""
Custom capability gate with additional checks.
"""
CAPABILITY_GATES = {
**AutoDevCapabilityService.CAPABILITY_GATES,
"auto_dev.custom_feature": "SUPERVISED",
}
def can_use_custom_feature(
self,
agent_id: str,
custom_condition: dict,
) -> bool:
"""
Check custom capability with additional conditions.
Args:
agent_id: Agent identifier
custom_condition: {
"min_episodes": 10,
"success_rate": 0.8,
}
"""
# Check maturity gate
if not self.can_use(
agent_id=agent_id,
capability="auto_dev.custom_feature",
workspace_settings={},
):
return False
# Check custom conditions
agent_stats = self._get_agent_stats(agent_id)
if agent_stats["episode_count"] < custom_condition.get("min_episodes", 0):
return False
if agent_stats["success_rate"] < custom_condition.get("success_rate", 0.0):
return False
return True
def _get_agent_stats(self, agent_id: str) -> dict:
"""Get agent statistics."""
from core.models import Episode
episodes = (
self.db.query(Episode)
.filter(Episode.agent_id == agent_id)
.all()
)
success_count = sum(1 for e in episodes if e.success)
return {
"episode_count": len(episodes),
"success_rate": success_count / len(episodes) if episodes else 0.0,
}---
Testing Strategies
Unit Tests
Test individual components in isolation:
import pytest
from unittest.mock import Mock, AsyncMock
from core.auto_dev.memento_engine import MementoEngine
def test_analyze_episode():
"""Test episode analysis."""
# Setup
db = Mock()
engine = MementoEngine(db=db)
# Mock episode query
mock_episode = Mock()
mock_episode.id = "episode-123"
mock_episode.task_description = "Test task"
mock_episode.agent_id = "agent-456"
db.query().filter().first.return_value = mock_episode
# Execute
result = await engine.analyze_episode("episode-123")
# Assert
assert result["episode_id"] == "episode-123"
assert result["task_description"] == "Test task"
assert "error_trace" in result
@pytest.mark.asyncio
async def test_propose_code_change():
"""Test code proposal."""
# Setup
db = Mock()
llm = AsyncMock()
llm.generate_completion.return_value = {
"content": "def test_function():\n return 'hello'"
}
engine = MementoEngine(db=db, llm_service=llm)
# Execute
code = await engine.propose_code_change({
"task_description": "Test task",
"error_trace": "Error message",
})
# Assert
assert "def test_function" in code
assert "return 'hello'" in codeIntegration Tests
Test component interactions:
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from core.auto_dev.memento_engine import MementoEngine
from core.models import Base
@pytest.fixture
def db_session():
"""Create test database session."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
@pytest.mark.asyncio
async def test_full_memento_pipeline(db_session):
"""Test complete Memento-Skills pipeline."""
# Create test episode
episode = Episode(
id="episode-123",
agent_id="agent-456",
user_id="tenant-789",
task_description="Extract invoice ID",
success=False,
)
db_session.add(episode)
db_session.commit()
# Run pipeline
engine = MementoEngine(db=db_session)
candidate = await engine.generate_skill_candidate(
tenant_id="tenant-789",
agent_id="agent-456",
episode_id="episode-123",
)
# Assert
assert candidate.skill_name is not None
assert candidate.generated_code is not None
assert candidate.validation_status == "pending"Property-Based Tests
Use Hypothesis for property-based testing:
from hypothesis import given, strategies as st
import pytest
@given(
task_description=st.text(min_size=10, max_size=100),
error_trace=st.text(min_size=0, max_size=500),
)
@pytest.mark.asyncio
async def test_analyze_episode_properties(task_description, error_trace):
"""Test analyze_episode with various inputs."""
db = Mock()
engine = MementoEngine(db=db)
# Mock episode
mock_episode = Mock()
mock_episode.task_description = task_description
mock_episode.error_trace = error_trace
db.query().filter().first.return_value = mock_episode
# Execute
result = await engine.analyze_episode("episode-123")
# Assert properties
assert isinstance(result, dict)
assert "episode_id" in result
assert isinstance(result.get("task_description"), str)Mock Sandbox for Testing
Create mock sandbox for testing:
class MockSandbox:
"""Mock sandbox for testing."""
async def execute_raw_python(
self,
tenant_id: str,
code: str,
input_params: dict,
timeout: int = 60,
safety_level: str = "MEDIUM_RISK",
**kwargs,
) -> dict:
"""Mock execution."""
return {
"status": "success",
"output": "Mock output",
"execution_seconds": 0.1,
}
# Usage in tests
engine = MementoEngine(db=db, sandbox=MockSandbox())
result = await engine.validate_change(
code="print('hello')",
test_inputs=[{}],
tenant_id="tenant-123",
)
assert result["passed"] == True---
Common Patterns
LLM Service Integration
from core.llm_service import get_llm_service
# Get LLM service
llm = get_llm_service()
# Generate completion
response = await llm.generate_completion(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Generate Python code..."},
],
model="auto",
task_type="code",
)
code = response["content"]Markdown Fence Stripping
def strip_markdown_fences(code: str) -> str:
"""Strip markdown code fences from LLM output."""
code = code.strip()
if code.startswith("```python"):
code = code[len("```python"):]
elif code.startswith("```"):
code = code[3:]
if code.endswith("```"):
code = code[:-3]
return code.strip()Error Handling Patterns
import logging
logger = logging.getLogger(__name__)
async def safe_operation():
"""Safe operation with error handling."""
try:
# Attempt operation
result = await risky_operation()
return result
except ValueError as e:
logger.error(f"Validation error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
raiseLogging Patterns
import structlog
logger = structlog.get_logger()
# Structured logging
logger.info(
"mutation_created",
tenant_id=tenant_id,
tool_name=tool_name,
mutation_id=mutation.id,
)
# Error logging with context
logger.error(
"sandbox_execution_failed",
tenant_id=tenant_id,
code=code[:100], # Truncate for logs
error=str(e),
exc_info=True,
)Graceful Degradation
def get_llm_service():
"""Get LLM service with graceful fallback."""
try:
from core.llm_service import get_llm_service as get_llm
return get_llm()
except Exception as e:
logger.warning(f"LLM service unavailable: {e}")
return None
# Usage
llm = get_llm_service()
if llm:
result = await llm.generate_completion(...)
else:
# Fallback behavior
result = fallback_logic()---
See Also
- AUTO_DEV_API_REFERENCE.md - Complete API documentation
- AUTO_DEV_USER_GUIDE.md - End-user guide
- AUTO_DEV_EVENT_PROTOCOL.md - Event protocol
- AUTO_DEV_INTEGRATION_GUIDE.md - Deployment and monitoring
- examples/auto_dev_examples.py - Code examples