ATOM Documentation

← Back to App

Auto-Dev API Reference

Complete API documentation for all Auto-Dev components including EventBus, learning engines, database models, and supporting services.

**Version:** 1.0.0

**Last Updated:** 2026-04-10

---

Table of Contents

---

EventBus

**Module:** core.auto_dev.event_hooks

Lightweight in-process event bus for Auto-Dev lifecycle events. Decouples EpisodeService and SandboxExecutor from learning engines.

Event Types

TaskEvent

@dataclass
class TaskEvent:
    """Event payload for task lifecycle events."""

    episode_id: str
    agent_id: str
    tenant_id: str
    task_description: str = ""
    error_trace: str | None = None
    outcome: str = ""  # "success", "failure", "partial"
    metadata: dict[str, Any] = field(default_factory=dict)

**Fields:**

  • episode_id (str): Unique identifier for the episode
  • agent_id (str): Agent that executed the task
  • tenant_id (str): Tenant/workspace identifier
  • task_description (str): Human-readable task description
  • error_trace (str | None): Stack trace if task failed
  • outcome (str): Task outcome - "success", "failure", or "partial"
  • metadata (dict): Additional event metadata

SkillExecutionEvent

@dataclass
class SkillExecutionEvent:
    """Event payload for skill execution events."""

    execution_id: str
    agent_id: str
    tenant_id: str
    skill_id: str
    skill_name: str = ""
    execution_seconds: float = 0.0
    token_usage: int = 0
    success: bool = False
    output: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)

**Fields:**

  • execution_id (str): Unique execution identifier
  • agent_id (str): Agent that executed the skill
  • tenant_id (str): Tenant/workspace identifier
  • skill_id (str): Skill identifier
  • skill_name (str): Human-readable skill name
  • execution_seconds (float): Execution time in seconds
  • token_usage (int): LLM tokens consumed
  • success (bool): Whether execution succeeded
  • output (str): Execution output/error message
  • metadata (dict): Additional execution metadata

EventBus Class

class EventBus:
    """Simple in-process event bus for Auto-Dev lifecycle events."""

Methods

`on_task_fail(handler: EventHandler) -> EventHandler`

Register a handler for task failure events.

**Parameters:**

  • handler (EventHandler): Async function that takes TaskEvent

**Returns:**

  • EventHandler: The same handler (decorator pattern)

**Example:**

from core.auto_dev.event_hooks import event_bus, TaskEvent

@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
    print(f"Task failed: {event.task_description}")
    print(f"Error: {event.error_trace}")

`on_task_success(handler: EventHandler) -> EventHandler`

Register a handler for task success events.

**Parameters:**

  • handler (EventHandler): Async function that takes TaskEvent

**Returns:**

  • EventHandler: The same handler (decorator pattern)

**Example:**

@event_bus.on_task_success
async def handle_success(event: TaskEvent):
    print(f"Task succeeded: {event.task_description}")

`on_skill_execution(handler: EventHandler) -> EventHandler`

Register a handler for skill execution events.

**Parameters:**

  • handler (EventHandler): Async function that takes SkillExecutionEvent

**Returns:**

  • EventHandler: The same handler (decorator pattern)

**Example:**

@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
    print(f"Skill {event.skill_name} executed in {event.execution_seconds}s")

`async emit_task_fail(event: TaskEvent) -> None`

Emit a task failure event to all registered handlers.

**Parameters:**

  • event (TaskEvent): Event payload

**Behavior:**

  • Dispatches to all handlers registered with @on_task_fail
  • Catches exceptions in handlers to prevent cascade failures
  • Logs errors for failed handlers

`async emit_task_success(event: TaskEvent) -> None`

Emit a task success event to all registered handlers.

**Parameters:**

  • event (TaskEvent): Event payload

**Behavior:**

  • Dispatches to all handlers registered with @on_task_success
  • Catches exceptions in handlers to prevent cascade failures

`async emit_skill_execution(event: SkillExecutionEvent) -> None`

Emit a skill execution event to all registered handlers.

**Parameters:**

  • event (SkillExecutionEvent): Event payload

**Behavior:**

  • Dispatches to all handlers registered with @on_skill_execution
  • Catches exceptions in handlers to prevent cascade failures

`clear() -> None`

Remove all registered handlers. Useful for testing.

**Example:**

def test_event_handler():
    event_bus.clear()
    # Register test handlers...
    # Run test...
    event_bus.clear()  # Cleanup

Global Singleton

# Global singleton — imported by EpisodeService and SandboxExecutor
event_bus = EventBus()

**Usage:**

from core.auto_dev.event_hooks import event_bus

# Register handlers
@event_bus.on_task_fail
async def my_handler(event: TaskEvent):
    ...

# Emit events
await event_bus.emit_task_fail(TaskEvent(...))

---

BaseLearningEngine

**Module:** core.auto_dev.base_engine

Abstract interface for self-improving agent modules. Both MementoEngine (skill generation) and AlphaEvolverEngine (skill optimization) implement this interface.

SandboxProtocol

@runtime_checkable
class SandboxProtocol(Protocol):
    """Abstract sandbox interface for executing untrusted code."""

    async def execute_raw_python(
        self,
        tenant_id: str,
        code: str,
        input_params: dict[str, Any],
        timeout: int = 60,
        safety_level: str = "MEDIUM_RISK",
        **kwargs,
    ) -> dict[str, Any]:
        """
        Execute raw Python code in an isolated sandbox.

        Returns:
            {
                "status": "success" | "failed",
                "output": str,
                "execution_seconds": float,
                "execution_id": str,
            }
        """
        ...

**Implementations:**

  • ContainerSandbox (upstream): Docker-based isolation
  • SandboxExecutionService (SaaS): Fly.io-based isolation

BaseLearningEngine Class

class BaseLearningEngine(ABC):
    """
    Unified interface for self-improving agent modules.

    Subclasses must implement three core lifecycle methods:
    1. analyze_episode — read and interpret execution data
    2. propose_code_change — generate a code modification
    3. validate_change — execute in sandbox and assess fitness
    """

Constructor

def __init__(
    self,
    db: Session,
    llm_service: Any | None = None,
    sandbox: SandboxProtocol | None = None,
):

**Parameters:**

  • db (Session): SQLAlchemy database session
  • llm_service (Any | None): LLM service for code generation
  • sandbox (SandboxProtocol | None): Sandbox for code execution

Abstract Methods

`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`

Read and interpret an episode's execution data.

**Parameters:**

  • episode_id (str): Episode identifier
  • **kwargs: Additional engine-specific parameters

**Returns:**

{
    "episode_id": str,
    "task_description": str,
    "error_trace": str,  # for failures
    "tool_calls_attempted": list,  # for failures
    "latency": float,  # for successes
    "token_usage": int,  # for successes
    "edge_case_signals": dict,  # for successes
    ...
}

**Raises:**

  • ValueError: If episode not found

`async propose_code_change(context: dict[str, Any], **kwargs) -> str`

Generate a code modification proposal via LLM.

**Parameters:**

  • context (dict): Analysis output from analyze_episode()
  • **kwargs: Additional engine-specific parameters

**Returns:**

  • str: Generated Python code string

**Behavior:**

  • Uses LLM to generate code
  • Strips markdown fences from output
  • Returns fallback code if LLM unavailable

`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`

Execute proposed code in sandbox and assess fitness.

**Parameters:**

  • code (str): Python code to validate
  • test_inputs (list[dict]): Test cases for validation
  • tenant_id (str): Tenant for sandbox isolation
  • **kwargs: Additional engine-specific parameters

**Returns:**

{
    "passed": bool,
    "proxy_signals": dict,  # Immediate fitness signals
    "execution_result": dict,
}

Protected Methods

`_get_llm_service() -> Any | None`

Get LLM service with graceful fallback.

**Returns:**

  • LLM service instance or None if unavailable

**Behavior:**

  • Returns injected self.llm if available
  • Attempts to import get_llm_service() from core.llm_service
  • Logs warning if unavailable

`_get_sandbox() -> SandboxProtocol | None`

Get sandbox with graceful fallback to ContainerSandbox.

**Returns:**

  • Sandbox instance or None if unavailable

**Behavior:**

  • Returns injected self.sandbox if available
  • Attempts to import ContainerSandbox from core.auto_dev.container_sandbox
  • Logs warning if unavailable

`_strip_markdown_fences(code: str) -> str`

Strip markdown code fences from LLM output.

**Parameters:**

  • code (str): LLM-generated code

**Returns:**

  • str: Cleaned Python code

**Behavior:**

  • Removes ```python and ``` fences
  • Strips leading/trailing whitespace

---

MementoEngine

**Module:** core.auto_dev.memento_engine

Generates new skills from failed episodes. When an agent hits the same failure pattern repeatedly, MementoEngine analyzes the failure, generates a skill proposal, validates it in sandbox, and promotes it to the skill registry.

Class Definition

class MementoEngine(BaseLearningEngine):
    """
    Generates new skills from failed episodes.

    Lifecycle:
    1. analyze_episode() — extract failure pattern
    2. propose_code_change() — generate skill code via LLM
    3. validate_change() — test in sandbox
    4. promote_skill() — register via SkillBuilderService
    """

Constructor

def __init__(
    self,
    db: Session,
    llm_service: Any | None = None,
    sandbox: SandboxProtocol | None = None,
):

**Parameters:**

  • db (Session): SQLAlchemy database session
  • llm_service (Any | None): LLM service for skill generation
  • sandbox (SandboxProtocol | None): Sandbox for validation

Methods

`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`

Analyze a failed episode to extract the failure pattern.

**Parameters:**

  • episode_id (str): Failed episode identifier

**Returns:**

{
    "episode_id": str,
    "agent_id": str | None,
    "tenant_id": str | None,
    "task_description": str,
    "error_trace": str,
    "tool_calls_attempted": [
        {"tool_name": str, "status": str},
        ...
    ],
    "error_segments_count": int,
    "failure_summary": str,
    "suggested_skill_name": str,
}

**Raises:**

  • ValueError: If episode not found

**Example:**

engine = MementoEngine(db)
analysis = await engine.analyze_episode("episode-123")
print(analysis["failure_summary"])
# Output: "Failed: Process invoice. Errors: KeyError 'invoice_id'..."

`async propose_code_change(context: dict[str, Any], **kwargs) -> str`

Generate a new skill script via LLM to address a failure pattern.

**Parameters:**

  • context (dict): Analysis output from analyze_episode()

**Returns:**

  • str: Generated Python skill code

**LLM Prompts:**

  • **System:** "You are the Memento Skill Generator. Your goal is to create a new Python utility function that addresses a gap in the agent's capabilities."
  • **User:** Includes task description, error trace, and attempted tools

**Example:**

code = await engine.propose_code_change(analysis)
print(code)
# Output:
# def process_invoice(invoice_id: str) -> dict:
#     """Process an invoice by ID."""
#     ...

`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`

Execute generated skill in sandbox and verify it works.

**Parameters:**

  • code (str): Generated skill code
  • test_inputs (list[dict]): Test cases (defaults to [{}] if empty)
  • tenant_id (str): Tenant for sandbox isolation

**Returns:**

{
    "passed": bool,
    "test_results": [
        {
            "test_index": int,
            "passed": bool,
            "output": str,
            "execution_seconds": float,
        },
        ...
    ],
}

**Behavior:**

  • Executes code against each test input
  • Returns passed=True only if all tests pass
  • Captures output and execution time for each test

`async generate_skill_candidate(tenant_id: str, agent_id: str | None, episode_id: str, failure_analysis: dict[str, Any] | None = None) -> SkillCandidate`

Full pipeline: analyze episode → generate skill → store candidate.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • agent_id (str | None): Agent identifier
  • episode_id (str): Failed episode identifier
  • failure_analysis (dict | None): Pre-computed analysis (optional)

**Returns:**

  • SkillCandidate: Database record with validation_status='pending'

**Raises:**

  • ValueError: If episode analysis fails

**Example:**

candidate = await engine.generate_skill_candidate(
    tenant_id="tenant-123",
    agent_id="agent-456",
    episode_id="episode-789",
)
print(f"Generated candidate: {candidate.skill_name}")

`async validate_candidate(candidate_id: str, tenant_id: str, test_inputs: list[dict[str, Any]] | None = None) -> dict[str, Any]`

Validate a pending skill candidate in the sandbox.

**Parameters:**

  • candidate_id (str): Candidate identifier
  • tenant_id (str): Tenant identifier
  • test_inputs (list[dict] | None): Test cases (defaults to [{}])

**Returns:**

{
    "candidate_id": str,
    "passed": bool,
    "validation_result": {...},
}

**Behavior:**

  • Updates candidate's validation_status to 'validated' or 'failed'
  • Stores validation_result with test outputs
  • Sets fitness_score=1.0 if passed

`async promote_skill(candidate_id: str, tenant_id: str) -> dict[str, Any]`

Promote a validated candidate to the active skill registry.

**Parameters:**

  • candidate_id (str): Validated candidate identifier
  • tenant_id (str): Tenant identifier

**Returns:**

{
    "success": bool,
    "skill_id": str,
    ...
}

**Raises:**

  • ValueError: If candidate not found or not validated

**Behavior:**

  • Uses SkillBuilderService to create skill package
  • Updates candidate status to 'promoted'
  • Sets promoted_at timestamp

`static _suggest_skill_name(task_description: str, error_trace: str) -> str`

Generate a suggested skill name from the task description.

**Parameters:**

  • task_description (str): Task that failed
  • error_trace (str): Error information

**Returns:**

  • str: Python-identifier-safe skill name

**Example:**

name = MementoEngine._suggest_skill_name(
    "Process invoice from email",
    "KeyError: invoice_id"
)
print(name)
# Output: "auto_process_invoice_email"

---

AlphaEvolverEngine

**Module:** core.auto_dev.alpha_evolver_engine

Core mutation and optimization logic for the evolutionary learning loop. Produces code mutations via LLM, executes them in sandbox, and tracks fitness signals.

Class Definition

class AlphaEvolverEngine(BaseLearningEngine):
    """
    Skill optimization via iterative code mutation.

    Lifecycle:
    1. analyze_episode() — extract performance signals from successful episodes
    2. generate_tool_mutation() — LLM generates a code mutation
    3. sandbox_execute_mutation() — run in sandbox, collect fitness signals
    4. spawn_workflow_variant() — track variant for population comparison
    5. run_research_experiment() — iterative mutate→sandbox→compare loop
    """

Constructor

def __init__(
    self,
    db: Session,
    llm_service: Any | None = None,
    sandbox: SandboxProtocol | None = None,
):

Methods

`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`

Analyze a successful episode to identify optimization opportunities.

**Parameters:**

  • episode_id (str): Successful episode identifier

**Returns:**

{
    "episode_id": str,
    "task_description": str,
    "success": bool,
    "total_segments": int,
    "metadata": dict,
    "optimization_targets": [
        {
            "segment_id": str,
            "reason": "high_latency" | "retries",
            "value": float | int,
        },
        ...
    ],
}

**Behavior:**

  • Identifies segments with latency >5s
  • Identifies segments with retry attempts

`async propose_code_change(context: dict[str, Any], **kwargs) -> str`

Generate a code mutation via LLM.

**Parameters:**

  • context (dict): Contains base_code and mutation_prompt

**Returns:**

  • str: Mutated Python code

**LLM Prompts:**

  • **System:** "You are the AlphaEvolve Code Mutator. Your goal is to refine and evolve Python tool code to better achieve a specific objective."
  • **User:** Includes objective and original code

`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`

Execute mutated code in sandbox and assess fitness.

**Parameters:**

  • code (str): Mutated code
  • test_inputs (list[dict]): Test cases
  • tenant_id (str): Tenant for sandbox isolation

**Returns:**

{
    "passed": bool,
    "test_results": [...],
    "proxy_signals": {
        "execution_success": bool,
        "pass_rate": float,
        "avg_execution_seconds": float,
        "syntax_error": bool,
    },
}

`async generate_tool_mutation(tenant_id: str, tool_name: str, parent_tool_id: str | None, base_code: str, mutation_prompt: str) -> ToolMutation`

Produce a new variation of a Python tool via LLM mutation.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • tool_name (str): Tool name
  • parent_tool_id (str | None): Parent tool for lineage tracing
  • base_code (str): Original code to mutate
  • mutation_prompt (str): Optimization objective

**Returns:**

  • ToolMutation: Database record with sandbox_status='pending'

**Example:**

mutation = await engine.generate_tool_mutation(
    tenant_id="tenant-123",
    tool_name="process_invoice",
    parent_tool_id="tool-456",
    base_code=original_code,
    mutation_prompt="Reduce execution time by 50%",
)

`async sandbox_execute_mutation(mutation_id: str, tenant_id: str, inputs: dict[str, Any]) -> dict[str, Any]`

Execute a mutation in the sandbox and record results.

**Parameters:**

  • mutation_id (str): Mutation identifier
  • tenant_id (str): Tenant identifier
  • inputs (dict): Input parameters for execution

**Returns:**

{
    "success": bool,
    "output": str,
    "proxy_signals": {
        "syntax_error": bool,
        "execution_success": bool,
        "execution_latency_ms": float,
        "environment": "docker" | "subprocess",
    },
}

**Behavior:**

  • Updates mutation's sandbox_status to 'passed' or 'failed'
  • Stores execution_error if failed

`spawn_workflow_variant(tenant_id: str, agent_id: str, workflow_def: dict[str, Any], parent_variant_id: str | None = None) -> WorkflowVariant`

Create a new workflow variant for population-based comparison.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • agent_id (str): Agent identifier
  • workflow_def (dict): Workflow definition
  • parent_variant_id (str | None): Parent variant for lineage

**Returns:**

  • WorkflowVariant: Database record with evaluation_status='pending'

`check_auto_synthesis_readiness(tenant_id: str, tool_name: str, threshold: int = 5) -> bool`

Check if enough mutations passed to trigger automatic synthesis.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • tool_name (str): Tool name
  • threshold (int): Minimum passed mutations (default: 5)

**Returns:**

  • bool: True if ready for synthesis

`async run_research_experiment(tenant_id: str, base_code: str, research_goal: str, iterations: int = 3, inputs: dict[str, Any] | None = None) -> list[dict[str, Any]]`

Iterative research experiment: mutate → sandbox → compare → keep winner.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • base_code (str): Starting code
  • research_goal (str): Optimization objective
  • iterations (int): Number of iterations (default: 3)
  • inputs (dict | None): Test inputs

**Returns:**

[
    {
        "iteration": int,
        "mutation_id": str,
        "success": bool,
        "output": str,
        "code_preview": str,
    },
    ...
]

**Behavior:**

  • Progressive evolution: uses winner as next iteration's base
  • Each iteration generates new mutation and validates

**Example:**

results = await engine.run_research_experiment(
    tenant_id="tenant-123",
    base_code=original_code,
    research_goal="Optimize for speed",
    iterations=3,
)
for result in results:
    print(f"Iteration {result['iteration']}: {result['success']}")

---

FitnessService

**Module:** core.auto_dev.fitness_service

Multi-stage fitness evaluation for workflow variants. Immediate proxy signals from sandbox execution and delayed async signals from downstream integrations.

Class Definition

class FitnessService:
    """
    Multi-stage fitness evaluation for workflow variants.

    Stage 1 - evaluate_initial_proxy(): Immediate feedback from sandbox execution.
    Stage 2 - evaluate_delayed_webhook(): Async signals from downstream integrations.
    """

Constructor

def __init__(self, db: Session):

**Parameters:**

  • db (Session): SQLAlchemy database session

Methods

`evaluate_initial_proxy(variant_id: str, tenant_id: str, proxy_signals: dict[str, Any]) -> float`

Record immediate proxy signals and calculate baseline fitness.

**Parameters:**

  • variant_id (str): Variant identifier
  • tenant_id (str): Tenant identifier
  • proxy_signals (dict): Immediate fitness signals

**Proxy Signals:**

{
    "execution_success": bool,  # Ran without crash
    "syntax_error": bool,  # Code had syntax errors
    "execution_latency_ms": float,  # Execution time
    "user_approved_proposal": bool,  # HITL approval
}

**Returns:**

  • float: Fitness score from 0.0 to 1.0

**Scoring:**

  • Syntax error: -1.0
  • Survived syntax check: +0.2
  • Execution success: +0.3
  • User approved: +0.5
  • User rejected: -0.5

**Behavior:**

  • Updates variant's fitness_score and fitness_signals
  • Sets evaluation_status to 'pending' or 'evaluated'

`evaluate_delayed_webhook(variant_id: str, tenant_id: str, external_signals: dict[str, Any]) -> float`

Process downstream webhook signals and adjust fitness score.

**Parameters:**

  • variant_id (str): Variant identifier
  • tenant_id (str): Tenant identifier
  • external_signals (dict): Downstream integration signals

**External Signals:**

{
    "invoice_created": bool,  # Positive
    "crm_conversion": bool,  # Positive
    "conversion_success": bool,  # Positive
    "email_bounce": bool,  # Negative
    "error_signal": bool,  # Negative
    "conversion_value": float,  # Scaled positive
}

**Returns:**

  • float: Adjusted fitness score from 0.0 to 1.0

**Adjustments:**

  • invoice_created: +0.4
  • crm_conversion: +0.5
  • conversion_success: +0.6
  • email_bounce: -0.3
  • error_signal: -0.5
  • conversion_value: +0.5 per $1000 (max +0.5)

**Behavior:**

  • Adjusts current score by signal adjustments
  • Updates variant's fitness_signals with external data
  • Sets evaluation_status to 'evaluated'

`get_top_variants(tenant_id: str, limit: int = 5) -> list[WorkflowVariant]`

Retrieve highest-fitness variants for crossover/mutation operations.

**Parameters:**

  • tenant_id (str): Tenant identifier
  • limit (int): Maximum variants to return (default: 5)

**Returns:**

  • list[WorkflowVariant]: Top variants ordered by fitness score

**Example:**

service = FitnessService(db)
top = service.get_top_variants("tenant-123", limit=3)
for variant in top:
    print(f"{variant.id}: {variant.fitness_score}")

---

AutoDevCapabilityService

**Module:** core.auto_dev.capability_gate

Gates Auto-Dev features based on agent maturity level and workspace settings.

Constants

STUDENT = "student"
INTERN = "intern"
SUPERVISED = "supervised"
AUTONOMOUS = "autonomous"

MATURITY_ORDER = [STUDENT, INTERN, SUPERVISED, AUTONOMOUS]

Capability Gates

AutoDevCapabilityService.CAPABILITY_GATES = {
    "auto_dev.memento_skills": INTERN,
    "auto_dev.alpha_evolver": SUPERVISED,
    "auto_dev.background_evolution": AUTONOMOUS,
}

Helper Function

`is_at_least(current: str, required: str) -> bool`

Check if current maturity level meets or exceeds the required level.

**Parameters:**

  • current (str): Current maturity level
  • required (str): Required maturity level

**Returns:**

  • bool: True if current >= required

**Example:**

is_at_least("supervised", "intern")  # True
is_at_least("intern", "supervised")  # False

AutoDevCapabilityService Class

class AutoDevCapabilityService:
    """Gates Auto-Dev features based on agent maturity and workspace settings."""

Constructor

def __init__(self, db: Session):

**Parameters:**

  • db (Session): SQLAlchemy database session

Methods

`can_use(agent_id: str, capability: str, workspace_settings: dict[str, Any] | None = None) -> bool`

Check if an agent can use a specific Auto-Dev capability.

**Parameters:**

  • agent_id (str): Agent to check
  • capability (str): e.g., "auto_dev.memento_skills"
  • workspace_settings (dict | None): Workspace configuration

**Returns:**

  • bool: True if allowed

**Requires BOTH:**

  1. Workspace settings allow it (auto_dev.enabled + per-capability toggle)
  2. Agent has graduated to required maturity level

**Example:**

gate = AutoDevCapabilityService(db)
if gate.can_use("agent-123", "auto_dev.memento_skills", workspace_settings):
    # Enable Memento-Skills

`record_usage(agent_id: str, capability: str, success: bool) -> None`

Record Auto-Dev usage to progress agent maturity via graduation framework.

**Parameters:**

  • agent_id (str): Agent identifier
  • capability (str): Capability used
  • success (bool): Whether usage was successful

**Behavior:**

  • Calls CapabilityGraduationService.record_usage()
  • Contributes to agent's graduation score

`check_daily_limits(agent_id: str, capability: str, workspace_settings: dict[str, Any] | None = None) -> bool`

Check if the agent has exceeded daily Auto-Dev limits.

**Parameters:**

  • agent_id (str): Agent identifier
  • capability (str): Capability to check
  • workspace_settings (dict | None): Workspace configuration

**Returns:**

  • bool: True if within limits

**Default Limits:**

  • max_mutations_per_day: 10
  • max_skill_candidates_per_day: 5

`notify_capability_unlocked(agent_id: str, capability: str) -> dict[str, Any]`

Generate notification payload when an agent graduates into a new capability.

**Parameters:**

  • agent_id (str): Agent identifier
  • capability (str): Unlocked capability

**Returns:**

{
    "type": "auto_dev_capability_unlocked",
    "agent_id": str,
    "capability": str,
    "message": str,
    "action_required": bool,
}

---

Database Models

**Module:** core.auto_dev.models

SQLAlchemy models for the self-evolving agent system.

ToolMutation

class ToolMutation(Base):
    """
    AlphaEvolve: Tracks tool code mutations, lineage, and sandbox test results.

    Each mutation has a parent_tool_id for lineage tracing, allowing the system
    to track evolutionary chains of code improvements.
    """

    __tablename__ = "tool_mutations"

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    tenant_id = Column(String(36), nullable=False, index=True)
    parent_tool_id = Column(String(36), nullable=True, index=True)
    tool_name = Column(String(255), nullable=False)
    mutated_code = Column(Text, nullable=False)
    sandbox_status = Column(String(50), default="pending")
    execution_error = Column(Text, nullable=True)
    created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

**Fields:**

  • id (str): Primary key
  • tenant_id (str): Tenant identifier (indexed)
  • parent_tool_id (str | None): Parent tool for lineage (indexed)
  • tool_name (str): Tool name
  • mutated_code (str): Mutated Python code
  • sandbox_status (str): "pending", "passed", or "failed"
  • execution_error (str | None): Error message if failed
  • created_at (DateTime): Creation timestamp

WorkflowVariant

class WorkflowVariant(Base):
    """
    AlphaEvolve: Tracks variations of workflows/prompts alongside their
    automated fitness scores.

    Fitness is evaluated in two stages:
    1. Immediate proxy signals (compilation, execution success)
    2. Deferred async signals (webhook events, conversion data)
    """

    __tablename__ = "workflow_variants"

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    tenant_id = Column(String(36), nullable=False, index=True)
    parent_variant_id = Column(String(36), nullable=True, index=True)
    agent_id = Column(String(36), nullable=True, index=True)
    workflow_definition = Column(JSON, nullable=False)
    fitness_score = Column(Float, nullable=True)
    fitness_signals = Column(JSON, nullable=True)
    evaluation_status = Column(String(50), default="pending")
    created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    last_evaluated_at = Column(DateTime(timezone=True), nullable=True)

**Fields:**

  • id (str): Primary key
  • tenant_id (str): Tenant identifier (indexed)
  • parent_variant_id (str | None): Parent variant for lineage (indexed)
  • agent_id (str | None): Agent identifier (indexed)
  • workflow_definition (JSON): Workflow/prompt definition
  • fitness_score (float | None): 0.0 to 1.0
  • fitness_signals (JSON | None): Raw proxy/external signals
  • evaluation_status (str): "pending", "evaluated", or "pruned"
  • created_at (DateTime): Creation timestamp
  • last_evaluated_at (DateTime | None): Last evaluation timestamp

SkillCandidate

class SkillCandidate(Base):
    """
    Memento-Skills: Skill proposals generated from failed episodes.

    When an agent fails a task repeatedly, the MementoEngine analyzes
    the failure pattern and generates a new skill candidate. The candidate
    must pass sandbox validation before it can be promoted to the active
    skill registry.

    Lifecycle: pending → validated/failed → promoted
    """

    __tablename__ = "skill_candidates"

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    tenant_id = Column(String(36), nullable=False, index=True)
    agent_id = Column(String(36), nullable=True, index=True)
    source_episode_id = Column(String(36), nullable=True, index=True)
    skill_name = Column(String(255), nullable=False)
    skill_description = Column(Text, nullable=True)
    generated_code = Column(Text, nullable=False)
    failure_pattern = Column(JSON, nullable=True)
    validation_status = Column(String(50), default="pending")
    validation_result = Column(JSON, nullable=True)
    fitness_score = Column(Float, nullable=True)
    created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    validated_at = Column(DateTime(timezone=True), nullable=True)
    promoted_at = Column(DateTime(timezone=True), nullable=True)

    __table_args__ = (
        Index("ix_skill_candidates_tenant_status", "tenant_id", "validation_status"),
    )

**Fields:**

  • id (str): Primary key
  • tenant_id (str): Tenant identifier (indexed)
  • agent_id (str | None): Agent identifier (indexed)
  • source_episode_id (str | None): Failed episode that triggered generation (indexed)
  • skill_name (str): Skill name
  • skill_description (str | None): Human-readable description
  • generated_code (str): Generated Python code
  • failure_pattern (JSON | None): Extracted failure analysis
  • validation_status (str): "pending", "validated", "failed", or "promoted"
  • validation_result (JSON | None): Sandbox execution results
  • fitness_score (float | None): 0.0 to 1.0
  • created_at (DateTime): Creation timestamp
  • validated_at (DateTime | None): Validation timestamp
  • promoted_at (DateTime | None): Promotion timestamp

**Indexes:**

  • ix_skill_candidates_tenant_status: Composite index on tenant_id + validation_status

---

ReflectionEngine

**Module:** core.auto_dev.reflection_engine

Monitors task failures and triggers Memento-Skills when patterns emerge.

Class Definition

class ReflectionEngine:
    """
    Monitors task failures and triggers Memento-Skills when patterns emerge.

    Usage:
        engine = ReflectionEngine(db)
        engine.register()  # Registers on event bus
    """

Constructor

def __init__(
    self,
    db: Session,
    failure_threshold: int = DEFAULT_FAILURE_THRESHOLD,
):

**Parameters:**

  • db (Session): SQLAlchemy database session
  • failure_threshold (int): Minimum similar failures to trigger (default: 2)

Methods

`register() -> None`

Register this engine on the global event bus.

**Example:**

engine = ReflectionEngine(db)
engine.register()  # Now listening for task_fail events

`async process_failure(event: TaskEvent) -> None`

Process a task failure event.

**Parameters:**

  • event (TaskEvent): Failure event

**Behavior:**

  • Checks if agent maturity allows Auto-Dev
  • Adds failure to pattern buffer
  • Checks for recurring patterns
  • Triggers MementoEngine if threshold exceeded
  • Clears buffer to avoid re-triggering

---

EvolutionEngine

**Module:** core.auto_dev.evolution_engine

Background optimizer that triggers AlphaEvolver on underperforming skills.

Constants

LATENCY_THRESHOLD_SECONDS = 5.0
TOKEN_THRESHOLD = 5000

Class Definition

class EvolutionEngine:
    """
    Background optimizer that triggers AlphaEvolver on underperforming skills.

    Usage:
        engine = EvolutionEngine(db)
        engine.register()  # Registers on event bus
    """

Constructor

def __init__(self, db: Session):

**Parameters:**

  • db (Session): SQLAlchemy database session

Methods

`register() -> None`

Register this engine on the global event bus.

`async process_execution(event: SkillExecutionEvent) -> None`

Evaluate a skill execution and trigger optimization if warranted.

**Parameters:**

  • event (SkillExecutionEvent): Skill execution event

**Behavior:**

  • Only processes AUTONOMOUS agents with workspace opt-in
  • Checks optimization triggers (latency, tokens, failures)
  • Triggers AlphaEvolver if warranted

**Optimization Triggers:**

  • Execution latency >5s
  • Token usage >5000
  • Execution failure

---

AdvisorService

**Module:** core.auto_dev.advisor_service

AI-powered guidance for evolutionary progress.

Class Definition

class AdvisorService:
    """
    AI Advisor for the Auto-Dev evolutionary framework.

    Analyzes mutation/fitness data and generates human-readable guidance.
    """

Constructor

def __init__(self, db: Session, llm_service: Any | None = None):

**Parameters:**

  • db (Session): SQLAlchemy database session
  • llm_service (Any | None): LLM service for AI guidance

Methods

`async generate_guidance(tenant_id: str, agent_id: str | None = None, llm_service: Any | None = None) -> dict[str, Any]`

Analyze current mutations and fitness data to provide advice.

**Parameters:**

  • tenant_id (str): Tenant to analyze
  • agent_id (str | None): Optional specific agent
  • llm_service (Any | None): Override LLM service

**Returns:**

{
    "status": "success",
    "message": str,
    "data_summary": {
        "num_mutations": int,
        "passed_mutations": int,
        "failed_mutations": int,
        "top_fitness_score": float,
        "avg_fitness_score": float,
    },
    "readiness_score": int,  # 0-100
}

**Behavior:**

  • Fetches recent mutations and variants
  • Generates AI guidance if LLM available
  • Falls back to heuristic guidance if not

---

ContainerSandbox

**Module:** core.auto_dev.container_sandbox

Docker-based sandbox for executing untrusted code.

Constants

DOCKER_IMAGE = "python:3.11-slim"
DEFAULT_TIMEOUT = 60
DEFAULT_MEMORY_LIMIT = "256m"

Class Definition

class ContainerSandbox:
    """
    Docker-based sandbox implementing SandboxProtocol.

    Falls back to subprocess isolation if Docker is unavailable.
    """

Constructor

def __init__(
    self,
    docker_image: str = DOCKER_IMAGE,
    timeout: int = DEFAULT_TIMEOUT,
    memory_limit: str = DEFAULT_MEMORY_LIMIT,
    enable_network: bool = False,
):

**Parameters:**

  • docker_image (str): Docker image to use
  • timeout (int): Execution timeout in seconds
  • memory_limit (str): Docker memory limit
  • enable_network (bool): Whether to allow network access

Properties

`docker_available -> bool`

Check if Docker is available on the system.

**Returns:**

  • bool: True if Docker is installed and running

Methods

`async execute_raw_python(tenant_id: str, code: str, input_params: dict[str, Any] | None = None, timeout: int | None = None, safety_level: str = "MEDIUM_RISK", **kwargs) -> dict[str, Any]`

Execute Python code in an isolated environment.

**Parameters:**

  • tenant_id (str): Tenant ID for tracking
  • code (str): Python code to execute
  • input_params (dict | None): Input parameters passed as JSON
  • timeout (int | None): Override default timeout
  • safety_level (str): Ignored in upstream (used by SaaS)

**Returns:**

{
    "status": "success" | "failed",
    "output": str,
    "execution_seconds": float,
    "environment": "docker" | "subprocess",
}

**Behavior:**

  • Uses Docker if available
  • Falls back to subprocess isolation
  • Enforces timeout
  • Returns execution time and output

**Security:**

  • Ephemeral containers (destroyed after execution)
  • No network access (unless enable_network=True)
  • Memory limit (default 256MB)
  • Read-only filesystem with tmpfs for /tmp

---

Type Aliases

EventHandler = Callable[..., Coroutine[Any, Any, None]]

Async event handler function type.

---

Usage Examples

Basic Memento-Skills Workflow

from core.auto_dev.memento_engine import MementoEngine

engine = MementoEngine(db)

# Analyze failed episode
analysis = await engine.analyze_episode("episode-123")

# Generate skill candidate
candidate = await engine.generate_skill_candidate(
    tenant_id="tenant-456",
    agent_id="agent-789",
    episode_id="episode-123",
)

# Validate in sandbox
result = await engine.validate_candidate(
    candidate_id=candidate.id,
    tenant_id="tenant-456",
)

# Promote if validated
if result["passed"]:
    await engine.promote_skill(candidate.id, "tenant-456")

Basic AlphaEvolver Workflow

from core.auto_dev.alpha_evolver_engine import AlphaEvolverEngine

engine = AlphaEvolverEngine(db)

# Run research experiment
results = await engine.run_research_experiment(
    tenant_id="tenant-456",
    base_code=original_code,
    research_goal="Optimize for speed",
    iterations=3,
)

# Check results
for result in results:
    print(f"Iteration {result['iteration']}: {result['success']}")

Event Subscription

from core.auto_dev.event_hooks import event_bus, TaskEvent

@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
    print(f"Task failed: {event.task_description}")
    # Trigger learning loop...

---

See Also