ATOM Documentation

← Back to App

GraphRAG Idempotency Issue Analysis & Fix

Problem Report

Symptom

"No new entities created with Outlook ingestion"

Root Cause

**Entities WERE being created, but duplicates were being created on every ingestion!**

The bug was in graphrag_engine.py line 1156-1165 (before fix):

# Deduplicate logic simplified:  ← COMMENT WAS LYING!
node = GraphNode(
    workspace_id=workspace_id,
    name=name,
    type=e_data.get("type", "unknown"),
    ...
)
session.add(node)  # ← Just blindly adds, NO CHECK!

**Impact:**

  • 1st ingestion of email → Creates GraphNode("Test Subject", type="email")
  • 2nd ingestion of same email → Creates ANOTHER GraphNode("Test Subject", type="email")
  • Nth ingestion → N duplicates! ❌

Why This Matters

  1. **Database Bloat** - Graph explodes with duplicate nodes
  2. **Performance Degradation** - Queries get slower as node count grows
  3. **Incorrect Analytics** - Entity counts are meaningless
  4. **Relationship Chaos** - Duplicate nodes create messy relationship webs

Solution Implemented

Fix Applied (graphrag_engine.py lines 1155-1185)

# Check if node already exists (workspace_id, name, type)
existing = (
    session.query(GraphNode)
    .filter_by(
        workspace_id=workspace_id,
        name=name,
        type=e_data.get("type", "unknown")
    )
    .first()
)

if existing:
    # Update existing node
    existing.description = e_data.get("description", existing.description)
    existing.properties.update(properties)
    node_id = existing.id
    logger.debug(f"Updated existing node: {name} ({existing.type})")
else:
    # Create new node
    node = GraphNode(
        workspace_id=workspace_id,
        name=name,
        type=e_data.get("type", "unknown"),
        description=e_data.get("description", ""),
        properties=properties,
    )
    session.add(node)
    session.flush()
    node_id = node.id
    logger.debug(f"Created new node: {name} ({e_data.get('type', 'unknown')})")

node_map[name] = node_id

What Changed

**Before:**

  • Always created new nodes (duplicates)
  • No check for existing entities
  • Properties never updated

**After:**

  • Checks if node exists (workspace_id, name, type)
  • If exists: UPDATE description and properties
  • If not: CREATE new node
  • Proper logging (debug level)

Critique of Original Idempotency Plan

✅ Good Ideas (Should Still Implement)

  1. **Content Hashing** - Track if entity actually changed before updating
  2. **source_ids JSONB** - Track which documents contributed to an entity
  3. **Unique Constraints** - Add database-level uniqueness:
  1. **ON CONFLICT Upserts** - For better performance:

# Better (ON CONFLICT in Postgres)

# Single round-trip, atomic, faster

insert_stmt = text("""

INSERT INTO graph_nodes (workspace_id, name, type, description, properties)

VALUES (:workspace_id, :name, :type, :description, :properties)

ON CONFLICT (workspace_id, name, type)

DO UPDATE SET

description = EXCLUDED.description,

properties = graph_nodes.properties || EXCLUDED.properties

""")

```

❌ Issues with Original Plan

  1. **Missing Multi-Workspace Support**
  • Plan doesn't add tenant_id to GraphNode/GraphEdge
  • But we just implemented multi-workspace for EntityTypeDefinition!
  • Inconsistency will cause problems
  1. **No Document-Level Dedup**
  • Plan focuses on entity/edge dedup
  • Missing: Track which documents have been processed
  • Suggestion:
  1. **Performance Concerns**
  • Check-then-merge in Python is slow (2+ queries per entity)
  • Should use raw SQL with ON CONFLICT for bulk ops

Testing the Fix

Verify Deduplication Works

# 1. Ingest same email twice
curl -X POST https://atom-saas.fly.dev/api/integrations/outlook/sync \
  -H "Authorization: Bearer $TOKEN" \
  -H "X-Tenant-Id: $TENANT_ID" \
  -d '{
    "start_date": "2024-01-01T00:00:00Z",
    "end_date": "2024-01-02T00:00:00Z"
  }'

# 2. Check database for duplicates
cd backend-saase
python3 <<EOF
from core.database import SessionLocal
from core.models import GraphNode

session = SessionLocal()
duplicates = session.query(
    GraphNode.name, GraphNode.type, func.count(GraphNode.id)
).group_by(
    GraphNode.name, GraphNode.type
).having(
    func.count(GraphNode.id) > 1
).all()

for name, type, count in duplicates:
    print(f"DUPLICATE: {name} ({type}) - {count} instances")
EOF

Expected Result After Fix

  • No duplicate entries
  • Second ingestion should UPDATE existing node
  • Should see "Updated existing node" in logs (debug level)

Next Steps

  1. ✅ **Immediate Fix** - Applied (check-then-update logic)
  2. **High Priority** - Add unique constraint migration
  3. **Medium Priority** - Implement content hashing to skip unnecessary updates
  4. **Low Priority** - Migrate to ON CONFLICT for performance

Files Modified

  • backend-saas/core/graphrag_engine.py (lines 1155-1185)
  • ⏳ Migration to add unique constraints (TODO)
  • ⏳ ON CONFLICT implementation (TODO)

Deployment

Commit message ready:

git add backend-saas/core/graphrag_engine.py
git commit -m "fix: implement GraphRAG entity deduplication to prevent duplicate nodes

CRITICAL BUG FIX: Prevents duplicate GraphNode creation on repeated ingestion.

Changes:
- Check for existing nodes by (workspace_id, name, type) before inserting
- Update existing nodes with new description/properties instead of creating duplicates
- Add debug logging for tracking creates vs updates

Root Cause: Previous implementation blindly created new nodes on every ingestion,
causing database bloat and performance degradation with duplicate entities.

Test: Ingest same email twice - should see 1 GraphNode, not 2.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

Run tests before deploying to verify.