Auto-Dev Integration Guide
Complete guide for deploying, configuring, monitoring, and troubleshooting Auto-Dev in production environments.
**Version:** 1.0.0
**Last Updated:** 2026-04-10
---
Table of Contents
- Deployment Checklist
- Configuration
- Monitoring
- Troubleshooting
- Performance Tuning
- Security Considerations
- Scaling Strategies
- Testing Integration
---
Deployment Checklist
Prerequisites
Before deploying Auto-Dev, ensure:
- [ ] **Database**: PostgreSQL 12+ or SQLite 3.35+
- [ ] **Python**: Python 3.11+
- [ ] **Docker** (optional but recommended): Docker 20.10+
- [ ] **LLM Service**: Configured LLM provider (OpenAI, Anthropic, etc.)
- [ ] **Workspace Settings**: Access to workspace configuration
Database Migrations
Run Auto-Dev database migrations:
# Navigate to backend directory
cd backend
# Check current migration status
alembic current
# Run Auto-Dev migrations
alembic upgrade head
# Verify migrations
alembic current
# Should show: XXXXX_create_auto_dev_tables (head)**Expected Tables:**
tool_mutationsworkflow_variantsskill_candidates
**Verification:**
-- PostgreSQL
\dt tool_mutations
\dt workflow_variants
\dt skill_candidates
-- SQLite
.tables tool_mutations
.tables workflow_variants
.tables skill_candidatesDocker Availability (Optional but Recommended)
If using ContainerSandbox:
# Check Docker is installed
docker --version
# Expected: Docker version 20.10.x or higher
# Check Docker is running
docker info
# Should show Docker system information
# Test Docker can run Python
docker run --rm python:3.11-slim python --version
# Expected: Python 3.11.x**If Docker Unavailable:**
Auto-Dev falls back to subprocess isolation (weaker security). For production, Docker is strongly recommended.
LLM Service Configuration
Configure LLM service for code generation:
# In workspace settings or environment variables
LLM_PROVIDER="openai" # or "anthropic", "deepseek", etc.
OPENAI_API_KEY="sk-..."
ANTHROPIC_API_KEY="sk-..."
# Verify LLM service
from core.llm_service import get_llm_service
llm = get_llm_service()
response = await llm.generate_completion(
messages=[{"role": "user", "content": "Hello"}],
model="auto",
)
print(response["content"])Workspace Settings Initialization
Configure Auto-Dev in workspace settings:
# Via API
PUT /api/workspaces/{workspace_id}/configuration
{
"auto_dev": {
"enabled": true,
"memento_skills": true,
"alpha_evolver": true,
"background_evolution": false,
"max_mutations_per_day": 10,
"max_skill_candidates_per_day": 5
}
}
# Or directly in database
UPDATE workspaces
SET configuration = jsonb_set(
configuration,
'{auto_dev,enabled}',
'true'
)
WHERE id = 'workspace-123';Graduation Framework Integration
Ensure graduation framework is enabled:
# Check graduation service
from core.capability_graduation_service import CapabilityGraduationService
service = CapabilityGraduationService(db)
maturity = service.get_maturity("agent-123", "auto_dev.memento_skills")
print(f"Agent maturity: {maturity}")
# Expected: "student", "intern", "supervised", or "autonomous"Verification Checklist
After deployment, verify:
- [ ] Database tables created
- [ ] Docker accessible (if using ContainerSandbox)
- [ ] LLM service responding
- [ ] Workspace settings configured
- [ ] Graduation framework operational
- [ ] Event handlers registered
- [ ] No errors in logs
**Test Command:**
# Quick health check
from core.auto_dev import MementoEngine, AlphaEvolverEngine
from core.auto_dev.capability_gate import AutoDevCapabilityService
memento = MementoEngine(db)
evolver = AlphaEvolverEngine(db)
gate = AutoDevCapabilityService(db)
print("✅ Auto-Dev components initialized successfully")---
Configuration
Environment Variables
Configure Auto-Dev via environment variables:
# Auto-Dev Toggle
AUTO_DEV_ENABLED=true
# Sandbox Configuration
SANDBOX_TYPE=docker # or "subprocess"
DOCKER_IMAGE=python:3.11-slim
SANDBOX_TIMEOUT=60
SANDBOX_MEMORY_LIMIT=256m
# Capability Gates
AUTO_DEV_MEMENTO_SKILLS_ENABLED=true
AUTO_DEV_ALPHA_EVOLVER_ENABLED=true
AUTO_DEV_BACKGROUND_EVOLUTION_ENABLED=false
# Daily Limits
AUTO_DEV_MAX_MUTATIONS_PER_DAY=10
AUTO_DEV_MAX_SKILL_CANDIDATES_PER_DAY=5
# LLM Configuration
LLM_PROVIDER=openai
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-...Workspace Configuration
Configure per-workspace settings:
{
"auto_dev": {
"enabled": true,
"memento_skills": true,
"alpha_evolver": true,
"background_evolution": false,
"max_mutations_per_day": 10,
"max_skill_candidates_per_day": 5,
"sandbox_config": {
"timeout": 60,
"memory_limit": "256m",
"enable_network": false
}
}
}Capability Gates
Configure maturity requirements:
from core.auto_dev.capability_gate import AutoDevCapabilityService
# Default maturity gates
AutoDevCapabilityService.CAPABILITY_GATES = {
"auto_dev.memento_skills": "INTERN",
"auto_dev.alpha_evolver": "SUPERVISED",
"auto_dev.background_evolution": "AUTONOMOUS",
}
# Custom maturity gates (if needed)
AutoDevCapabilityService.CAPABILITY_GATES.update({
"auto_dev.custom_feature": "SUPERVISED",
})Sandbox Resource Limits
Configure sandbox execution limits:
from core.auto_dev.container_sandbox import ContainerSandbox
# Custom sandbox configuration
sandbox = ContainerSandbox(
docker_image="python:3.11-slim",
timeout=120, # seconds
memory_limit="512m", # megabytes
enable_network=False, # security: disable network
)**Resource Limit Guidelines:**
| Resource | Minimum | Recommended | Maximum |
|---|---|---|---|
| Timeout | 30s | 60s | 300s |
| Memory | 128m | 256m | 1g |
| CPUs | 0.5 | 1 | 4 |
---
Monitoring
Key Metrics
Track these metrics for Auto-Dev health:
Mutation Metrics
# Mutation rate (mutations/hour)
mutation_rate = (
db.query(ToolMutation)
.filter(ToolMutation.created_at > hours_ago(1))
.count()
)
# Mutation success rate
total_mutations = db.query(ToolMutation).count()
passed_mutations = (
db.query(ToolMutation)
.filter(ToolMutation.sandbox_status == "passed")
.count()
)
success_rate = passed_mutations / total_mutations if total_mutations > 0 else 0
# Average mutation latency
from sqlalchemy import func
avg_latency = (
db.query(
func.avg(
ToolMutation.execution_error # Contains latency in metadata
)
)
.scalar()
)Fitness Score Trends
# Average fitness score
from sqlalchemy import func
avg_fitness = (
db.query(func.avg(WorkflowVariant.fitness_score))
.filter(WorkflowVariant.fitness_score.isnot(None))
.scalar()
)
# Top fitness score
top_fitness = (
db.query(func.max(WorkflowVariant.fitness_score))
.scalar()
)
# Fitness score distribution
import pandas as pd
fitness_scores = [
v.fitness_score
for v in db.query(WorkflowVariant.fitness_score)
.filter(WorkflowVariant.fitness_score.isnot(None))
.all()
]
distribution = pd.Series(fitness_scores).describe()
# count, mean, std, min, 25%, 50%, 75%, maxSkill Candidate Metrics
# Candidate generation rate
candidates_per_day = (
db.query(SkillCandidate)
.filter(SkillCandidate.created_at > days_ago(1))
.count()
)
# Candidate validation rate
total_candidates = db.query(SkillCandidate).count()
validated_candidates = (
db.query(SkillCandidate)
.filter(SkillCandidate.validation_status == "validated")
.count()
)
validation_rate = validated_candidates / total_candidates if total_candidates > 0 else 0
# Candidate promotion rate
promoted_candidates = (
db.query(SkillCandidate)
.filter(SkillCandidate.validation_status == "promoted")
.count()
)
promotion_rate = promoted_candidates / total_candidates if total_candidates > 0 else 0Logging
Configure structured logging:
import structlog
logger = structlog.get_logger()
# Log mutation creation
logger.info(
"mutation_created",
tenant_id=tenant_id,
tool_name=tool_name,
mutation_id=mutation.id,
parent_tool_id=mutation.parent_tool_id,
)
# Log sandbox execution
logger.info(
"sandbox_execution",
mutation_id=mutation.id,
status=result.get("status"),
execution_seconds=result.get("execution_seconds"),
environment=result.get("environment"),
)
# Log fitness evaluation
logger.info(
"fitness_evaluated",
variant_id=variant_id,
fitness_score=fitness_score,
proxy_signals=proxy_signals,
)**Log Levels:**
INFO: Normal operations (mutations, validations, promotions)WARNING: Degraded performance (high failure rates, low fitness)ERROR: Failures (sandbox errors, LLM failures)DEBUG: Detailed diagnostics (event payloads, handler execution)
Alerts
Configure alerts for critical issues:
# Alert: High mutation failure rate
if success_rate < 0.5:
send_alert(
severity="WARNING",
message=f"High mutation failure rate: {success_rate:.1%}",
metrics={"success_rate": success_rate},
)
# Alert: Sandbox unavailable
if not sandbox.docker_available:
send_alert(
severity="ERROR",
message="Sandbox Docker unavailable",
metrics={"docker_available": False},
)
# Alert: Daily limit exceeded
if not gate.check_daily_limits(agent_id, capability):
send_alert(
severity="INFO",
message=f"Daily limit exceeded for {capability}",
metrics={"agent_id": agent_id, "capability": capability},
)**Alert Channels:**
- Slack
- PagerDuty
- Custom webhooks
Dashboard
Create monitoring dashboard with:
**Mutation Panel:**
- Mutations per hour (line chart)
- Mutation success rate (gauge)
- Average execution time (line chart)
- Top tools by mutation count (bar chart)
**Fitness Panel:**
- Average fitness score (line chart)
- Fitness score distribution (histogram)
- Top variants by fitness (table)
**Candidate Panel:**
- Candidates per day (line chart)
- Validation rate (gauge)
- Promotion rate (gauge)
- Pending candidates (table)
**Example Dashboard (Grafana):**
{
"dashboard": {
"title": "Auto-Dev Monitoring",
"panels": [
{
"title": "Mutation Rate",
"targets": [
{
"expr": "rate(auto_dev_mutations_total[1h])"
}
]
},
{
"title": "Fitness Score",
"targets": [
{
"expr": "avg(auto_dev_fitness_score)"
}
]
}
]
}
}Prometheus Metrics Integration
Expose metrics for Prometheus:
from prometheus_client import Counter, Histogram, Gauge
# Mutation metrics
mutation_counter = Counter(
"auto_dev_mutations_total",
"Total mutations generated",
["tenant_id", "tool_name", "status"]
)
mutation_duration = Histogram(
"auto_dev_mutation_duration_seconds",
"Mutation execution duration",
["tenant_id"]
)
# Fitness metrics
fitness_gauge = Gauge(
"auto_dev_fitness_score",
"Variant fitness score",
["tenant_id", "variant_id"]
)
# Candidate metrics
candidate_counter = Counter(
"auto_dev_candidates_total",
"Total skill candidates generated",
["tenant_id", "validation_status"]
)
# Use in code
mutation_counter.labels(
tenant_id=tenant_id,
tool_name=tool_name,
status="passed"
).inc()
fitness_gauge.labels(
tenant_id=tenant_id,
variant_id=variant_id
).set(fitness_score)---
Troubleshooting
Common Issues and Solutions
Issue: Sandbox Unavailable
**Symptoms:**
- All mutations fail with "Sandbox unavailable"
ContainerSandbox.docker_availablereturnsFalse- Error: "docker: command not found"
**Diagnosis:**
# Check Docker
docker info
# Expected: Docker system information
# Actual: Error connecting to Docker daemon
# Check Docker daemon
sudo systemctl status docker
# or macOS: open Docker Desktop**Solutions:**
- **Install Docker**
# macOS
# Download Docker Desktop from docker.com
```
- **Start Docker Daemon**
# macOS
# Open Docker Desktop application
```
- **Verify Docker Access**
- **Configure Auto-Dev to Use Docker**
sandbox = ContainerSandbox()
assert sandbox.docker_available, "Docker not available"
```
**Fallback:**
If Docker cannot be installed, Auto-Dev falls back to subprocess isolation (weaker security).
---
Issue: LLM Service Unavailable
**Symptoms:**
- Skill generation fails with "LLM unavailable"
- Mutations have default/fallback code
- Error: "LLM service unavailable"
**Diagnosis:**
from core.llm_service import get_llm_service
llm = get_llm_service()
print(llm)
# Expected: <LLMService object>
# Actual: None**Solutions:**
- **Configure LLM Provider**
- **Verify API Key**
- **Test LLM Service**
**Fallback:**
Auto-Dev continues with graceful degradation - code generation skipped, only validation runs.
---
Issue: Episode Models Not Available
**Symptoms:**
analyze_episode()returns "Episode models not available"- MementoEngine cannot analyze failures
- Error: "No module named 'core.models'"
**Diagnosis:**
from core.models import Episode
print(Episode)
# Expected: <class 'core.models.Episode'>
# Actual: ImportError**Solutions:**
- **Check Database Models**
- **Run Migrations**
- **Verify Import Path**
**Fallback:**
MementoEngine skips episode analysis and returns error message.
---
Issue: Daily Limits Exceeded
**Symptoms:**
- Error: "Daily limit exceeded"
- No new mutations/candidates generated
- Counter at max value
**Diagnosis:**
from core.auto_dev.capability_gate import AutoDevCapabilityService
gate = AutoDevCapabilityService(db)
can_proceed = gate.check_daily_limits(
agent_id="agent-123",
capability="auto_dev.alpha_evolver",
workspace_settings=settings,
)
print(can_proceed)
# Expected: True
# Actual: False**Solutions:**
- **Wait for Reset**
- Daily limits reset at midnight UTC
- Check current UTC time
- **Increase Limits**
- **Check Current Usage**
today_start = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
)
count = (
db.query(ToolMutation)
.filter(
ToolMutation.tenant_id == tenant_id,
ToolMutation.created_at >= today_start,
)
.count()
)
print(f"Mutations today: {count}")
```
---
Issue: Capability Gates Failing
**Symptoms:**
- Agent cannot use Auto-Dev features
- Error: "Capability not allowed"
can_use()returnsFalse
**Diagnosis:**
from core.auto_dev.capability_gate import AutoDevCapabilityService
gate = AutoDevCapabilityService(db)
can_use = gate.can_use(
agent_id="agent-123",
capability="auto_dev.memento_skills",
workspace_settings=settings,
)
# Debug checks
print(f"Auto-Dev enabled: {settings.get('auto_dev', {}).get('enabled')}")
print(f"Agent maturity: {gate._get_agent_maturity('agent-123', 'auto_dev.memento_skills')}")**Solutions:**
- **Enable Auto-Dev in Workspace**
- **Check Agent Maturity**
- **Graduate Agent**
- Run more episodes
- Reduce interventions
- Improve constitutional compliance
---
Issue: Fitness Scores Not Updating
**Symptoms:**
- Variants stuck at "pending" status
- Fitness scores remain
None - External signals not received
**Diagnosis:**
from core.auto_dev.models import WorkflowVariant
variant = (
db.query(WorkflowVariant)
.filter(WorkflowVariant.id == "variant-123")
.first()
)
print(f"Status: {variant.evaluation_status}")
print(f"Fitness: {variant.fitness_score}")
print(f"Signals: {variant.fitness_signals}")**Solutions:**
- **Trigger Proxy Evaluation**
fitness_service = FitnessService(db)
score = fitness_service.evaluate_initial_proxy(
variant_id="variant-123",
tenant_id="tenant-456",
proxy_signals={
"execution_success": True,
"syntax_error": False,
"execution_latency_ms": 1500,
},
)
```
- **Configure Webhooks**
- **Check FitnessService Logs**
---
Troubleshooting Table
| Symptom | Cause | Solution |
|---|---|---|
| "Sandbox unavailable" | Docker not installed/running | Install/start Docker |
| "LLM unavailable" | LLM service not configured | Set LLM_PROVIDER and API keys |
| "Episode models not available" | Models not imported | Run migrations, verify imports |
| "Daily limit exceeded" | Too many mutations/candidates | Wait for reset or increase limits |
| "Capability not allowed" | Agent maturity too low | Graduate agent to higher maturity |
Fitness scores None | Evaluation not triggered | Run proxy evaluation manually |
| High failure rate | Poor base code or prompts | Review mutation prompts and base code |
| Low fitness scores | Suboptimal mutations | Increase research iterations |
| Candidates not validating | Sandbox errors | Check Docker, increase timeout |
---
Performance Tuning
Sandbox Execution Timeouts
**Issue:** Sandbox executions timing out
**Diagnosis:**
# Check execution times
durations = [
float(r.get("execution_seconds", 0))
for r in mutation_results
]
avg_duration = sum(durations) / len(durations)
print(f"Average execution time: {avg_duration:.2f}s")**Solutions:**
- **Increase Timeout**
- **Optimize Code**
- Use more efficient algorithms
- Reduce I/O operations
- Cache results
- **Profile Code**
def profile_execution(code):
profiler = cProfile.Profile()
profiler.enable()
result = execute_code(code)
profiler.disable()
profiler.print_stats(sort='cumtime')
return result
```
Database Query Optimization
**Issue:** Slow database queries
**Diagnosis:**
# Enable query logging
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# Check query times
# Look for slow queries in logs**Solutions:**
- **Add Indexes**
Index("ix_mutations_tenant_created", ToolMutation.tenant_id, ToolMutation.created_at.desc())
```
- **Use Eager Loading**
variants = (
db.query(WorkflowVariant)
.options(eager_load(WorkflowVariant.parent_variant))
.all()
)
```
- **Batch Operations**
# Use single query
mutations = (
db.query(ToolMutation)
.filter(ToolMutation.id.in_(mutation_ids))
.all()
)
```
Event Handler Batching
**Issue:** Event handlers causing delays
**Diagnosis:**
# Check handler execution times
import time
@event_bus.on_task_fail
async def timed_handler(event: TaskEvent):
start = time.monotonic()
await process_event(event)
duration = time.monotonic() - start
if duration > 1.0:
logger.warning(f"Slow handler: {duration:.2f}s")**Solutions:**
- **Offload Heavy Processing**
- **Use Background Workers**
@event_bus.on_task_fail
async def queue_handler(event: TaskEvent):
process_failure_background.delay(event.episode_id)
```
Fitness Evaluation Caching
**Issue:** Repeated fitness calculations
**Diagnosis:**
# Check for duplicate evaluations
duplicates = (
db.query(FitnessEvaluation.variant_id, func.count(FitnessEvaluation.id))
.group_by(FitnessEvaluation.variant_id)
.having(func.count(FitnessEvaluation.id) > 1)
.all()
)**Solutions:**
- **Cache Fitness Scores**
@lru_cache(maxsize=1000)
def get_cached_fitness(variant_id: str) -> float:
variant = db.query(WorkflowVariant).filter(WorkflowVariant.id == variant_id).first()
return variant.fitness_score or 0.0
```
- **Use Redis Cache**
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_fitness_with_cache(variant_id: str) -> float:
cache_key = f"fitness:{variant_id}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return float(cached)
# Calculate and cache
fitness = calculate_fitness(variant_id)
redis_client.setex(cache_key, 3600, str(fitness)) # 1 hour TTL
return fitness
```
Concurrent Mutation Limits
**Issue:** Too many concurrent mutations
**Diagnosis:**
import asyncio
running_mutations = [
task for task in asyncio.all_tasks()
if "mutation" in str(task)
]
print(f"Running mutations: {len(running_mutations)}")**Solutions:**
- **Limit Concurrency**
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def limited_mutation(code):
async with semaphore:
return await generate_mutation(code)
```
- **Use Queue**
async def mutation_worker():
while True:
code = await mutation_queue.get()
await generate_mutation(code)
mutation_queue.task_done()
```
---
Security Considerations
Sandbox Isolation
**Network Isolation:**
# Disable network access
sandbox = ContainerSandbox(
enable_network=False, # Prevents external calls
)**Filesystem Isolation:**
# Read-only filesystem
docker run --read-only ...
# Tmpfs only
docker run --tmpfs /tmp:rw,noexec,nosuid,size=64m ...**Capability Dropping:**
# Drop Linux capabilities
docker run --cap-drop ALL ...Tenant Isolation
**Always Filter by tenant_id:**
# ✅ Good: tenant_id filter
mutations = (
db.query(ToolMutation)
.filter(ToolMutation.tenant_id == tenant_id)
.all()
)
# ❌ Bad: no tenant filter
mutations = db.query(ToolMutation).all() # SECURITY RISK**Row-Level Security (RLS):**
-- Enable RLS on Auto-Dev tables
ALTER TABLE tool_mutations ENABLE ROW LEVEL SECURITY;
ALTER TABLE workflow_variants ENABLE ROW LEVEL SECURITY;
ALTER TABLE skill_candidates ENABLE ROW LEVEL SECURITY;
-- Create RLS policies
CREATE POLICY tenant_isolation ON tool_mutations
FOR ALL
USING (tenant_id = current_tenant());Maturity Gates
**Enforce Maturity Requirements:**
from core.auto_dev.capability_gate import AutoDevCapabilityService
gate = AutoDevCapabilityService(db)
# Always check before operations
if not gate.can_use(agent_id, capability, workspace_settings):
raise PermissionError("Agent not authorized for this capability")Daily Limits
**Prevent Resource Exhaustion:**
# Check limits before operations
if not gate.check_daily_limits(agent_id, capability, workspace_settings):
raise RateLimitError("Daily limit exceeded")Code Validation
**Syntax Checking:**
import ast
def validate_syntax(code: str) -> bool:
"""Validate Python syntax."""
try:
ast.parse(code)
return True
except SyntaxError:
return False**Security Scanning:**
import re
def scan_for_secrets(code: str) -> list[str]:
"""Scan code for hardcoded secrets."""
secrets = []
# Check for API keys
if re.search(r'(api_key|apikey)\s*=\s*["\'][^"\']+["\']', code):
secrets.append("Hardcoded API key")
# Check for passwords
if re.search(r'password\s*=\s*["\'][^"\']+["\']', code):
secrets.append("Hardcoded password")
return secrets---
Scaling Strategies
Horizontal Scaling
**Multiple Workers:**
# Run multiple worker processes
# gunicorn.conf.py
import multiprocessing
workers = multiprocessing.cpu_count() * 2 + 1
# Each worker has own event bus
# Events not shared across workers**Message Queue for Events:**
# Use Redis pub/sub for cross-worker events
import redis
redis_client = redis.Redis(host='localhost', port=6379)
# Publish event
redis_client.publish('auto_dev:task_fail', event_json)
# Subscribe to events
pubsub = redis_client.pubsub()
pubsub.subscribe('auto_dev:task_fail')
for message in pubsub.listen():
event = json.loads(message['data'])
await handle_event(event)Database Pooling
**Connection Pool Configuration:**
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Max connections
max_overflow=10, # Additional connections
pool_timeout=30, # Wait time for connection
pool_recycle=3600, # Recycle connections after 1 hour
)**Connection Limits:**
# Calculate pool size
# Formula: (num_workers * concurrent_requests_per_worker) + safety_margin
pool_size = (4 * 5) + 10 # 30 connectionsSandbox Pooling
**Container Reuse:**
class SandboxPool:
"""Pool of reusable sandbox containers."""
def __init__(self, size: int = 5):
self.size = size
self.containers = asyncio.Queue(maxsize=size)
async def get_container(self):
"""Get container from pool."""
if self.containers.empty():
return await self._create_container()
return await self.containers.get()
async def return_container(self, container):
"""Return container to pool."""
await self.containers.put(container)
async def _create_container(self):
"""Create new container."""
# Docker container creation logic
passEvent Queue
**Redis-Based Event Bus:**
import redis
import json
class RedisEventBus:
"""Redis-backed event bus for cross-process communication."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
async def emit_task_fail(self, event: TaskEvent):
"""Emit event to Redis."""
await self.redis.publish(
'auto_dev:task_fail',
json.dumps(event.__dict__)
)
def subscribe_task_fail(self, handler):
"""Subscribe to task failure events."""
pubsub = self.redis.pubsub()
pubsub.subscribe('auto_dev:task_fail')
async def listen():
for message in pubsub.listen():
if message['type'] == 'message':
event_dict = json.loads(message['data'])
event = TaskEvent(**event_dict)
await handler(event)
asyncio.create_task(listen())Background Jobs
**Celery Integration:**
from celery import Celery
celery_app = Celery('auto_dev')
@celery_app.task
def generate_mutation_background(tenant_id, tool_name, base_code):
"""Generate mutation in background."""
engine = AlphaEvolverEngine(db)
mutation = await engine.generate_tool_mutation(
tenant_id=tenant_id,
tool_name=tool_name,
base_code=base_code,
mutation_prompt="Optimize for speed",
)
return mutation.id
# Usage
generate_mutation_background.delay("tenant-123", "process_invoice", code)---
Testing Integration
Unit Tests
import pytest
from unittest.mock import Mock, AsyncMock
def test_memento_engine():
"""Test MementoEngine unit."""
db = Mock()
llm = AsyncMock()
engine = MementoEngine(db=db, llm_service=llm)
# Test analyze_episode
analysis = await engine.analyze_episode("episode-123")
assert "episode_id" in analysis
# Test propose_code_change
llm.generate_completion.return_value = {"content": "def test(): pass"}
code = await engine.propose_code_change(analysis)
assert "def test" in codeIntegration Tests
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def db_session():
"""Create test database."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
@pytest.mark.asyncio
async def test_full_pipeline(db_session):
"""Test full Memento pipeline."""
# Create episode
episode = Episode(id="episode-123", success=False, ...)
db_session.add(episode)
db_session.commit()
# Run pipeline
engine = MementoEngine(db=db_session)
candidate = await engine.generate_skill_candidate(
tenant_id="tenant-456",
agent_id="agent-789",
episode_id="episode-123",
)
# Assert
assert candidate.validation_status == "pending"
assert candidate.skill_name is not NoneProperty-Based Tests
from hypothesis import given, strategies as st
import pytest
@given(
task_description=st.text(min_size=10, max_size=100),
error_trace=st.text(min_size=0, max_size=500),
)
@pytest.mark.asyncio
async def test_analyze_episode_properties(task_description, error_trace):
"""Test analyze_episode with various inputs."""
db = Mock()
engine = MementoEngine(db=db)
mock_episode = Mock()
mock_episode.task_description = task_description
mock_episode.error_trace = error_trace
db.query().filter().first.return_value = mock_episode
result = await engine.analyze_episode("episode-123")
assert isinstance(result, dict)
assert "episode_id" in resultE2E Tests
import pytest
@pytest.mark.asyncio
async def test_e2e_memento_workflow():
"""Test complete Memento workflow."""
# Setup
db = get_test_db()
agent = create_test_agent(db, maturity="INTERN")
# Trigger failure
episode = await execute_failing_task(agent.id)
# Wait for ReflectionEngine
await asyncio.sleep(1)
# Check candidate created
candidates = (
db.query(SkillCandidate)
.filter(SkillCandidate.agent_id == agent.id)
.all()
)
assert len(candidates) > 0
# Validate candidate
engine = MementoEngine(db)
result = await engine.validate_candidate(
candidate_id=candidates[0].id,
tenant_id="test-tenant",
)
assert result["passed"] is True
# Promote candidate
promotion = await engine.promote_skill(
candidate_id=candidates[0].id,
tenant_id="test-tenant",
)
assert promotion["success"] is TrueCI/CD Pipeline Integration
# .github/workflows/auto-dev-tests.yml
name: Auto-Dev Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:14
env:
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/test_auto_dev_unit.py -v
- name: Run integration tests
run: pytest tests/test_auto_dev_integration.py -v
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
- name: Run property-based tests
run: pytest tests/test_auto_dev_property.py -v
- name: Upload coverage
uses: codecov/codecov-action@v3---
See Also
- AUTO_DEV_API_REFERENCE.md - Complete API documentation
- AUTO_DEV_USER_GUIDE.md - End-user guide
- AUTO_DEV_DEVELOPER_GUIDE.md - Developer guide
- AUTO_DEV_ARCHITECTURE.md - System architecture