ATOM Documentation

← Back to App

Auto-Dev Integration Guide

Complete guide for deploying, configuring, monitoring, and troubleshooting Auto-Dev in production environments.

**Version:** 1.0.0

**Last Updated:** 2026-04-10

---

Table of Contents

---

Deployment Checklist

Prerequisites

Before deploying Auto-Dev, ensure:

  • [ ] **Database**: PostgreSQL 12+ or SQLite 3.35+
  • [ ] **Python**: Python 3.11+
  • [ ] **Docker** (optional but recommended): Docker 20.10+
  • [ ] **LLM Service**: Configured LLM provider (OpenAI, Anthropic, etc.)
  • [ ] **Workspace Settings**: Access to workspace configuration

Database Migrations

Run Auto-Dev database migrations:

# Navigate to backend directory
cd backend

# Check current migration status
alembic current

# Run Auto-Dev migrations
alembic upgrade head

# Verify migrations
alembic current
# Should show: XXXXX_create_auto_dev_tables (head)

**Expected Tables:**

  • tool_mutations
  • workflow_variants
  • skill_candidates

**Verification:**

-- PostgreSQL
\dt tool_mutations
\dt workflow_variants
\dt skill_candidates

-- SQLite
.tables tool_mutations
.tables workflow_variants
.tables skill_candidates

If using ContainerSandbox:

# Check Docker is installed
docker --version
# Expected: Docker version 20.10.x or higher

# Check Docker is running
docker info
# Should show Docker system information

# Test Docker can run Python
docker run --rm python:3.11-slim python --version
# Expected: Python 3.11.x

**If Docker Unavailable:**

Auto-Dev falls back to subprocess isolation (weaker security). For production, Docker is strongly recommended.

LLM Service Configuration

Configure LLM service for code generation:

# In workspace settings or environment variables
LLM_PROVIDER="openai"  # or "anthropic", "deepseek", etc.
OPENAI_API_KEY="sk-..."
ANTHROPIC_API_KEY="sk-..."

# Verify LLM service
from core.llm_service import get_llm_service

llm = get_llm_service()
response = await llm.generate_completion(
    messages=[{"role": "user", "content": "Hello"}],
    model="auto",
)
print(response["content"])

Workspace Settings Initialization

Configure Auto-Dev in workspace settings:

# Via API
PUT /api/workspaces/{workspace_id}/configuration
{
  "auto_dev": {
    "enabled": true,
    "memento_skills": true,
    "alpha_evolver": true,
    "background_evolution": false,
    "max_mutations_per_day": 10,
    "max_skill_candidates_per_day": 5
  }
}

# Or directly in database
UPDATE workspaces
SET configuration = jsonb_set(
    configuration,
    '{auto_dev,enabled}',
    'true'
)
WHERE id = 'workspace-123';

Graduation Framework Integration

Ensure graduation framework is enabled:

# Check graduation service
from core.capability_graduation_service import CapabilityGraduationService

service = CapabilityGraduationService(db)
maturity = service.get_maturity("agent-123", "auto_dev.memento_skills")
print(f"Agent maturity: {maturity}")
# Expected: "student", "intern", "supervised", or "autonomous"

Verification Checklist

After deployment, verify:

  • [ ] Database tables created
  • [ ] Docker accessible (if using ContainerSandbox)
  • [ ] LLM service responding
  • [ ] Workspace settings configured
  • [ ] Graduation framework operational
  • [ ] Event handlers registered
  • [ ] No errors in logs

**Test Command:**

# Quick health check
from core.auto_dev import MementoEngine, AlphaEvolverEngine
from core.auto_dev.capability_gate import AutoDevCapabilityService

memento = MementoEngine(db)
evolver = AlphaEvolverEngine(db)
gate = AutoDevCapabilityService(db)

print("✅ Auto-Dev components initialized successfully")

---

Configuration

Environment Variables

Configure Auto-Dev via environment variables:

# Auto-Dev Toggle
AUTO_DEV_ENABLED=true

# Sandbox Configuration
SANDBOX_TYPE=docker  # or "subprocess"
DOCKER_IMAGE=python:3.11-slim
SANDBOX_TIMEOUT=60
SANDBOX_MEMORY_LIMIT=256m

# Capability Gates
AUTO_DEV_MEMENTO_SKILLS_ENABLED=true
AUTO_DEV_ALPHA_EVOLVER_ENABLED=true
AUTO_DEV_BACKGROUND_EVOLUTION_ENABLED=false

# Daily Limits
AUTO_DEV_MAX_MUTATIONS_PER_DAY=10
AUTO_DEV_MAX_SKILL_CANDIDATES_PER_DAY=5

# LLM Configuration
LLM_PROVIDER=openai
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-...

Workspace Configuration

Configure per-workspace settings:

{
  "auto_dev": {
    "enabled": true,
    "memento_skills": true,
    "alpha_evolver": true,
    "background_evolution": false,
    "max_mutations_per_day": 10,
    "max_skill_candidates_per_day": 5,
    "sandbox_config": {
      "timeout": 60,
      "memory_limit": "256m",
      "enable_network": false
    }
  }
}

Capability Gates

Configure maturity requirements:

from core.auto_dev.capability_gate import AutoDevCapabilityService

# Default maturity gates
AutoDevCapabilityService.CAPABILITY_GATES = {
    "auto_dev.memento_skills": "INTERN",
    "auto_dev.alpha_evolver": "SUPERVISED",
    "auto_dev.background_evolution": "AUTONOMOUS",
}

# Custom maturity gates (if needed)
AutoDevCapabilityService.CAPABILITY_GATES.update({
    "auto_dev.custom_feature": "SUPERVISED",
})

Sandbox Resource Limits

Configure sandbox execution limits:

from core.auto_dev.container_sandbox import ContainerSandbox

# Custom sandbox configuration
sandbox = ContainerSandbox(
    docker_image="python:3.11-slim",
    timeout=120,  # seconds
    memory_limit="512m",  # megabytes
    enable_network=False,  # security: disable network
)

**Resource Limit Guidelines:**

ResourceMinimumRecommendedMaximum
Timeout30s60s300s
Memory128m256m1g
CPUs0.514

---

Monitoring

Key Metrics

Track these metrics for Auto-Dev health:

Mutation Metrics

# Mutation rate (mutations/hour)
mutation_rate = (
    db.query(ToolMutation)
    .filter(ToolMutation.created_at > hours_ago(1))
    .count()
)

# Mutation success rate
total_mutations = db.query(ToolMutation).count()
passed_mutations = (
    db.query(ToolMutation)
    .filter(ToolMutation.sandbox_status == "passed")
    .count()
)
success_rate = passed_mutations / total_mutations if total_mutations > 0 else 0

# Average mutation latency
from sqlalchemy import func

avg_latency = (
    db.query(
        func.avg(
            ToolMutation.execution_error  # Contains latency in metadata
        )
    )
    .scalar()
)
# Average fitness score
from sqlalchemy import func

avg_fitness = (
    db.query(func.avg(WorkflowVariant.fitness_score))
    .filter(WorkflowVariant.fitness_score.isnot(None))
    .scalar()
)

# Top fitness score
top_fitness = (
    db.query(func.max(WorkflowVariant.fitness_score))
    .scalar()
)

# Fitness score distribution
import pandas as pd

fitness_scores = [
    v.fitness_score
    for v in db.query(WorkflowVariant.fitness_score)
    .filter(WorkflowVariant.fitness_score.isnot(None))
    .all()
]

distribution = pd.Series(fitness_scores).describe()
# count, mean, std, min, 25%, 50%, 75%, max

Skill Candidate Metrics

# Candidate generation rate
candidates_per_day = (
    db.query(SkillCandidate)
    .filter(SkillCandidate.created_at > days_ago(1))
    .count()
)

# Candidate validation rate
total_candidates = db.query(SkillCandidate).count()
validated_candidates = (
    db.query(SkillCandidate)
    .filter(SkillCandidate.validation_status == "validated")
    .count()
)
validation_rate = validated_candidates / total_candidates if total_candidates > 0 else 0

# Candidate promotion rate
promoted_candidates = (
    db.query(SkillCandidate)
    .filter(SkillCandidate.validation_status == "promoted")
    .count()
)
promotion_rate = promoted_candidates / total_candidates if total_candidates > 0 else 0

Logging

Configure structured logging:

import structlog

logger = structlog.get_logger()

# Log mutation creation
logger.info(
    "mutation_created",
    tenant_id=tenant_id,
    tool_name=tool_name,
    mutation_id=mutation.id,
    parent_tool_id=mutation.parent_tool_id,
)

# Log sandbox execution
logger.info(
    "sandbox_execution",
    mutation_id=mutation.id,
    status=result.get("status"),
    execution_seconds=result.get("execution_seconds"),
    environment=result.get("environment"),
)

# Log fitness evaluation
logger.info(
    "fitness_evaluated",
    variant_id=variant_id,
    fitness_score=fitness_score,
    proxy_signals=proxy_signals,
)

**Log Levels:**

  • INFO: Normal operations (mutations, validations, promotions)
  • WARNING: Degraded performance (high failure rates, low fitness)
  • ERROR: Failures (sandbox errors, LLM failures)
  • DEBUG: Detailed diagnostics (event payloads, handler execution)

Alerts

Configure alerts for critical issues:

# Alert: High mutation failure rate
if success_rate < 0.5:
    send_alert(
        severity="WARNING",
        message=f"High mutation failure rate: {success_rate:.1%}",
        metrics={"success_rate": success_rate},
    )

# Alert: Sandbox unavailable
if not sandbox.docker_available:
    send_alert(
        severity="ERROR",
        message="Sandbox Docker unavailable",
        metrics={"docker_available": False},
    )

# Alert: Daily limit exceeded
if not gate.check_daily_limits(agent_id, capability):
    send_alert(
        severity="INFO",
        message=f"Daily limit exceeded for {capability}",
        metrics={"agent_id": agent_id, "capability": capability},
    )

**Alert Channels:**

  • Email
  • Slack
  • PagerDuty
  • Custom webhooks

Dashboard

Create monitoring dashboard with:

**Mutation Panel:**

  • Mutations per hour (line chart)
  • Mutation success rate (gauge)
  • Average execution time (line chart)
  • Top tools by mutation count (bar chart)

**Fitness Panel:**

  • Average fitness score (line chart)
  • Fitness score distribution (histogram)
  • Top variants by fitness (table)

**Candidate Panel:**

  • Candidates per day (line chart)
  • Validation rate (gauge)
  • Promotion rate (gauge)
  • Pending candidates (table)

**Example Dashboard (Grafana):**

{
  "dashboard": {
    "title": "Auto-Dev Monitoring",
    "panels": [
      {
        "title": "Mutation Rate",
        "targets": [
          {
            "expr": "rate(auto_dev_mutations_total[1h])"
          }
        ]
      },
      {
        "title": "Fitness Score",
        "targets": [
          {
            "expr": "avg(auto_dev_fitness_score)"
          }
        ]
      }
    ]
  }
}

Prometheus Metrics Integration

Expose metrics for Prometheus:

from prometheus_client import Counter, Histogram, Gauge

# Mutation metrics
mutation_counter = Counter(
    "auto_dev_mutations_total",
    "Total mutations generated",
    ["tenant_id", "tool_name", "status"]
)

mutation_duration = Histogram(
    "auto_dev_mutation_duration_seconds",
    "Mutation execution duration",
    ["tenant_id"]
)

# Fitness metrics
fitness_gauge = Gauge(
    "auto_dev_fitness_score",
    "Variant fitness score",
    ["tenant_id", "variant_id"]
)

# Candidate metrics
candidate_counter = Counter(
    "auto_dev_candidates_total",
    "Total skill candidates generated",
    ["tenant_id", "validation_status"]
)

# Use in code
mutation_counter.labels(
    tenant_id=tenant_id,
    tool_name=tool_name,
    status="passed"
).inc()

fitness_gauge.labels(
    tenant_id=tenant_id,
    variant_id=variant_id
).set(fitness_score)

---

Troubleshooting

Common Issues and Solutions

Issue: Sandbox Unavailable

**Symptoms:**

  • All mutations fail with "Sandbox unavailable"
  • ContainerSandbox.docker_available returns False
  • Error: "docker: command not found"

**Diagnosis:**

# Check Docker
docker info
# Expected: Docker system information
# Actual: Error connecting to Docker daemon

# Check Docker daemon
sudo systemctl status docker
# or macOS: open Docker Desktop

**Solutions:**

  1. **Install Docker**

# macOS

# Download Docker Desktop from docker.com

```

  1. **Start Docker Daemon**

# macOS

# Open Docker Desktop application

```

  1. **Verify Docker Access**
  1. **Configure Auto-Dev to Use Docker**

sandbox = ContainerSandbox()

assert sandbox.docker_available, "Docker not available"

```

**Fallback:**

If Docker cannot be installed, Auto-Dev falls back to subprocess isolation (weaker security).

---

Issue: LLM Service Unavailable

**Symptoms:**

  • Skill generation fails with "LLM unavailable"
  • Mutations have default/fallback code
  • Error: "LLM service unavailable"

**Diagnosis:**

from core.llm_service import get_llm_service

llm = get_llm_service()
print(llm)
# Expected: <LLMService object>
# Actual: None

**Solutions:**

  1. **Configure LLM Provider**
  1. **Verify API Key**
  1. **Test LLM Service**

**Fallback:**

Auto-Dev continues with graceful degradation - code generation skipped, only validation runs.

---

Issue: Episode Models Not Available

**Symptoms:**

  • analyze_episode() returns "Episode models not available"
  • MementoEngine cannot analyze failures
  • Error: "No module named 'core.models'"

**Diagnosis:**

from core.models import Episode
print(Episode)
# Expected: <class 'core.models.Episode'>
# Actual: ImportError

**Solutions:**

  1. **Check Database Models**
  1. **Run Migrations**
  1. **Verify Import Path**

**Fallback:**

MementoEngine skips episode analysis and returns error message.

---

Issue: Daily Limits Exceeded

**Symptoms:**

  • Error: "Daily limit exceeded"
  • No new mutations/candidates generated
  • Counter at max value

**Diagnosis:**

from core.auto_dev.capability_gate import AutoDevCapabilityService

gate = AutoDevCapabilityService(db)
can_proceed = gate.check_daily_limits(
    agent_id="agent-123",
    capability="auto_dev.alpha_evolver",
    workspace_settings=settings,
)
print(can_proceed)
# Expected: True
# Actual: False

**Solutions:**

  1. **Wait for Reset**
  • Daily limits reset at midnight UTC
  • Check current UTC time
  1. **Increase Limits**
  1. **Check Current Usage**

today_start = datetime.now(timezone.utc).replace(

hour=0, minute=0, second=0, microsecond=0

)

count = (

db.query(ToolMutation)

.filter(

ToolMutation.tenant_id == tenant_id,

ToolMutation.created_at >= today_start,

)

.count()

)

print(f"Mutations today: {count}")

```

---

Issue: Capability Gates Failing

**Symptoms:**

  • Agent cannot use Auto-Dev features
  • Error: "Capability not allowed"
  • can_use() returns False

**Diagnosis:**

from core.auto_dev.capability_gate import AutoDevCapabilityService

gate = AutoDevCapabilityService(db)
can_use = gate.can_use(
    agent_id="agent-123",
    capability="auto_dev.memento_skills",
    workspace_settings=settings,
)

# Debug checks
print(f"Auto-Dev enabled: {settings.get('auto_dev', {}).get('enabled')}")
print(f"Agent maturity: {gate._get_agent_maturity('agent-123', 'auto_dev.memento_skills')}")

**Solutions:**

  1. **Enable Auto-Dev in Workspace**
  1. **Check Agent Maturity**
  1. **Graduate Agent**
  • Run more episodes
  • Reduce interventions
  • Improve constitutional compliance

---

Issue: Fitness Scores Not Updating

**Symptoms:**

  • Variants stuck at "pending" status
  • Fitness scores remain None
  • External signals not received

**Diagnosis:**

from core.auto_dev.models import WorkflowVariant

variant = (
    db.query(WorkflowVariant)
    .filter(WorkflowVariant.id == "variant-123")
    .first()
)

print(f"Status: {variant.evaluation_status}")
print(f"Fitness: {variant.fitness_score}")
print(f"Signals: {variant.fitness_signals}")

**Solutions:**

  1. **Trigger Proxy Evaluation**

fitness_service = FitnessService(db)

score = fitness_service.evaluate_initial_proxy(

variant_id="variant-123",

tenant_id="tenant-456",

proxy_signals={

"execution_success": True,

"syntax_error": False,

"execution_latency_ms": 1500,

},

)

```

  1. **Configure Webhooks**
  1. **Check FitnessService Logs**

---

Troubleshooting Table

SymptomCauseSolution
"Sandbox unavailable"Docker not installed/runningInstall/start Docker
"LLM unavailable"LLM service not configuredSet LLM_PROVIDER and API keys
"Episode models not available"Models not importedRun migrations, verify imports
"Daily limit exceeded"Too many mutations/candidatesWait for reset or increase limits
"Capability not allowed"Agent maturity too lowGraduate agent to higher maturity
Fitness scores NoneEvaluation not triggeredRun proxy evaluation manually
High failure ratePoor base code or promptsReview mutation prompts and base code
Low fitness scoresSuboptimal mutationsIncrease research iterations
Candidates not validatingSandbox errorsCheck Docker, increase timeout

---

Performance Tuning

Sandbox Execution Timeouts

**Issue:** Sandbox executions timing out

**Diagnosis:**

# Check execution times
durations = [
    float(r.get("execution_seconds", 0))
    for r in mutation_results
]
avg_duration = sum(durations) / len(durations)
print(f"Average execution time: {avg_duration:.2f}s")

**Solutions:**

  1. **Increase Timeout**
  1. **Optimize Code**
  • Use more efficient algorithms
  • Reduce I/O operations
  • Cache results
  1. **Profile Code**

def profile_execution(code):

profiler = cProfile.Profile()

profiler.enable()

result = execute_code(code)

profiler.disable()

profiler.print_stats(sort='cumtime')

return result

```

Database Query Optimization

**Issue:** Slow database queries

**Diagnosis:**

# Enable query logging
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

# Check query times
# Look for slow queries in logs

**Solutions:**

  1. **Add Indexes**

Index("ix_mutations_tenant_created", ToolMutation.tenant_id, ToolMutation.created_at.desc())

```

  1. **Use Eager Loading**

variants = (

db.query(WorkflowVariant)

.options(eager_load(WorkflowVariant.parent_variant))

.all()

)

```

  1. **Batch Operations**

# Use single query

mutations = (

db.query(ToolMutation)

.filter(ToolMutation.id.in_(mutation_ids))

.all()

)

```

Event Handler Batching

**Issue:** Event handlers causing delays

**Diagnosis:**

# Check handler execution times
import time

@event_bus.on_task_fail
async def timed_handler(event: TaskEvent):
    start = time.monotonic()
    await process_event(event)
    duration = time.monotonic() - start
    if duration > 1.0:
        logger.warning(f"Slow handler: {duration:.2f}s")

**Solutions:**

  1. **Offload Heavy Processing**
  1. **Use Background Workers**

@event_bus.on_task_fail

async def queue_handler(event: TaskEvent):

process_failure_background.delay(event.episode_id)

```

Fitness Evaluation Caching

**Issue:** Repeated fitness calculations

**Diagnosis:**

# Check for duplicate evaluations
duplicates = (
    db.query(FitnessEvaluation.variant_id, func.count(FitnessEvaluation.id))
    .group_by(FitnessEvaluation.variant_id)
    .having(func.count(FitnessEvaluation.id) > 1)
    .all()
)

**Solutions:**

  1. **Cache Fitness Scores**

@lru_cache(maxsize=1000)

def get_cached_fitness(variant_id: str) -> float:

variant = db.query(WorkflowVariant).filter(WorkflowVariant.id == variant_id).first()

return variant.fitness_score or 0.0

```

  1. **Use Redis Cache**

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_fitness_with_cache(variant_id: str) -> float:

cache_key = f"fitness:{variant_id}"

# Check cache

cached = redis_client.get(cache_key)

if cached:

return float(cached)

# Calculate and cache

fitness = calculate_fitness(variant_id)

redis_client.setex(cache_key, 3600, str(fitness)) # 1 hour TTL

return fitness

```

Concurrent Mutation Limits

**Issue:** Too many concurrent mutations

**Diagnosis:**

import asyncio

running_mutations = [
    task for task in asyncio.all_tasks()
    if "mutation" in str(task)
]
print(f"Running mutations: {len(running_mutations)}")

**Solutions:**

  1. **Limit Concurrency**

semaphore = asyncio.Semaphore(5) # Max 5 concurrent

async def limited_mutation(code):

async with semaphore:

return await generate_mutation(code)

```

  1. **Use Queue**

async def mutation_worker():

while True:

code = await mutation_queue.get()

await generate_mutation(code)

mutation_queue.task_done()

```

---

Security Considerations

Sandbox Isolation

**Network Isolation:**

# Disable network access
sandbox = ContainerSandbox(
    enable_network=False,  # Prevents external calls
)

**Filesystem Isolation:**

# Read-only filesystem
docker run --read-only ...

# Tmpfs only
docker run --tmpfs /tmp:rw,noexec,nosuid,size=64m ...

**Capability Dropping:**

# Drop Linux capabilities
docker run --cap-drop ALL ...

Tenant Isolation

**Always Filter by tenant_id:**

# ✅ Good: tenant_id filter
mutations = (
    db.query(ToolMutation)
    .filter(ToolMutation.tenant_id == tenant_id)
    .all()
)

# ❌ Bad: no tenant filter
mutations = db.query(ToolMutation).all()  # SECURITY RISK

**Row-Level Security (RLS):**

-- Enable RLS on Auto-Dev tables
ALTER TABLE tool_mutations ENABLE ROW LEVEL SECURITY;
ALTER TABLE workflow_variants ENABLE ROW LEVEL SECURITY;
ALTER TABLE skill_candidates ENABLE ROW LEVEL SECURITY;

-- Create RLS policies
CREATE POLICY tenant_isolation ON tool_mutations
    FOR ALL
    USING (tenant_id = current_tenant());

Maturity Gates

**Enforce Maturity Requirements:**

from core.auto_dev.capability_gate import AutoDevCapabilityService

gate = AutoDevCapabilityService(db)

# Always check before operations
if not gate.can_use(agent_id, capability, workspace_settings):
    raise PermissionError("Agent not authorized for this capability")

Daily Limits

**Prevent Resource Exhaustion:**

# Check limits before operations
if not gate.check_daily_limits(agent_id, capability, workspace_settings):
    raise RateLimitError("Daily limit exceeded")

Code Validation

**Syntax Checking:**

import ast

def validate_syntax(code: str) -> bool:
    """Validate Python syntax."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False

**Security Scanning:**

import re

def scan_for_secrets(code: str) -> list[str]:
    """Scan code for hardcoded secrets."""
    secrets = []

    # Check for API keys
    if re.search(r'(api_key|apikey)\s*=\s*["\'][^"\']+["\']', code):
        secrets.append("Hardcoded API key")

    # Check for passwords
    if re.search(r'password\s*=\s*["\'][^"\']+["\']', code):
        secrets.append("Hardcoded password")

    return secrets

---

Scaling Strategies

Horizontal Scaling

**Multiple Workers:**

# Run multiple worker processes
# gunicorn.conf.py
import multiprocessing

workers = multiprocessing.cpu_count() * 2 + 1

# Each worker has own event bus
# Events not shared across workers

**Message Queue for Events:**

# Use Redis pub/sub for cross-worker events
import redis

redis_client = redis.Redis(host='localhost', port=6379)

# Publish event
redis_client.publish('auto_dev:task_fail', event_json)

# Subscribe to events
pubsub = redis_client.pubsub()
pubsub.subscribe('auto_dev:task_fail')

for message in pubsub.listen():
    event = json.loads(message['data'])
    await handle_event(event)

Database Pooling

**Connection Pool Configuration:**

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=20,  # Max connections
    max_overflow=10,  # Additional connections
    pool_timeout=30,  # Wait time for connection
    pool_recycle=3600,  # Recycle connections after 1 hour
)

**Connection Limits:**

# Calculate pool size
# Formula: (num_workers * concurrent_requests_per_worker) + safety_margin

pool_size = (4 * 5) + 10  # 30 connections

Sandbox Pooling

**Container Reuse:**

class SandboxPool:
    """Pool of reusable sandbox containers."""

    def __init__(self, size: int = 5):
        self.size = size
        self.containers = asyncio.Queue(maxsize=size)

    async def get_container(self):
        """Get container from pool."""
        if self.containers.empty():
            return await self._create_container()
        return await self.containers.get()

    async def return_container(self, container):
        """Return container to pool."""
        await self.containers.put(container)

    async def _create_container(self):
        """Create new container."""
        # Docker container creation logic
        pass

Event Queue

**Redis-Based Event Bus:**

import redis
import json

class RedisEventBus:
    """Redis-backed event bus for cross-process communication."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)

    async def emit_task_fail(self, event: TaskEvent):
        """Emit event to Redis."""
        await self.redis.publish(
            'auto_dev:task_fail',
            json.dumps(event.__dict__)
        )

    def subscribe_task_fail(self, handler):
        """Subscribe to task failure events."""
        pubsub = self.redis.pubsub()
        pubsub.subscribe('auto_dev:task_fail')

        async def listen():
            for message in pubsub.listen():
                if message['type'] == 'message':
                    event_dict = json.loads(message['data'])
                    event = TaskEvent(**event_dict)
                    await handler(event)

        asyncio.create_task(listen())

Background Jobs

**Celery Integration:**

from celery import Celery

celery_app = Celery('auto_dev')

@celery_app.task
def generate_mutation_background(tenant_id, tool_name, base_code):
    """Generate mutation in background."""
    engine = AlphaEvolverEngine(db)
    mutation = await engine.generate_tool_mutation(
        tenant_id=tenant_id,
        tool_name=tool_name,
        base_code=base_code,
        mutation_prompt="Optimize for speed",
    )
    return mutation.id

# Usage
generate_mutation_background.delay("tenant-123", "process_invoice", code)

---

Testing Integration

Unit Tests

import pytest
from unittest.mock import Mock, AsyncMock

def test_memento_engine():
    """Test MementoEngine unit."""
    db = Mock()
    llm = AsyncMock()
    engine = MementoEngine(db=db, llm_service=llm)

    # Test analyze_episode
    analysis = await engine.analyze_episode("episode-123")
    assert "episode_id" in analysis

    # Test propose_code_change
    llm.generate_completion.return_value = {"content": "def test(): pass"}
    code = await engine.propose_code_change(analysis)
    assert "def test" in code

Integration Tests

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

@pytest.fixture
def db_session():
    """Create test database."""
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()

@pytest.mark.asyncio
async def test_full_pipeline(db_session):
    """Test full Memento pipeline."""
    # Create episode
    episode = Episode(id="episode-123", success=False, ...)
    db_session.add(episode)
    db_session.commit()

    # Run pipeline
    engine = MementoEngine(db=db_session)
    candidate = await engine.generate_skill_candidate(
        tenant_id="tenant-456",
        agent_id="agent-789",
        episode_id="episode-123",
    )

    # Assert
    assert candidate.validation_status == "pending"
    assert candidate.skill_name is not None

Property-Based Tests

from hypothesis import given, strategies as st
import pytest

@given(
    task_description=st.text(min_size=10, max_size=100),
    error_trace=st.text(min_size=0, max_size=500),
)
@pytest.mark.asyncio
async def test_analyze_episode_properties(task_description, error_trace):
    """Test analyze_episode with various inputs."""
    db = Mock()
    engine = MementoEngine(db=db)

    mock_episode = Mock()
    mock_episode.task_description = task_description
    mock_episode.error_trace = error_trace

    db.query().filter().first.return_value = mock_episode

    result = await engine.analyze_episode("episode-123")

    assert isinstance(result, dict)
    assert "episode_id" in result

E2E Tests

import pytest

@pytest.mark.asyncio
async def test_e2e_memento_workflow():
    """Test complete Memento workflow."""
    # Setup
    db = get_test_db()
    agent = create_test_agent(db, maturity="INTERN")

    # Trigger failure
    episode = await execute_failing_task(agent.id)

    # Wait for ReflectionEngine
    await asyncio.sleep(1)

    # Check candidate created
    candidates = (
        db.query(SkillCandidate)
        .filter(SkillCandidate.agent_id == agent.id)
        .all()
    )
    assert len(candidates) > 0

    # Validate candidate
    engine = MementoEngine(db)
    result = await engine.validate_candidate(
        candidate_id=candidates[0].id,
        tenant_id="test-tenant",
    )
    assert result["passed"] is True

    # Promote candidate
    promotion = await engine.promote_skill(
        candidate_id=candidates[0].id,
        tenant_id="test-tenant",
    )
    assert promotion["success"] is True

CI/CD Pipeline Integration

# .github/workflows/auto-dev-tests.yml
name: Auto-Dev Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:14
        env:
          POSTGRES_DB: test_db
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

      redis:
        image: redis:7
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-asyncio pytest-cov

      - name: Run unit tests
        run: pytest tests/test_auto_dev_unit.py -v

      - name: Run integration tests
        run: pytest tests/test_auto_dev_integration.py -v
        env:
          DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db

      - name: Run property-based tests
        run: pytest tests/test_auto_dev_property.py -v

      - name: Upload coverage
        uses: codecov/codecov-action@v3

---

See Also