Redis Usage Prevention Guide
Overview
This guide provides best practices and patterns to prevent excessive Redis usage and avoid database suspensions.
---
Core Principles
1. Cache Smart, Not Everything
**Problem:** Caching everything wastes memory and can hurt performance
**Solution:** Cache strategically based on access patterns
**What to Cache:**
- ✅ Tenant lookups (accessed on every API request)
- ✅ User sessions (accessed frequently, change rarely)
- ✅ Configuration data (accessed frequently, static)
- ✅ Expensive query results (DB joins, aggregations)
**What NOT to Cache:**
- ❌ Rapidly changing data (stock prices, real-time stats)
- ❌ Large objects (>1MB)
- ❌ Data accessed once (no benefit to cache)
- ❌ Streaming data (logs, events)
2. Use Appropriate TTLs
**Problem:** Wrong TTL causes stale data or excessive cache misses
**Solution:** Match TTL to data change frequency
# Static data (rarely changes)
cache.set("config:features", features, ttl=3600) # 1 hour
# User data (changes occasionally)
cache.set("user:profile", profile, ttl=1800) # 30 minutes
# Session data (changes frequently)
cache.set("session:active", active, ttl=300) # 5 minutes
# Rate limits (reset every minute)
cache.set("ratelimit:user123", count, ttl=60) # 1 minute3. Monitor Continuously
**Problem:** Don't know about issues until it's too late
**Solution:** Automated monitoring and alerts
See REDIS_MONITORING_GUIDE.md for setup instructions.
---
Code Patterns
Pattern 1: Cache-Aside (Lazy Loading)
**Best for:** Read-heavy workloads
async def get_tenant(tenant_id: str) -> dict | None:
# Try cache first
cache_key = f"tenant:{tenant_id}"
cached = await cache.get_async(cache_key)
if cached:
logger.debug(f"Cache hit: {cache_key}")
return json.loads(cached)
# Cache miss - fetch from DB
logger.debug(f"Cache miss: {cache_key}")
tenant = await db.query(
"SELECT * FROM tenants WHERE id = $1",
tenant_id
)
if tenant:
# Store in cache for next time
await cache.set_async(
cache_key,
json.dumps(tenant),
ttl=3600 # 1 hour
)
return tenantPattern 2: Write-Through
**Best for:** Read-after-write consistency
async def update_tenant(tenant_id: str, data: dict) -> dict:
# Update database
tenant = await db.query(
"UPDATE tenants SET name = $2 WHERE id = $1 RETURNING *",
tenant_id, data["name"]
)
# Immediately update cache
cache_key = f"tenant:{tenant_id}"
await cache.set_async(
cache_key,
json.dumps(tenant),
ttl=3600
)
return tenantPattern 3: Write-Behind (Async)
**Best for:** Write-heavy workloads
async def update_tenant_async(tenant_id: str, data: dict) -> dict:
# Update cache immediately
cache_key = f"tenant:{tenant_id}"
await cache.set_async(cache_key, json.dumps(data), ttl=3600)
# Queue DB update for later
await queue.enqueue("update_tenant", tenant_id, data)
return data # Return immediatelyPattern 4: Cache Invalidation
**Problem:** Stale cache after updates
**Solution:** Invalidate related cache keys on updates
async def invalidate_tenant_cache(tenant_id: str):
"""Invalidate all cache entries for a tenant"""
# Invalidate by ID
await cache.delete_async(f"tenant:{tenant_id}")
# Invalidate by subdomain (need to fetch first)
tenant = await db.query("SELECT subdomain FROM tenants WHERE id = $1", tenant_id)
if tenant:
await cache.delete_async(f"tenant:subdomain:{tenant['subdomain']}")
await cache.delete_async(f"tenant:domain:{tenant.get('custom_domain')}")
logger.info(f"Invalidated cache for tenant {tenant_id}")---
Anti-Patterns to Avoid
❌ Anti-Pattern 1: Cache Churning
**Problem:** Constantly writing to cache with very short TTL
# BAD: 1 second TTL = constant cache churn
async def get_user(user_id: str):
cached = await cache.get(f"user:{user_id}")
if not cached:
user = await db.get_user(user_id)
await cache.set(f"user:{user_id}", user, ttl=1) # ❌ Too short!
return cached**Fix:** Use longer TTL based on data change frequency
# GOOD: 30 minute TTL
await cache.set(f"user:{user_id}", user, ttl=1800) # ✅ Reasonable❌ Anti-Pattern 2: N+1 Cache Queries
**Problem:** Querying cache in a loop
# BAD: N cache queries
async def get_users(user_ids: list[str]):
users = []
for user_id in user_ids:
user = await cache.get(f"user:{user_id}") # ❌ N queries
users.append(user)
return users**Fix:** Batch cache queries or use multi-get
# GOOD: 1 cache query (if supported) or batch DB query
async def get_users(user_ids: list[str]):
# Try batch cache get
cached = await cache.m_get([f"user:{uid}" for uid in user_ids])
# Fetch missing from DB in one query
missing_ids = [uid for uid, val in zip(user_ids, cached) if val is None]
if missing_ids:
users = await db.query("SELECT * FROM users WHERE id = ANY($1)", missing_ids)
# Cache the results
for user in users:
await cache.set(f"user:{user['id']}", user, ttl=1800)
return cached❌ Anti-Pattern 3: Caching Large Objects
**Problem:** Caching objects >1MB wastes memory
# BAD: Caching entire result set
async def get_all_transactions():
transactions = await db.query("SELECT * FROM transactions") # 10,000 rows
await cache.set("transactions:all", transactions, ttl=3600) # ❌ Too large!**Fix:** Cache paginated results or individual items
# GOOD: Cache individual items or pages
async def get_transaction(transaction_id: str):
cached = await cache.get(f"transaction:{transaction_id}")
if cached:
return cached
transaction = await db.get_transaction(transaction_id)
await cache.set(f"transaction:{transaction_id}", transaction, ttl=1800)
return transaction
async def get_transactions_page(page: int, per_page: int = 100):
cache_key = f"transactions:page:{page}"
cached = await cache.get(cache_key)
if cached:
return cached
transactions = await db.query(
"SELECT * FROM transactions ORDER BY created_at DESC LIMIT $1 OFFSET $2",
per_page, (page - 1) * per_page
)
await cache.set(cache_key, transactions, ttl=300) # 5 minutes
return transactions❌ Anti-Pattern 4: Tight Polling Loops
**Problem:** Sub-second polling creates excessive operations
# BAD: Polls 10 times per second
while True:
status = await check_status()
if status == "complete":
break
await asyncio.sleep(0.1) # ❌ Too aggressive!**Fix:** Use longer intervals or webhooks
# GOOD: Polls once per minute
while True:
status = await check_status()
if status == "complete":
break
await asyncio.sleep(60) # ✅ Reasonable
# BETTER: Use webhooks
await webhook_service.subscribe(event="status.complete", callback=handle_complete)---
Performance Optimization
1. Use Hash Data Structure
**Problem:** Multiple keys for related data
# BAD: Multiple keys
await cache.set("user:123:name", "Alice")
await cache.set("user:123:email", "alice@example.com")
await cache.set("user:123:age", "30")**Fix:** Use Redis hashes
# GOOD: Single hash
await cache.hset("user:123", mapping={
"name": "Alice",
"email": "alice@example.com",
"age": "30"
})2. Pipeline Operations
**Problem:** Multiple round-trips to Redis
# BAD: N round-trips
for key in keys:
await cache.set(key, value)**Fix:** Use pipeline
# GOOD: 1 round-trip
pipe = cache.client.pipeline()
for key in keys:
pipe.set(key, value)
pipe.execute()3. Use Redis Sets for Membership
**Problem:** Checking membership with strings
# BAD: String key
await cache.set("user:123:is_admin", "true")
is_admin = await cache.get("user:123:is_admin")**Fix:** Use sets
# GOOD: Set membership
await cache.sadd("admins", "123")
is_admin = await cache.sismember("admins", "123")---
Rate Limiting
Problem: Rate limiting also uses Redis
Every API request checks rate limits:
daily_count = await cache.get(f"ratelimit:{tenant_id}:daily")
minute_count = await cache.get(f"ratelimit:{tenant_id}:minute")**Solution:** Cache rate limit checks
async def check_rate_limit(tenant_id: str) -> bool:
# Check in-memory cache first (fast)
local_cache_key = f"ratelimit:{tenant_id}"
if local_cache_key in rate_limit_cache:
return rate_limit_cache[local_cache_key]
# Check Redis (slower, but authoritative)
minute_key = f"ratelimit:{tenant_id}:{datetime.now().strftime('%Y%m%d:%H%M')}"
count = await cache.get(minute_key) or 0
if count >= LIMIT:
rate_limit_cache[local_cache_key] = False
return False
rate_limit_cache[local_cache_key] = True
return True---
Monitoring Checklist
Daily Checks
- [ ] Review Redis command count
- [ ] Check cache hit rate (>80% is good)
- [ ] Look for unusual spikes
Weekly Checks
- [ ] Review cache key patterns
- [ ] Identify cache churn (high miss rate)
- [ ] Check memory usage
Monthly Checks
- [ ] Audit cache TTLs
- [ ] Review cache key naming
- [ ] Clean up unused keys
- [ ] Update monitoring thresholds
---
Quick Reference
Cache TTL Guidelines
| Data Type | TTL | Rationale |
|---|---|---|
| Static config | 3600s (1hr) | Rarely changes |
| User profile | 1800s (30min) | Changes occasionally |
| Session data | 300s (5min) | Changes frequently |
| Rate limits | 60s (1min) | Per-minute limits |
| API responses | 60-300s | Based on freshness needs |
Cache Key Naming
# Good: Hierarchical, namespaced
tenant:123:profile
tenant:123:settings
user:456:session
ratelimit:789:20250408:1430
# Bad: Flat, no namespace
tenant123
user_session_456
ratelimit_789When NOT to Cache
- **Real-time data** (stock prices, live stats)
- **Large objects** (>1MB)
- **One-time operations** (password reset tokens)
- **Streaming data** (logs, events)
- **Security-sensitive** (encryption keys)
---
Troubleshooting
High Redis Usage? Check This:
- **Current ops/sec**
- If >100: You have a problem
- If >1000: Critical bug (infinite loop?)
- **Cache hit rate**
- If <50%: Caching ineffective
- If <20%: No caching or wrong keys
- **Key count**
- If >100K: Key explosion (memory leak?)
- Check for:
cache.set(f"temp:{uuid}", ...)without TTL
- **Memory usage**
- If >1GB: Check for large objects
- If >10MB per key: Too large!
---
Tools and Scripts
Monitor Usage
cd backend-saas
python3 scripts/monitor_redis_usage.pyClean Up Keys
# Delete all keys matching pattern
redis-cli --scan --pattern "temp:*" | xargs redis-cli del
# Or using Python
python3 scripts/clean_all_upstash_and_qstash.pyAnalyze Key Patterns
# Get sample keys
redis-cli --scan --pattern "*" | head -100
# Count keys by pattern
redis-cli --scan --pattern "tenant:*" | wc -l
redis-cli --scan --pattern "user:*" | wc -l
redis-cli --scan --pattern "ratelimit:*" | wc -l---
Further Reading
- **Redis Best Practices:** https://redis.io/topics/lru-cache
- **Upstash Documentation:** https://upstash.com/docs
- **Caching Strategies:** https://docs.aws.amazon.com/whitepapers/latest/database-caching-strategies-using-redis/
---
**Last Updated:** 2026-04-09
**Version:** 1.0
**Related:**
REDIS_MONITORING_GUIDE.mdREDIS_SPIKE_ANALYSIS.mdREDIS_READS_DIAGNOSIS.md