Auto-Dev API Reference
Complete API documentation for all Auto-Dev components including EventBus, learning engines, database models, and supporting services.
**Version:** 1.0.0
**Last Updated:** 2026-04-10
---
Table of Contents
- EventBus
- BaseLearningEngine
- MementoEngine
- AlphaEvolverEngine
- FitnessService
- AutoDevCapabilityService
- Database Models
- ReflectionEngine
- EvolutionEngine
- AdvisorService
- ContainerSandbox
---
EventBus
**Module:** core.auto_dev.event_hooks
Lightweight in-process event bus for Auto-Dev lifecycle events. Decouples EpisodeService and SandboxExecutor from learning engines.
Event Types
TaskEvent
@dataclass
class TaskEvent:
"""Event payload for task lifecycle events."""
episode_id: str
agent_id: str
tenant_id: str
task_description: str = ""
error_trace: str | None = None
outcome: str = "" # "success", "failure", "partial"
metadata: dict[str, Any] = field(default_factory=dict)**Fields:**
episode_id(str): Unique identifier for the episodeagent_id(str): Agent that executed the tasktenant_id(str): Tenant/workspace identifiertask_description(str): Human-readable task descriptionerror_trace(str | None): Stack trace if task failedoutcome(str): Task outcome - "success", "failure", or "partial"metadata(dict): Additional event metadata
SkillExecutionEvent
@dataclass
class SkillExecutionEvent:
"""Event payload for skill execution events."""
execution_id: str
agent_id: str
tenant_id: str
skill_id: str
skill_name: str = ""
execution_seconds: float = 0.0
token_usage: int = 0
success: bool = False
output: str = ""
metadata: dict[str, Any] = field(default_factory=dict)**Fields:**
execution_id(str): Unique execution identifieragent_id(str): Agent that executed the skilltenant_id(str): Tenant/workspace identifierskill_id(str): Skill identifierskill_name(str): Human-readable skill nameexecution_seconds(float): Execution time in secondstoken_usage(int): LLM tokens consumedsuccess(bool): Whether execution succeededoutput(str): Execution output/error messagemetadata(dict): Additional execution metadata
EventBus Class
class EventBus:
"""Simple in-process event bus for Auto-Dev lifecycle events."""Methods
`on_task_fail(handler: EventHandler) -> EventHandler`
Register a handler for task failure events.
**Parameters:**
handler(EventHandler): Async function that takesTaskEvent
**Returns:**
EventHandler: The same handler (decorator pattern)
**Example:**
from core.auto_dev.event_hooks import event_bus, TaskEvent
@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
print(f"Task failed: {event.task_description}")
print(f"Error: {event.error_trace}")`on_task_success(handler: EventHandler) -> EventHandler`
Register a handler for task success events.
**Parameters:**
handler(EventHandler): Async function that takesTaskEvent
**Returns:**
EventHandler: The same handler (decorator pattern)
**Example:**
@event_bus.on_task_success
async def handle_success(event: TaskEvent):
print(f"Task succeeded: {event.task_description}")`on_skill_execution(handler: EventHandler) -> EventHandler`
Register a handler for skill execution events.
**Parameters:**
handler(EventHandler): Async function that takesSkillExecutionEvent
**Returns:**
EventHandler: The same handler (decorator pattern)
**Example:**
@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
print(f"Skill {event.skill_name} executed in {event.execution_seconds}s")`async emit_task_fail(event: TaskEvent) -> None`
Emit a task failure event to all registered handlers.
**Parameters:**
event(TaskEvent): Event payload
**Behavior:**
- Dispatches to all handlers registered with
@on_task_fail - Catches exceptions in handlers to prevent cascade failures
- Logs errors for failed handlers
`async emit_task_success(event: TaskEvent) -> None`
Emit a task success event to all registered handlers.
**Parameters:**
event(TaskEvent): Event payload
**Behavior:**
- Dispatches to all handlers registered with
@on_task_success - Catches exceptions in handlers to prevent cascade failures
`async emit_skill_execution(event: SkillExecutionEvent) -> None`
Emit a skill execution event to all registered handlers.
**Parameters:**
event(SkillExecutionEvent): Event payload
**Behavior:**
- Dispatches to all handlers registered with
@on_skill_execution - Catches exceptions in handlers to prevent cascade failures
`clear() -> None`
Remove all registered handlers. Useful for testing.
**Example:**
def test_event_handler():
event_bus.clear()
# Register test handlers...
# Run test...
event_bus.clear() # CleanupGlobal Singleton
# Global singleton — imported by EpisodeService and SandboxExecutor
event_bus = EventBus()**Usage:**
from core.auto_dev.event_hooks import event_bus
# Register handlers
@event_bus.on_task_fail
async def my_handler(event: TaskEvent):
...
# Emit events
await event_bus.emit_task_fail(TaskEvent(...))---
BaseLearningEngine
**Module:** core.auto_dev.base_engine
Abstract interface for self-improving agent modules. Both MementoEngine (skill generation) and AlphaEvolverEngine (skill optimization) implement this interface.
SandboxProtocol
@runtime_checkable
class SandboxProtocol(Protocol):
"""Abstract sandbox interface for executing untrusted code."""
async def execute_raw_python(
self,
tenant_id: str,
code: str,
input_params: dict[str, Any],
timeout: int = 60,
safety_level: str = "MEDIUM_RISK",
**kwargs,
) -> dict[str, Any]:
"""
Execute raw Python code in an isolated sandbox.
Returns:
{
"status": "success" | "failed",
"output": str,
"execution_seconds": float,
"execution_id": str,
}
"""
...**Implementations:**
ContainerSandbox(upstream): Docker-based isolationSandboxExecutionService(SaaS): Fly.io-based isolation
BaseLearningEngine Class
class BaseLearningEngine(ABC):
"""
Unified interface for self-improving agent modules.
Subclasses must implement three core lifecycle methods:
1. analyze_episode — read and interpret execution data
2. propose_code_change — generate a code modification
3. validate_change — execute in sandbox and assess fitness
"""Constructor
def __init__(
self,
db: Session,
llm_service: Any | None = None,
sandbox: SandboxProtocol | None = None,
):**Parameters:**
db(Session): SQLAlchemy database sessionllm_service(Any | None): LLM service for code generationsandbox(SandboxProtocol | None): Sandbox for code execution
Abstract Methods
`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`
Read and interpret an episode's execution data.
**Parameters:**
episode_id(str): Episode identifier**kwargs: Additional engine-specific parameters
**Returns:**
{
"episode_id": str,
"task_description": str,
"error_trace": str, # for failures
"tool_calls_attempted": list, # for failures
"latency": float, # for successes
"token_usage": int, # for successes
"edge_case_signals": dict, # for successes
...
}**Raises:**
ValueError: If episode not found
`async propose_code_change(context: dict[str, Any], **kwargs) -> str`
Generate a code modification proposal via LLM.
**Parameters:**
context(dict): Analysis output fromanalyze_episode()**kwargs: Additional engine-specific parameters
**Returns:**
str: Generated Python code string
**Behavior:**
- Uses LLM to generate code
- Strips markdown fences from output
- Returns fallback code if LLM unavailable
`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`
Execute proposed code in sandbox and assess fitness.
**Parameters:**
code(str): Python code to validatetest_inputs(list[dict]): Test cases for validationtenant_id(str): Tenant for sandbox isolation**kwargs: Additional engine-specific parameters
**Returns:**
{
"passed": bool,
"proxy_signals": dict, # Immediate fitness signals
"execution_result": dict,
}Protected Methods
`_get_llm_service() -> Any | None`
Get LLM service with graceful fallback.
**Returns:**
- LLM service instance or
Noneif unavailable
**Behavior:**
- Returns injected
self.llmif available - Attempts to import
get_llm_service()fromcore.llm_service - Logs warning if unavailable
`_get_sandbox() -> SandboxProtocol | None`
Get sandbox with graceful fallback to ContainerSandbox.
**Returns:**
- Sandbox instance or
Noneif unavailable
**Behavior:**
- Returns injected
self.sandboxif available - Attempts to import
ContainerSandboxfromcore.auto_dev.container_sandbox - Logs warning if unavailable
`_strip_markdown_fences(code: str) -> str`
Strip markdown code fences from LLM output.
**Parameters:**
code(str): LLM-generated code
**Returns:**
str: Cleaned Python code
**Behavior:**
- Removes ```
pythonand ``` fences - Strips leading/trailing whitespace
---
MementoEngine
**Module:** core.auto_dev.memento_engine
Generates new skills from failed episodes. When an agent hits the same failure pattern repeatedly, MementoEngine analyzes the failure, generates a skill proposal, validates it in sandbox, and promotes it to the skill registry.
Class Definition
class MementoEngine(BaseLearningEngine):
"""
Generates new skills from failed episodes.
Lifecycle:
1. analyze_episode() — extract failure pattern
2. propose_code_change() — generate skill code via LLM
3. validate_change() — test in sandbox
4. promote_skill() — register via SkillBuilderService
"""Constructor
def __init__(
self,
db: Session,
llm_service: Any | None = None,
sandbox: SandboxProtocol | None = None,
):**Parameters:**
db(Session): SQLAlchemy database sessionllm_service(Any | None): LLM service for skill generationsandbox(SandboxProtocol | None): Sandbox for validation
Methods
`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`
Analyze a failed episode to extract the failure pattern.
**Parameters:**
episode_id(str): Failed episode identifier
**Returns:**
{
"episode_id": str,
"agent_id": str | None,
"tenant_id": str | None,
"task_description": str,
"error_trace": str,
"tool_calls_attempted": [
{"tool_name": str, "status": str},
...
],
"error_segments_count": int,
"failure_summary": str,
"suggested_skill_name": str,
}**Raises:**
ValueError: If episode not found
**Example:**
engine = MementoEngine(db)
analysis = await engine.analyze_episode("episode-123")
print(analysis["failure_summary"])
# Output: "Failed: Process invoice. Errors: KeyError 'invoice_id'..."`async propose_code_change(context: dict[str, Any], **kwargs) -> str`
Generate a new skill script via LLM to address a failure pattern.
**Parameters:**
context(dict): Analysis output fromanalyze_episode()
**Returns:**
str: Generated Python skill code
**LLM Prompts:**
- **System:** "You are the Memento Skill Generator. Your goal is to create a new Python utility function that addresses a gap in the agent's capabilities."
- **User:** Includes task description, error trace, and attempted tools
**Example:**
code = await engine.propose_code_change(analysis)
print(code)
# Output:
# def process_invoice(invoice_id: str) -> dict:
# """Process an invoice by ID."""
# ...`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`
Execute generated skill in sandbox and verify it works.
**Parameters:**
code(str): Generated skill codetest_inputs(list[dict]): Test cases (defaults to[{}]if empty)tenant_id(str): Tenant for sandbox isolation
**Returns:**
{
"passed": bool,
"test_results": [
{
"test_index": int,
"passed": bool,
"output": str,
"execution_seconds": float,
},
...
],
}**Behavior:**
- Executes code against each test input
- Returns
passed=Trueonly if all tests pass - Captures output and execution time for each test
`async generate_skill_candidate(tenant_id: str, agent_id: str | None, episode_id: str, failure_analysis: dict[str, Any] | None = None) -> SkillCandidate`
Full pipeline: analyze episode → generate skill → store candidate.
**Parameters:**
tenant_id(str): Tenant identifieragent_id(str | None): Agent identifierepisode_id(str): Failed episode identifierfailure_analysis(dict | None): Pre-computed analysis (optional)
**Returns:**
SkillCandidate: Database record withvalidation_status='pending'
**Raises:**
ValueError: If episode analysis fails
**Example:**
candidate = await engine.generate_skill_candidate(
tenant_id="tenant-123",
agent_id="agent-456",
episode_id="episode-789",
)
print(f"Generated candidate: {candidate.skill_name}")`async validate_candidate(candidate_id: str, tenant_id: str, test_inputs: list[dict[str, Any]] | None = None) -> dict[str, Any]`
Validate a pending skill candidate in the sandbox.
**Parameters:**
candidate_id(str): Candidate identifiertenant_id(str): Tenant identifiertest_inputs(list[dict] | None): Test cases (defaults to[{}])
**Returns:**
{
"candidate_id": str,
"passed": bool,
"validation_result": {...},
}**Behavior:**
- Updates candidate's
validation_statusto'validated'or'failed' - Stores
validation_resultwith test outputs - Sets
fitness_score=1.0if passed
`async promote_skill(candidate_id: str, tenant_id: str) -> dict[str, Any]`
Promote a validated candidate to the active skill registry.
**Parameters:**
candidate_id(str): Validated candidate identifiertenant_id(str): Tenant identifier
**Returns:**
{
"success": bool,
"skill_id": str,
...
}**Raises:**
ValueError: If candidate not found or not validated
**Behavior:**
- Uses
SkillBuilderServiceto create skill package - Updates candidate status to
'promoted' - Sets
promoted_attimestamp
`static _suggest_skill_name(task_description: str, error_trace: str) -> str`
Generate a suggested skill name from the task description.
**Parameters:**
task_description(str): Task that failederror_trace(str): Error information
**Returns:**
str: Python-identifier-safe skill name
**Example:**
name = MementoEngine._suggest_skill_name(
"Process invoice from email",
"KeyError: invoice_id"
)
print(name)
# Output: "auto_process_invoice_email"---
AlphaEvolverEngine
**Module:** core.auto_dev.alpha_evolver_engine
Core mutation and optimization logic for the evolutionary learning loop. Produces code mutations via LLM, executes them in sandbox, and tracks fitness signals.
Class Definition
class AlphaEvolverEngine(BaseLearningEngine):
"""
Skill optimization via iterative code mutation.
Lifecycle:
1. analyze_episode() — extract performance signals from successful episodes
2. generate_tool_mutation() — LLM generates a code mutation
3. sandbox_execute_mutation() — run in sandbox, collect fitness signals
4. spawn_workflow_variant() — track variant for population comparison
5. run_research_experiment() — iterative mutate→sandbox→compare loop
"""Constructor
def __init__(
self,
db: Session,
llm_service: Any | None = None,
sandbox: SandboxProtocol | None = None,
):Methods
`async analyze_episode(episode_id: str, **kwargs) -> dict[str, Any]`
Analyze a successful episode to identify optimization opportunities.
**Parameters:**
episode_id(str): Successful episode identifier
**Returns:**
{
"episode_id": str,
"task_description": str,
"success": bool,
"total_segments": int,
"metadata": dict,
"optimization_targets": [
{
"segment_id": str,
"reason": "high_latency" | "retries",
"value": float | int,
},
...
],
}**Behavior:**
- Identifies segments with latency >5s
- Identifies segments with retry attempts
`async propose_code_change(context: dict[str, Any], **kwargs) -> str`
Generate a code mutation via LLM.
**Parameters:**
context(dict): Containsbase_codeandmutation_prompt
**Returns:**
str: Mutated Python code
**LLM Prompts:**
- **System:** "You are the AlphaEvolve Code Mutator. Your goal is to refine and evolve Python tool code to better achieve a specific objective."
- **User:** Includes objective and original code
`async validate_change(code: str, test_inputs: list[dict[str, Any]], tenant_id: str, **kwargs) -> dict[str, Any]`
Execute mutated code in sandbox and assess fitness.
**Parameters:**
code(str): Mutated codetest_inputs(list[dict]): Test casestenant_id(str): Tenant for sandbox isolation
**Returns:**
{
"passed": bool,
"test_results": [...],
"proxy_signals": {
"execution_success": bool,
"pass_rate": float,
"avg_execution_seconds": float,
"syntax_error": bool,
},
}`async generate_tool_mutation(tenant_id: str, tool_name: str, parent_tool_id: str | None, base_code: str, mutation_prompt: str) -> ToolMutation`
Produce a new variation of a Python tool via LLM mutation.
**Parameters:**
tenant_id(str): Tenant identifiertool_name(str): Tool nameparent_tool_id(str | None): Parent tool for lineage tracingbase_code(str): Original code to mutatemutation_prompt(str): Optimization objective
**Returns:**
ToolMutation: Database record withsandbox_status='pending'
**Example:**
mutation = await engine.generate_tool_mutation(
tenant_id="tenant-123",
tool_name="process_invoice",
parent_tool_id="tool-456",
base_code=original_code,
mutation_prompt="Reduce execution time by 50%",
)`async sandbox_execute_mutation(mutation_id: str, tenant_id: str, inputs: dict[str, Any]) -> dict[str, Any]`
Execute a mutation in the sandbox and record results.
**Parameters:**
mutation_id(str): Mutation identifiertenant_id(str): Tenant identifierinputs(dict): Input parameters for execution
**Returns:**
{
"success": bool,
"output": str,
"proxy_signals": {
"syntax_error": bool,
"execution_success": bool,
"execution_latency_ms": float,
"environment": "docker" | "subprocess",
},
}**Behavior:**
- Updates mutation's
sandbox_statusto'passed'or'failed' - Stores
execution_errorif failed
`spawn_workflow_variant(tenant_id: str, agent_id: str, workflow_def: dict[str, Any], parent_variant_id: str | None = None) -> WorkflowVariant`
Create a new workflow variant for population-based comparison.
**Parameters:**
tenant_id(str): Tenant identifieragent_id(str): Agent identifierworkflow_def(dict): Workflow definitionparent_variant_id(str | None): Parent variant for lineage
**Returns:**
WorkflowVariant: Database record withevaluation_status='pending'
`check_auto_synthesis_readiness(tenant_id: str, tool_name: str, threshold: int = 5) -> bool`
Check if enough mutations passed to trigger automatic synthesis.
**Parameters:**
tenant_id(str): Tenant identifiertool_name(str): Tool namethreshold(int): Minimum passed mutations (default: 5)
**Returns:**
bool:Trueif ready for synthesis
`async run_research_experiment(tenant_id: str, base_code: str, research_goal: str, iterations: int = 3, inputs: dict[str, Any] | None = None) -> list[dict[str, Any]]`
Iterative research experiment: mutate → sandbox → compare → keep winner.
**Parameters:**
tenant_id(str): Tenant identifierbase_code(str): Starting coderesearch_goal(str): Optimization objectiveiterations(int): Number of iterations (default: 3)inputs(dict | None): Test inputs
**Returns:**
[
{
"iteration": int,
"mutation_id": str,
"success": bool,
"output": str,
"code_preview": str,
},
...
]**Behavior:**
- Progressive evolution: uses winner as next iteration's base
- Each iteration generates new mutation and validates
**Example:**
results = await engine.run_research_experiment(
tenant_id="tenant-123",
base_code=original_code,
research_goal="Optimize for speed",
iterations=3,
)
for result in results:
print(f"Iteration {result['iteration']}: {result['success']}")---
FitnessService
**Module:** core.auto_dev.fitness_service
Multi-stage fitness evaluation for workflow variants. Immediate proxy signals from sandbox execution and delayed async signals from downstream integrations.
Class Definition
class FitnessService:
"""
Multi-stage fitness evaluation for workflow variants.
Stage 1 - evaluate_initial_proxy(): Immediate feedback from sandbox execution.
Stage 2 - evaluate_delayed_webhook(): Async signals from downstream integrations.
"""Constructor
def __init__(self, db: Session):**Parameters:**
db(Session): SQLAlchemy database session
Methods
`evaluate_initial_proxy(variant_id: str, tenant_id: str, proxy_signals: dict[str, Any]) -> float`
Record immediate proxy signals and calculate baseline fitness.
**Parameters:**
variant_id(str): Variant identifiertenant_id(str): Tenant identifierproxy_signals(dict): Immediate fitness signals
**Proxy Signals:**
{
"execution_success": bool, # Ran without crash
"syntax_error": bool, # Code had syntax errors
"execution_latency_ms": float, # Execution time
"user_approved_proposal": bool, # HITL approval
}**Returns:**
float: Fitness score from 0.0 to 1.0
**Scoring:**
- Syntax error: -1.0
- Survived syntax check: +0.2
- Execution success: +0.3
- User approved: +0.5
- User rejected: -0.5
**Behavior:**
- Updates variant's
fitness_scoreandfitness_signals - Sets
evaluation_statusto'pending'or'evaluated'
`evaluate_delayed_webhook(variant_id: str, tenant_id: str, external_signals: dict[str, Any]) -> float`
Process downstream webhook signals and adjust fitness score.
**Parameters:**
variant_id(str): Variant identifiertenant_id(str): Tenant identifierexternal_signals(dict): Downstream integration signals
**External Signals:**
{
"invoice_created": bool, # Positive
"crm_conversion": bool, # Positive
"conversion_success": bool, # Positive
"email_bounce": bool, # Negative
"error_signal": bool, # Negative
"conversion_value": float, # Scaled positive
}**Returns:**
float: Adjusted fitness score from 0.0 to 1.0
**Adjustments:**
invoice_created: +0.4crm_conversion: +0.5conversion_success: +0.6email_bounce: -0.3error_signal: -0.5conversion_value: +0.5 per $1000 (max +0.5)
**Behavior:**
- Adjusts current score by signal adjustments
- Updates variant's
fitness_signalswith external data - Sets
evaluation_statusto'evaluated'
`get_top_variants(tenant_id: str, limit: int = 5) -> list[WorkflowVariant]`
Retrieve highest-fitness variants for crossover/mutation operations.
**Parameters:**
tenant_id(str): Tenant identifierlimit(int): Maximum variants to return (default: 5)
**Returns:**
list[WorkflowVariant]: Top variants ordered by fitness score
**Example:**
service = FitnessService(db)
top = service.get_top_variants("tenant-123", limit=3)
for variant in top:
print(f"{variant.id}: {variant.fitness_score}")---
AutoDevCapabilityService
**Module:** core.auto_dev.capability_gate
Gates Auto-Dev features based on agent maturity level and workspace settings.
Constants
STUDENT = "student"
INTERN = "intern"
SUPERVISED = "supervised"
AUTONOMOUS = "autonomous"
MATURITY_ORDER = [STUDENT, INTERN, SUPERVISED, AUTONOMOUS]Capability Gates
AutoDevCapabilityService.CAPABILITY_GATES = {
"auto_dev.memento_skills": INTERN,
"auto_dev.alpha_evolver": SUPERVISED,
"auto_dev.background_evolution": AUTONOMOUS,
}Helper Function
`is_at_least(current: str, required: str) -> bool`
Check if current maturity level meets or exceeds the required level.
**Parameters:**
current(str): Current maturity levelrequired(str): Required maturity level
**Returns:**
bool:Trueif current >= required
**Example:**
is_at_least("supervised", "intern") # True
is_at_least("intern", "supervised") # FalseAutoDevCapabilityService Class
class AutoDevCapabilityService:
"""Gates Auto-Dev features based on agent maturity and workspace settings."""Constructor
def __init__(self, db: Session):**Parameters:**
db(Session): SQLAlchemy database session
Methods
`can_use(agent_id: str, capability: str, workspace_settings: dict[str, Any] | None = None) -> bool`
Check if an agent can use a specific Auto-Dev capability.
**Parameters:**
agent_id(str): Agent to checkcapability(str): e.g.,"auto_dev.memento_skills"workspace_settings(dict | None): Workspace configuration
**Returns:**
bool:Trueif allowed
**Requires BOTH:**
- Workspace settings allow it (
auto_dev.enabled+ per-capability toggle) - Agent has graduated to required maturity level
**Example:**
gate = AutoDevCapabilityService(db)
if gate.can_use("agent-123", "auto_dev.memento_skills", workspace_settings):
# Enable Memento-Skills`record_usage(agent_id: str, capability: str, success: bool) -> None`
Record Auto-Dev usage to progress agent maturity via graduation framework.
**Parameters:**
agent_id(str): Agent identifiercapability(str): Capability usedsuccess(bool): Whether usage was successful
**Behavior:**
- Calls
CapabilityGraduationService.record_usage() - Contributes to agent's graduation score
`check_daily_limits(agent_id: str, capability: str, workspace_settings: dict[str, Any] | None = None) -> bool`
Check if the agent has exceeded daily Auto-Dev limits.
**Parameters:**
agent_id(str): Agent identifiercapability(str): Capability to checkworkspace_settings(dict | None): Workspace configuration
**Returns:**
bool:Trueif within limits
**Default Limits:**
max_mutations_per_day: 10max_skill_candidates_per_day: 5
`notify_capability_unlocked(agent_id: str, capability: str) -> dict[str, Any]`
Generate notification payload when an agent graduates into a new capability.
**Parameters:**
agent_id(str): Agent identifiercapability(str): Unlocked capability
**Returns:**
{
"type": "auto_dev_capability_unlocked",
"agent_id": str,
"capability": str,
"message": str,
"action_required": bool,
}---
Database Models
**Module:** core.auto_dev.models
SQLAlchemy models for the self-evolving agent system.
ToolMutation
class ToolMutation(Base):
"""
AlphaEvolve: Tracks tool code mutations, lineage, and sandbox test results.
Each mutation has a parent_tool_id for lineage tracing, allowing the system
to track evolutionary chains of code improvements.
"""
__tablename__ = "tool_mutations"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
tenant_id = Column(String(36), nullable=False, index=True)
parent_tool_id = Column(String(36), nullable=True, index=True)
tool_name = Column(String(255), nullable=False)
mutated_code = Column(Text, nullable=False)
sandbox_status = Column(String(50), default="pending")
execution_error = Column(Text, nullable=True)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))**Fields:**
id(str): Primary keytenant_id(str): Tenant identifier (indexed)parent_tool_id(str | None): Parent tool for lineage (indexed)tool_name(str): Tool namemutated_code(str): Mutated Python codesandbox_status(str):"pending","passed", or"failed"execution_error(str | None): Error message if failedcreated_at(DateTime): Creation timestamp
WorkflowVariant
class WorkflowVariant(Base):
"""
AlphaEvolve: Tracks variations of workflows/prompts alongside their
automated fitness scores.
Fitness is evaluated in two stages:
1. Immediate proxy signals (compilation, execution success)
2. Deferred async signals (webhook events, conversion data)
"""
__tablename__ = "workflow_variants"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
tenant_id = Column(String(36), nullable=False, index=True)
parent_variant_id = Column(String(36), nullable=True, index=True)
agent_id = Column(String(36), nullable=True, index=True)
workflow_definition = Column(JSON, nullable=False)
fitness_score = Column(Float, nullable=True)
fitness_signals = Column(JSON, nullable=True)
evaluation_status = Column(String(50), default="pending")
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
last_evaluated_at = Column(DateTime(timezone=True), nullable=True)**Fields:**
id(str): Primary keytenant_id(str): Tenant identifier (indexed)parent_variant_id(str | None): Parent variant for lineage (indexed)agent_id(str | None): Agent identifier (indexed)workflow_definition(JSON): Workflow/prompt definitionfitness_score(float | None): 0.0 to 1.0fitness_signals(JSON | None): Raw proxy/external signalsevaluation_status(str):"pending","evaluated", or"pruned"created_at(DateTime): Creation timestamplast_evaluated_at(DateTime | None): Last evaluation timestamp
SkillCandidate
class SkillCandidate(Base):
"""
Memento-Skills: Skill proposals generated from failed episodes.
When an agent fails a task repeatedly, the MementoEngine analyzes
the failure pattern and generates a new skill candidate. The candidate
must pass sandbox validation before it can be promoted to the active
skill registry.
Lifecycle: pending → validated/failed → promoted
"""
__tablename__ = "skill_candidates"
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
tenant_id = Column(String(36), nullable=False, index=True)
agent_id = Column(String(36), nullable=True, index=True)
source_episode_id = Column(String(36), nullable=True, index=True)
skill_name = Column(String(255), nullable=False)
skill_description = Column(Text, nullable=True)
generated_code = Column(Text, nullable=False)
failure_pattern = Column(JSON, nullable=True)
validation_status = Column(String(50), default="pending")
validation_result = Column(JSON, nullable=True)
fitness_score = Column(Float, nullable=True)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
validated_at = Column(DateTime(timezone=True), nullable=True)
promoted_at = Column(DateTime(timezone=True), nullable=True)
__table_args__ = (
Index("ix_skill_candidates_tenant_status", "tenant_id", "validation_status"),
)**Fields:**
id(str): Primary keytenant_id(str): Tenant identifier (indexed)agent_id(str | None): Agent identifier (indexed)source_episode_id(str | None): Failed episode that triggered generation (indexed)skill_name(str): Skill nameskill_description(str | None): Human-readable descriptiongenerated_code(str): Generated Python codefailure_pattern(JSON | None): Extracted failure analysisvalidation_status(str):"pending","validated","failed", or"promoted"validation_result(JSON | None): Sandbox execution resultsfitness_score(float | None): 0.0 to 1.0created_at(DateTime): Creation timestampvalidated_at(DateTime | None): Validation timestamppromoted_at(DateTime | None): Promotion timestamp
**Indexes:**
ix_skill_candidates_tenant_status: Composite index ontenant_id+validation_status
---
ReflectionEngine
**Module:** core.auto_dev.reflection_engine
Monitors task failures and triggers Memento-Skills when patterns emerge.
Class Definition
class ReflectionEngine:
"""
Monitors task failures and triggers Memento-Skills when patterns emerge.
Usage:
engine = ReflectionEngine(db)
engine.register() # Registers on event bus
"""Constructor
def __init__(
self,
db: Session,
failure_threshold: int = DEFAULT_FAILURE_THRESHOLD,
):**Parameters:**
db(Session): SQLAlchemy database sessionfailure_threshold(int): Minimum similar failures to trigger (default: 2)
Methods
`register() -> None`
Register this engine on the global event bus.
**Example:**
engine = ReflectionEngine(db)
engine.register() # Now listening for task_fail events`async process_failure(event: TaskEvent) -> None`
Process a task failure event.
**Parameters:**
event(TaskEvent): Failure event
**Behavior:**
- Checks if agent maturity allows Auto-Dev
- Adds failure to pattern buffer
- Checks for recurring patterns
- Triggers MementoEngine if threshold exceeded
- Clears buffer to avoid re-triggering
---
EvolutionEngine
**Module:** core.auto_dev.evolution_engine
Background optimizer that triggers AlphaEvolver on underperforming skills.
Constants
LATENCY_THRESHOLD_SECONDS = 5.0
TOKEN_THRESHOLD = 5000Class Definition
class EvolutionEngine:
"""
Background optimizer that triggers AlphaEvolver on underperforming skills.
Usage:
engine = EvolutionEngine(db)
engine.register() # Registers on event bus
"""Constructor
def __init__(self, db: Session):**Parameters:**
db(Session): SQLAlchemy database session
Methods
`register() -> None`
Register this engine on the global event bus.
`async process_execution(event: SkillExecutionEvent) -> None`
Evaluate a skill execution and trigger optimization if warranted.
**Parameters:**
event(SkillExecutionEvent): Skill execution event
**Behavior:**
- Only processes AUTONOMOUS agents with workspace opt-in
- Checks optimization triggers (latency, tokens, failures)
- Triggers AlphaEvolver if warranted
**Optimization Triggers:**
- Execution latency >5s
- Token usage >5000
- Execution failure
---
AdvisorService
**Module:** core.auto_dev.advisor_service
AI-powered guidance for evolutionary progress.
Class Definition
class AdvisorService:
"""
AI Advisor for the Auto-Dev evolutionary framework.
Analyzes mutation/fitness data and generates human-readable guidance.
"""Constructor
def __init__(self, db: Session, llm_service: Any | None = None):**Parameters:**
db(Session): SQLAlchemy database sessionllm_service(Any | None): LLM service for AI guidance
Methods
`async generate_guidance(tenant_id: str, agent_id: str | None = None, llm_service: Any | None = None) -> dict[str, Any]`
Analyze current mutations and fitness data to provide advice.
**Parameters:**
tenant_id(str): Tenant to analyzeagent_id(str | None): Optional specific agentllm_service(Any | None): Override LLM service
**Returns:**
{
"status": "success",
"message": str,
"data_summary": {
"num_mutations": int,
"passed_mutations": int,
"failed_mutations": int,
"top_fitness_score": float,
"avg_fitness_score": float,
},
"readiness_score": int, # 0-100
}**Behavior:**
- Fetches recent mutations and variants
- Generates AI guidance if LLM available
- Falls back to heuristic guidance if not
---
ContainerSandbox
**Module:** core.auto_dev.container_sandbox
Docker-based sandbox for executing untrusted code.
Constants
DOCKER_IMAGE = "python:3.11-slim"
DEFAULT_TIMEOUT = 60
DEFAULT_MEMORY_LIMIT = "256m"Class Definition
class ContainerSandbox:
"""
Docker-based sandbox implementing SandboxProtocol.
Falls back to subprocess isolation if Docker is unavailable.
"""Constructor
def __init__(
self,
docker_image: str = DOCKER_IMAGE,
timeout: int = DEFAULT_TIMEOUT,
memory_limit: str = DEFAULT_MEMORY_LIMIT,
enable_network: bool = False,
):**Parameters:**
docker_image(str): Docker image to usetimeout(int): Execution timeout in secondsmemory_limit(str): Docker memory limitenable_network(bool): Whether to allow network access
Properties
`docker_available -> bool`
Check if Docker is available on the system.
**Returns:**
bool:Trueif Docker is installed and running
Methods
`async execute_raw_python(tenant_id: str, code: str, input_params: dict[str, Any] | None = None, timeout: int | None = None, safety_level: str = "MEDIUM_RISK", **kwargs) -> dict[str, Any]`
Execute Python code in an isolated environment.
**Parameters:**
tenant_id(str): Tenant ID for trackingcode(str): Python code to executeinput_params(dict | None): Input parameters passed as JSONtimeout(int | None): Override default timeoutsafety_level(str): Ignored in upstream (used by SaaS)
**Returns:**
{
"status": "success" | "failed",
"output": str,
"execution_seconds": float,
"environment": "docker" | "subprocess",
}**Behavior:**
- Uses Docker if available
- Falls back to subprocess isolation
- Enforces timeout
- Returns execution time and output
**Security:**
- Ephemeral containers (destroyed after execution)
- No network access (unless
enable_network=True) - Memory limit (default 256MB)
- Read-only filesystem with tmpfs for /tmp
---
Type Aliases
EventHandler = Callable[..., Coroutine[Any, Any, None]]Async event handler function type.
---
Usage Examples
Basic Memento-Skills Workflow
from core.auto_dev.memento_engine import MementoEngine
engine = MementoEngine(db)
# Analyze failed episode
analysis = await engine.analyze_episode("episode-123")
# Generate skill candidate
candidate = await engine.generate_skill_candidate(
tenant_id="tenant-456",
agent_id="agent-789",
episode_id="episode-123",
)
# Validate in sandbox
result = await engine.validate_candidate(
candidate_id=candidate.id,
tenant_id="tenant-456",
)
# Promote if validated
if result["passed"]:
await engine.promote_skill(candidate.id, "tenant-456")Basic AlphaEvolver Workflow
from core.auto_dev.alpha_evolver_engine import AlphaEvolverEngine
engine = AlphaEvolverEngine(db)
# Run research experiment
results = await engine.run_research_experiment(
tenant_id="tenant-456",
base_code=original_code,
research_goal="Optimize for speed",
iterations=3,
)
# Check results
for result in results:
print(f"Iteration {result['iteration']}: {result['success']}")Event Subscription
from core.auto_dev.event_hooks import event_bus, TaskEvent
@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
print(f"Task failed: {event.task_description}")
# Trigger learning loop...---
See Also
- AUTO_DEV_USER_GUIDE.md - End-user guide
- AUTO_DEV_DEVELOPER_GUIDE.md - Developer guide
- AUTO_DEV_EVENT_PROTOCOL.md - Event protocol
- AUTO_DEV_ARCHITECTURE.md - Architecture diagrams