ATOM Documentation

← Back to App

Historical Sync Architecture

Backfill integration data (emails, calendar events, messages, etc.) into dual memory for AI agent context: structured graph (Postgres GraphNode/GraphEdge) and semantic vector (LanceDB with hybrid keyword+embedding search). Supports 3+ months of historical data ingestion with automatic worker scaling.

Architecture Overview

┌──────────────┐     Redis Queue      ┌──────────────┐
│  Web Machine  │ ── enqueue ──────→  │ Worker VM #1  │ ── fetch ──→ Microsoft Graph
│  (Next.js +   │                     │  (2GB, 1xCPU) │              Google APIs
│   FastAPI)     │ ←── poll results ── │               │              Slack, etc.
└──────────────┘                     └──────────────┘
       │                                    │
       │ Fly Machines API                   │ Fly Machines API
       │ (start/stop/create)                │ (self-report idle)
       ▼                                    ▼
┌──────────────────────────────────────────────────┐
│              Fly.io Infrastructure                │
│  auto_stop_machines: true                        │
│  auto_start_machines: true                       │
│  Worker VMs: 1–3, 2GB each                      │
└──────────────────────────────────────────────────┘

Dual-Memory Architecture

Each ingested record produces two parallel memory stores:

Memory TypeStorageWhatUsed For
StructuredPostgres GraphNode/GraphEdgeEntities + relationships via LLM + rule-based extractionKnowledge graph queries
SemanticLanceDBRaw text + vector embeddingsHybrid keyword + semantic similarity search

Per-Chunk Ingestion Flow (100 records)

For each record in chunk:
    |
    +---> _extract_structured_entities()  --> Postgres DiscoveredEntity
    |       * Rule-based from metadata (from, to, subject)
    |       * Linked to EntityTypeDefinition during schema discovery
    |       * Promoted to GraphNode on entity type activation
    |
    +---> graphrag.ingest_document()      --> Postgres GraphNode/GraphEdge
    |       * LLM extraction via tenant BYOK key
    |       * Entities: people, orgs, topics
    |       * Idempotent (content hash dedup)
    |
    +---> Collect text in chunk_texts[]

After chunk (batched):
    |
    +---> asyncio.gather(                    --> LanceDB (semantic memory)
    |       lancedb.add_document() x 100)    * 100 concurrent embedding API calls
    |                                         * ~2s total vs 20s sequential
    |                                         * Raw text + vector embeddings
    |                                         * Direct store, no transfer pipeline

Why Batched?

Per-record lancedb.add_document() calls the OpenAI embedding API synchronously.

100 records × 200ms = 20s of blocking calls per chunk, triggering the 30-minute reaper.

asyncio.gather() runs all 100 calls concurrently, completing in ~2s.

Memory Lifecycle

  1. **Semantic memory**: LanceDB — raw text + embeddings stored directly during ingestion. Hybrid keyword + vector search. No transfer pipeline.
  2. **Structured memory**: Postgres — GraphNode/GraphEdge for knowledge graph queries, DiscoveredEntity for pre-promotion staging. Permanent.

Components

1. Job Lifecycle (`core/historical_sync_service.py`)

**States:** pendingrunningcompleted / failed / paused / cancelled

  • start_historical_sync() — validates plan tier, ACU quota, enqueues to Redis
  • _process_sync_job() — background task: fetches records, extracts entities, persists to GraphRAG
  • cancel_sync() — marks job cancelled; processing loop checks this on each chunk
  • resume_sync() — re-enqueues to Redis worker queue from checkpoint

**Chunk processing (per chunk):**

  1. Memory check (worker VMs only)
  2. fetch_paginated_records() from integration
  3. Entity/relationship extraction via _extract_structured_entities()
  4. Persist to GraphRAG via ingest_structured_data()
  5. Schema discovery for new entity types
  6. Checkpoint save, heartbeat update, progress broadcast

2. Worker Queue (`core/sync_job_queue.py`)

Redis-backed priority queue with:

  • **Sorted set** for priority + FIFO ordering
  • **Job locking** (SETNX with 5-minute TTL) — prevents duplicate processing
  • **Dead-letter queue** for failed jobs with retry support
  • **Queue metrics** (depth, idle time) for autoscaling decisions

3. Worker Process (`workers/sync_worker.py`)

  • Polls Redis every 1 second via dequeue()
  • Acquires job lock before processing
  • Creates fresh DB session per job (SessionLocal()) — prevents connection leaks
  • **Never self-shuts down** — polls forever, Fly.io manages VM lifecycle
  • SIGTERM/SIGINT handlers for graceful shutdown

4. Autoscaling (`core/sync_job_queue.py` + `core/startup_tasks.py`)

**Scale-up** (on start_historical_sync):

  • ensure_worker_running() checks if workers exist, starts stopped ones, or creates new
  • If queue_depth > 5 and < 3 workers running → scale_up() → Fly API creates new 2GB worker VM

**Scale-down** (every 5 minutes via maintenance loop):

  • autoscale_workers() checks idle time
  • If idle >= 5 minutes and > 1 workers → scale_down() → destroys newest worker
  • **Always keeps at least 1 worker**

**Max capacity:** 3 concurrent workers, each processing 100 records/chunk

5. Reaper (`core/startup_tasks.py`)

Runs every 5 minutes on the web machine:

  • Finds jobs with status = "running" and last_heartbeat older than 15 minutes
  • Marks them cancelled with "Abandoned (server restart or timeout)"
  • Jobs set last_heartbeat before entering the processing loop (initial) and after each chunk

6. Integration Fetch (`integrations/outlook_service.py`)

  • **Token refresh** via Microsoft OAuth endpoint (30s timeout)
  • **Page fetch** via Microsoft Graph API with:
  • 30s aiohttp timeout
  • Retry on 504 Gateway Timeout (1 retry, 2s backoff)
  • Retry on asyncio.TimeoutError (1 retry, 2s backoff)
  • **Compound page tokens** ("channel||nextLink") for email + calendar pagination
  • **Lazy token refresh** — checks expiry before each page fetch

Timeouts and Resilience

OperationTimeoutRetryFallback
MS Graph API page fetch30s1 (504/timeout)Returns error to job
MS OAuth token refresh30sNoneReturns None → auth failure
Fly Machines API calls10sNoneFalls back to in-process
Reaper heartbeat check15 minN/ACancels abandoned jobs
DB session pool timeout30sN/ARaises TimeoutError

Key Configuration

# fly.toml
[[vm]]
  memory = "2gb"
  cpus = 1
  memory_mb = 2048
  processes = ["worker"]

[http_service]
  auto_stop_machines = true
  auto_start_machines = true
# sync_job_queue.py
SCALE_UP_QUEUE_DEPTH = 5       # Scale up if > 5 jobs waiting
SCALE_DOWN_IDLE_MINUTES = 5    # Scale down if idle 5+ min
LOCK_TIMEOUT = 300             # 5 min job lock TTL
# sync_worker.py
POLL_INTERVAL = 1              # Redis poll every 1 second

Database Schema

historical_sync_jobs table:

  • id (UUID) — primary key
  • tenant_id (VARCHAR) — tenant isolation
  • integration_id (VARCHAR) — e.g., "outlook", "slack"
  • source_connection_id (VARCHAR) — UserConnection ID
  • status — pending / running / completed / failed / paused / cancelled
  • progress_percentage, records_processed
  • entities_extracted, relationships_extracted
  • last_heartbeat — updated before loop and after each chunk
  • checkpoint_data (JSONB) — for resumability
  • last_error, error_count, max_retries

Common Failure Modes and Fixes

SymptomLikely CauseFix
Job stuck pendingWorker VM stopped, not restartedensure_worker_running() wakes it
Job running with 0/0 for minutesToken refresh or Graph API hanging30s timeout + 504 retry
Job cancelled "Abandoned"Reaper killed it — no heartbeatInitial heartbeat before loop
Job pausedMemory threshold on web machineMemory check only on worker VMs
Job failed with auth errorExpired refresh tokenTerminal failure, UI shows Reconnect
Worker self-stops mid-jobOld idle-shutdown bugRemoved self-shutdown entirely