
nosql-databases
MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications
MongoDB, Redis, Cassandra, DynamoDB, and distributed database patterns for scalable applications
NoSQL Databases
Production-grade NoSQL database patterns with MongoDB, Redis, Cassandra, and DynamoDB.
Quick Start
# MongoDB with PyMongo
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from datetime import datetime
client = MongoClient("mongodb://localhost:27017/")
db = client.analytics
events = db.events
# Create index for query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
events.create_index([("event_type", 1)])
# Insert with retry pattern
def insert_event(event: dict, retries: int = 3):
event["_id"] = f"{event['user_id']}_{event['timestamp'].isoformat()}"
event["created_at"] = datetime.utcnow()
for attempt in range(retries):
try:
events.insert_one(event)
return True
except DuplicateKeyError:
return False # Already exists
except Exception as e:
if attempt == retries - 1:
raise
return False
# Aggregation pipeline
pipeline = [
{"$match": {"event_type": "purchase", "timestamp": {"$gte": datetime(2024, 1, 1)}}},
{"$group": {"_id": "$user_id", "total_purchases": {"$sum": "$amount"}, "count": {"$sum": 1}}},
{"$sort": {"total_purchases": -1}},
{"$limit": 100}
]
top_customers = list(events.aggregate(pipeline))
Core Concepts
1. Redis for Caching & Real-time
import redis
import json
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Cache pattern with TTL
def get_user_profile(user_id: str) -> dict:
cache_key = f"user:{user_id}:profile"
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch from DB
profile = fetch_from_database(user_id)
# Set with 1 hour TTL
r.setex(cache_key, timedelta(hours=1), json.dumps(profile))
return profile
# Rate limiting
def check_rate_limit(user_id: str, limit: int = 100, window: int = 60) -> bool:
key = f"rate:{user_id}:{int(time.time()) // window}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
# Real-time leaderboard with sorted sets
def update_leaderboard(user_id: str, score: float):
r.zadd("leaderboard:daily", {user_id: score})
def get_top_users(n: int = 10) -> list:
return r.zrevrange("leaderboard:daily", 0, n-1, withscores=True)
# Pub/Sub for event streaming
def publish_event(channel: str, event: dict):
r.publish(channel, json.dumps(event))
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield json.loads(message['data'])
2. DynamoDB Patterns
import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Events')
# Single table design pattern
def put_event(event: dict):
item = {
'PK': f"USER#{event['user_id']}",
'SK': f"EVENT#{event['timestamp']}#{event['event_id']}",
'GSI1PK': f"TYPE#{event['event_type']}",
'GSI1SK': f"DATE#{event['timestamp'][:10]}",
'data': event
}
table.put_item(Item=item)
# Query by user
def get_user_events(user_id: str, limit: int = 100):
response = table.query(
KeyConditionExpression=Key('PK').eq(f"USER#{user_id}") & Key('SK').begins_with("EVENT#"),
ScanIndexForward=False,
Limit=limit
)
return response['Items']
# Query by event type (using GSI)
def get_events_by_type(event_type: str, date: str):
response = table.query(
IndexName='GSI1',
KeyConditionExpression=Key('GSI1PK').eq(f"TYPE#{event_type}") & Key('GSI1SK').eq(f"DATE#{date}")
)
return response['Items']
# Batch write with exponential backoff
def batch_write_events(events: list):
with table.batch_writer() as batch:
for event in events:
batch.put_item(Item=event)
3. Cassandra for Time Series
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement, SimpleStatement
from datetime import datetime
cluster = Cluster(['node1', 'node2', 'node3'])
session = cluster.connect('analytics')
# Create table with time-based partitioning
session.execute("""
CREATE TABLE IF NOT EXISTS events_by_day (
date date,
user_id uuid,
event_time timestamp,
event_type text,
data text,
PRIMARY KEY ((date), event_time, user_id)
) WITH CLUSTERING ORDER BY (event_time DESC)
""")
# Insert with prepared statement
insert_stmt = session.prepare("""
INSERT INTO events_by_day (date, user_id, event_time, event_type, data)
VALUES (?, ?, ?, ?, ?)
""")
def insert_event(event: dict):
session.execute(insert_stmt, [
event['timestamp'].date(),
event['user_id'],
event['timestamp'],
event['event_type'],
json.dumps(event['data'])
])
# Query by date range
def get_events_for_date(date: datetime.date):
rows = session.execute(
"SELECT * FROM events_by_day WHERE date = %s",
[date]
)
return list(rows)
Tools & Technologies
| Tool | Purpose | Version (2025) |
|---|---|---|
| MongoDB | Document store | 7.0+ |
| Redis | Cache, pub/sub | 7.2+ |
| Cassandra | Time series, wide column | 5.0+ |
| DynamoDB | Managed key-value | Latest |
| Elasticsearch | Search, analytics | 8.12+ |
| ScyllaDB | High-perf Cassandra | 5.4+ |
Troubleshooting Guide
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Hot Partition | High latency on some keys | Uneven partition key | Redesign partition key |
| Memory Pressure | Redis evictions, slow queries | Data > memory | Eviction policy, clustering |
| Query Timeout | Slow reads in Cassandra | Missing index, large partition | Add index, limit partition size |
| Consistency Issues | Stale reads | Eventual consistency | Use appropriate consistency level |
Best Practices
# ✅ DO: Design for access patterns (NoSQL)
# Primary key = partition key + sort key
# ✅ DO: Use connection pooling
pool = redis.ConnectionPool(max_connections=20)
r = redis.Redis(connection_pool=pool)
# ✅ DO: Set TTLs on cache data
r.setex(key, ttl_seconds, value)
# ✅ DO: Handle eventual consistency
# Read-your-writes with consistent reads where needed
# ❌ DON'T: Use NoSQL for complex joins
# ❌ DON'T: Store unbounded data in single document
# ❌ DON'T: Ignore partition sizing
Resources
Skill Certification Checklist:
- [ ] Can design document schemas for MongoDB
- [ ] Can implement caching patterns with Redis
- [ ] Can model time series data in Cassandra
- [ ] Can use DynamoDB single-table design
- [ ] Can choose appropriate consistency levels
You Might Also Like
Related Skills

zig-system-calls
Guides using bun.sys for system calls and file I/O in Zig. Use when implementing file operations instead of std.fs or std.posix.
oven-sh
bun-file-io
Use this when you are working on file operations like reading, writing, scanning, or deleting files. It summarizes the preferred file APIs and patterns used in this repo. It also notes when to use filesystem helpers for directories.
anomalyco
vector-index-tuning
Optimize vector index performance for latency, recall, and memory. Use when tuning HNSW parameters, selecting quantization strategies, or scaling vector search infrastructure.
wshobson
similarity-search-patterns
Implement efficient similarity search with vector databases. Use when building semantic search, implementing nearest neighbor queries, or optimizing retrieval performance.
wshobson
dbt-transformation-patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
wshobson
event-store-design
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.
wshobson