Multi-Tenant Patterns
**Version:** v11.2
**Last Updated:** 2026-04-10
Overview
atom-saas is a multi-tenant SaaS platform where all data is isolated by tenant. This document describes patterns for implementing tenant isolation in API endpoints, database queries, tests, and integrations.
**Key Principle:** Every database query, API request, and background job MUST be scoped to a specific tenant.
---
Table of Contents
- Tenant Context
- Database Query Patterns
- API Endpoint Patterns
- Test Fixtures
- Common Mistakes
- SaaS-Specific Patterns
- Storage Patterns
- Background Jobs
---
Tenant Context
Extracting Tenant from Request (TypeScript/Next.js)
import { getTenantFromRequest } from '@/lib/tenant';
export async function GET(request: Request) {
const tenant = await getTenantFromRequest(request);
if (!tenant) {
return NextResponse.json(
{ error: "Tenant not found" },
{ status: 404 }
);
}
// Use tenant.id for all queries
const agents = await db.query(Agent).filter(
Agent.tenant_id == tenant.id
).all();
return NextResponse.json({ agents });
}Extracting Tenant from Request (Python/FastAPI)
from fastapi import Request, HTTPException
from core.tenant_service import get_tenant_from_request
@app.get("/api/agents")
async def list_agents(request: Request):
tenant = await get_tenant_from_request(request)
if not tenant:
raise HTTPException(status_code=404, detail="Tenant not found")
# Always filter by tenant_id
agents = db.query(Agent).filter(
Agent.tenant_id == tenant.id
).all()
return {"agents": agents}Tenant from Headers
def get_tenant_from_headers(headers: dict) -> Optional[Tenant]:
"""Extract tenant from X-Tenant-ID header or subdomain."""
tenant_id = headers.get("X-Tenant-ID")
if tenant_id:
return db.query(Tenant).filter(Tenant.id == tenant_id).first()
# Fallback to subdomain
host = headers.get("host", "")
subdomain = host.split(".")[0]
return db.query(Tenant).filter(Tenant.subdomain == subdomain).first()---
Database Query Patterns
Tenant-Aware Queries
**✅ CORRECT: Always filter by tenant_id**
# Correct: Explicit tenant filter
agents = db.query(Agent).filter(
Agent.tenant_id == tenant_id
).all()
# Correct: Using tenant relationship
tenant = db.query(Tenant).get(tenant_id)
agents = tenant.agents.all()
# Correct: Join with tenant filter
results = db.query(Agent, Canvas).join(Canvas).filter(
Agent.tenant_id == tenant_id,
Canvas.tenant_id == tenant_id
).all()**❌ INCORRECT: Never query without tenant filter**
# WRONG: Returns all agents across all tenants (CROSS-TENANT DATA LEAK)
agents = db.query(Agent).all()
# WRONG: Missing tenant filter in join
results = db.query(Agent, Canvas).join(Canvas).all()
# WRONG: Filtering by ID only (bypasses tenant isolation)
agent = db.query(Agent).filter(Agent.id == agent_id).first()Tenant-Aware Creates
**✅ CORRECT: Always include tenant_id**
# Correct: Explicit tenant_id
agent = Agent(
id="agent-123",
name="Sales Agent",
tenant_id=tenant_id,
maturity="intern"
)
db.add(agent)
db.commit()
# Correct: Using factory
agent = AgentFactory.create(
_session=db,
tenant_id=tenant_id,
name="Sales Agent",
maturity="intern"
)**❌ INCORRECT: Missing tenant_id**
# WRONG: Will fail or default to wrong tenant
agent = Agent(
id="agent-123",
name="Sales Agent",
maturity="intern"
)Tenant-Aware Updates
**✅ CORRECT: Filter by tenant + ID**
# Correct: Update with tenant filter
result = db.query(Agent).filter(
Agent.tenant_id == tenant_id,
Agent.id == agent_id
).update({"name": "Updated Name"})
if result == 0:
raise HTTPException(status_code=404, detail="Agent not found")**❌ INCORRECT: Update by ID only**
# WRONG: Could update agent in different tenant
db.query(Agent).filter(Agent.id == agent_id).update({"name": "Updated Name"})Tenant-Aware Deletes
**✅ CORRECT: Filter by tenant + ID**
# Correct: Delete with tenant filter
result = db.query(Agent).filter(
Agent.tenant_id == tenant_id,
Agent.id == agent_id
).delete()
if result == 0:
raise HTTPException(status_code=404, detail="Agent not found")---
API Endpoint Patterns
GET Requests (List Resources)
@app.get("/api/agents")
async def list_agents(request: Request):
tenant = await get_tenant_from_request(request)
# Always filter by tenant
agents = db.query(Agent).filter(
Agent.tenant_id == tenant.id
).all()
return {
"agents": [a.to_dict() for a in agents],
"tenant_id": tenant.id
}GET Requests (Single Resource)
@app.get("/api/agents/{agent_id}")
async def get_agent(agent_id: str, request: Request):
tenant = await get_tenant_from_request(request)
# Filter by both tenant and ID
agent = db.query(Agent).filter(
Agent.tenant_id == tenant.id,
Agent.id == agent_id
).first()
if not agent:
raise HTTPException(
status_code=404,
detail="Agent not found"
)
return agent.to_dict()POST Requests (Create Resource)
@app.post("/api/agents")
async def create_agent(data: AgentCreate, request: Request):
tenant = await get_tenant_from_request(request)
# Always include tenant_id
agent = Agent(
id=str(uuid.uuid4()),
tenant_id=tenant.id,
name=data.name,
maturity=data.maturity
)
db.add(agent)
db.commit()
return agent.to_dict(), 201PUT Requests (Update Resource)
@app.put("/api/agents/{agent_id}")
async def update_agent(
agent_id: str,
data: AgentUpdate,
request: Request
):
tenant = await get_tenant_from_request(request)
# Filter by tenant + ID
result = db.query(Agent).filter(
Agent.tenant_id == tenant.id,
Agent.id == agent_id
).update(data.dict(exclude_unset=True))
if result == 0:
raise HTTPException(status_code=404)
db.commit()
return {"updated": True}DELETE Requests (Delete Resource)
@app.delete("/api/agents/{agent_id}")
async def delete_agent(agent_id: str, request: Request):
tenant = await get_tenant_from_request(request)
# Filter by tenant + ID
result = db.query(Agent).filter(
Agent.tenant_id == tenant.id,
Agent.id == agent_id
).delete()
if result == 0:
raise HTTPException(status_code=404)
db.commit()
return {"deleted": True}---
Test Fixtures
Using Tenant Fixtures
From Plan 276-01, we have comprehensive tenant fixtures:
import pytest
from tests.fixtures.tenant_fixtures import test_tenant, pro_tenant
from tests.fixtures.agent_factory import AgentFactory
def test_agent_creation(test_tenant):
"""Test agent creation with tenant isolation."""
agent = AgentFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Test Agent",
maturity="intern"
)
assert agent.tenant_id == test_tenant.id
assert test_tenant.plan == "free" # Free plan tenantTenant Isolation Tests
def test_tenant_isolation(test_tenant, pro_tenant):
"""Verify agents are isolated between tenants."""
# Create agent in tenant 1
agent1 = AgentFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Agent 1"
)
# Create agent in tenant 2
agent2 = AgentFactory.create(
_session=db,
tenant_id=pro_tenant.id,
name="Agent 2"
)
# Query tenant 1
tenant1_agents = db.query(Agent).filter(
Agent.tenant_id == test_tenant.id
).all()
# Query tenant 2
tenant2_agents = db.query(Agent).filter(
Agent.tenant_id == pro_tenant.id
).all()
# Verify isolation
assert len(tenant1_agents) == 1
assert len(tenant2_agents) == 1
assert agent1.id not in [a.id for a in tenant2_agents]
assert agent2.id not in [a.id for a in tenant1_agents]Canvas Integration Tests
@pytest.mark.integration
def test_canvas_episode_recording(test_tenant):
"""Test canvas execution records episode with tenant context."""
from tests.fixtures.canvas_factory import CanvasFactory
from core.canvas_skill_integration import record_canvas_execution
# Create canvas with tenant
canvas = CanvasFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Test Canvas"
)
# Record episode (from Plan 276-02)
episode = await record_canvas_execution(
tenant_id=test_tenant.id,
canvas_id=canvas.id,
execution_data={"action": "test"}
)
# Verify tenant isolation
assert episode.canvas_id == canvas.id
assert episode.tenant_id == test_tenant.id
# Verify episode only visible to this tenant
episodes = db.query(Episode).filter(
Episode.tenant_id == test_tenant.id
).all()
assert episode.id in [e.id for e in episodes]Factory Patterns
All factories from Plan 276-01 require tenant_id:
# Agent factory
agent = AgentFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Test Agent"
)
# Episode factory
episode = EpisodeFactory.create(
_session=db,
tenant_id=test_tenant.id,
agent_id="agent-123",
outcome="success"
)
# Canvas factory
canvas = CanvasFactory.create(
_session=db,
tenant_id=test_tenant.id,
name="Test Canvas"
)
# Execution factory
execution = AgentExecutionFactory.create(
_session=db,
tenant_id=test_tenant.id,
agent_id="agent-123",
status="completed"
)---
Common Mistakes
❌ Mistake 1: Query Without Tenant Filter
# WRONG: Returns all agents across all tenants
agents = db.query(Agent).all()
# CORRECT: Filter by tenant
agents = db.query(Agent).filter(
Agent.tenant_id == tenant_id
).all()❌ Mistake 2: Missing tenant_id in Create
# WRONG: Agent created without tenant context
agent = Agent(
id="agent-123",
name="Test Agent"
)
# CORRECT: Always include tenant_id
agent = Agent(
id="agent-123",
name="Test Agent",
tenant_id=tenant_id
)❌ Mistake 3: Filter by ID Only
# WRONG: Bypasses tenant isolation
agent = db.query(Agent).filter(Agent.id == agent_id).first()
# CORRECT: Filter by both tenant and ID
agent = db.query(Agent).filter(
Agent.tenant_id == tenant_id,
Agent.id == agent_id
).first()❌ Mistake 4: Hardcoded Tenant ID
# WRONG: Hardcoded tenant reference
agents = db.query(Agent).filter(
Agent.tenant_id == "default-tenant"
).all()
# CORRECT: Use tenant from request/context
tenant = await get_tenant_from_request(request)
agents = db.query(Agent).filter(
Agent.tenant_id == tenant.id
).all()❌ Mistake 5: Join Without Tenant Filter
# WRONG: Cross-tenant join
results = db.query(Agent, Canvas).join(Canvas).all()
# CORRECT: Join with tenant filter
results = db.query(Agent, Canvas).join(Canvas).filter(
Agent.tenant_id == tenant_id,
Canvas.tenant_id == tenant_id
).all()---
SaaS-Specific Patterns
These patterns are specific to the SaaS multi-tenant platform and not present in upstream.
Governance Checks with Tenant
from core.agent_governance_service import AgentGovernanceService
@app.post("/api/agents/{agent_id}/execute")
async def execute_agent(agent_id: str, request: Request):
tenant = await get_tenant_from_request(request)
# SaaS-specific: Check governance
governance = AgentGovernanceService(db)
decision = await governance.can_perform_action(
tenant_id=tenant.id,
agent_id=agent_id,
action="execute"
)
if not decision.allowed:
raise HTTPException(
status_code=403,
detail=f"Agent not allowed: {decision.reason}"
)
# Proceed with execution
result = await execute_agent_action(tenant.id, agent_id)
return resultQuota Checks with Tenant
from core.quota_service import QuotaService
@app.post("/api/agents/{agent_id}/execute")
async def execute_agent(agent_id: str, request: Request):
tenant = await get_tenant_from_request(request)
# SaaS-specific: Check quota
quota = QuotaService(db)
within_limit = await quota.check_execution_limit(
tenant_id=tenant.id
)
if not within_limit:
raise HTTPException(
status_code=429,
detail="Daily execution limit exceeded"
)
# Proceed with execution
result = await execute_agent_action(tenant.id, agent_id)
return resultBilling Integration
from core.billing_service import BillingService
@app.get("/api/agents")
async def list_agents(request: Request):
tenant = await get_tenant_from_request(request)
# SaaS-specific: Check plan limits
billing = BillingService(db)
plan = await billing.get_tenant_plan(tenant.id)
agents = db.query(Agent).filter(
Agent.tenant_id == tenant.id
).all()
# Enforce agent limits by plan
if len(agents) >= plan.max_agents:
raise HTTPException(
status_code=403,
detail=f"Agent limit exceeded for {plan.name} plan"
)
return {"agents": agents, "plan": plan.name}---
Storage Patterns
S3 Storage with Tenant Isolation
import boto3
from core.tenant_service import get_tenant_from_request
async def upload_file(file: UploadFile, request: Request):
tenant = await get_tenant_from_request(request)
# S3 path includes tenant_id
s3_key = f"{tenant.id}/uploads/{file.filename}"
s3_client = boto3.client('s3')
s3_client.upload_fileobj(
file.file,
"atom-saas",
s3_key
)
return {"s3_key": s3_key, "tenant_id": tenant.id}Redis with Tenant Namespacing
import redis
from core.tenant_service import get_tenant_from_request
redis_client = redis.Redis()
async def cache_agent_data(agent_id: str, data: dict, request: Request):
tenant = await get_tenant_from_request(request)
# Redis key includes tenant_id
key = f"tenant:{tenant.id}:agent:{agent_id}"
redis_client.setex(
key,
3600, # 1 hour TTL
json.dumps(data)
)
return {"cached": True, "key": key}---
Background Jobs
Tenant-Aware Background Jobs
from core.worker import background_job
@background_job
def process_agent_execution(tenant_id: str, agent_id: str, task_data: dict):
"""Background job always requires tenant_id parameter."""
# Load tenant context
tenant = db.query(Tenant).get(tenant_id)
if not tenant:
raise ValueError(f"Tenant {tenant_id} not found")
# All queries filter by tenant
agent = db.query(Agent).filter(
Agent.tenant_id == tenant_id,
Agent.id == agent_id
).first()
if not agent:
raise ValueError(f"Agent {agent_id} not found for tenant {tenant_id}")
# Process with tenant context
result = execute_task(agent, task_data)
# Record execution with tenant_id
execution = AgentExecution(
tenant_id=tenant_id,
agent_id=agent_id,
status="completed",
result=result
)
db.add(execution)
db.commit()
return resultScheduled Jobs with Tenant Iteration
from apscheduler.schedulers.background import BackgroundScheduler
def scheduled_job_for_all_tenants():
"""Run job for all tenants."""
tenants = db.query(Tenant).filter(Tenant.active == True).all()
for tenant in tenants:
# Queue job with specific tenant context
process_tenant_data.delay(tenant_id=tenant.id)
def process_tenant_data(tenant_id: str):
"""Process data for single tenant."""
# All queries scoped to this tenant
agents = db.query(Agent).filter(
Agent.tenant_id == tenant_id
).all()
for agent in agents:
process_agent(tenant_id, agent.id)---
Verification
Pre-Commit Checklist
Before committing code, verify:
- [ ] All database queries include
tenant_idfilter - [ ] All creates include
tenant_idfield - [ ] All updates filter by both
tenant_idand resource ID - [ ] All deletes filter by both
tenant_idand resource ID - [ ] No hardcoded tenant IDs
- [ ] No queries using
.all()without tenant filter - [ ] All background jobs accept
tenant_idparameter - [ ] All S3 paths include
tenant_id - [ ] All Redis keys include
tenant:{tenant_id}prefix
Automated Verification Scripts
# Check for queries without tenant filter
grep -r "db.query.*\.all()" backend-saas/
# Check for hardcoded tenant IDs
grep -r "tenant_id.*=.*['\"]" backend-saas/ | grep -v "tenant_id.*=.*?"
# Check for missing tenant_id in creates
grep -A 5 "Agent(" backend-saas/ | grep -v "tenant_id"---
Further Reading
- **API Documentation** - Complete API reference
- **Integration Patterns** - Integration examples with tenant isolation
- **Test Fixtures** - Factory and fixture documentation
- **Repository Sync** - SaaS vs upstream patterns
---
Changelog
v11.2 (2026-04-10)
- Initial multi-tenant patterns documentation
- Patterns from Plan 276-01 (test fixtures)
- Integration patterns from Plan 276-02 (canvas/episode integration)
- SaaS-specific patterns (governance, quota, billing)
- Common mistakes and verification checklists