Plan: BPC-Style Smart Model Routing + Multi-Entity Extraction Fixes
Summary
Fix four issues in the Phase 323 multi-entity extraction pipeline and add BPC-style data-driven model selection that combines email complexity scoring, task type classification, and fresh pricing data from DynamicPricingFetcher.
Problems to Fix
| # | Problem | Severity | Root Cause |
|---|---|---|---|
| 1 | Concurrent DB session access | **HIGH** | _process_multi_entity_extraction calls self.db.add_all()/flush() on shared session from 20 concurrent coroutines |
| 2 | Integration list inconsistency | **MEDIUM** | Three code paths use different inline hardcoded lists. whatsapp, teams, hubspot bypass multi-entity in webhook/scheduled sync |
| 3 | No aggregated metrics | **LOW** | asyncio.gather results discarded; no success/failure/per-model counts logged |
| 4 | Dumb model selection | **MEDIUM** | Static threshold (complexity < 40 → mini, else → 4o). No BPC scoring, no fresh pricing, no quality thresholds |
Architecture Decision
**Two-phase extraction pattern** (matching the proven-safe _extract_chunk_to_r2_and_ingest pattern):
- Concurrent workers do LLM calls only, return
List[DiscoveredEntity]data — no DB access - Single-threaded post-gather loop does all
self.db.add_all()/commit()
Implementation Phases
Phase 1: Shared Integration Constants (no behavioral change)
**NEW FILE:** backend-saas/core/integration_constants.py
# Communication channels supporting multi-entity extraction
COMMUNICATION_INTEGRATIONS = ["outlook", "gmail", "slack", "whatsapp", "teams", "hubspot"]
# Document channels supporting multi-entity extraction
DOCUMENT_INTEGRATIONS = ["document"]
# Combined: all integrations with multi-entity support
MULTI_ENTITY_INTEGRATIONS = COMMUNICATION_INTEGRATIONS + DOCUMENT_INTEGRATIONS**Update call sites to use shared constants:**
| File | Line | Old | New |
|---|---|---|---|
ingestion_pipeline.py | 220 | ["outlook", "gmail", "slack", "document"] | MULTI_ENTITY_INTEGRATIONS |
ingestion_pipeline.py | 1132 | ["outlook", "gmail", "slack", "document"] | MULTI_ENTITY_INTEGRATIONS |
historical_sync_service.py | 1173 | ["outlook", "gmail", "slack", "whatsapp", "teams", "hubspot"] | COMMUNICATION_INTEGRATIONS |
historical_sync_service.py | 1175 | ["outlook", "gmail", "slack", "document"] | MULTI_ENTITY_INTEGRATIONS |
**Behavior change:** whatsapp, teams, hubspot will now also get multi-entity extraction in sync_and_ingest and webhook paths (feature parity).
Phase 2: BPC Extraction Model Router (no behavioral change until wired)
**NEW FILE:** backend-saas/core/llm/extraction_model_router.py
Create a standalone module implementing BPC-style model selection for extraction tasks:
select_best_model(complexity_score, integration_id, pricing_fetcher=None) → (provider, model, tier)**Routing algorithm:**
- **Complexity → Tier mapping** (more aggressive than general-purpose routing since extraction is structured):
- 0-25 →
STANDARD(gpt-4o-mini quality) - 25-75 →
VERSATILE(gpt-4o quality) - 75-100 →
HEAVY(capped at 90)
- **Integration adjustment:** Slack/WhatsApp messages are shorter/less structured (-10pts), documents are longer (+5pts)
- **BPC value scoring:** For each candidate model from
DynamicPricingFetcher.pricing_cache:
- Filter: context window ≥ tier minimum
- Filter: quality score in
[min_quality_by_tier, 90](cap at 90 for extraction) - Filter: exclude o-series models (o1, o3, o4 — no reliable
message.content) - Filter: exclude deprecated/free models
- Score:
value = quality² / (cost * 1e6) - Sort by value descending
- **Fallback:** If
DynamicPricingFetcherunavailable or no candidates → static defaults (gpt-4ofor VERSATILE/HEAVY,gpt-4o-minifor STANDARD)
**Key BPC reference:** Follows byok_handler.py:get_ranked_providers() (lines 1144-1515) — same quality²/cost formula, same o-series exclusion, same tier-to-context mapping.
Phase 3: Decouple Extraction from Persistence
**File:** backend-saas/core/ingestion_pipeline.py
Split _process_multi_entity_extraction into two methods:
async def _extract_multi_entity_only(
self, record, integration_id, text, job_id, llm_service=None
) -> List[DiscoveredEntity]:
"""Phase 1: Extract entities via LLM. NO DB ACCESS. Returns entities for caller to persist."""
# LLM call + entity construction, no self.db usage
...
async def _process_multi_entity_extraction(
self, record, integration_id, text, job_id, llm_service=None
) -> int:
"""Convenience wrapper for single-threaded callers: extract + persist."""
entities = await self._extract_multi_entity_only(record, integration_id, text, job_id, llm_service)
if entities:
self.db.add_all(entities)
self.db.flush()
return len(entities)
return 0**File:** backend-saas/core/multi_entity_llm_extractor.py
- Add
current_modeltracking attribute for accurate metadata - Add
llm_serviceoptional parameter toextract_from_email()and_call_llm() - Backward compatible: both parameters are optional, existing callers unchanged
Phase 4: Fix Concurrent DB Access + Add Metrics
**File:** backend-saas/core/historical_sync_service.py (lines 1360-1396)
**Before (unsafe):**
sem = asyncio.Semaphore(20)
async def _extract_multi(rec, txt):
async with sem:
return await self.ingestion_pipeline._process_multi_entity_extraction(...) # writes to self.db
await asyncio.gather(...) # results discarded
self.db.commit()**After (safe, two-phase, with metrics):**
# Create shared LLMService once per chunk (avoids O(N) DB queries for tenant settings)
shared_db = SessionLocal()
chunk_llm_service = LLMService(db=shared_db, workspace_id=ws, tenant_id=tenant_id)
# Phase 1: Concurrent extraction (NO DB ACCESS)
sem = asyncio.Semaphore(20)
async def _extract_multi(rec, txt):
async with sem:
return await self.ingestion_pipeline._extract_multi_entity_only(
record=rec, integration_id=job.integration_id, text=txt,
job_id=job_id, llm_service=chunk_llm_service,
)
raw_results = await asyncio.gather(
*[_extract_multi(rec, txt) for rec, txt in multi_entity_tasks],
return_exceptions=True
)
# Phase 2: Single-threaded persistence
all_entities = []
total_tasks = len(multi_entity_tasks)
failed = 0
for result in raw_results:
if isinstance(result, Exception):
failed += 1
logger.warning(f"Multi-entity extraction error: {result}")
elif result:
all_entities.extend(result)
if all_entities:
self.db.add_all(all_entities)
self.db.flush()
# Aggregated metrics
model_stats = self.ingestion_pipeline.multi_entity_extractor.model_stats
logger.info(
f"Chunk {chunk_count+1}: Multi-entity extraction done - "
f"tasks={total_tasks}, succeeded={total_tasks-failed}, failed={failed}, "
f"total_entities={len(all_entities)}, "
f"models={model_stats}, "
f"chunk_llm_cost_est={getattr(chunk_llm_service, 'total_cost', 'N/A')}",
tenant_id=self.tenant_id, job_id=job_id,
)Also update model_stats in MultiEntityLLMExtractor:
- Change from hardcoded keys
{"gpt-4o": 0, "gpt-4o-mini": 0, "total": 0}todefaultdict(int) - Add
async with self.model_stats_lock:around increments in_select_model
Phase 5: Wire BPC Router Into Extractor
**File:** backend-saas/core/multi_entity_llm_extractor.py
Replace _select_model() — currently:
if complexity["complexity_score"] < 40:
selected_model = "gpt-4o-mini"
else:
selected_model = "gpt-4o"With BPC routing:
def _select_model(self, email_data):
if not self.enable_model_selection:
return self.model
complexity = self._analyze_email_complexity(email_data)
provider, model_name, tier = select_best_model(
complexity_score=complexity["complexity_score"],
integration_id=getattr(self, 'integration_id', 'unknown'),
)
self.model_stats[model_name] += 1
self.model_stats["total"] += 1
return model_nameFiles Changed
| File | Action | Phase |
|---|---|---|
backend-saas/core/integration_constants.py | **NEW** | 1 |
backend-saas/core/llm/extraction_model_router.py | **NEW** | 2 |
backend-saas/core/ingestion_pipeline.py | Modify (lines 220, 1132, 2555-2617) | 1, 3 |
backend-saas/core/historical_sync_service.py | Modify (lines 1173, 1175, 1274, 1360-1396) | 1, 4 |
backend-saas/core/multi_entity_llm_extractor.py | Modify (add llm_service param, model selection, model_stats) | 3, 5 |
Verification
Unit Tests
extraction_model_router.py: Test complexity-to-tier mapping, BPC scoring with mock pricing data, fallback when fetcher unavailablemulti_entity_llm_extractor.py: Test model selection with BPC router, test llm_service parameter passthrough_extract_multi_entity_only: Test that no DB operations occur in extracted entities
Integration Tests
- Run historical sync job for outlook integration, verify:
- No SQLAlchemy session errors in logs
- Aggregated metrics appear in chunk logs
- Model stats show dynamic model names (not just gpt-4o/gpt-4o-mini)
- Entities appear in
DiscoveredEntitytable with correct types - Trigger webhook for
whatsapp/teams/hubspot— verify multi-entity extraction runs
Production Smoke Test
# Deploy and trigger a small backfill
fly deploy -a atom-saas --strategy immediate
# Check logs for model selection and metrics
fly logs -a atom-saas | grep "multi-entity extraction"Rollback Path
- BPC router has static fallback when
DynamicPricingFetcherunavailable → matches current behavior - Old
_process_multi_entity_extractionwrapper preserves backward compatibility - If BPC selects bad models, set
enable_model_selection=Falseon the extractor to revert to defaultgpt-4o