ATOM Documentation

← Back to App

Multi-Tenant Patterns

**Version:** v11.2

**Last Updated:** 2026-04-10

Overview

atom-saas is a multi-tenant SaaS platform where all data is isolated by tenant. This document describes patterns for implementing tenant isolation in API endpoints, database queries, tests, and integrations.

**Key Principle:** Every database query, API request, and background job MUST be scoped to a specific tenant.

---

Table of Contents

  1. Tenant Context
  2. Database Query Patterns
  3. API Endpoint Patterns
  4. Test Fixtures
  5. Common Mistakes
  6. SaaS-Specific Patterns
  7. Storage Patterns
  8. Background Jobs

---

Tenant Context

Extracting Tenant from Request (TypeScript/Next.js)

import { getTenantFromRequest } from '@/lib/tenant';

export async function GET(request: Request) {
  const tenant = await getTenantFromRequest(request);
  if (!tenant) {
    return NextResponse.json(
      { error: "Tenant not found" },
      { status: 404 }
    );
  }

  // Use tenant.id for all queries
  const agents = await db.query(Agent).filter(
    Agent.tenant_id == tenant.id
  ).all();

  return NextResponse.json({ agents });
}

Extracting Tenant from Request (Python/FastAPI)

from fastapi import Request, HTTPException
from core.tenant_service import get_tenant_from_request

@app.get("/api/agents")
async def list_agents(request: Request):
    tenant = await get_tenant_from_request(request)
    if not tenant:
        raise HTTPException(status_code=404, detail="Tenant not found")

    # Always filter by tenant_id
    agents = db.query(Agent).filter(
        Agent.tenant_id == tenant.id
    ).all()

    return {"agents": agents}

Tenant from Headers

def get_tenant_from_headers(headers: dict) -> Optional[Tenant]:
    """Extract tenant from X-Tenant-ID header or subdomain."""
    tenant_id = headers.get("X-Tenant-ID")
    if tenant_id:
        return db.query(Tenant).filter(Tenant.id == tenant_id).first()

    # Fallback to subdomain
    host = headers.get("host", "")
    subdomain = host.split(".")[0]
    return db.query(Tenant).filter(Tenant.subdomain == subdomain).first()

---

Database Query Patterns

Tenant-Aware Queries

**✅ CORRECT: Always filter by tenant_id**

# Correct: Explicit tenant filter
agents = db.query(Agent).filter(
    Agent.tenant_id == tenant_id
).all()

# Correct: Using tenant relationship
tenant = db.query(Tenant).get(tenant_id)
agents = tenant.agents.all()

# Correct: Join with tenant filter
results = db.query(Agent, Canvas).join(Canvas).filter(
    Agent.tenant_id == tenant_id,
    Canvas.tenant_id == tenant_id
).all()

**❌ INCORRECT: Never query without tenant filter**

# WRONG: Returns all agents across all tenants (CROSS-TENANT DATA LEAK)
agents = db.query(Agent).all()

# WRONG: Missing tenant filter in join
results = db.query(Agent, Canvas).join(Canvas).all()

# WRONG: Filtering by ID only (bypasses tenant isolation)
agent = db.query(Agent).filter(Agent.id == agent_id).first()

Tenant-Aware Creates

**✅ CORRECT: Always include tenant_id**

# Correct: Explicit tenant_id
agent = Agent(
    id="agent-123",
    name="Sales Agent",
    tenant_id=tenant_id,
    maturity="intern"
)
db.add(agent)
db.commit()

# Correct: Using factory
agent = AgentFactory.create(
    _session=db,
    tenant_id=tenant_id,
    name="Sales Agent",
    maturity="intern"
)

**❌ INCORRECT: Missing tenant_id**

# WRONG: Will fail or default to wrong tenant
agent = Agent(
    id="agent-123",
    name="Sales Agent",
    maturity="intern"
)

Tenant-Aware Updates

**✅ CORRECT: Filter by tenant + ID**

# Correct: Update with tenant filter
result = db.query(Agent).filter(
    Agent.tenant_id == tenant_id,
    Agent.id == agent_id
).update({"name": "Updated Name"})

if result == 0:
    raise HTTPException(status_code=404, detail="Agent not found")

**❌ INCORRECT: Update by ID only**

# WRONG: Could update agent in different tenant
db.query(Agent).filter(Agent.id == agent_id).update({"name": "Updated Name"})

Tenant-Aware Deletes

**✅ CORRECT: Filter by tenant + ID**

# Correct: Delete with tenant filter
result = db.query(Agent).filter(
    Agent.tenant_id == tenant_id,
    Agent.id == agent_id
).delete()

if result == 0:
    raise HTTPException(status_code=404, detail="Agent not found")

---

API Endpoint Patterns

GET Requests (List Resources)

@app.get("/api/agents")
async def list_agents(request: Request):
    tenant = await get_tenant_from_request(request)

    # Always filter by tenant
    agents = db.query(Agent).filter(
        Agent.tenant_id == tenant.id
    ).all()

    return {
        "agents": [a.to_dict() for a in agents],
        "tenant_id": tenant.id
    }

GET Requests (Single Resource)

@app.get("/api/agents/{agent_id}")
async def get_agent(agent_id: str, request: Request):
    tenant = await get_tenant_from_request(request)

    # Filter by both tenant and ID
    agent = db.query(Agent).filter(
        Agent.tenant_id == tenant.id,
        Agent.id == agent_id
    ).first()

    if not agent:
        raise HTTPException(
            status_code=404,
            detail="Agent not found"
        )

    return agent.to_dict()

POST Requests (Create Resource)

@app.post("/api/agents")
async def create_agent(data: AgentCreate, request: Request):
    tenant = await get_tenant_from_request(request)

    # Always include tenant_id
    agent = Agent(
        id=str(uuid.uuid4()),
        tenant_id=tenant.id,
        name=data.name,
        maturity=data.maturity
    )
    db.add(agent)
    db.commit()

    return agent.to_dict(), 201

PUT Requests (Update Resource)

@app.put("/api/agents/{agent_id}")
async def update_agent(
    agent_id: str,
    data: AgentUpdate,
    request: Request
):
    tenant = await get_tenant_from_request(request)

    # Filter by tenant + ID
    result = db.query(Agent).filter(
        Agent.tenant_id == tenant.id,
        Agent.id == agent_id
    ).update(data.dict(exclude_unset=True))

    if result == 0:
        raise HTTPException(status_code=404)

    db.commit()
    return {"updated": True}

DELETE Requests (Delete Resource)

@app.delete("/api/agents/{agent_id}")
async def delete_agent(agent_id: str, request: Request):
    tenant = await get_tenant_from_request(request)

    # Filter by tenant + ID
    result = db.query(Agent).filter(
        Agent.tenant_id == tenant.id,
        Agent.id == agent_id
    ).delete()

    if result == 0:
        raise HTTPException(status_code=404)

    db.commit()
    return {"deleted": True}

---

Test Fixtures

Using Tenant Fixtures

From Plan 276-01, we have comprehensive tenant fixtures:

import pytest
from tests.fixtures.tenant_fixtures import test_tenant, pro_tenant
from tests.fixtures.agent_factory import AgentFactory

def test_agent_creation(test_tenant):
    """Test agent creation with tenant isolation."""
    agent = AgentFactory.create(
        _session=db,
        tenant_id=test_tenant.id,
        name="Test Agent",
        maturity="intern"
    )

    assert agent.tenant_id == test_tenant.id
    assert test_tenant.plan == "free"  # Free plan tenant

Tenant Isolation Tests

def test_tenant_isolation(test_tenant, pro_tenant):
    """Verify agents are isolated between tenants."""
    # Create agent in tenant 1
    agent1 = AgentFactory.create(
        _session=db,
        tenant_id=test_tenant.id,
        name="Agent 1"
    )

    # Create agent in tenant 2
    agent2 = AgentFactory.create(
        _session=db,
        tenant_id=pro_tenant.id,
        name="Agent 2"
    )

    # Query tenant 1
    tenant1_agents = db.query(Agent).filter(
        Agent.tenant_id == test_tenant.id
    ).all()

    # Query tenant 2
    tenant2_agents = db.query(Agent).filter(
        Agent.tenant_id == pro_tenant.id
    ).all()

    # Verify isolation
    assert len(tenant1_agents) == 1
    assert len(tenant2_agents) == 1
    assert agent1.id not in [a.id for a in tenant2_agents]
    assert agent2.id not in [a.id for a in tenant1_agents]

Canvas Integration Tests

@pytest.mark.integration
def test_canvas_episode_recording(test_tenant):
    """Test canvas execution records episode with tenant context."""
    from tests.fixtures.canvas_factory import CanvasFactory
    from core.canvas_skill_integration import record_canvas_execution

    # Create canvas with tenant
    canvas = CanvasFactory.create(
        _session=db,
        tenant_id=test_tenant.id,
        name="Test Canvas"
    )

    # Record episode (from Plan 276-02)
    episode = await record_canvas_execution(
        tenant_id=test_tenant.id,
        canvas_id=canvas.id,
        execution_data={"action": "test"}
    )

    # Verify tenant isolation
    assert episode.canvas_id == canvas.id
    assert episode.tenant_id == test_tenant.id

    # Verify episode only visible to this tenant
    episodes = db.query(Episode).filter(
        Episode.tenant_id == test_tenant.id
    ).all()

    assert episode.id in [e.id for e in episodes]

Factory Patterns

All factories from Plan 276-01 require tenant_id:

# Agent factory
agent = AgentFactory.create(
    _session=db,
    tenant_id=test_tenant.id,
    name="Test Agent"
)

# Episode factory
episode = EpisodeFactory.create(
    _session=db,
    tenant_id=test_tenant.id,
    agent_id="agent-123",
    outcome="success"
)

# Canvas factory
canvas = CanvasFactory.create(
    _session=db,
    tenant_id=test_tenant.id,
    name="Test Canvas"
)

# Execution factory
execution = AgentExecutionFactory.create(
    _session=db,
    tenant_id=test_tenant.id,
    agent_id="agent-123",
    status="completed"
)

---

Common Mistakes

❌ Mistake 1: Query Without Tenant Filter

# WRONG: Returns all agents across all tenants
agents = db.query(Agent).all()

# CORRECT: Filter by tenant
agents = db.query(Agent).filter(
    Agent.tenant_id == tenant_id
).all()

❌ Mistake 2: Missing tenant_id in Create

# WRONG: Agent created without tenant context
agent = Agent(
    id="agent-123",
    name="Test Agent"
)

# CORRECT: Always include tenant_id
agent = Agent(
    id="agent-123",
    name="Test Agent",
    tenant_id=tenant_id
)

❌ Mistake 3: Filter by ID Only

# WRONG: Bypasses tenant isolation
agent = db.query(Agent).filter(Agent.id == agent_id).first()

# CORRECT: Filter by both tenant and ID
agent = db.query(Agent).filter(
    Agent.tenant_id == tenant_id,
    Agent.id == agent_id
).first()

❌ Mistake 4: Hardcoded Tenant ID

# WRONG: Hardcoded tenant reference
agents = db.query(Agent).filter(
    Agent.tenant_id == "default-tenant"
).all()

# CORRECT: Use tenant from request/context
tenant = await get_tenant_from_request(request)
agents = db.query(Agent).filter(
    Agent.tenant_id == tenant.id
).all()

❌ Mistake 5: Join Without Tenant Filter

# WRONG: Cross-tenant join
results = db.query(Agent, Canvas).join(Canvas).all()

# CORRECT: Join with tenant filter
results = db.query(Agent, Canvas).join(Canvas).filter(
    Agent.tenant_id == tenant_id,
    Canvas.tenant_id == tenant_id
).all()

---

SaaS-Specific Patterns

These patterns are specific to the SaaS multi-tenant platform and not present in upstream.

Governance Checks with Tenant

from core.agent_governance_service import AgentGovernanceService

@app.post("/api/agents/{agent_id}/execute")
async def execute_agent(agent_id: str, request: Request):
    tenant = await get_tenant_from_request(request)

    # SaaS-specific: Check governance
    governance = AgentGovernanceService(db)
    decision = await governance.can_perform_action(
        tenant_id=tenant.id,
        agent_id=agent_id,
        action="execute"
    )

    if not decision.allowed:
        raise HTTPException(
            status_code=403,
            detail=f"Agent not allowed: {decision.reason}"
        )

    # Proceed with execution
    result = await execute_agent_action(tenant.id, agent_id)
    return result

Quota Checks with Tenant

from core.quota_service import QuotaService

@app.post("/api/agents/{agent_id}/execute")
async def execute_agent(agent_id: str, request: Request):
    tenant = await get_tenant_from_request(request)

    # SaaS-specific: Check quota
    quota = QuotaService(db)
    within_limit = await quota.check_execution_limit(
        tenant_id=tenant.id
    )

    if not within_limit:
        raise HTTPException(
            status_code=429,
            detail="Daily execution limit exceeded"
        )

    # Proceed with execution
    result = await execute_agent_action(tenant.id, agent_id)
    return result

Billing Integration

from core.billing_service import BillingService

@app.get("/api/agents")
async def list_agents(request: Request):
    tenant = await get_tenant_from_request(request)

    # SaaS-specific: Check plan limits
    billing = BillingService(db)
    plan = await billing.get_tenant_plan(tenant.id)

    agents = db.query(Agent).filter(
        Agent.tenant_id == tenant.id
    ).all()

    # Enforce agent limits by plan
    if len(agents) >= plan.max_agents:
        raise HTTPException(
            status_code=403,
            detail=f"Agent limit exceeded for {plan.name} plan"
        )

    return {"agents": agents, "plan": plan.name}

---

Storage Patterns

S3 Storage with Tenant Isolation

import boto3
from core.tenant_service import get_tenant_from_request

async def upload_file(file: UploadFile, request: Request):
    tenant = await get_tenant_from_request(request)

    # S3 path includes tenant_id
    s3_key = f"{tenant.id}/uploads/{file.filename}"

    s3_client = boto3.client('s3')
    s3_client.upload_fileobj(
        file.file,
        "atom-saas",
        s3_key
    )

    return {"s3_key": s3_key, "tenant_id": tenant.id}

Redis with Tenant Namespacing

import redis
from core.tenant_service import get_tenant_from_request

redis_client = redis.Redis()

async def cache_agent_data(agent_id: str, data: dict, request: Request):
    tenant = await get_tenant_from_request(request)

    # Redis key includes tenant_id
    key = f"tenant:{tenant.id}:agent:{agent_id}"

    redis_client.setex(
        key,
        3600,  # 1 hour TTL
        json.dumps(data)
    )

    return {"cached": True, "key": key}

---

Background Jobs

Tenant-Aware Background Jobs

from core.worker import background_job

@background_job
def process_agent_execution(tenant_id: str, agent_id: str, task_data: dict):
    """Background job always requires tenant_id parameter."""

    # Load tenant context
    tenant = db.query(Tenant).get(tenant_id)
    if not tenant:
        raise ValueError(f"Tenant {tenant_id} not found")

    # All queries filter by tenant
    agent = db.query(Agent).filter(
        Agent.tenant_id == tenant_id,
        Agent.id == agent_id
    ).first()

    if not agent:
        raise ValueError(f"Agent {agent_id} not found for tenant {tenant_id}")

    # Process with tenant context
    result = execute_task(agent, task_data)

    # Record execution with tenant_id
    execution = AgentExecution(
        tenant_id=tenant_id,
        agent_id=agent_id,
        status="completed",
        result=result
    )
    db.add(execution)
    db.commit()

    return result

Scheduled Jobs with Tenant Iteration

from apscheduler.schedulers.background import BackgroundScheduler

def scheduled_job_for_all_tenants():
    """Run job for all tenants."""
    tenants = db.query(Tenant).filter(Tenant.active == True).all()

    for tenant in tenants:
        # Queue job with specific tenant context
        process_tenant_data.delay(tenant_id=tenant.id)

def process_tenant_data(tenant_id: str):
    """Process data for single tenant."""
    # All queries scoped to this tenant
    agents = db.query(Agent).filter(
        Agent.tenant_id == tenant_id
    ).all()

    for agent in agents:
        process_agent(tenant_id, agent.id)

---

Verification

Pre-Commit Checklist

Before committing code, verify:

  • [ ] All database queries include tenant_id filter
  • [ ] All creates include tenant_id field
  • [ ] All updates filter by both tenant_id and resource ID
  • [ ] All deletes filter by both tenant_id and resource ID
  • [ ] No hardcoded tenant IDs
  • [ ] No queries using .all() without tenant filter
  • [ ] All background jobs accept tenant_id parameter
  • [ ] All S3 paths include tenant_id
  • [ ] All Redis keys include tenant:{tenant_id} prefix

Automated Verification Scripts

# Check for queries without tenant filter
grep -r "db.query.*\.all()" backend-saas/

# Check for hardcoded tenant IDs
grep -r "tenant_id.*=.*['\"]" backend-saas/ | grep -v "tenant_id.*=.*?"

# Check for missing tenant_id in creates
grep -A 5 "Agent(" backend-saas/ | grep -v "tenant_id"

---

Further Reading

---

Changelog

v11.2 (2026-04-10)

  • Initial multi-tenant patterns documentation
  • Patterns from Plan 276-01 (test fixtures)
  • Integration patterns from Plan 276-02 (canvas/episode integration)
  • SaaS-specific patterns (governance, quota, billing)
  • Common mistakes and verification checklists