ATOM Documentation

← Back to App

Redis Usage Prevention Guide

Overview

This guide provides best practices and patterns to prevent excessive Redis usage and avoid database suspensions.

---

Core Principles

1. Cache Smart, Not Everything

**Problem:** Caching everything wastes memory and can hurt performance

**Solution:** Cache strategically based on access patterns

**What to Cache:**

  • ✅ Tenant lookups (accessed on every API request)
  • ✅ User sessions (accessed frequently, change rarely)
  • ✅ Configuration data (accessed frequently, static)
  • ✅ Expensive query results (DB joins, aggregations)

**What NOT to Cache:**

  • ❌ Rapidly changing data (stock prices, real-time stats)
  • ❌ Large objects (>1MB)
  • ❌ Data accessed once (no benefit to cache)
  • ❌ Streaming data (logs, events)

2. Use Appropriate TTLs

**Problem:** Wrong TTL causes stale data or excessive cache misses

**Solution:** Match TTL to data change frequency

# Static data (rarely changes)
cache.set("config:features", features, ttl=3600)  # 1 hour

# User data (changes occasionally)
cache.set("user:profile", profile, ttl=1800)      # 30 minutes

# Session data (changes frequently)
cache.set("session:active", active, ttl=300)       # 5 minutes

# Rate limits (reset every minute)
cache.set("ratelimit:user123", count, ttl=60)      # 1 minute

3. Monitor Continuously

**Problem:** Don't know about issues until it's too late

**Solution:** Automated monitoring and alerts

See REDIS_MONITORING_GUIDE.md for setup instructions.

---

Code Patterns

Pattern 1: Cache-Aside (Lazy Loading)

**Best for:** Read-heavy workloads

async def get_tenant(tenant_id: str) -> dict | None:
    # Try cache first
    cache_key = f"tenant:{tenant_id}"
    cached = await cache.get_async(cache_key)

    if cached:
        logger.debug(f"Cache hit: {cache_key}")
        return json.loads(cached)

    # Cache miss - fetch from DB
    logger.debug(f"Cache miss: {cache_key}")
    tenant = await db.query(
        "SELECT * FROM tenants WHERE id = $1",
        tenant_id
    )

    if tenant:
        # Store in cache for next time
        await cache.set_async(
            cache_key,
            json.dumps(tenant),
            ttl=3600  # 1 hour
        )

    return tenant

Pattern 2: Write-Through

**Best for:** Read-after-write consistency

async def update_tenant(tenant_id: str, data: dict) -> dict:
    # Update database
    tenant = await db.query(
        "UPDATE tenants SET name = $2 WHERE id = $1 RETURNING *",
        tenant_id, data["name"]
    )

    # Immediately update cache
    cache_key = f"tenant:{tenant_id}"
    await cache.set_async(
        cache_key,
        json.dumps(tenant),
        ttl=3600
    )

    return tenant

Pattern 3: Write-Behind (Async)

**Best for:** Write-heavy workloads

async def update_tenant_async(tenant_id: str, data: dict) -> dict:
    # Update cache immediately
    cache_key = f"tenant:{tenant_id}"
    await cache.set_async(cache_key, json.dumps(data), ttl=3600)

    # Queue DB update for later
    await queue.enqueue("update_tenant", tenant_id, data)

    return data  # Return immediately

Pattern 4: Cache Invalidation

**Problem:** Stale cache after updates

**Solution:** Invalidate related cache keys on updates

async def invalidate_tenant_cache(tenant_id: str):
    """Invalidate all cache entries for a tenant"""
    # Invalidate by ID
    await cache.delete_async(f"tenant:{tenant_id}")

    # Invalidate by subdomain (need to fetch first)
    tenant = await db.query("SELECT subdomain FROM tenants WHERE id = $1", tenant_id)
    if tenant:
        await cache.delete_async(f"tenant:subdomain:{tenant['subdomain']}")
        await cache.delete_async(f"tenant:domain:{tenant.get('custom_domain')}")

    logger.info(f"Invalidated cache for tenant {tenant_id}")

---

Anti-Patterns to Avoid

❌ Anti-Pattern 1: Cache Churning

**Problem:** Constantly writing to cache with very short TTL

# BAD: 1 second TTL = constant cache churn
async def get_user(user_id: str):
    cached = await cache.get(f"user:{user_id}")
    if not cached:
        user = await db.get_user(user_id)
        await cache.set(f"user:{user_id}", user, ttl=1)  # ❌ Too short!
    return cached

**Fix:** Use longer TTL based on data change frequency

# GOOD: 30 minute TTL
await cache.set(f"user:{user_id}", user, ttl=1800)  # ✅ Reasonable

❌ Anti-Pattern 2: N+1 Cache Queries

**Problem:** Querying cache in a loop

# BAD: N cache queries
async def get_users(user_ids: list[str]):
    users = []
    for user_id in user_ids:
        user = await cache.get(f"user:{user_id}")  # ❌ N queries
        users.append(user)
    return users

**Fix:** Batch cache queries or use multi-get

# GOOD: 1 cache query (if supported) or batch DB query
async def get_users(user_ids: list[str]):
    # Try batch cache get
    cached = await cache.m_get([f"user:{uid}" for uid in user_ids])

    # Fetch missing from DB in one query
    missing_ids = [uid for uid, val in zip(user_ids, cached) if val is None]
    if missing_ids:
        users = await db.query("SELECT * FROM users WHERE id = ANY($1)", missing_ids)

        # Cache the results
        for user in users:
            await cache.set(f"user:{user['id']}", user, ttl=1800)

    return cached

❌ Anti-Pattern 3: Caching Large Objects

**Problem:** Caching objects >1MB wastes memory

# BAD: Caching entire result set
async def get_all_transactions():
    transactions = await db.query("SELECT * FROM transactions")  # 10,000 rows
    await cache.set("transactions:all", transactions, ttl=3600)  # ❌ Too large!

**Fix:** Cache paginated results or individual items

# GOOD: Cache individual items or pages
async def get_transaction(transaction_id: str):
    cached = await cache.get(f"transaction:{transaction_id}")
    if cached:
        return cached

    transaction = await db.get_transaction(transaction_id)
    await cache.set(f"transaction:{transaction_id}", transaction, ttl=1800)
    return transaction

async def get_transactions_page(page: int, per_page: int = 100):
    cache_key = f"transactions:page:{page}"
    cached = await cache.get(cache_key)
    if cached:
        return cached

    transactions = await db.query(
        "SELECT * FROM transactions ORDER BY created_at DESC LIMIT $1 OFFSET $2",
        per_page, (page - 1) * per_page
    )
    await cache.set(cache_key, transactions, ttl=300)  # 5 minutes
    return transactions

❌ Anti-Pattern 4: Tight Polling Loops

**Problem:** Sub-second polling creates excessive operations

# BAD: Polls 10 times per second
while True:
    status = await check_status()
    if status == "complete":
        break
    await asyncio.sleep(0.1)  # ❌ Too aggressive!

**Fix:** Use longer intervals or webhooks

# GOOD: Polls once per minute
while True:
    status = await check_status()
    if status == "complete":
        break
    await asyncio.sleep(60)  # ✅ Reasonable

# BETTER: Use webhooks
await webhook_service.subscribe(event="status.complete", callback=handle_complete)

---

Performance Optimization

1. Use Hash Data Structure

**Problem:** Multiple keys for related data

# BAD: Multiple keys
await cache.set("user:123:name", "Alice")
await cache.set("user:123:email", "alice@example.com")
await cache.set("user:123:age", "30")

**Fix:** Use Redis hashes

# GOOD: Single hash
await cache.hset("user:123", mapping={
    "name": "Alice",
    "email": "alice@example.com",
    "age": "30"
})

2. Pipeline Operations

**Problem:** Multiple round-trips to Redis

# BAD: N round-trips
for key in keys:
    await cache.set(key, value)

**Fix:** Use pipeline

# GOOD: 1 round-trip
pipe = cache.client.pipeline()
for key in keys:
    pipe.set(key, value)
pipe.execute()

3. Use Redis Sets for Membership

**Problem:** Checking membership with strings

# BAD: String key
await cache.set("user:123:is_admin", "true")
is_admin = await cache.get("user:123:is_admin")

**Fix:** Use sets

# GOOD: Set membership
await cache.sadd("admins", "123")
is_admin = await cache.sismember("admins", "123")

---

Rate Limiting

Problem: Rate limiting also uses Redis

Every API request checks rate limits:

daily_count = await cache.get(f"ratelimit:{tenant_id}:daily")
minute_count = await cache.get(f"ratelimit:{tenant_id}:minute")

**Solution:** Cache rate limit checks

async def check_rate_limit(tenant_id: str) -> bool:
    # Check in-memory cache first (fast)
    local_cache_key = f"ratelimit:{tenant_id}"
    if local_cache_key in rate_limit_cache:
        return rate_limit_cache[local_cache_key]

    # Check Redis (slower, but authoritative)
    minute_key = f"ratelimit:{tenant_id}:{datetime.now().strftime('%Y%m%d:%H%M')}"
    count = await cache.get(minute_key) or 0

    if count >= LIMIT:
        rate_limit_cache[local_cache_key] = False
        return False

    rate_limit_cache[local_cache_key] = True
    return True

---

Monitoring Checklist

Daily Checks

  • [ ] Review Redis command count
  • [ ] Check cache hit rate (>80% is good)
  • [ ] Look for unusual spikes

Weekly Checks

  • [ ] Review cache key patterns
  • [ ] Identify cache churn (high miss rate)
  • [ ] Check memory usage

Monthly Checks

  • [ ] Audit cache TTLs
  • [ ] Review cache key naming
  • [ ] Clean up unused keys
  • [ ] Update monitoring thresholds

---

Quick Reference

Cache TTL Guidelines

Data TypeTTLRationale
Static config3600s (1hr)Rarely changes
User profile1800s (30min)Changes occasionally
Session data300s (5min)Changes frequently
Rate limits60s (1min)Per-minute limits
API responses60-300sBased on freshness needs

Cache Key Naming

# Good: Hierarchical, namespaced
tenant:123:profile
tenant:123:settings
user:456:session
ratelimit:789:20250408:1430

# Bad: Flat, no namespace
tenant123
user_session_456
ratelimit_789

When NOT to Cache

  1. **Real-time data** (stock prices, live stats)
  2. **Large objects** (>1MB)
  3. **One-time operations** (password reset tokens)
  4. **Streaming data** (logs, events)
  5. **Security-sensitive** (encryption keys)

---

Troubleshooting

High Redis Usage? Check This:

  1. **Current ops/sec**
  • If >100: You have a problem
  • If >1000: Critical bug (infinite loop?)
  1. **Cache hit rate**
  • If <50%: Caching ineffective
  • If <20%: No caching or wrong keys
  1. **Key count**
  • If >100K: Key explosion (memory leak?)
  • Check for: cache.set(f"temp:{uuid}", ...) without TTL
  1. **Memory usage**
  • If >1GB: Check for large objects
  • If >10MB per key: Too large!

---

Tools and Scripts

Monitor Usage

cd backend-saas
python3 scripts/monitor_redis_usage.py

Clean Up Keys

# Delete all keys matching pattern
redis-cli --scan --pattern "temp:*" | xargs redis-cli del

# Or using Python
python3 scripts/clean_all_upstash_and_qstash.py

Analyze Key Patterns

# Get sample keys
redis-cli --scan --pattern "*" | head -100

# Count keys by pattern
redis-cli --scan --pattern "tenant:*" | wc -l
redis-cli --scan --pattern "user:*" | wc -l
redis-cli --scan --pattern "ratelimit:*" | wc -l

---

Further Reading

  • **Redis Best Practices:** https://redis.io/topics/lru-cache
  • **Upstash Documentation:** https://upstash.com/docs
  • **Caching Strategies:** https://docs.aws.amazon.com/whitepapers/latest/database-caching-strategies-using-redis/

---

**Last Updated:** 2026-04-09

**Version:** 1.0

**Related:**

  • REDIS_MONITORING_GUIDE.md
  • REDIS_SPIKE_ANALYSIS.md
  • REDIS_READS_DIAGNOSIS.md