Auto-Dev Event Protocol
Complete specification of Auto-Dev event types, subscription patterns, event flow, and handler best practices.
**Version:** 1.0.0
**Last Updated:** 2026-04-10
---
Table of Contents
- Event Types
- Event Flow
- Subscription Patterns
- Event Payloads
- Handler Best Practices
- Event Bus Internals
- Integration Points
---
Event Types
Auto-Dev defines two primary event types for lifecycle communication:
TaskEvent
Emitted when an agent task completes (success or failure).
@dataclass
class TaskEvent:
"""Event payload for task lifecycle events."""
episode_id: str
agent_id: str
tenant_id: str
task_description: str = ""
error_trace: str | None = None
outcome: str = "" # "success", "failure", "partial"
metadata: dict[str, Any] = field(default_factory=dict)**Field Specifications:**
| Field | Type | Required | Description | |
|---|---|---|---|---|
episode_id | str | Yes | Unique episode identifier (UUID) | |
agent_id | str | Yes | Agent that executed the task | |
tenant_id | str | Yes | Tenant/workspace identifier | |
task_description | str | No | Human-readable task description | |
error_trace | `str \ | None` | No | Stack trace if task failed |
outcome | str | No | Task outcome: "success", "failure", or "partial" | |
metadata | dict | No | Additional event metadata |
**When Emitted:**
emit_task_fail(): Agent task fails (exception, timeout, validation error)emit_task_success(): Agent task completes successfully
**Example Payload:**
{
"episode_id": "550e8400-e29b-41d4-a716-446655440000",
"agent_id": "agent-123",
"tenant_id": "tenant-456",
"task_description": "Extract invoice ID from customer email",
"error_trace": "KeyError: 'invoice_id'\\n File \\\"process.py\\\", line 42",
"outcome": "failure",
"metadata": {
"tool_calls": ["extract_email", "parse_invoice"],
"execution_seconds": 3.2,
"token_usage": 1250
}
}SkillExecutionEvent
Emitted after a skill is executed in the sandbox.
@dataclass
class SkillExecutionEvent:
"""Event payload for skill execution events."""
execution_id: str
agent_id: str
tenant_id: str
skill_id: str
skill_name: str = ""
execution_seconds: float = 0.0
token_usage: int = 0
success: bool = False
output: str = ""
metadata: dict[str, Any] = field(default_factory=dict)**Field Specifications:**
| Field | Type | Required | Description |
|---|---|---|---|
execution_id | str | Yes | Unique execution identifier (UUID) |
agent_id | str | Yes | Agent that executed the skill |
tenant_id | str | Yes | Tenant/workspace identifier |
skill_id | str | Yes | Skill identifier |
skill_name | str | No | Human-readable skill name |
execution_seconds | float | No | Execution time in seconds |
token_usage | int | No | LLM tokens consumed |
success | bool | No | Whether execution succeeded |
output | str | No | Execution output or error message |
metadata | dict | No | Additional execution metadata |
**When Emitted:**
emit_skill_execution(): After skill execution completes (success or failure)
**Example Payload:**
{
"execution_id": "650e8400-e29b-41d4-a716-446655440000",
"agent_id": "agent-123",
"tenant_id": "tenant-456",
"skill_id": "skill-789",
"skill_name": "extract_invoice_id",
"execution_seconds": 1.8,
"token_usage": 450,
"success": true,
"output": "INV-12345",
"metadata": {
"sandbox_environment": "docker",
"memory_used_mb": 64,
"cpu_time_ms": 150
}
}---
Event Flow
Task Failure Flow
┌─────────────────┐
│ Agent Executes │
│ Task │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Task Fails │
│ (Exception) │
└────────┬────────┘
│
▼
┌─────────────────────────────┐
│ EpisodeService Records │
│ Failed Episode │
└────────┬────────────────────┘
│
▼
┌─────────────────────────────┐
│ event_bus.emit_task_fail() │
│ (TaskEvent) │
└────────┬────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ EventBus Dispatches to Handlers │
│ ├─ ReflectionEngine.process_failure() │
│ ├─ Custom handler 1 │
│ └─ Custom handler 2 │
└─────────────────────────────────────────┘Task Success Flow
┌─────────────────┐
│ Agent Executes │
│ Task │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Task Succeeds │
└────────┬────────┘
│
▼
┌───────────────────────────────┐
│ EpisodeService Records │
│ Successful Episode │
└────────┬──────────────────────┘
│
▼
┌──────────────────────────────────┐
│ event_bus.emit_task_success() │
│ (TaskEvent) │
└────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ EventBus Dispatches to Handlers │
│ ├─ AlphaEvolver analysis (optional) │
│ ├─ Analytics handler │
│ └─ Custom handler 1 │
└─────────────────────────────────────────┘Skill Execution Flow
┌─────────────────┐
│ SandboxExecutor │
│ Executes Skill │
└────────┬────────┘
│
▼
┌─────────────────────────────┐
│ Skill Completes/Fails │
└────────┬────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ event_bus.emit_skill_execution() │
│ (SkillExecutionEvent) │
└────────┬─────────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ EventBus Dispatches to Handlers │
│ ├─ EvolutionEngine.process_execution() │
│ ├─ Performance tracker │
│ └─ Custom handler 1 │
└─────────────────────────────────────────┘Event Dispatch with Error Isolation
event_bus.emit_task_fail(TaskEvent(...))
│
▼
┌─────────────────────────┐
│ _dispatch() │
│ ├─ Handler 1.execute() │
│ │ └─ ✅ Success │
│ ├─ Handler 2.execute() │
│ │ └─ ❌ Exception │
│ │ └─ Logged, continue │
│ └─ Handler 3.execute() │
│ └─ ✅ Success │
└─────────────────────────┘**Key Point:** Exceptions in handlers don't prevent other handlers from executing.
---
Subscription Patterns
Decorator Registration
Use decorators to register event handlers:
from core.auto_dev.event_hooks import event_bus, TaskEvent, SkillExecutionEvent
@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
"""Handle task failure events."""
logger.info(f"Task failed: {event.task_description}")
# Your logic here
@event_bus.on_task_success
async def handle_success(event: TaskEvent):
"""Handle task success events."""
logger.info(f"Task succeeded: {event.task_description}")
# Your logic here
@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
"""Handle skill execution events."""
logger.info(f"Skill {event.skill_name} executed in {event.execution_seconds}s")
# Your logic hereHandler Signature
Event handlers must be async functions with the correct signature:
# TaskEvent handlers
async def handler(event: TaskEvent) -> None:
"""Handle task events."""
pass
# SkillExecutionEvent handlers
async def handler(event: SkillExecutionEvent) -> None:
"""Handle skill execution events."""
passMultiple Subscribers
Multiple handlers can subscribe to the same event type:
@event_bus.on_task_fail
async def log_failure(event: TaskEvent):
"""Log all failures."""
logger.error(f"Failure: {event.task_description}")
@event_bus.on_task_fail
async def track_failure(event: TaskEvent):
"""Track failure metrics."""
metrics.increment("task_failures")
@event_bus.on_task_fail
async def trigger_memento(event: TaskEvent):
"""Trigger Memento-Skills."""
if should_trigger(event):
await generate_skill_candidate(event)
# All three handlers execute on each task_fail eventConditional Handling
Implement conditional logic within handlers:
@event_bus.on_task_fail
async def selective_handler(event: TaskEvent):
"""Handle only specific failures."""
# Filter by agent
if event.agent_id != "target-agent":
return
# Filter by error type
if "KeyError" not in (event.error_trace or ""):
return
# Process specific failure
await handle_keyerror_failure(event)---
Event Payloads
Required Fields
Both event types have required fields that must be provided:
**TaskEvent Required:**
episode_idagent_idtenant_id
**SkillExecutionEvent Required:**
execution_idagent_idtenant_idskill_id
Optional Fields
Optional fields provide additional context:
# TaskEvent with optional fields
event = TaskEvent(
episode_id="episode-123",
agent_id="agent-456",
tenant_id="tenant-789",
task_description="Process invoice", # Optional
error_trace="KeyError: ...", # Optional
outcome="failure", # Optional
metadata={ # Optional
"tool_calls": ["extract", "parse"],
"execution_seconds": 3.2,
},
)
# SkillExecutionEvent with optional fields
event = SkillExecutionEvent(
execution_id="exec-123",
agent_id="agent-456",
tenant_id="tenant-789",
skill_id="skill-abc",
skill_name="extract_invoice", # Optional
execution_seconds=1.8, # Optional
token_usage=450, # Optional
success=True, # Optional
output="INV-12345", # Optional
metadata={ # Optional
"sandbox_environment": "docker",
},
)Metadata Dictionary
The metadata field allows custom data:
event = TaskEvent(
episode_id="episode-123",
agent_id="agent-456",
tenant_id="tenant-789",
metadata={
# Custom fields
"custom_field_1": "value1",
"custom_field_2": 42,
# Nested data
"nested": {
"key": "value",
},
# Lists
"items": ["item1", "item2"],
},
)Type Hints
Event fields use type hints for validation:
from typing import Any
event = TaskEvent(
episode_id="episode-123", # str
agent_id="agent-456", # str
tenant_id="tenant-789", # str
error_trace=None, # str | None
metadata={}, # dict[str, Any]
)Validation Requirements
**Event ID Format:**
- Must be valid UUID strings
- Format:
xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx
**Tenant/Agent ID Format:**
- Typically UUID strings
- Can be application-specific identifiers
**Outcome Values (TaskEvent):**
"success": Task completed successfully"failure": Task failed with exception"partial": Task partially completed
---
Handler Best Practices
1. Keep Handlers Lightweight
Handlers should execute quickly to avoid blocking event dispatch:
# ✅ Good: Fast logging
@event_bus.on_task_fail
async def log_failure(event: TaskEvent):
"""Log failure quickly."""
logger.info(f"Task failed: {event.task_description}")
# ❌ Bad: Blocking operation
@event_bus.on_task_fail
async def slow_handler(event: TaskEvent):
"""Blocking operation delays all handlers."""
result = await slow_database_query(event.episode_id) # Takes seconds**Solution:** Offload heavy work to background tasks:
# ✅ Good: Offload to background
@event_bus.on_task_fail
async def queue_background_work(event: TaskEvent):
"""Queue work for background processing."""
await background_queue.enqueue(event.episode_id)2. Use Async/Await Properly
Handlers must be async and use await correctly:
# ✅ Good: Proper async
@event_bus.on_task_fail
async def proper_handler(event: TaskEvent):
"""Proper async handler."""
result = await async_operation(event.episode_id)
await another_async_operation(result)
# ❌ Bad: Missing await
@event_bus.on_task_fail
async def improper_handler(event: TaskEvent):
"""Missing await causes race conditions."""
async_operation(event.episode_id) # Not awaited!3. Handle Exceptions Gracefully
Wrap handler logic in try-except:
# ✅ Good: Exception handling
@event_bus.on_task_fail
async def safe_handler(event: TaskEvent):
"""Safe handler with exception handling."""
try:
result = await risky_operation(event)
await process_result(result)
except Exception as e:
logger.error(f"Handler failed: {e}", exc_info=True)
# Event bus continues to other handlers
# ❌ Bad: Unhandled exceptions
@event_bus.on_task_fail
async def unsafe_handler(event: TaskEvent):
"""Unhandled exception logged by event bus."""
result = await risky_operation(event) # May raise
process_result(result) # May raise**Note:** Event bus catches exceptions automatically, but explicit handling provides better logging.
4. Avoid Blocking Operations
Don't perform blocking I/O in handlers:
# ❌ Bad: Blocking HTTP call
@event_bus.on_task_fail
async def blocking_handler(event: TaskEvent):
"""Blocking HTTP call delays event dispatch."""
import requests
response = requests.post(url, json=event_data) # Blocks!
# ✅ Good: Async HTTP call
@event_bus.on_task_fail
async def nonblocking_handler(event: TaskEvent):
"""Async HTTP call doesn't block."""
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(url, json=event_data)5. Use Queues for Heavy Processing
For heavy processing, use message queues:
# ✅ Good: Queue for heavy processing
@event_bus.on_task_fail
async def queue_heavy_work(event: TaskEvent):
"""Queue heavy work for background processing."""
await task_queue.enqueue(
"process_failure",
{"episode_id": event.episode_id},
)**Example queue implementations:**
- Celery
- RQ (Redis Queue)
- AWS SQS
- RabbitMQ
6. Log Handler Entry/Exit
Add logging for debugging:
# ✅ Good: Structured logging
@event_bus.on_task_fail
async def logged_handler(event: TaskEvent):
"""Handler with structured logging."""
logger.info(
"handler_started",
handler="logged_handler",
episode_id=event.episode_id,
)
try:
result = await process_event(event)
logger.info(
"handler_completed",
handler="logged_handler",
episode_id=event.episode_id,
)
return result
except Exception as e:
logger.error(
"handler_failed",
handler="logged_handler",
episode_id=event.episode_id,
error=str(e),
exc_info=True,
)
raise---
Event Bus Internals
In-Memory Singleton
EventBus is a singleton for in-process communication:
# Global singleton
event_bus = EventBus()
# Access from anywhere
from core.auto_dev.event_hooks import event_bus
@event_bus.on_task_fail
async def handler(event: TaskEvent):
pass**Implications:**
- Events only delivered within same process
- No cross-process communication
- No persistence across restarts
Handler List Management
EventBus maintains separate handler lists:
class EventBus:
def __init__(self):
self._fail_handlers: list[EventHandler] = []
self._success_handlers: list[EventHandler] = []
self._skill_handlers: list[EventHandler] = []**Registration:**
@event_bus.on_task_fail
async def handler(event: TaskEvent):
pass
# Equivalent to:
event_bus._fail_handlers.append(handler)Dispatch Loop with Error Isolation
Event dispatch uses error isolation:
async def _dispatch(
self, handlers: list[EventHandler], event: Any, event_name: str
) -> None:
"""Dispatch event to all handlers, catching exceptions."""
if not handlers:
return
for handler in handlers:
try:
await handler(event)
except Exception as e:
logger.error(
f"Auto-Dev event handler error in {event_name} "
f"(handler={handler.__name__}): {e}",
exc_info=True,
)**Behavior:**
- All handlers execute even if some fail
- Exceptions logged with handler name
- Event dispatch continues after exceptions
Clear Method for Testing
Use clear() to remove all handlers (testing only):
def test_event_handler():
"""Test event handler."""
event_bus.clear() # Remove existing handlers
@event_bus.on_task_fail
async def test_handler(event: TaskEvent):
pass
# Run test
await event_bus.emit_task_fail(TaskEvent(...))
# Cleanup
event_bus.clear()---
Integration Points
EpisodeService Integration
EpisodeService emits task events:
from core.auto_dev.event_hooks import event_bus, TaskEvent
class EpisodeService:
async def record_episode(self, episode_data: dict):
"""Record episode and emit event."""
episode = Episode(**episode_data)
self.db.add(episode)
self.db.commit()
# Emit event
if episode.success:
await event_bus.emit_task_success(TaskEvent(
episode_id=episode.id,
agent_id=episode.agent_id,
tenant_id=episode.user_id,
task_description=episode.task_description,
outcome="success",
))
else:
await event_bus.emit_task_fail(TaskEvent(
episode_id=episode.id,
agent_id=episode.agent_id,
tenant_id=episode.user_id,
task_description=episode.task_description,
error_trace=episode.error_trace,
outcome="failure",
))SandboxExecutor Integration
SandboxExecutor emits skill execution events:
from core.auto_dev.event_hooks import event_bus, SkillExecutionEvent
class SandboxExecutor:
async def execute_skill(
self,
skill_id: str,
agent_id: str,
tenant_id: str,
code: str,
):
"""Execute skill and emit event."""
start_time = time.monotonic()
try:
result = await self._execute_code(code, tenant_id)
execution_seconds = time.monotonic() - start_time
# Emit event
await event_bus.emit_skill_execution(SkillExecutionEvent(
execution_id=str(uuid.uuid4()),
agent_id=agent_id,
tenant_id=tenant_id,
skill_id=skill_id,
skill_name=skill.name,
execution_seconds=execution_seconds,
token_usage=result.get("token_usage", 0),
success=True,
output=result.get("output", ""),
))
return result
except Exception as e:
execution_seconds = time.monotonic() - start_time
# Emit failure event
await event_bus.emit_skill_execution(SkillExecutionEvent(
execution_id=str(uuid.uuid4()),
agent_id=agent_id,
tenant_id=tenant_id,
skill_id=skill_id,
skill_name=skill.name,
execution_seconds=execution_seconds,
token_usage=0,
success=False,
output=str(e),
))
raiseCustom Event Sources
Create custom event sources:
from core.auto_dev.event_hooks import event_bus, TaskEvent
class CustomEventSource:
"""Custom event source for external systems."""
async def on_external_failure(
self,
external_system: str,
error_data: dict,
):
"""Handle external system failure."""
# Map external data to TaskEvent
event = TaskEvent(
episode_id=str(uuid.uuid4()),
agent_id=error_data.get("agent_id", "unknown"),
tenant_id=error_data.get("tenant_id", "unknown"),
task_description=f"{external_system} failure",
error_trace=error_data.get("error"),
outcome="failure",
metadata={
"external_system": external_system,
"external_error_code": error_data.get("code"),
},
)
# Emit to Auto-Dev handlers
await event_bus.emit_task_fail(event)Event Aggregation Patterns
Aggregate events over time windows:
from collections import defaultdict
from datetime import datetime, timedelta, timezone
class EventAggregator:
"""Aggregate events for batch processing."""
def __init__(self, window_seconds: int = 300):
self.window_seconds = window_seconds
self._events: dict[str, list[TaskEvent]] = defaultdict(list)
@event_bus.on_task_fail
async def aggregate_failures(self, event: TaskEvent):
"""Aggregate failures over time window."""
agent_id = event.agent_id
self._events[agent_id].append(event)
# Process if window full
if self._should_process(agent_id):
await self._process_window(agent_id)
def _should_process(self, agent_id: str) -> bool:
"""Check if time window is full."""
events = self._events[agent_id]
if len(events) < 5:
return False
# Check time window
oldest = min(
e.metadata.get("timestamp", datetime.now(timezone.utc))
for e in events
)
window_start = datetime.now(timezone.utc) - timedelta(
seconds=self.window_seconds
)
return oldest < window_start
async def _process_window(self, agent_id: str):
"""Process aggregated events."""
events = self._events[agent_id]
# Batch process
await self._batch_analyze(agent_id, events)
# Clear processed events
self._events[agent_id].clear()---
See Also
- AUTO_DEV_API_REFERENCE.md - Complete API documentation
- AUTO_DEV_DEVELOPER_GUIDE.md - Developer guide
- AUTO_DEV_USER_GUIDE.md - End-user guide
- AUTO_DEV_ARCHITECTURE.md - System architecture