
data-engineer
Expert in data pipelines, ETL processes, and data infrastructure
Expert in data pipelines, ETL processes, and data infrastructure
Data Engineer Skill
I help you build robust data pipelines, ETL processes, and data infrastructure.
What I Do
Data Pipelines:
- Extract, Transform, Load (ETL) processes
- Data ingestion from multiple sources
- Batch and real-time processing
- Data quality validation
Data Infrastructure:
- Database schema design
- Data warehousing
- Caching strategies
- Data replication
Analytics:
- Data aggregation
- Metrics calculation
- Report generation
- Data export
ETL Patterns
Pattern 1: Simple ETL Pipeline
Use case: Daily sync from external API to database
// lib/etl/daily-sync.ts
interface RawCustomer {
id: string
full_name: string
email_address: string
signup_date: string
}
interface Customer {
id: string
name: string
email: string
signupDate: Date
}
export async function syncCustomers() {
console.log('Starting customer sync...')
// EXTRACT: Fetch data from external API
const response = await fetch('https://api.example.com/customers', {
headers: {
Authorization: `Bearer ${process.env.API_KEY}`
}
})
const rawCustomers: RawCustomer[] = await response.json()
console.log(`Extracted ${rawCustomers.length} customers`)
// TRANSFORM: Clean and normalize data
const transformedCustomers: Customer[] = rawCustomers.map(raw => ({
id: raw.id,
name: raw.full_name.trim(),
email: raw.email_address.toLowerCase(),
signupDate: new Date(raw.signup_date)
}))
// LOAD: Insert into database
let inserted = 0
let updated = 0
for (const customer of transformedCustomers) {
const existing = await db.customers.findUnique({
where: { id: customer.id }
})
if (existing) {
await db.customers.update({
where: { id: customer.id },
data: customer
})
updated++
} else {
await db.customers.create({
data: customer
})
inserted++
}
}
console.log(`Sync complete: ${inserted} inserted, ${updated} updated`)
return { inserted, updated, total: transformedCustomers.length }
}
Schedule with Vercel Cron:
// vercel.json
{
"crons": [
{
"path": "/api/cron/sync-customers",
"schedule": "0 2 * * *"
}
]
}
// app/api/cron/sync-customers/route.ts
import { syncCustomers } from '@/lib/etl/daily-sync'
export async function GET(req: Request) {
const authHeader = req.headers.get('authorization')
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return Response.json({ error: 'Unauthorized' }, { status: 401 })
}
try {
const result = await syncCustomers()
return Response.json(result)
} catch (error) {
console.error('Sync failed:', error)
return Response.json({ error: 'Sync failed' }, { status: 500 })
}
}
Pattern 2: Incremental ETL (Delta Sync)
Use case: Only process new/changed records
// lib/etl/incremental-sync.ts
export async function incrementalSync() {
// Get last sync timestamp
const lastSync = await db.syncLog.findFirst({
where: { source: 'customers' },
orderBy: { syncedAt: 'desc' }
})
const since = lastSync?.syncedAt || new Date('2020-01-01')
// EXTRACT: Only fetch records modified since last sync
const response = await fetch(
`https://api.example.com/customers?modified_since=${since.toISOString()}`,
{
headers: { Authorization: `Bearer ${process.env.API_KEY}` }
}
)
const newOrModified = await response.json()
console.log(`Found ${newOrModified.length} new/modified records`)
// TRANSFORM & LOAD
for (const record of newOrModified) {
await db.customers.upsert({
where: { id: record.id },
create: transformCustomer(record),
update: transformCustomer(record)
})
}
// Log sync
await db.syncLog.create({
data: {
source: 'customers',
recordsProcessed: newOrModified.length,
syncedAt: new Date()
}
})
return { processed: newOrModified.length }
}
Benefits:
- Faster (only process changes)
- Lower API costs
- Reduced database load
Pattern 3: Real-Time Data Pipeline
Use case: Process events as they happen
// lib/pipelines/events-processor.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'myapp',
brokers: [process.env.KAFKA_BROKER!]
})
const consumer = kafka.consumer({ groupId: 'analytics-group' })
export async function startEventProcessor() {
await consumer.connect()
await consumer.subscribe({ topic: 'user-events', fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString())
// TRANSFORM: Enrich event data
const enrichedEvent = {
...event,
processedAt: new Date(),
userId: event.user_id,
eventType: event.type.toLowerCase()
}
// LOAD: Write to analytics database
await analyticsDb.events.create({
data: enrichedEvent
})
// Also update real-time metrics
await updateRealtimeMetrics(enrichedEvent)
}
})
}
async function updateRealtimeMetrics(event: any) {
if (event.eventType === 'purchase') {
await redis.hincrby('metrics:today', 'purchases', 1)
await redis.hincrbyfloat('metrics:today', 'revenue', event.amount)
}
}
Data Transformation Patterns
Transformation 1: Data Cleaning
// lib/transformers/cleaners.ts
export function cleanEmail(email: string): string {
return email.trim().toLowerCase()
}
export function cleanPhone(phone: string): string {
// Remove all non-numeric characters
return phone.replace(/\D/g, '')
}
export function cleanName(name: string): string {
return name
.trim()
.replace(/\s+/g, ' ') // Multiple spaces ā single space
.split(' ')
.map(word => word.charAt(0).toUpperCase() + word.slice(1).toLowerCase())
.join(' ')
}
export function parseDate(dateStr: string): Date | null {
try {
const date = new Date(dateStr)
return isNaN(date.getTime()) ? null : date
} catch {
return null
}
}
Transformation 2: Data Enrichment
// lib/transformers/enrichers.ts
export async function enrichCustomer(customer: RawCustomer) {
// Add geolocation data
const geo = await geocode(customer.address)
// Add lifecycle stage
const daysSinceSignup = differenceInDays(new Date(), customer.signupDate)
const lifecycleStage =
daysSinceSignup < 7
? 'new'
: daysSinceSignup < 30
? 'active'
: daysSinceSignup < 90
? 'engaged'
: 'dormant'
// Add lifetime value
const orders = await db.orders.findMany({
where: { customerId: customer.id }
})
const lifetimeValue = orders.reduce((sum, order) => sum + order.total, 0)
return {
...customer,
latitude: geo.lat,
longitude: geo.lng,
lifecycleStage,
lifetimeValue,
totalOrders: orders.length
}
}
Transformation 3: Data Aggregation
// lib/transformers/aggregators.ts
export async function aggregateDailySales() {
const sales = await db.$queryRaw`
SELECT
DATE(created_at) as date,
COUNT(*) as order_count,
SUM(total) as total_revenue,
AVG(total) as average_order_value,
COUNT(DISTINCT user_id) as unique_customers
FROM orders
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY DATE(created_at)
ORDER BY date DESC
`
return sales
}
export async function aggregateByRegion() {
const regions = await db.$queryRaw`
SELECT
country,
COUNT(*) as customer_count,
SUM(lifetime_value) as total_revenue
FROM customers
GROUP BY country
ORDER BY total_revenue DESC
`
return regions
}
Data Validation
Schema Validation with Zod
// lib/validators/customer.ts
import { z } from 'zod'
export const customerSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
name: z.string().min(1).max(100),
age: z.number().int().min(0).max(150).optional(),
signupDate: z.coerce.date(),
tags: z.array(z.string()).default([])
})
export type ValidatedCustomer = z.infer<typeof customerSchema>
export function validateCustomer(data: unknown): ValidatedCustomer {
return customerSchema.parse(data)
}
// In ETL pipeline
const rawData = await fetchFromAPI()
const validatedData = rawData
.map(record => {
try {
return validateCustomer(record)
} catch (error) {
console.error(`Validation failed for record ${record.id}:`, error)
return null
}
})
.filter(Boolean)
Data Quality Checks
// lib/quality/checks.ts
export async function dataQualityChecks() {
const checks = []
// Check 1: No duplicate emails
const duplicates = await db.$queryRaw`
SELECT email, COUNT(*) as count
FROM customers
GROUP BY email
HAVING COUNT(*) > 1
`
checks.push({
name: 'No duplicate emails',
passed: duplicates.length === 0,
issues: duplicates
})
// Check 2: All customers have valid emails
const invalidEmails = await db.customers.count({
where: {
email: {
not: {
contains: '@'
}
}
}
})
checks.push({
name: 'Valid email format',
passed: invalidEmails === 0,
issues: invalidEmails
})
// Check 3: No orphaned orders
const orphanedOrders = await db.$queryRaw`
SELECT COUNT(*) as count
FROM orders
WHERE user_id NOT IN (SELECT id FROM customers)
`
checks.push({
name: 'No orphaned orders',
passed: orphanedOrders[0].count === 0,
issues: orphanedOrders[0].count
})
return checks
}
Data Warehousing
Star Schema Design
// prisma/schema.prisma
// Fact table (metrics/events)
model FactSales {
id String @id @default(cuid())
// Foreign keys to dimensions
dateId String
customerId String
productId String
locationId String
// Metrics
quantity Int
unitPrice Decimal
totalAmount Decimal
discountAmount Decimal
netAmount Decimal
// Relations
date DimDate @relation(fields: [dateId], references: [id])
customer DimCustomer @relation(fields: [customerId], references: [id])
product DimProduct @relation(fields: [productId], references: [id])
location DimLocation @relation(fields: [locationId], references: [id])
}
// Dimension tables
model DimDate {
id String @id
date DateTime
year Int
quarter Int
month Int
dayOfWeek Int
isWeekend Boolean
isHoliday Boolean
sales FactSales[]
}
model DimCustomer {
id String @id
name String
email String
segment String // 'enterprise', 'smb', 'consumer'
lifecycleStage String
sales FactSales[]
}
model DimProduct {
id String @id
name String
category String
brand String
sku String
sales FactSales[]
}
model DimLocation {
id String @id
country String
state String
city String
zipCode String
sales FactSales[]
}
Populate Data Warehouse
// lib/warehouse/populate.ts
export async function populateWarehouse() {
// Extract from operational database
const orders = await db.orders.findMany({
include: {
customer: true,
items: {
include: { product: true }
}
},
where: {
createdAt: {
gte: new Date(Date.now() - 24 * 60 * 60 * 1000) // Last 24 hours
}
}
})
for (const order of orders) {
// Transform into fact table format
for (const item of order.items) {
await warehouseDb.factSales.create({
data: {
dateId: formatDateId(order.createdAt),
customerId: order.customer.id,
productId: item.product.id,
locationId: order.customer.locationId,
quantity: item.quantity,
unitPrice: item.price,
totalAmount: item.quantity * item.price,
discountAmount: item.discount || 0,
netAmount: item.quantity * item.price - (item.discount || 0)
}
})
}
}
}
function formatDateId(date: Date): string {
return date.toISOString().split('T')[0] // "2025-10-22"
}
Performance Optimization
Batch Processing
// lib/etl/batch-processor.ts
export async function processBatch<T>(
items: T[],
processor: (item: T) => Promise<void>,
batchSize = 100
) {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize)
// Process batch in parallel
await Promise.all(batch.map(processor))
console.log(`Processed ${Math.min(i + batchSize, items.length)}/${items.length}`)
// Small delay to avoid overwhelming database
await new Promise(resolve => setTimeout(resolve, 100))
}
}
// Usage
await processBatch(
customers,
async customer => {
await db.customers.upsert({
where: { id: customer.id },
create: customer,
update: customer
})
},
100 // Process 100 at a time
)
Database Optimization
// lib/db/optimizations.ts
// Use raw SQL for complex aggregations
export async function efficientAggregation() {
// Instead of multiple queries
const result = await db.$queryRaw`
SELECT
c.segment,
COUNT(DISTINCT c.id) as customer_count,
COUNT(o.id) as order_count,
SUM(o.total) as total_revenue,
AVG(o.total) as avg_order_value
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
GROUP BY c.segment
`
return result
}
// Use indexes for faster queries
// In migration file:
await db.$executeRaw`
CREATE INDEX idx_orders_created_at ON orders(created_at);
CREATE INDEX idx_orders_customer_id ON orders(customer_id);
CREATE INDEX idx_customers_email ON customers(email);
`
Caching Strategies
Redis for Aggregated Data
// lib/cache/metrics.ts
import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL!)
export async function getCachedMetrics(key: string) {
const cached = await redis.get(key)
if (cached) {
return JSON.parse(cached)
}
// Calculate metrics
const metrics = await calculateMetrics()
// Cache for 1 hour
await redis.setex(key, 3600, JSON.stringify(metrics))
return metrics
}
async function calculateMetrics() {
const [revenue, orders, customers] = await Promise.all([
db.orders.aggregate({ _sum: { total: true } }),
db.orders.count(),
db.customers.count()
])
return {
totalRevenue: revenue._sum.total || 0,
totalOrders: orders,
totalCustomers: customers,
avgOrderValue: (revenue._sum.total || 0) / orders
}
}
Monitoring and Logging
Pipeline Monitoring
// lib/monitoring/pipeline.ts
interface PipelineRun {
pipelineName: string
startTime: Date
endTime?: Date
status: 'running' | 'success' | 'failed'
recordsProcessed: number
errorMessage?: string
}
export async function trackPipeline<T>(name: string, pipeline: () => Promise<T>): Promise<T> {
const run: PipelineRun = {
pipelineName: name,
startTime: new Date(),
status: 'running',
recordsProcessed: 0
}
// Log start
await db.pipelineRuns.create({ data: run })
try {
const result = await pipeline()
// Log success
run.endTime = new Date()
run.status = 'success'
await db.pipelineRuns.update({
where: { id: run.id },
data: run
})
return result
} catch (error) {
// Log failure
run.endTime = new Date()
run.status = 'failed'
run.errorMessage = error.message
await db.pipelineRuns.update({
where: { id: run.id },
data: run
})
// Alert team
await sendAlert({
channel: '#data-engineering',
message: `Pipeline ${name} failed: ${error.message}`
})
throw error
}
}
// Usage
await trackPipeline('daily-customer-sync', async () => {
return await syncCustomers()
})
Common Patterns
Pattern: Upsert with Conflict Resolution
export async function upsertWithConflict(record: any) {
const existing = await db.customers.findUnique({
where: { id: record.id }
})
if (existing) {
// Conflict resolution: Use most recent data
if (record.updatedAt > existing.updatedAt) {
await db.customers.update({
where: { id: record.id },
data: record
})
return { action: 'updated' }
} else {
return { action: 'skipped', reason: 'stale data' }
}
} else {
await db.customers.create({ data: record })
return { action: 'created' }
}
}
Pattern: Dead Letter Queue
// lib/queue/dead-letter.ts
export async function processWithRetry<T>(
item: T,
processor: (item: T) => Promise<void>,
maxRetries = 3
) {
let attempt = 0
while (attempt < maxRetries) {
try {
await processor(item)
return { success: true }
} catch (error) {
attempt++
console.error(`Attempt ${attempt} failed:`, error)
if (attempt >= maxRetries) {
// Move to dead letter queue
await db.deadLetterQueue.create({
data: {
item: JSON.stringify(item),
error: error.message,
attempts: attempt,
queuedAt: new Date()
}
})
return { success: false, deadLettered: true }
}
// Exponential backoff
await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, attempt)))
}
}
}
When to Use Me
Perfect for:
- Building ETL pipelines
- Data migration projects
- Analytics infrastructure
- Data quality improvement
- Real-time data processing
I'll help you:
- Design data pipelines
- Transform and clean data
- Build data warehouses
- Optimize database queries
- Monitor data quality
What I'll Create
š ETL Pipelines
š Data Transformations
šļø Data Warehouses
ā
Data Quality Checks
ā” Real-Time Processors
š Analytics Infrastructure
Let's build robust, scalable data infrastructure!
You Might Also Like
Related Skills

zig-system-calls
Guides using bun.sys for system calls and file I/O in Zig. Use when implementing file operations instead of std.fs or std.posix.
oven-sh
bun-file-io
Use this when you are working on file operations like reading, writing, scanning, or deleting files. It summarizes the preferred file APIs and patterns used in this repo. It also notes when to use filesystem helpers for directories.
anomalyco
vector-index-tuning
Optimize vector index performance for latency, recall, and memory. Use when tuning HNSW parameters, selecting quantization strategies, or scaling vector search infrastructure.
wshobson
similarity-search-patterns
Implement efficient similarity search with vector databases. Use when building semantic search, implementing nearest neighbor queries, or optimizing retrieval performance.
wshobson
dbt-transformation-patterns
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
wshobson
event-store-design
Design and implement event stores for event-sourced systems. Use when building event sourcing infrastructure, choosing event store technologies, or implementing event persistence patterns.
wshobson