ATOM Documentation

← Back to App

Auto-Dev Event Protocol

Complete specification of Auto-Dev event types, subscription patterns, event flow, and handler best practices.

**Version:** 1.0.0

**Last Updated:** 2026-04-10

---

Table of Contents

---

Event Types

Auto-Dev defines two primary event types for lifecycle communication:

TaskEvent

Emitted when an agent task completes (success or failure).

@dataclass
class TaskEvent:
    """Event payload for task lifecycle events."""

    episode_id: str
    agent_id: str
    tenant_id: str
    task_description: str = ""
    error_trace: str | None = None
    outcome: str = ""  # "success", "failure", "partial"
    metadata: dict[str, Any] = field(default_factory=dict)

**Field Specifications:**

FieldTypeRequiredDescription
episode_idstrYesUnique episode identifier (UUID)
agent_idstrYesAgent that executed the task
tenant_idstrYesTenant/workspace identifier
task_descriptionstrNoHuman-readable task description
error_trace`str \None`NoStack trace if task failed
outcomestrNoTask outcome: "success", "failure", or "partial"
metadatadictNoAdditional event metadata

**When Emitted:**

  • emit_task_fail(): Agent task fails (exception, timeout, validation error)
  • emit_task_success(): Agent task completes successfully

**Example Payload:**

{
  "episode_id": "550e8400-e29b-41d4-a716-446655440000",
  "agent_id": "agent-123",
  "tenant_id": "tenant-456",
  "task_description": "Extract invoice ID from customer email",
  "error_trace": "KeyError: 'invoice_id'\\n  File \\\"process.py\\\", line 42",
  "outcome": "failure",
  "metadata": {
    "tool_calls": ["extract_email", "parse_invoice"],
    "execution_seconds": 3.2,
    "token_usage": 1250
  }
}

SkillExecutionEvent

Emitted after a skill is executed in the sandbox.

@dataclass
class SkillExecutionEvent:
    """Event payload for skill execution events."""

    execution_id: str
    agent_id: str
    tenant_id: str
    skill_id: str
    skill_name: str = ""
    execution_seconds: float = 0.0
    token_usage: int = 0
    success: bool = False
    output: str = ""
    metadata: dict[str, Any] = field(default_factory=dict)

**Field Specifications:**

FieldTypeRequiredDescription
execution_idstrYesUnique execution identifier (UUID)
agent_idstrYesAgent that executed the skill
tenant_idstrYesTenant/workspace identifier
skill_idstrYesSkill identifier
skill_namestrNoHuman-readable skill name
execution_secondsfloatNoExecution time in seconds
token_usageintNoLLM tokens consumed
successboolNoWhether execution succeeded
outputstrNoExecution output or error message
metadatadictNoAdditional execution metadata

**When Emitted:**

  • emit_skill_execution(): After skill execution completes (success or failure)

**Example Payload:**

{
  "execution_id": "650e8400-e29b-41d4-a716-446655440000",
  "agent_id": "agent-123",
  "tenant_id": "tenant-456",
  "skill_id": "skill-789",
  "skill_name": "extract_invoice_id",
  "execution_seconds": 1.8,
  "token_usage": 450,
  "success": true,
  "output": "INV-12345",
  "metadata": {
    "sandbox_environment": "docker",
    "memory_used_mb": 64,
    "cpu_time_ms": 150
  }
}

---

Event Flow

Task Failure Flow

┌─────────────────┐
│ Agent Executes  │
│     Task        │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Task Fails     │
│  (Exception)    │
└────────┬────────┘
         │
         ▼
┌─────────────────────────────┐
│  EpisodeService Records     │
│  Failed Episode             │
└────────┬────────────────────┘
         │
         ▼
┌─────────────────────────────┐
│  event_bus.emit_task_fail() │
│  (TaskEvent)                │
└────────┬────────────────────┘
         │
         ▼
┌─────────────────────────────────────────┐
│  EventBus Dispatches to Handlers        │
│  ├─ ReflectionEngine.process_failure() │
│  ├─ Custom handler 1                    │
│  └─ Custom handler 2                    │
└─────────────────────────────────────────┘

Task Success Flow

┌─────────────────┐
│ Agent Executes  │
│     Task        │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Task Succeeds  │
└────────┬────────┘
         │
         ▼
┌───────────────────────────────┐
│  EpisodeService Records       │
│  Successful Episode           │
└────────┬──────────────────────┘
         │
         ▼
┌──────────────────────────────────┐
│  event_bus.emit_task_success()   │
│  (TaskEvent)                     │
└────────┬─────────────────────────┘
         │
         ▼
┌─────────────────────────────────────────┐
│  EventBus Dispatches to Handlers        │
│  ├─ AlphaEvolver analysis (optional)   │
│  ├─ Analytics handler                  │
│  └─ Custom handler 1                   │
└─────────────────────────────────────────┘

Skill Execution Flow

┌─────────────────┐
│ SandboxExecutor │
│ Executes Skill  │
└────────┬────────┘
         │
         ▼
┌─────────────────────────────┐
│  Skill Completes/Fails      │
└────────┬────────────────────┘
         │
         ▼
┌──────────────────────────────────────┐
│  event_bus.emit_skill_execution()    │
│  (SkillExecutionEvent)               │
└────────┬─────────────────────────────┘
         │
         ▼
┌─────────────────────────────────────────┐
│  EventBus Dispatches to Handlers        │
│  ├─ EvolutionEngine.process_execution() │
│  ├─ Performance tracker                 │
│  └─ Custom handler 1                    │
└─────────────────────────────────────────┘

Event Dispatch with Error Isolation

event_bus.emit_task_fail(TaskEvent(...))
         │
         ▼
┌─────────────────────────┐
│  _dispatch()            │
│  ├─ Handler 1.execute() │
│  │  └─ ✅ Success       │
│  ├─ Handler 2.execute() │
│  │  └─ ❌ Exception     │
│  │  └─ Logged, continue │
│  └─ Handler 3.execute() │
│     └─ ✅ Success       │
└─────────────────────────┘

**Key Point:** Exceptions in handlers don't prevent other handlers from executing.

---

Subscription Patterns

Decorator Registration

Use decorators to register event handlers:

from core.auto_dev.event_hooks import event_bus, TaskEvent, SkillExecutionEvent

@event_bus.on_task_fail
async def handle_failure(event: TaskEvent):
    """Handle task failure events."""
    logger.info(f"Task failed: {event.task_description}")
    # Your logic here

@event_bus.on_task_success
async def handle_success(event: TaskEvent):
    """Handle task success events."""
    logger.info(f"Task succeeded: {event.task_description}")
    # Your logic here

@event_bus.on_skill_execution
async def handle_skill_execution(event: SkillExecutionEvent):
    """Handle skill execution events."""
    logger.info(f"Skill {event.skill_name} executed in {event.execution_seconds}s")
    # Your logic here

Handler Signature

Event handlers must be async functions with the correct signature:

# TaskEvent handlers
async def handler(event: TaskEvent) -> None:
    """Handle task events."""
    pass

# SkillExecutionEvent handlers
async def handler(event: SkillExecutionEvent) -> None:
    """Handle skill execution events."""
    pass

Multiple Subscribers

Multiple handlers can subscribe to the same event type:

@event_bus.on_task_fail
async def log_failure(event: TaskEvent):
    """Log all failures."""
    logger.error(f"Failure: {event.task_description}")

@event_bus.on_task_fail
async def track_failure(event: TaskEvent):
    """Track failure metrics."""
    metrics.increment("task_failures")

@event_bus.on_task_fail
async def trigger_memento(event: TaskEvent):
    """Trigger Memento-Skills."""
    if should_trigger(event):
        await generate_skill_candidate(event)

# All three handlers execute on each task_fail event

Conditional Handling

Implement conditional logic within handlers:

@event_bus.on_task_fail
async def selective_handler(event: TaskEvent):
    """Handle only specific failures."""
    # Filter by agent
    if event.agent_id != "target-agent":
        return

    # Filter by error type
    if "KeyError" not in (event.error_trace or ""):
        return

    # Process specific failure
    await handle_keyerror_failure(event)

---

Event Payloads

Required Fields

Both event types have required fields that must be provided:

**TaskEvent Required:**

  • episode_id
  • agent_id
  • tenant_id

**SkillExecutionEvent Required:**

  • execution_id
  • agent_id
  • tenant_id
  • skill_id

Optional Fields

Optional fields provide additional context:

# TaskEvent with optional fields
event = TaskEvent(
    episode_id="episode-123",
    agent_id="agent-456",
    tenant_id="tenant-789",
    task_description="Process invoice",  # Optional
    error_trace="KeyError: ...",  # Optional
    outcome="failure",  # Optional
    metadata={  # Optional
        "tool_calls": ["extract", "parse"],
        "execution_seconds": 3.2,
    },
)

# SkillExecutionEvent with optional fields
event = SkillExecutionEvent(
    execution_id="exec-123",
    agent_id="agent-456",
    tenant_id="tenant-789",
    skill_id="skill-abc",
    skill_name="extract_invoice",  # Optional
    execution_seconds=1.8,  # Optional
    token_usage=450,  # Optional
    success=True,  # Optional
    output="INV-12345",  # Optional
    metadata={  # Optional
        "sandbox_environment": "docker",
    },
)

Metadata Dictionary

The metadata field allows custom data:

event = TaskEvent(
    episode_id="episode-123",
    agent_id="agent-456",
    tenant_id="tenant-789",
    metadata={
        # Custom fields
        "custom_field_1": "value1",
        "custom_field_2": 42,

        # Nested data
        "nested": {
            "key": "value",
        },

        # Lists
        "items": ["item1", "item2"],
    },
)

Type Hints

Event fields use type hints for validation:

from typing import Any

event = TaskEvent(
    episode_id="episode-123",  # str
    agent_id="agent-456",  # str
    tenant_id="tenant-789",  # str
    error_trace=None,  # str | None
    metadata={},  # dict[str, Any]
)

Validation Requirements

**Event ID Format:**

  • Must be valid UUID strings
  • Format: xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx

**Tenant/Agent ID Format:**

  • Typically UUID strings
  • Can be application-specific identifiers

**Outcome Values (TaskEvent):**

  • "success": Task completed successfully
  • "failure": Task failed with exception
  • "partial": Task partially completed

---

Handler Best Practices

1. Keep Handlers Lightweight

Handlers should execute quickly to avoid blocking event dispatch:

# ✅ Good: Fast logging
@event_bus.on_task_fail
async def log_failure(event: TaskEvent):
    """Log failure quickly."""
    logger.info(f"Task failed: {event.task_description}")

# ❌ Bad: Blocking operation
@event_bus.on_task_fail
async def slow_handler(event: TaskEvent):
    """Blocking operation delays all handlers."""
    result = await slow_database_query(event.episode_id)  # Takes seconds

**Solution:** Offload heavy work to background tasks:

# ✅ Good: Offload to background
@event_bus.on_task_fail
async def queue_background_work(event: TaskEvent):
    """Queue work for background processing."""
    await background_queue.enqueue(event.episode_id)

2. Use Async/Await Properly

Handlers must be async and use await correctly:

# ✅ Good: Proper async
@event_bus.on_task_fail
async def proper_handler(event: TaskEvent):
    """Proper async handler."""
    result = await async_operation(event.episode_id)
    await another_async_operation(result)

# ❌ Bad: Missing await
@event_bus.on_task_fail
async def improper_handler(event: TaskEvent):
    """Missing await causes race conditions."""
    async_operation(event.episode_id)  # Not awaited!

3. Handle Exceptions Gracefully

Wrap handler logic in try-except:

# ✅ Good: Exception handling
@event_bus.on_task_fail
async def safe_handler(event: TaskEvent):
    """Safe handler with exception handling."""
    try:
        result = await risky_operation(event)
        await process_result(result)
    except Exception as e:
        logger.error(f"Handler failed: {e}", exc_info=True)
        # Event bus continues to other handlers

# ❌ Bad: Unhandled exceptions
@event_bus.on_task_fail
async def unsafe_handler(event: TaskEvent):
    """Unhandled exception logged by event bus."""
    result = await risky_operation(event)  # May raise
    process_result(result)  # May raise

**Note:** Event bus catches exceptions automatically, but explicit handling provides better logging.

4. Avoid Blocking Operations

Don't perform blocking I/O in handlers:

# ❌ Bad: Blocking HTTP call
@event_bus.on_task_fail
async def blocking_handler(event: TaskEvent):
    """Blocking HTTP call delays event dispatch."""
    import requests
    response = requests.post(url, json=event_data)  # Blocks!

# ✅ Good: Async HTTP call
@event_bus.on_task_fail
async def nonblocking_handler(event: TaskEvent):
    """Async HTTP call doesn't block."""
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=event_data)

5. Use Queues for Heavy Processing

For heavy processing, use message queues:

# ✅ Good: Queue for heavy processing
@event_bus.on_task_fail
async def queue_heavy_work(event: TaskEvent):
    """Queue heavy work for background processing."""
    await task_queue.enqueue(
        "process_failure",
        {"episode_id": event.episode_id},
    )

**Example queue implementations:**

  • Celery
  • RQ (Redis Queue)
  • AWS SQS
  • RabbitMQ

6. Log Handler Entry/Exit

Add logging for debugging:

# ✅ Good: Structured logging
@event_bus.on_task_fail
async def logged_handler(event: TaskEvent):
    """Handler with structured logging."""
    logger.info(
        "handler_started",
        handler="logged_handler",
        episode_id=event.episode_id,
    )

    try:
        result = await process_event(event)
        logger.info(
            "handler_completed",
            handler="logged_handler",
            episode_id=event.episode_id,
        )
        return result
    except Exception as e:
        logger.error(
            "handler_failed",
            handler="logged_handler",
            episode_id=event.episode_id,
            error=str(e),
            exc_info=True,
        )
        raise

---

Event Bus Internals

In-Memory Singleton

EventBus is a singleton for in-process communication:

# Global singleton
event_bus = EventBus()

# Access from anywhere
from core.auto_dev.event_hooks import event_bus

@event_bus.on_task_fail
async def handler(event: TaskEvent):
    pass

**Implications:**

  • Events only delivered within same process
  • No cross-process communication
  • No persistence across restarts

Handler List Management

EventBus maintains separate handler lists:

class EventBus:
    def __init__(self):
        self._fail_handlers: list[EventHandler] = []
        self._success_handlers: list[EventHandler] = []
        self._skill_handlers: list[EventHandler] = []

**Registration:**

@event_bus.on_task_fail
async def handler(event: TaskEvent):
    pass

# Equivalent to:
event_bus._fail_handlers.append(handler)

Dispatch Loop with Error Isolation

Event dispatch uses error isolation:

async def _dispatch(
    self, handlers: list[EventHandler], event: Any, event_name: str
) -> None:
    """Dispatch event to all handlers, catching exceptions."""
    if not handlers:
        return

    for handler in handlers:
        try:
            await handler(event)
        except Exception as e:
            logger.error(
                f"Auto-Dev event handler error in {event_name} "
                f"(handler={handler.__name__}): {e}",
                exc_info=True,
            )

**Behavior:**

  • All handlers execute even if some fail
  • Exceptions logged with handler name
  • Event dispatch continues after exceptions

Clear Method for Testing

Use clear() to remove all handlers (testing only):

def test_event_handler():
    """Test event handler."""
    event_bus.clear()  # Remove existing handlers

    @event_bus.on_task_fail
    async def test_handler(event: TaskEvent):
        pass

    # Run test
    await event_bus.emit_task_fail(TaskEvent(...))

    # Cleanup
    event_bus.clear()

---

Integration Points

EpisodeService Integration

EpisodeService emits task events:

from core.auto_dev.event_hooks import event_bus, TaskEvent

class EpisodeService:
    async def record_episode(self, episode_data: dict):
        """Record episode and emit event."""
        episode = Episode(**episode_data)
        self.db.add(episode)
        self.db.commit()

        # Emit event
        if episode.success:
            await event_bus.emit_task_success(TaskEvent(
                episode_id=episode.id,
                agent_id=episode.agent_id,
                tenant_id=episode.user_id,
                task_description=episode.task_description,
                outcome="success",
            ))
        else:
            await event_bus.emit_task_fail(TaskEvent(
                episode_id=episode.id,
                agent_id=episode.agent_id,
                tenant_id=episode.user_id,
                task_description=episode.task_description,
                error_trace=episode.error_trace,
                outcome="failure",
            ))

SandboxExecutor Integration

SandboxExecutor emits skill execution events:

from core.auto_dev.event_hooks import event_bus, SkillExecutionEvent

class SandboxExecutor:
    async def execute_skill(
        self,
        skill_id: str,
        agent_id: str,
        tenant_id: str,
        code: str,
    ):
        """Execute skill and emit event."""
        start_time = time.monotonic()

        try:
            result = await self._execute_code(code, tenant_id)

            execution_seconds = time.monotonic() - start_time

            # Emit event
            await event_bus.emit_skill_execution(SkillExecutionEvent(
                execution_id=str(uuid.uuid4()),
                agent_id=agent_id,
                tenant_id=tenant_id,
                skill_id=skill_id,
                skill_name=skill.name,
                execution_seconds=execution_seconds,
                token_usage=result.get("token_usage", 0),
                success=True,
                output=result.get("output", ""),
            ))

            return result

        except Exception as e:
            execution_seconds = time.monotonic() - start_time

            # Emit failure event
            await event_bus.emit_skill_execution(SkillExecutionEvent(
                execution_id=str(uuid.uuid4()),
                agent_id=agent_id,
                tenant_id=tenant_id,
                skill_id=skill_id,
                skill_name=skill.name,
                execution_seconds=execution_seconds,
                token_usage=0,
                success=False,
                output=str(e),
            ))

            raise

Custom Event Sources

Create custom event sources:

from core.auto_dev.event_hooks import event_bus, TaskEvent

class CustomEventSource:
    """Custom event source for external systems."""

    async def on_external_failure(
        self,
        external_system: str,
        error_data: dict,
    ):
        """Handle external system failure."""
        # Map external data to TaskEvent
        event = TaskEvent(
            episode_id=str(uuid.uuid4()),
            agent_id=error_data.get("agent_id", "unknown"),
            tenant_id=error_data.get("tenant_id", "unknown"),
            task_description=f"{external_system} failure",
            error_trace=error_data.get("error"),
            outcome="failure",
            metadata={
                "external_system": external_system,
                "external_error_code": error_data.get("code"),
            },
        )

        # Emit to Auto-Dev handlers
        await event_bus.emit_task_fail(event)

Event Aggregation Patterns

Aggregate events over time windows:

from collections import defaultdict
from datetime import datetime, timedelta, timezone

class EventAggregator:
    """Aggregate events for batch processing."""

    def __init__(self, window_seconds: int = 300):
        self.window_seconds = window_seconds
        self._events: dict[str, list[TaskEvent]] = defaultdict(list)

    @event_bus.on_task_fail
    async def aggregate_failures(self, event: TaskEvent):
        """Aggregate failures over time window."""
        agent_id = event.agent_id
        self._events[agent_id].append(event)

        # Process if window full
        if self._should_process(agent_id):
            await self._process_window(agent_id)

    def _should_process(self, agent_id: str) -> bool:
        """Check if time window is full."""
        events = self._events[agent_id]

        if len(events) < 5:
            return False

        # Check time window
        oldest = min(
            e.metadata.get("timestamp", datetime.now(timezone.utc))
            for e in events
        )
        window_start = datetime.now(timezone.utc) - timedelta(
            seconds=self.window_seconds
        )

        return oldest < window_start

    async def _process_window(self, agent_id: str):
        """Process aggregated events."""
        events = self._events[agent_id]

        # Batch process
        await self._batch_analyze(agent_id, events)

        # Clear processed events
        self._events[agent_id].clear()

---

See Also