ATOM Documentation

← Back to App

Plan: BPC-Style Smart Model Routing + Multi-Entity Extraction Fixes

Summary

Fix four issues in the Phase 323 multi-entity extraction pipeline and add BPC-style data-driven model selection that combines email complexity scoring, task type classification, and fresh pricing data from DynamicPricingFetcher.

Problems to Fix

#ProblemSeverityRoot Cause
1Concurrent DB session access**HIGH**_process_multi_entity_extraction calls self.db.add_all()/flush() on shared session from 20 concurrent coroutines
2Integration list inconsistency**MEDIUM**Three code paths use different inline hardcoded lists. whatsapp, teams, hubspot bypass multi-entity in webhook/scheduled sync
3No aggregated metrics**LOW**asyncio.gather results discarded; no success/failure/per-model counts logged
4Dumb model selection**MEDIUM**Static threshold (complexity < 40 → mini, else → 4o). No BPC scoring, no fresh pricing, no quality thresholds

Architecture Decision

**Two-phase extraction pattern** (matching the proven-safe _extract_chunk_to_r2_and_ingest pattern):

  1. Concurrent workers do LLM calls only, return List[DiscoveredEntity] data — no DB access
  2. Single-threaded post-gather loop does all self.db.add_all()/commit()

Implementation Phases

Phase 1: Shared Integration Constants (no behavioral change)

**NEW FILE:** backend-saas/core/integration_constants.py

# Communication channels supporting multi-entity extraction
COMMUNICATION_INTEGRATIONS = ["outlook", "gmail", "slack", "whatsapp", "teams", "hubspot"]

# Document channels supporting multi-entity extraction
DOCUMENT_INTEGRATIONS = ["document"]

# Combined: all integrations with multi-entity support
MULTI_ENTITY_INTEGRATIONS = COMMUNICATION_INTEGRATIONS + DOCUMENT_INTEGRATIONS

**Update call sites to use shared constants:**

FileLineOldNew
ingestion_pipeline.py220["outlook", "gmail", "slack", "document"]MULTI_ENTITY_INTEGRATIONS
ingestion_pipeline.py1132["outlook", "gmail", "slack", "document"]MULTI_ENTITY_INTEGRATIONS
historical_sync_service.py1173["outlook", "gmail", "slack", "whatsapp", "teams", "hubspot"]COMMUNICATION_INTEGRATIONS
historical_sync_service.py1175["outlook", "gmail", "slack", "document"]MULTI_ENTITY_INTEGRATIONS

**Behavior change:** whatsapp, teams, hubspot will now also get multi-entity extraction in sync_and_ingest and webhook paths (feature parity).

Phase 2: BPC Extraction Model Router (no behavioral change until wired)

**NEW FILE:** backend-saas/core/llm/extraction_model_router.py

Create a standalone module implementing BPC-style model selection for extraction tasks:

select_best_model(complexity_score, integration_id, pricing_fetcher=None) → (provider, model, tier)

**Routing algorithm:**

  1. **Complexity → Tier mapping** (more aggressive than general-purpose routing since extraction is structured):
  • 0-25 → STANDARD (gpt-4o-mini quality)
  • 25-75 → VERSATILE (gpt-4o quality)
  • 75-100 → HEAVY (capped at 90)
  1. **Integration adjustment:** Slack/WhatsApp messages are shorter/less structured (-10pts), documents are longer (+5pts)
  1. **BPC value scoring:** For each candidate model from DynamicPricingFetcher.pricing_cache:
  • Filter: context window ≥ tier minimum
  • Filter: quality score in [min_quality_by_tier, 90] (cap at 90 for extraction)
  • Filter: exclude o-series models (o1, o3, o4 — no reliable message.content)
  • Filter: exclude deprecated/free models
  • Score: value = quality² / (cost * 1e6)
  • Sort by value descending
  1. **Fallback:** If DynamicPricingFetcher unavailable or no candidates → static defaults (gpt-4o for VERSATILE/HEAVY, gpt-4o-mini for STANDARD)

**Key BPC reference:** Follows byok_handler.py:get_ranked_providers() (lines 1144-1515) — same quality²/cost formula, same o-series exclusion, same tier-to-context mapping.

Phase 3: Decouple Extraction from Persistence

**File:** backend-saas/core/ingestion_pipeline.py

Split _process_multi_entity_extraction into two methods:

async def _extract_multi_entity_only(
    self, record, integration_id, text, job_id, llm_service=None
) -> List[DiscoveredEntity]:
    """Phase 1: Extract entities via LLM. NO DB ACCESS. Returns entities for caller to persist."""
    # LLM call + entity construction, no self.db usage
    ...

async def _process_multi_entity_extraction(
    self, record, integration_id, text, job_id, llm_service=None
) -> int:
    """Convenience wrapper for single-threaded callers: extract + persist."""
    entities = await self._extract_multi_entity_only(record, integration_id, text, job_id, llm_service)
    if entities:
        self.db.add_all(entities)
        self.db.flush()
        return len(entities)
    return 0

**File:** backend-saas/core/multi_entity_llm_extractor.py

  • Add current_model tracking attribute for accurate metadata
  • Add llm_service optional parameter to extract_from_email() and _call_llm()
  • Backward compatible: both parameters are optional, existing callers unchanged

Phase 4: Fix Concurrent DB Access + Add Metrics

**File:** backend-saas/core/historical_sync_service.py (lines 1360-1396)

**Before (unsafe):**

sem = asyncio.Semaphore(20)
async def _extract_multi(rec, txt):
    async with sem:
        return await self.ingestion_pipeline._process_multi_entity_extraction(...)  # writes to self.db
await asyncio.gather(...)  # results discarded
self.db.commit()

**After (safe, two-phase, with metrics):**

# Create shared LLMService once per chunk (avoids O(N) DB queries for tenant settings)
shared_db = SessionLocal()
chunk_llm_service = LLMService(db=shared_db, workspace_id=ws, tenant_id=tenant_id)

# Phase 1: Concurrent extraction (NO DB ACCESS)
sem = asyncio.Semaphore(20)
async def _extract_multi(rec, txt):
    async with sem:
        return await self.ingestion_pipeline._extract_multi_entity_only(
            record=rec, integration_id=job.integration_id, text=txt,
            job_id=job_id, llm_service=chunk_llm_service,
        )

raw_results = await asyncio.gather(
    *[_extract_multi(rec, txt) for rec, txt in multi_entity_tasks],
    return_exceptions=True
)

# Phase 2: Single-threaded persistence
all_entities = []
total_tasks = len(multi_entity_tasks)
failed = 0
for result in raw_results:
    if isinstance(result, Exception):
        failed += 1
        logger.warning(f"Multi-entity extraction error: {result}")
    elif result:
        all_entities.extend(result)

if all_entities:
    self.db.add_all(all_entities)
    self.db.flush()

# Aggregated metrics
model_stats = self.ingestion_pipeline.multi_entity_extractor.model_stats
logger.info(
    f"Chunk {chunk_count+1}: Multi-entity extraction done - "
    f"tasks={total_tasks}, succeeded={total_tasks-failed}, failed={failed}, "
    f"total_entities={len(all_entities)}, "
    f"models={model_stats}, "
    f"chunk_llm_cost_est={getattr(chunk_llm_service, 'total_cost', 'N/A')}",
    tenant_id=self.tenant_id, job_id=job_id,
)

Also update model_stats in MultiEntityLLMExtractor:

  • Change from hardcoded keys {"gpt-4o": 0, "gpt-4o-mini": 0, "total": 0} to defaultdict(int)
  • Add async with self.model_stats_lock: around increments in _select_model

Phase 5: Wire BPC Router Into Extractor

**File:** backend-saas/core/multi_entity_llm_extractor.py

Replace _select_model() — currently:

if complexity["complexity_score"] < 40:
    selected_model = "gpt-4o-mini"
else:
    selected_model = "gpt-4o"

With BPC routing:

def _select_model(self, email_data):
    if not self.enable_model_selection:
        return self.model
    complexity = self._analyze_email_complexity(email_data)
    provider, model_name, tier = select_best_model(
        complexity_score=complexity["complexity_score"],
        integration_id=getattr(self, 'integration_id', 'unknown'),
    )
    self.model_stats[model_name] += 1
    self.model_stats["total"] += 1
    return model_name

Files Changed

FileActionPhase
backend-saas/core/integration_constants.py**NEW**1
backend-saas/core/llm/extraction_model_router.py**NEW**2
backend-saas/core/ingestion_pipeline.pyModify (lines 220, 1132, 2555-2617)1, 3
backend-saas/core/historical_sync_service.pyModify (lines 1173, 1175, 1274, 1360-1396)1, 4
backend-saas/core/multi_entity_llm_extractor.pyModify (add llm_service param, model selection, model_stats)3, 5

Verification

Unit Tests

  • extraction_model_router.py: Test complexity-to-tier mapping, BPC scoring with mock pricing data, fallback when fetcher unavailable
  • multi_entity_llm_extractor.py: Test model selection with BPC router, test llm_service parameter passthrough
  • _extract_multi_entity_only: Test that no DB operations occur in extracted entities

Integration Tests

  • Run historical sync job for outlook integration, verify:
  • No SQLAlchemy session errors in logs
  • Aggregated metrics appear in chunk logs
  • Model stats show dynamic model names (not just gpt-4o/gpt-4o-mini)
  • Entities appear in DiscoveredEntity table with correct types
  • Trigger webhook for whatsapp / teams / hubspot — verify multi-entity extraction runs

Production Smoke Test

# Deploy and trigger a small backfill
fly deploy -a atom-saas --strategy immediate
# Check logs for model selection and metrics
fly logs -a atom-saas | grep "multi-entity extraction"

Rollback Path

  • BPC router has static fallback when DynamicPricingFetcher unavailable → matches current behavior
  • Old _process_multi_entity_extraction wrapper preserves backward compatibility
  • If BPC selects bad models, set enable_model_selection=False on the extractor to revert to default gpt-4o