ATOM Documentation

← Back to App

Auto-Dev Developer Guide

Learn how to extend and customize the Auto-Dev system by creating custom learning engines, subscribing to events, integrating with databases, and implementing custom fitness functions.

**Version:** 1.0.0

**Last Updated:** 2026-04-10

---

Table of Contents

---

Creating Custom Learning Engines

BaseLearningEngine Interface

All learning engines extend BaseLearningEngine, which provides a unified lifecycle:

from core.auto_dev.base_engine import BaseLearningEngine, SandboxProtocol
from sqlalchemy.orm import Session
from typing import Any

class CustomLearningEngine(BaseLearningEngine):
    """
    Custom learning engine for domain-specific self-improvement.
    """

    def __init__(
        self,
        db: Session,
        llm_service: Any = None,
        sandbox: SandboxProtocol = None,
    ):
        super().__init__(db=db, llm_service=llm_service, sandbox=sandbox)
        # Add custom initialization here

Implementing Lifecycle Methods

1. analyze_episode()

Extract relevant information from episodes for learning.

async def analyze_episode(self, episode_id: str, **kwargs) -> dict[str, Any]:
    """
    Analyze an episode to extract learning signals.

    Returns:
        dict with analysis results
    """
    from core.models import Episode, EpisodeSegment

    episode = (
        self.db.query(Episode)
        .filter(Episode.id == episode_id)
        .first()
    )

    if not episode:
        raise ValueError(f"Episode {episode_id} not found")

    segments = (
        self.db.query(EpisodeSegment)
        .filter(EpisodeSegment.episode_id == episode_id)
        .all()
    )

    # Custom analysis logic
    return {
        "episode_id": episode_id,
        "task_description": episode.task_description,
        "success": episode.success,
        "segments_count": len(segments),
        # Add custom fields
    }

2. propose_code_change()

Generate code modifications using LLM or other methods.

async def propose_code_change(
    self, context: dict[str, Any], **kwargs
) -> str:
    """
    Generate a code modification proposal.

    Args:
        context: Analysis output from analyze_episode()

    Returns:
        str: Generated Python code
    """
    llm = self._get_llm_service()
    if not llm:
        return "# Code generation skipped: LLM unavailable"

    system_prompt = "You are an expert Python developer..."
    user_prompt = f"Context: {context}\n\nGenerate improved code..."

    try:
        response = await llm.generate_completion(
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            model="auto",
            task_type="code",
        )
        return self._strip_markdown_fences(response.get("content", ""))
    except Exception as e:
        logger.error(f"Code generation failed: {e}")
        return f"# Code generation failed: {e}"

3. validate_change()

Test proposed code in sandbox and assess fitness.

async def validate_change(
    self,
    code: str,
    test_inputs: list[dict[str, Any]],
    tenant_id: str,
    **kwargs,
) -> dict[str, Any]:
    """
    Validate proposed code in sandbox.

    Returns:
        dict with validation results
    """
    sandbox = self._get_sandbox()
    if not sandbox:
        return {"passed": False, "error": "Sandbox unavailable"}

    results = []
    all_passed = True

    for i, inputs in enumerate(test_inputs or [{}]):
        result = await sandbox.execute_raw_python(
            tenant_id=tenant_id,
            code=code,
            input_params=inputs,
        )

        passed = result.get("status") == "success"
        if not passed:
            all_passed = False

        results.append({
            "test_index": i,
            "passed": passed,
            "output": result.get("output", ""),
            "execution_seconds": result.get("execution_seconds", 0),
        })

    return {
        "passed": all_passed,
        "test_results": results,
        "proxy_signals": self._compute_proxy_signals(results),
    }

Complete Example: CustomErrorLearningEngine

"""
Custom learning engine that specializes in fixing common error patterns.
"""

import logging
from typing import Any
from sqlalchemy.orm import Session

from core.auto_dev.base_engine import BaseLearningEngine, SandboxProtocol

logger = logging.getLogger(__name__)


class CustomErrorLearningEngine(BaseLearningEngine):
    """
    Learning engine that specializes in fixing common error patterns.

    Focuses on:
    - KeyError handling
    - IndexError prevention
    - Type checking
    - Null value handling
    """

    async def analyze_episode(self, episode_id: str, **kwargs) -> dict[str, Any]:
        """Analyze episode to extract error patterns."""
        from core.models import Episode, EpisodeSegment

        episode = (
            self.db.query(Episode)
            .filter(Episode.id == episode_id)
            .first()
        )

        if not episode:
            return {"error": f"Episode {episode_id} not found"}

        segments = (
            self.db.query(EpisodeSegment)
            .filter(EpisodeSegment.episode_id == episode_id)
            .all()
        )

        # Extract error patterns
        error_patterns = []
        for segment in segments:
            metadata = getattr(segment, "metadata", {}) or {}
            if metadata.get("error"):
                error_patterns.append({
                    "error_type": self._classify_error(metadata["error"]),
                    "error_message": metadata["error"],
                    "segment_id": str(segment.id),
                })

        return {
            "episode_id": episode_id,
            "task_description": episode.task_description or "",
            "error_patterns": error_patterns,
            "most_common_error": self._get_most_common_error(error_patterns),
        }

    async def propose_code_change(
        self, context: dict[str, Any], **kwargs
    ) -> str:
        """Generate error-handling code."""
        llm = self._get_llm_service()
        if not llm:
            return "# Error fixing skipped: LLM unavailable"

        error_type = context.get("most_common_error", "UnknownError")
        task_desc = context.get("task_description", "")

        system_prompt = (
            "You are an expert Python developer specializing in error handling. "
            "Generate code that prevents the specified error type. "
            "Include proper exception handling, type checking, and defensive programming. "
            "Respond ONLY with the Python code."
        )

        user_prompt = (
            f"Task: {task_desc}\n\n"
            f"Common error: {error_type}\n\n"
            f"Generate Python code that prevents this error:\n"
        )

        try:
            response = await llm.generate_completion(
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt},
                ],
                model="auto",
                task_type="code",
            )
            return self._strip_markdown_fences(response.get("content", ""))
        except Exception as e:
            logger.error(f"Error fixing code generation failed: {e}")
            return f"# Error fixing failed: {e}"

    async def validate_change(
        self,
        code: str,
        test_inputs: list[dict[str, Any]],
        tenant_id: str,
        **kwargs,
    ) -> dict[str, Any]:
        """Validate error-handling code."""
        sandbox = self._get_sandbox()
        if not sandbox:
            return {"passed": False, "error": "Sandbox unavailable"}

        # Test with error-inducing inputs
        error_test_cases = [
            {"input": None, "description": "null input"},
            {"input": "", "description": "empty string"},
            {"input": {"invalid": "key"}, "description": "missing key"},
        ]

        results = []
        all_passed = True

        for i, test_case in enumerate(error_test_cases):
            result = await sandbox.execute_raw_python(
                tenant_id=tenant_id,
                code=code,
                input_params=test_case,
            )

            passed = result.get("status") == "success"
            if not passed:
                all_passed = False

            results.append({
                "test_index": i,
                "test_description": test_case["description"],
                "passed": passed,
                "output": result.get("output", ""),
            })

        return {
            "passed": all_passed,
            "test_results": results,
            "error_handling_score": self._compute_error_handling_score(results),
        }

    # --- Helper methods ---

    @staticmethod
    def _classify_error(error_message: str) -> str:
        """Classify error type from error message."""
        error_message = error_message.lower()

        if "keyerror" in error_message:
            return "KeyError"
        elif "indexerror" in error_message:
            return "IndexError"
        elif "typeerror" in error_message:
            return "TypeError"
        elif "valueerror" in error_message:
            return "ValueError"
        elif "attributeerror" in error_message:
            return "AttributeError"
        else:
            return "UnknownError"

    @staticmethod
    def _get_most_common_error(error_patterns: list[dict]) -> str:
        """Find the most common error type."""
        if not error_patterns:
            return "UnknownError"

        error_counts = {}
        for pattern in error_patterns:
            error_type = pattern["error_type"]
            error_counts[error_type] = error_counts.get(error_type, 0) + 1

        return max(error_counts, key=error_counts.get)

    @staticmethod
    def _compute_error_handling_score(results: list[dict]) -> float:
        """Compute error handling effectiveness score."""
        if not results:
            return 0.0

        passed = sum(1 for r in results if r["passed"])
        return passed / len(results)

Using Your Custom Engine

from core.auto_dev.custom_error_engine import CustomErrorLearningEngine

# Initialize
engine = CustomErrorLearningEngine(db)

# Use the lifecycle
analysis = await engine.analyze_episode("episode-123")
code = await engine.propose_code_change(analysis)
validation = await engine.validate_change(
    code=code,
    test_inputs=[{"input": "test"}],
    tenant_id="tenant-456",
)

if validation["passed"]:
    print("Code passed validation!")
else:
    print("Code failed validation")

---

Event Subscription Patterns

Registering Event Handlers

Use the global event_bus singleton to subscribe to Auto-Dev events:

from core.auto_dev.event_hooks import event_bus, TaskEvent, SkillExecutionEvent

@event_bus.on_task_fail
async def handle_task_failure(event: TaskEvent):
    """Handle task failure events."""
    logger.info(f"Task failed: {event.task_description}")
    logger.info(f"Error: {event.error_trace}")

    # Trigger custom logic
    await analyze_failure_pattern(event)

@event_bus.on_task_success
async def handle_task_success(event: TaskEvent):
    """Handle task success events."""
    logger.info(f"Task succeeded: {event.task_description}")

    # Trigger custom logic
    await optimize_for_success(event)

@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
    """Handle skill execution events."""
    logger.info(f"Skill {event.skill_name} executed in {event.execution_seconds}s")

    # Trigger custom logic
    await track_skill_performance(event)

Advanced Pattern: Aggregated Event Processing

from collections import defaultdict
from datetime import datetime, timedelta, timezone
from typing import Any

class EventAggregator:
    """
    Aggregate events over time windows for batch processing.
    """

    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self._events: dict[str, list[TaskEvent]] = defaultdict(list)

    async def process_failure(self, event: TaskEvent) -> None:
        """Aggregate failure events."""
        agent_id = event.agent_id
        self._events[agent_id].append(event)

        # Process if window full
        if self._should_process_window(agent_id):
            await self._analyze_failure_window(agent_id)

    def _should_process_window(self, agent_id: str) -> bool:
        """Check if time window is full."""
        events = self._events[agent_id]
        if len(events) < 5:
            return False

        # Check if oldest event is outside window
        oldest = min(e.metadata.get("timestamp", datetime.now(timezone.utc)) for e in events)
        window_start = datetime.now(timezone.utc) - timedelta(seconds=self.window_seconds)

        return oldest < window_start

    async def _analyze_failure_window(self, agent_id: str) -> None:
        """Analyze aggregated failure window."""
        events = self._events[agent_id]

        # Extract patterns
        error_types = defaultdict(int)
        for event in events:
            if event.error_trace:
                error_type = self._classify_error(event.error_trace)
                error_types[error_type] += 1

        # Report insights
        logger.info(f"Agent {agent_id} failure analysis:")
        for error_type, count in error_types.most_common():
            logger.info(f"  {error_type}: {count} occurrences")

        # Clear processed events
        self._events[agent_id].clear()

    @staticmethod
    def _classify_error(error_trace: str) -> str:
        """Classify error from trace."""
        error_trace = error_trace.lower()

        if "keyerror" in error_trace:
            return "KeyError"
        elif "timeout" in error_trace:
            return "Timeout"
        else:
            return "Other"

# Usage
aggregator = EventAggregator(window_seconds=300)
event_bus.on_task_fail(aggregator.process_failure)

Error Isolation in Handlers

Always wrap handler logic in try-except to prevent cascade failures:

@event_bus.on_task_fail
async def safe_failure_handler(event: TaskEvent) -> None:
    """Safe handler with error isolation."""
    try:
        # Your logic here
        await process_failure(event)
    except Exception as e:
        logger.error(f"Handler failed for task {event.episode_id}: {e}", exc_info=True)
        # Event bus continues processing other handlers

---

Database Models and Persistence

Using Auto-Dev Models

ToolMutation

from core.auto_dev.models import ToolMutation
from sqlalchemy.orm import Session

def create_mutation(
    db: Session,
    tenant_id: str,
    tool_name: str,
    mutated_code: str,
    parent_tool_id: str = None,
) -> ToolMutation:
    """Create a new tool mutation record."""
    mutation = ToolMutation(
        tenant_id=tenant_id,
        parent_tool_id=parent_tool_id,
        tool_name=tool_name,
        mutated_code=mutated_code,
        sandbox_status="pending",
    )
    db.add(mutation)
    db.commit()
    db.refresh(mutation)
    return mutation

def get_mutation_history(
    db: Session,
    tenant_id: str,
    tool_name: str,
    limit: int = 10,
) -> list[ToolMutation]:
    """Get mutation history for a tool."""
    return (
        db.query(ToolMutation)
        .filter(
            ToolMutation.tenant_id == tenant_id,
            ToolMutation.tool_name == tool_name,
        )
        .order_by(ToolMutation.created_at.desc())
        .limit(limit)
        .all()
    )

def get_mutation_lineage(
    db: Session,
    mutation_id: str,
) -> list[ToolMutation]:
    """Trace mutation lineage backwards."""
    lineage = []
    current = db.query(ToolMutation).filter(ToolMutation.id == mutation_id).first()

    while current and current.parent_tool_id:
        lineage.append(current)
        current = (
            db.query(ToolMutation)
            .filter(ToolMutation.id == current.parent_tool_id)
            .first()
        )

    return lineage

WorkflowVariant

from core.auto_dev.models import WorkflowVariant

def create_variant(
    db: Session,
    tenant_id: str,
    agent_id: str,
    workflow_definition: dict,
    parent_variant_id: str = None,
) -> WorkflowVariant:
    """Create a new workflow variant."""
    variant = WorkflowVariant(
        tenant_id=tenant_id,
        parent_variant_id=parent_variant_id,
        agent_id=agent_id,
        workflow_definition=workflow_definition,
        evaluation_status="pending",
    )
    db.add(variant)
    db.commit()
    db.refresh(variant)
    return variant

def update_fitness_score(
    db: Session,
    variant_id: str,
    fitness_score: float,
    fitness_signals: dict,
) -> None:
    """Update variant fitness score."""
    variant = (
        db.query(WorkflowVariant)
        .filter(WorkflowVariant.id == variant_id)
        .first()
    )

    if variant:
        variant.fitness_score = fitness_score
        variant.fitness_signals = fitness_signals
        variant.evaluation_status = "evaluated"
        variant.last_evaluated_at = datetime.now(timezone.utc)
        db.commit()

SkillCandidate

from core.auto_dev.models import SkillCandidate

def get_pending_candidates(
    db: Session,
    tenant_id: str,
) -> list[SkillCandidate]:
    """Get all pending skill candidates."""
    return (
        db.query(SkillCandidate)
        .filter(
            SkillCandidate.tenant_id == tenant_id,
            SkillCandidate.validation_status == "pending",
        )
        .order_by(SkillCandidate.created_at.desc())
        .all()
    )

def promote_candidate(
    db: Session,
    candidate_id: str,
) -> bool:
    """Promote a validated candidate."""
    candidate = (
        db.query(SkillCandidate)
        .filter(
            SkillCandidate.id == candidate_id,
            SkillCandidate.validation_status == "validated",
        )
        .first()
    )

    if candidate:
        candidate.validation_status = "promoted"
        candidate.promoted_at = datetime.now(timezone.utc)
        db.commit()
        return True

    return False

Index Optimization

Auto-Dev models include indexes for common queries:

# ToolMutation indexes
Index("ix_tool_mutations_tenant_id", ToolMutation.tenant_id)
Index("ix_tool_mutations_parent_tool_id", ToolMutation.parent_tool_id)

# WorkflowVariant indexes
Index("ix_workflow_variants_tenant_id", WorkflowVariant.tenant_id)
Index("ix_workflow_variants_agent_id", WorkflowVariant.agent_id)

# SkillCandidate indexes
Index("ix_skill_candidates_tenant_status", SkillCandidate.tenant_id, SkillCandidate.validation_status)

For custom queries, create additional indexes:

from sqlalchemy import Index

# Composite index for tenant + status queries
Index("ix_mutations_tenant_status", ToolMutation.tenant_id, ToolMutation.sandbox_status)

# Index for time-based queries
Index("ix_variants_created_at", WorkflowVariant.created_at.desc())

---

Fitness Evaluation

Custom Fitness Functions

Extend FitnessService for domain-specific fitness evaluation:

from core.auto_dev.fitness_service import FitnessService
from typing import Any

class DomainFitnessService(FitnessService):
    """
    Custom fitness service for domain-specific evaluation.

    Adds domain-specific signals:
    - Business value metrics
    - User satisfaction scores
    - Regulatory compliance checks
    """

    def evaluate_business_value(
        self,
        variant_id: str,
        tenant_id: str,
        business_metrics: dict[str, Any],
    ) -> float:
        """
        Evaluate business value of a variant.

        Args:
            variant_id: Variant identifier
            tenant_id: Tenant identifier
            business_metrics: {
                "revenue_impact": float,
                "cost_savings": float,
                "time_savings_hours": float,
            }

        Returns:
            float: Business value score (0.0 to 1.0)
        """
        variant = (
            self.db.query(WorkflowVariant)
            .filter(
                WorkflowVariant.id == variant_id,
                WorkflowVariant.tenant_id == tenant_id,
            )
            .first()
        )

        if not variant:
            return 0.0

        # Calculate business value score
        score = 0.0

        revenue_impact = business_metrics.get("revenue_impact", 0)
        if revenue_impact > 0:
            score += min(0.5, revenue_impact / 10000)  # Max 0.5 for $10K

        cost_savings = business_metrics.get("cost_savings", 0)
        if cost_savings > 0:
            score += min(0.3, cost_savings / 5000)  # Max 0.3 for $5K

        time_savings = business_metrics.get("time_savings_hours", 0)
        if time_savings > 0:
            score += min(0.2, time_savings / 100)  # Max 0.2 for 100 hours

        final_score = max(0.0, min(1.0, score))

        # Update variant with business value
        signals = variant.fitness_signals or {}
        signals["business_value"] = {
            "score": final_score,
            "metrics": business_metrics,
        }
        variant.fitness_signals = signals

        # Adjust overall fitness score
        current_score = variant.fitness_score or 0.0
        combined_score = (current_score + final_score) / 2

        variant.fitness_score = max(0.0, min(1.0, combined_score))
        self.db.commit()

        return final_score

    def evaluate_user_satisfaction(
        self,
        variant_id: str,
        tenant_id: str,
        feedback_scores: list[float],
    ) -> float:
        """
        Evaluate user satisfaction for a variant.

        Args:
            variant_id: Variant identifier
            tenant_id: Tenant identifier
            feedback_scores: List of user feedback scores (-1.0 to 1.0)

        Returns:
            float: Satisfaction score (0.0 to 1.0)
        """
        if not feedback_scores:
            return 0.5  # Neutral default

        # Calculate average satisfaction
        avg_score = sum(feedback_scores) / len(feedback_scores)

        # Convert from [-1, 1] to [0, 1]
        normalized_score = (avg_score + 1) / 2

        variant = (
            self.db.query(WorkflowVariant)
            .filter(
                WorkflowVariant.id == variant_id,
                WorkflowVariant.tenant_id == tenant_id,
            )
            .first()
        )

        if variant:
            signals = variant.fitness_signals or {}
            signals["user_satisfaction"] = {
                "score": normalized_score,
                "feedback_count": len(feedback_scores),
            }
            variant.fitness_signals = signals
            self.db.commit()

        return normalized_score

Multi-Objective Fitness

Combine multiple fitness objectives:

class MultiObjectiveFitness:
    """
    Combine multiple fitness objectives with weights.
    """

    def __init__(
        self,
        weights: dict[str, float] = None,
    ):
        """
        Args:
            weights: {
                "performance": 0.3,
                "accuracy": 0.4,
                "cost": 0.2,
                "satisfaction": 0.1,
            }
        """
        self.weights = weights or {
            "performance": 0.3,
            "accuracy": 0.4,
            "cost": 0.2,
            "satisfaction": 0.1,
        }

    def calculate_fitness(
        self,
        scores: dict[str, float],
    ) -> float:
        """
        Calculate weighted fitness score.

        Args:
            scores: {
                "performance": 0.8,
                "accuracy": 0.9,
                "cost": 0.7,
                "satisfaction": 0.6,
            }

        Returns:
            float: Combined fitness score
        """
        total_weight = sum(self.weights.values())
        weighted_score = 0.0

        for objective, weight in self.weights.items():
            score = scores.get(objective, 0.0)
            weighted_score += score * weight

        return weighted_score / total_weight

# Usage
fitness = MultiObjectiveFitness(
    weights={
        "performance": 0.4,
        "accuracy": 0.4,
        "cost": 0.2,
    }
)

score = fitness.calculate_fitness({
    "performance": 0.8,
    "accuracy": 0.9,
    "cost": 0.7,
})
# Result: 0.82

---

Sandbox Integration

Implementing SandboxProtocol

Create custom sandbox backends by implementing the SandboxProtocol interface:

from core.auto_dev.base_engine import SandboxProtocol
from typing import Any

class CustomSandbox(SandboxProtocol):
    """
    Custom sandbox implementation.

    Examples of custom backends:
    - Kubernetes-based isolation
    - AWS Lambda execution
    - Remote server execution
    - Virtual machine isolation
    """

    async def execute_raw_python(
        self,
        tenant_id: str,
        code: str,
        input_params: dict[str, Any],
        timeout: int = 60,
        safety_level: str = "MEDIUM_RISK",
        **kwargs,
    ) -> dict[str, Any]:
        """
        Execute Python code in custom isolated environment.

        Returns:
            {
                "status": "success" | "failed",
                "output": str,
                "execution_seconds": float,
                "execution_id": str,
            }
        """
        import time
        import uuid

        start_time = time.monotonic()
        execution_id = str(uuid.uuid4())

        try:
            # Custom execution logic
            result = await self._execute_in_custom_env(
                code=code,
                input_params=input_params,
                timeout=timeout,
                tenant_id=tenant_id,
            )

            elapsed = time.monotonic() - start_time

            return {
                "status": "success" if result["success"] else "failed",
                "output": result["output"],
                "execution_seconds": round(elapsed, 3),
                "execution_id": execution_id,
            }

        except Exception as e:
            elapsed = time.monotonic() - start_time
            return {
                "status": "failed",
                "output": str(e),
                "execution_seconds": round(elapsed, 3),
                "execution_id": execution_id,
            }

    async def _execute_in_custom_env(
        self,
        code: str,
        input_params: dict[str, Any],
        timeout: int,
        tenant_id: str,
    ) -> dict[str, Any]:
        """
        Implement custom execution logic.

        Example: Kubernetes Job execution
        """
        # Your custom implementation here
        pass

Resource Limits

Configure sandbox resource limits:

from core.auto_dev.container_sandbox import ContainerSandbox

# Custom resource limits
sandbox = ContainerSandbox(
    docker_image="python:3.11-slim",
    timeout=120,  # 2 minutes
    memory_limit="512m",  # 512MB
    enable_network=False,  # No network access
)

# Execute with custom limits
result = await sandbox.execute_raw_python(
    tenant_id="tenant-123",
    code=code,
    input_params={},
    timeout=120,  # Override default
)

Security Considerations

When implementing custom sandboxes:

  1. **Network Isolation**
  1. **Filesystem Isolation**

# Tmpfs for /tmp only

cmd.extend(["--tmpfs", "/tmp:rw,noexec,nosuid,size=64m"])

```

  1. **Resource Limits**

# CPU limit

cmd.extend(["--cpus=1"])

```

  1. **Capability Dropping**

---

Capability Gate Integration

Checking Capabilities

from core.auto_dev.capability_gate import AutoDevCapabilityService

gate = AutoDevCapabilityService(db)

# Check if agent can use Memento-Skills
can_use_memento = gate.can_use(
    agent_id="agent-123",
    capability="auto_dev.memento_skills",
    workspace_settings=workspace_config,
)

# Check if agent can use AlphaEvolver
can_use_evolver = gate.can_use(
    agent_id="agent-123",
    capability="auto_dev.alpha_evolver",
    workspace_settings=workspace_config,
)

Recording Usage

Track Auto-Dev usage for graduation tracking:

# Record successful usage
gate.record_usage(
    agent_id="agent-123",
    capability="auto_dev.memento_skills",
    success=True,
)

# Record failed usage
gate.record_usage(
    agent_id="agent-123",
    capability="auto_dev.alpha_evolver",
    success=False,
)

Checking Daily Limits

# Check before generating mutation
if gate.check_daily_limits(
    agent_id="agent-123",
    capability="auto_dev.alpha_evolver",
    workspace_settings=workspace_config,
):
    # Proceed with mutation
    await generate_mutation(...)
else:
    # Daily limit exceeded
    logger.warning("Daily mutation limit exceeded")

Custom Capability Gates

Create custom capability gates:

from core.auto_dev.capability_gate import AutoDevCapabilityService, is_at_least

class CustomCapabilityService(AutoDevCapabilityService):
    """
    Custom capability gate with additional checks.
    """

    CAPABILITY_GATES = {
        **AutoDevCapabilityService.CAPABILITY_GATES,
        "auto_dev.custom_feature": "SUPERVISED",
    }

    def can_use_custom_feature(
        self,
        agent_id: str,
        custom_condition: dict,
    ) -> bool:
        """
        Check custom capability with additional conditions.

        Args:
            agent_id: Agent identifier
            custom_condition: {
                "min_episodes": 10,
                "success_rate": 0.8,
            }
        """
        # Check maturity gate
        if not self.can_use(
            agent_id=agent_id,
            capability="auto_dev.custom_feature",
            workspace_settings={},
        ):
            return False

        # Check custom conditions
        agent_stats = self._get_agent_stats(agent_id)

        if agent_stats["episode_count"] < custom_condition.get("min_episodes", 0):
            return False

        if agent_stats["success_rate"] < custom_condition.get("success_rate", 0.0):
            return False

        return True

    def _get_agent_stats(self, agent_id: str) -> dict:
        """Get agent statistics."""
        from core.models import Episode

        episodes = (
            self.db.query(Episode)
            .filter(Episode.agent_id == agent_id)
            .all()
        )

        success_count = sum(1 for e in episodes if e.success)

        return {
            "episode_count": len(episodes),
            "success_rate": success_count / len(episodes) if episodes else 0.0,
        }

---

Testing Strategies

Unit Tests

Test individual components in isolation:

import pytest
from unittest.mock import Mock, AsyncMock
from core.auto_dev.memento_engine import MementoEngine

def test_analyze_episode():
    """Test episode analysis."""
    # Setup
    db = Mock()
    engine = MementoEngine(db=db)

    # Mock episode query
    mock_episode = Mock()
    mock_episode.id = "episode-123"
    mock_episode.task_description = "Test task"
    mock_episode.agent_id = "agent-456"

    db.query().filter().first.return_value = mock_episode

    # Execute
    result = await engine.analyze_episode("episode-123")

    # Assert
    assert result["episode_id"] == "episode-123"
    assert result["task_description"] == "Test task"
    assert "error_trace" in result

@pytest.mark.asyncio
async def test_propose_code_change():
    """Test code proposal."""
    # Setup
    db = Mock()
    llm = AsyncMock()
    llm.generate_completion.return_value = {
        "content": "def test_function():\n    return 'hello'"
    }

    engine = MementoEngine(db=db, llm_service=llm)

    # Execute
    code = await engine.propose_code_change({
        "task_description": "Test task",
        "error_trace": "Error message",
    })

    # Assert
    assert "def test_function" in code
    assert "return 'hello'" in code

Integration Tests

Test component interactions:

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from core.auto_dev.memento_engine import MementoEngine
from core.models import Base

@pytest.fixture
def db_session():
    """Create test database session."""
    engine = create_engine("sqlite:///:memory:")
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)
    session = Session()
    yield session
    session.close()

@pytest.mark.asyncio
async def test_full_memento_pipeline(db_session):
    """Test complete Memento-Skills pipeline."""
    # Create test episode
    episode = Episode(
        id="episode-123",
        agent_id="agent-456",
        user_id="tenant-789",
        task_description="Extract invoice ID",
        success=False,
    )
    db_session.add(episode)
    db_session.commit()

    # Run pipeline
    engine = MementoEngine(db=db_session)
    candidate = await engine.generate_skill_candidate(
        tenant_id="tenant-789",
        agent_id="agent-456",
        episode_id="episode-123",
    )

    # Assert
    assert candidate.skill_name is not None
    assert candidate.generated_code is not None
    assert candidate.validation_status == "pending"

Property-Based Tests

Use Hypothesis for property-based testing:

from hypothesis import given, strategies as st
import pytest

@given(
    task_description=st.text(min_size=10, max_size=100),
    error_trace=st.text(min_size=0, max_size=500),
)
@pytest.mark.asyncio
async def test_analyze_episode_properties(task_description, error_trace):
    """Test analyze_episode with various inputs."""
    db = Mock()
    engine = MementoEngine(db=db)

    # Mock episode
    mock_episode = Mock()
    mock_episode.task_description = task_description
    mock_episode.error_trace = error_trace

    db.query().filter().first.return_value = mock_episode

    # Execute
    result = await engine.analyze_episode("episode-123")

    # Assert properties
    assert isinstance(result, dict)
    assert "episode_id" in result
    assert isinstance(result.get("task_description"), str)

Mock Sandbox for Testing

Create mock sandbox for testing:

class MockSandbox:
    """Mock sandbox for testing."""

    async def execute_raw_python(
        self,
        tenant_id: str,
        code: str,
        input_params: dict,
        timeout: int = 60,
        safety_level: str = "MEDIUM_RISK",
        **kwargs,
    ) -> dict:
        """Mock execution."""
        return {
            "status": "success",
            "output": "Mock output",
            "execution_seconds": 0.1,
        }

# Usage in tests
engine = MementoEngine(db=db, sandbox=MockSandbox())
result = await engine.validate_change(
    code="print('hello')",
    test_inputs=[{}],
    tenant_id="tenant-123",
)
assert result["passed"] == True

---

Common Patterns

LLM Service Integration

from core.llm_service import get_llm_service

# Get LLM service
llm = get_llm_service()

# Generate completion
response = await llm.generate_completion(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Generate Python code..."},
    ],
    model="auto",
    task_type="code",
)

code = response["content"]

Markdown Fence Stripping

def strip_markdown_fences(code: str) -> str:
    """Strip markdown code fences from LLM output."""
    code = code.strip()

    if code.startswith("```python"):
        code = code[len("```python"):]
    elif code.startswith("```"):
        code = code[3:]

    if code.endswith("```"):
        code = code[:-3]

    return code.strip()

Error Handling Patterns

import logging

logger = logging.getLogger(__name__)

async def safe_operation():
    """Safe operation with error handling."""
    try:
        # Attempt operation
        result = await risky_operation()
        return result
    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        raise

Logging Patterns

import structlog

logger = structlog.get_logger()

# Structured logging
logger.info(
    "mutation_created",
    tenant_id=tenant_id,
    tool_name=tool_name,
    mutation_id=mutation.id,
)

# Error logging with context
logger.error(
    "sandbox_execution_failed",
    tenant_id=tenant_id,
    code=code[:100],  # Truncate for logs
    error=str(e),
    exc_info=True,
)

Graceful Degradation

def get_llm_service():
    """Get LLM service with graceful fallback."""
    try:
        from core.llm_service import get_llm_service as get_llm
        return get_llm()
    except Exception as e:
        logger.warning(f"LLM service unavailable: {e}")
        return None

# Usage
llm = get_llm_service()
if llm:
    result = await llm.generate_completion(...)
else:
    # Fallback behavior
    result = fallback_logic()

---

See Also