Historical Sync Architecture
Backfill integration data (emails, calendar events, messages, etc.) into dual memory for AI agent context: structured graph (Postgres GraphNode/GraphEdge) and semantic vector (LanceDB with hybrid keyword+embedding search). Supports 3+ months of historical data ingestion with automatic worker scaling.
Architecture Overview
┌──────────────┐ Redis Queue ┌──────────────┐
│ Web Machine │ ── enqueue ──────→ │ Worker VM #1 │ ── fetch ──→ Microsoft Graph
│ (Next.js + │ │ (2GB, 1xCPU) │ Google APIs
│ FastAPI) │ ←── poll results ── │ │ Slack, etc.
└──────────────┘ └──────────────┘
│ │
│ Fly Machines API │ Fly Machines API
│ (start/stop/create) │ (self-report idle)
▼ ▼
┌──────────────────────────────────────────────────┐
│ Fly.io Infrastructure │
│ auto_stop_machines: true │
│ auto_start_machines: true │
│ Worker VMs: 1–3, 2GB each │
└──────────────────────────────────────────────────┘Dual-Memory Architecture
Each ingested record produces two parallel memory stores:
| Memory Type | Storage | What | Used For |
|---|---|---|---|
| Structured | Postgres GraphNode/GraphEdge | Entities + relationships via LLM + rule-based extraction | Knowledge graph queries |
| Semantic | LanceDB | Raw text + vector embeddings | Hybrid keyword + semantic similarity search |
Per-Chunk Ingestion Flow (100 records)
For each record in chunk:
|
+---> _extract_structured_entities() --> Postgres DiscoveredEntity
| * Rule-based from metadata (from, to, subject)
| * Linked to EntityTypeDefinition during schema discovery
| * Promoted to GraphNode on entity type activation
|
+---> graphrag.ingest_document() --> Postgres GraphNode/GraphEdge
| * LLM extraction via tenant BYOK key
| * Entities: people, orgs, topics
| * Idempotent (content hash dedup)
|
+---> Collect text in chunk_texts[]
After chunk (batched):
|
+---> asyncio.gather( --> LanceDB (semantic memory)
| lancedb.add_document() x 100) * 100 concurrent embedding API calls
| * ~2s total vs 20s sequential
| * Raw text + vector embeddings
| * Direct store, no transfer pipelineWhy Batched?
Per-record lancedb.add_document() calls the OpenAI embedding API synchronously.
100 records × 200ms = 20s of blocking calls per chunk, triggering the 30-minute reaper.
asyncio.gather() runs all 100 calls concurrently, completing in ~2s.
Memory Lifecycle
- **Semantic memory**: LanceDB — raw text + embeddings stored directly during ingestion. Hybrid keyword + vector search. No transfer pipeline.
- **Structured memory**: Postgres — GraphNode/GraphEdge for knowledge graph queries, DiscoveredEntity for pre-promotion staging. Permanent.
Components
1. Job Lifecycle (`core/historical_sync_service.py`)
**States:** pending → running → completed / failed / paused / cancelled
start_historical_sync()— validates plan tier, ACU quota, enqueues to Redis_process_sync_job()— background task: fetches records, extracts entities, persists to GraphRAGcancel_sync()— marks job cancelled; processing loop checks this on each chunkresume_sync()— re-enqueues to Redis worker queue from checkpoint
**Chunk processing (per chunk):**
- Memory check (worker VMs only)
fetch_paginated_records()from integration- Entity/relationship extraction via
_extract_structured_entities() - Persist to GraphRAG via
ingest_structured_data() - Schema discovery for new entity types
- Checkpoint save, heartbeat update, progress broadcast
2. Worker Queue (`core/sync_job_queue.py`)
Redis-backed priority queue with:
- **Sorted set** for priority + FIFO ordering
- **Job locking** (SETNX with 5-minute TTL) — prevents duplicate processing
- **Dead-letter queue** for failed jobs with retry support
- **Queue metrics** (depth, idle time) for autoscaling decisions
3. Worker Process (`workers/sync_worker.py`)
- Polls Redis every 1 second via
dequeue() - Acquires job lock before processing
- Creates fresh DB session per job (
SessionLocal()) — prevents connection leaks - **Never self-shuts down** — polls forever, Fly.io manages VM lifecycle
- SIGTERM/SIGINT handlers for graceful shutdown
4. Autoscaling (`core/sync_job_queue.py` + `core/startup_tasks.py`)
**Scale-up** (on start_historical_sync):
ensure_worker_running()checks if workers exist, starts stopped ones, or creates new- If
queue_depth > 5and< 3workers running →scale_up()→ Fly API creates new 2GB worker VM
**Scale-down** (every 5 minutes via maintenance loop):
autoscale_workers()checks idle time- If idle
>= 5minutes and> 1workers →scale_down()→ destroys newest worker - **Always keeps at least 1 worker**
**Max capacity:** 3 concurrent workers, each processing 100 records/chunk
5. Reaper (`core/startup_tasks.py`)
Runs every 5 minutes on the web machine:
- Finds jobs with
status = "running"andlast_heartbeatolder than 15 minutes - Marks them
cancelledwith"Abandoned (server restart or timeout)" - Jobs set
last_heartbeatbefore entering the processing loop (initial) and after each chunk
6. Integration Fetch (`integrations/outlook_service.py`)
- **Token refresh** via Microsoft OAuth endpoint (30s timeout)
- **Page fetch** via Microsoft Graph API with:
- 30s aiohttp timeout
- Retry on 504 Gateway Timeout (1 retry, 2s backoff)
- Retry on
asyncio.TimeoutError(1 retry, 2s backoff) - **Compound page tokens** (
"channel||nextLink") for email + calendar pagination - **Lazy token refresh** — checks expiry before each page fetch
Timeouts and Resilience
| Operation | Timeout | Retry | Fallback |
|---|---|---|---|
| MS Graph API page fetch | 30s | 1 (504/timeout) | Returns error to job |
| MS OAuth token refresh | 30s | None | Returns None → auth failure |
| Fly Machines API calls | 10s | None | Falls back to in-process |
| Reaper heartbeat check | 15 min | N/A | Cancels abandoned jobs |
| DB session pool timeout | 30s | N/A | Raises TimeoutError |
Key Configuration
# fly.toml
[[vm]]
memory = "2gb"
cpus = 1
memory_mb = 2048
processes = ["worker"]
[http_service]
auto_stop_machines = true
auto_start_machines = true# sync_job_queue.py
SCALE_UP_QUEUE_DEPTH = 5 # Scale up if > 5 jobs waiting
SCALE_DOWN_IDLE_MINUTES = 5 # Scale down if idle 5+ min
LOCK_TIMEOUT = 300 # 5 min job lock TTL# sync_worker.py
POLL_INTERVAL = 1 # Redis poll every 1 secondDatabase Schema
historical_sync_jobs table:
id(UUID) — primary keytenant_id(VARCHAR) — tenant isolationintegration_id(VARCHAR) — e.g., "outlook", "slack"source_connection_id(VARCHAR) — UserConnection IDstatus— pending / running / completed / failed / paused / cancelledprogress_percentage,records_processedentities_extracted,relationships_extractedlast_heartbeat— updated before loop and after each chunkcheckpoint_data(JSONB) — for resumabilitylast_error,error_count,max_retries
Common Failure Modes and Fixes
| Symptom | Likely Cause | Fix |
|---|---|---|
Job stuck pending | Worker VM stopped, not restarted | ensure_worker_running() wakes it |
Job running with 0/0 for minutes | Token refresh or Graph API hanging | 30s timeout + 504 retry |
| Job cancelled "Abandoned" | Reaper killed it — no heartbeat | Initial heartbeat before loop |
Job paused | Memory threshold on web machine | Memory check only on worker VMs |
Job failed with auth error | Expired refresh token | Terminal failure, UI shows Reconnect |
| Worker self-stops mid-job | Old idle-shutdown bug | Removed self-shutdown entirely |