
monitoring-observability
Prometheus, Grafana, logging, alerting, and data pipeline observability
Prometheus, Grafana, logging, alerting, and data pipeline observability
Monitoring & Observability
Production monitoring with Prometheus, Grafana, structured logging, and data quality observability.
Quick Start
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import structlog
import time
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Prometheus metrics
RECORDS_PROCESSED = Counter('records_processed_total', 'Total records processed', ['pipeline', 'status'])
PROCESSING_TIME = Histogram('processing_duration_seconds', 'Processing duration', ['pipeline'])
QUEUE_SIZE = Gauge('queue_size', 'Current queue size', ['queue_name'])
def process_batch(batch: list, pipeline_name: str):
start_time = time.time()
try:
for record in batch:
# Process record...
RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='success').inc()
duration = time.time() - start_time
PROCESSING_TIME.labels(pipeline=pipeline_name).observe(duration)
logger.info("batch_processed",
pipeline=pipeline_name,
count=len(batch),
duration_seconds=duration
)
except Exception as e:
RECORDS_PROCESSED.labels(pipeline=pipeline_name, status='error').inc()
logger.error("batch_failed", pipeline=pipeline_name, error=str(e))
raise
# Start metrics server
start_http_server(8000)
Core Concepts
1. Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge, Summary
# Counter: monotonically increasing value
http_requests = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_requests.labels(method='GET', endpoint='/api/data', status='200').inc()
# Histogram: distribution of values (latency, sizes)
request_latency = Histogram(
'request_latency_seconds',
'Request latency in seconds',
['endpoint'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
with request_latency.labels(endpoint='/api/data').time():
# Process request
pass
# Gauge: value that can go up and down
active_connections = Gauge('active_connections', 'Active connections')
active_connections.inc() # Connection opened
active_connections.dec() # Connection closed
# Summary: similar to histogram with percentiles
response_size = Summary('response_size_bytes', 'Response size', ['endpoint'])
response_size.labels(endpoint='/api/data').observe(1024)
2. Grafana Dashboard (JSON)
{
"title": "Data Pipeline Dashboard",
"panels": [
{
"title": "Records Processed",
"type": "stat",
"targets": [{
"expr": "sum(rate(records_processed_total[5m]))",
"legendFormat": "Records/sec"
}]
},
{
"title": "Processing Latency P95",
"type": "graph",
"targets": [{
"expr": "histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m]))",
"legendFormat": "P95 Latency"
}]
},
{
"title": "Error Rate",
"type": "gauge",
"targets": [{
"expr": "sum(rate(records_processed_total{status='error'}[5m])) / sum(rate(records_processed_total[5m])) * 100",
"legendFormat": "Error %"
}]
}
]
}
3. Structured Logging
import structlog
from datetime import datetime
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
)
logger = structlog.get_logger()
# Usage with context
log = logger.bind(service="etl-pipeline", environment="production")
def process_order(order_id: str, user_id: str):
order_log = log.bind(order_id=order_id, user_id=user_id)
order_log.info("processing_started")
try:
# Process...
order_log.info("processing_completed", duration_ms=150)
except Exception as e:
order_log.error("processing_failed", error=str(e), exc_info=True)
raise
4. Alerting Rules (Prometheus)
# alerting_rules.yml
groups:
- name: data-pipeline-alerts
rules:
- alert: HighErrorRate
expr: |
sum(rate(records_processed_total{status="error"}[5m]))
/ sum(rate(records_processed_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate in data pipeline"
description: "Error rate is {{ $value | humanizePercentage }}"
- alert: PipelineStalled
expr: |
sum(rate(records_processed_total[10m])) == 0
for: 10m
labels:
severity: warning
annotations:
summary: "Data pipeline is not processing records"
- alert: HighLatency
expr: |
histogram_quantile(0.95, rate(processing_duration_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High processing latency detected"
Tools & Technologies
| Tool | Purpose | Version (2025) |
|---|---|---|
| Prometheus | Metrics collection | 2.50+ |
| Grafana | Visualization | 10.3+ |
| Loki | Log aggregation | 2.9+ |
| Alertmanager | Alert routing | 0.27+ |
| OpenTelemetry | Tracing standard | 1.24+ |
| Datadog | Full observability | Latest |
| Monte Carlo | Data observability | Latest |
Troubleshooting Guide
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Missing Metrics | Gaps in graphs | Scrape failure | Check targets, network |
| High Cardinality | Prometheus OOM | Too many labels | Reduce label values |
| Alert Fatigue | Too many alerts | Sensitive thresholds | Tune thresholds, add for duration |
| Log Volume | High storage cost | Verbose logging | Adjust log levels |
Best Practices
# ✅ DO: Use appropriate metric types
# Counter for totals, Histogram for latency
# ✅ DO: Add meaningful labels (but limit cardinality)
REQUESTS.labels(method='GET', status='200', endpoint='/api').inc()
# ✅ DO: Include correlation IDs in logs
logger.info("request_completed", request_id=request_id)
# ✅ DO: Set up dashboards for key metrics
# ❌ DON'T: High cardinality labels (user_id, request_id as labels)
# ❌ DON'T: Log sensitive data
# ❌ DON'T: Alert on every error
Resources
Skill Certification Checklist:
- [ ] Can instrument applications with Prometheus metrics
- [ ] Can create Grafana dashboards
- [ ] Can implement structured logging
- [ ] Can set up alerting rules
- [ ] Can troubleshoot observability issues
You Might Also Like
Related Skills

create-pr
Creates GitHub pull requests with properly formatted titles that pass the check-pr-title CI validation. Use when creating PRs, submitting changes for review, or when the user says /pr or asks to create a pull request.
n8n-io
electron-chromium-upgrade
Guide for performing Chromium version upgrades in the Electron project. Use when working on the roller/chromium/main branch to fix patch conflicts during `e sync --3`. Covers the patch application workflow, conflict resolution, analyzing upstream Chromium changes, and proper commit formatting for patch fixes.
electron
pr-creator
Use this skill when asked to create a pull request (PR). It ensures all PRs follow the repository's established templates and standards.
google-gemini
clawdhub
Use the ClawdHub CLI to search, install, update, and publish agent skills from clawdhub.com. Use when you need to fetch new skills on the fly, sync installed skills to latest or a specific version, or publish new/updated skill folders with the npm-installed clawdhub CLI.
moltbot
tmux
Remote-control tmux sessions for interactive CLIs by sending keystrokes and scraping pane output.
moltbot
create-pull-request
Create a GitHub pull request following project conventions. Use when the user asks to create a PR, submit changes for review, or open a pull request. Handles commit analysis, branch management, and PR creation using the gh CLI tool.
cline