streams

streams

Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

1星標
0分支
更新於 1/7/2026
SKILL.md
readonlyread-only
name
streams
description

Master Node.js streams for memory-efficient processing of large datasets, real-time data handling, and building data pipelines

version
"2.1.0"

Node.js Streams Skill

Master streams for memory-efficient processing of large files, real-time data, and building composable data pipelines.

Quick Start

Streams in 4 types:

  1. Readable - Source of data (file, HTTP request)
  2. Writable - Destination (file, HTTP response)
  3. Transform - Modify data in transit
  4. Duplex - Both readable and writable

Core Concepts

Readable Stream

const fs = require('fs');

// Create readable stream
const readStream = fs.createReadStream('large-file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Event-based consumption
readStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes`);
});

readStream.on('end', () => {
  console.log('Finished reading');
});

readStream.on('error', (err) => {
  console.error('Read error:', err);
});

Writable Stream

const writeStream = fs.createWriteStream('output.txt');

// Write data
writeStream.write('Hello, ');
writeStream.write('World!\n');
writeStream.end(); // Signal end

// Handle backpressure
const ok = writeStream.write(data);
if (!ok) {
  // Wait for drain event before writing more
  writeStream.once('drain', () => {
    continueWriting();
  });
}

Transform Stream

const { Transform } = require('stream');

// Custom transform: uppercase text
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// Usage
fs.createReadStream('input.txt')
  .pipe(upperCase)
  .pipe(fs.createWriteStream('output.txt'));

Learning Path

Beginner (1-2 weeks)

  • ✅ Understand stream types
  • ✅ Read/write file streams
  • ✅ Basic pipe operations
  • ✅ Handle stream events

Intermediate (3-4 weeks)

  • ✅ Transform streams
  • ✅ Backpressure handling
  • ✅ Object mode streams
  • ✅ Pipeline utility

Advanced (5-6 weeks)

  • ✅ Custom stream implementation
  • ✅ Async iterators
  • ✅ Web Streams API
  • ✅ Performance optimization

Pipeline (Recommended)

const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Compose streams with error handling
async function compressFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log('Compression complete');
}

// With transform
await pipeline(
  fs.createReadStream('data.csv'),
  csvParser(),
  transformRow(),
  jsonStringify(),
  fs.createWriteStream('data.json')
);

Pipeline with Error Handling

const { pipeline } = require('stream');

pipeline(
  source,
  transform1,
  transform2,
  destination,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

HTTP Streaming

const http = require('http');
const fs = require('fs');

// Stream file as HTTP response
http.createServer((req, res) => {
  const filePath = './video.mp4';
  const stat = fs.statSync(filePath);

  res.writeHead(200, {
    'Content-Type': 'video/mp4',
    'Content-Length': stat.size
  });

  // Stream instead of loading entire file
  fs.createReadStream(filePath).pipe(res);
}).listen(3000);

// Stream HTTP request body
http.createServer((req, res) => {
  const writeStream = fs.createWriteStream('./upload.bin');
  req.pipe(writeStream);

  req.on('end', () => {
    res.end('Upload complete');
  });
}).listen(3001);

Object Mode Streams

const { Transform } = require('stream');

const jsonParser = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk);
      this.push(obj);
      callback();
    } catch (err) {
      callback(err);
    }
  }
});

// Process objects instead of buffers
const processRecords = new Transform({
  objectMode: true,
  transform(record, encoding, callback) {
    record.processed = true;
    record.timestamp = Date.now();
    this.push(record);
    callback();
  }
});

Async Iterators

const { Readable } = require('stream');

// Create from async iterator
async function* generateData() {
  for (let i = 0; i < 100; i++) {
    yield { id: i, data: `item-${i}` };
  }
}

const stream = Readable.from(generateData(), { objectMode: true });

// Consume with for-await
async function processStream(readable) {
  for await (const chunk of readable) {
    console.log('Processing:', chunk);
  }
}

Backpressure Handling

const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', (chunk) => {
  // Check if writable can accept more data
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Pause reading until writable is ready
    readable.pause();
    writable.once('drain', () => {
      readable.resume();
    });
  }
});

// Or use pipeline (handles automatically)
pipeline(readable, writable, (err) => {
  if (err) console.error('Error:', err);
});

Custom Readable Stream

const { Readable } = require('stream');

class DatabaseStream extends Readable {
  constructor(query, options) {
    super({ ...options, objectMode: true });
    this.query = query;
    this.cursor = null;
  }

  async _read() {
    if (!this.cursor) {
      this.cursor = await db.collection('items').find(this.query).cursor();
    }

    const doc = await this.cursor.next();
    if (doc) {
      this.push(doc);
    } else {
      this.push(null); // Signal end
    }
  }
}

// Usage
const dbStream = new DatabaseStream({ status: 'active' });
for await (const item of dbStream) {
  console.log(item);
}

Unit Test Template

const { Readable, Transform } = require('stream');
const { pipeline } = require('stream/promises');

describe('Stream Processing', () => {
  it('should transform data correctly', async () => {
    const input = Readable.from(['hello', 'world']);
    const chunks = [];

    const upperCase = new Transform({
      transform(chunk, enc, cb) {
        this.push(chunk.toString().toUpperCase());
        cb();
      }
    });

    await pipeline(
      input,
      upperCase,
      async function* (source) {
        for await (const chunk of source) {
          chunks.push(chunk.toString());
        }
      }
    );

    expect(chunks).toEqual(['HELLO', 'WORLD']);
  });
});

Troubleshooting

Problem Cause Solution
Memory grows infinitely No backpressure Use pipeline or handle drain
Data loss Errors not caught Use pipeline with error callback
Slow processing Small chunk size Increase highWaterMark
Stream hangs Missing end() call Call writable.end()

When to Use

Use streams when:

  • Processing large files (GB+)
  • Real-time data processing
  • Memory-constrained environments
  • Building data pipelines
  • HTTP request/response handling

Related Skills

  • Async Programming (async patterns)
  • Performance Optimization (memory efficiency)
  • Express REST API (streaming responses)

Resources

You Might Also Like

Related Skills

coding-agent

coding-agent

179Kdev-codegen

Run Codex CLI, Claude Code, OpenCode, or Pi Coding Agent via background process for programmatic control.

openclaw avataropenclaw
獲取
add-uint-support

add-uint-support

97Kdev-codegen

Add unsigned integer (uint) type support to PyTorch operators by updating AT_DISPATCH macros. Use when adding support for uint16, uint32, uint64 types to operators, kernels, or when user mentions enabling unsigned types, barebones unsigned types, or uint support.

pytorch avatarpytorch
獲取
at-dispatch-v2

at-dispatch-v2

97Kdev-codegen

Convert PyTorch AT_DISPATCH macros to AT_DISPATCH_V2 format in ATen C++ code. Use when porting AT_DISPATCH_ALL_TYPES_AND*, AT_DISPATCH_FLOATING_TYPES*, or other dispatch macros to the new v2 API. For ATen kernel files, CUDA kernels, and native operator implementations.

pytorch avatarpytorch
獲取
skill-writer

skill-writer

97Kdev-codegen

Guide users through creating Agent Skills for Claude Code. Use when the user wants to create, write, author, or design a new Skill, or needs help with SKILL.md files, frontmatter, or skill structure.

pytorch avatarpytorch
獲取

Implements JavaScript classes in C++ using JavaScriptCore. Use when creating new JS classes with C++ bindings, prototypes, or constructors.

oven-sh avataroven-sh
獲取

Creates JavaScript classes using Bun's Zig bindings generator (.classes.ts). Use when implementing new JS APIs in Zig with JSC integration.

oven-sh avataroven-sh
獲取