什么是 Stream

边收边处理的"数据管道"——不用把整个文件 / 响应加载到内存。

适合:

  • 大文件读写(GB 级)
  • HTTP 响应流式发送
  • 实时日志处理
  • 解压 / 加密 / 转码等管道

4 种 Stream

类型 含义
Readable 可读 fs.createReadStream / process.stdin
Writable 可写 fs.createWriteStream / process.stdout
Duplex 双向 TCP socket
Transform 转换(边读边写) zlib / crypto

基本用法:pipe

import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';

// 旧式(出错麻烦)
createReadStream('input.txt')
    .pipe(createGzip())
    .pipe(createWriteStream('output.txt.gz'));

// 新式(推荐 - 自动错误处理)
await pipeline(
    createReadStream('input.txt'),
    createGzip(),
    createWriteStream('output.txt.gz'),
);

pipeline

  • 任一阶段出错都被捕获
  • 自动关闭所有流
  • 返回 Promise

读流:手动消费

import { createReadStream } from 'fs';

const stream = createReadStream('big.txt', { encoding: 'utf8' });

// 事件方式
stream.on('data', chunk => console.log(chunk));
stream.on('end', () => console.log('done'));
stream.on('error', err => console.error(err));

// 现代:for await
for await (const chunk of createReadStream('big.txt', 'utf8')) {
    console.log(chunk);
}

写流

import { createWriteStream } from 'fs';

const out = createWriteStream('output.txt');
out.write('first line\n');
out.write('second line\n');
out.end();                              // 结束

out.on('finish', () => console.log('Written'));

记得 end() —— 否则数据不刷盘。

Transform:自定义转换

import { Transform } from 'stream';

class UppercaseTransform extends Transform {
    _transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}

await pipeline(
    createReadStream('input.txt'),
    new UppercaseTransform(),
    createWriteStream('output.txt'),
);

实战:大文件按行处理

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

const rl = createInterface({
    input: createReadStream('big-log.txt'),
    crlfDelay: Infinity,
});

let errorCount = 0;
for await (const line of rl) {
    if (line.includes('ERROR')) errorCount++;
}
console.log(`Errors: ${errorCount}`);

实战:HTTP 响应流式

import { createReadStream } from 'fs';
import http from 'http';

http.createServer((req, res) => {
    const stream = createReadStream('big-file.zip');
    res.setHeader('Content-Type', 'application/zip');
    stream.pipe(res);    // 边读边发,不占内存
}).listen(8080);

实战:从网络下载边压缩边存

import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
import { createGzip } from 'zlib';

const response = await fetch('https://example.com/big-data.json');

await pipeline(
    response.body,                   // ReadableStream(Web 标准)
    createGzip(),                     // Transform
    createWriteStream('data.json.gz'),
);

Web Streams(现代标准)

Node 18+ 支持 Web Streams API(与浏览器一致):

import { ReadableStream } from 'stream/web';

const rs = new ReadableStream({
    start(controller) {
        controller.enqueue('hello ');
        controller.enqueue('world');
        controller.close();
    },
});

for await (const chunk of rs) {
    console.log(chunk);
}

转换:

import { Readable } from 'stream';
const nodeStream = Readable.fromWeb(webReadable);
const webStream = Readable.toWeb(nodeStream);

背压(backpressure)

写流跟不上读流 → 数据堆积内存:

// ❌ 不检查 write 返回值
for (const chunk of bigData) {
    writable.write(chunk);              // 可能堆爆
}

// ✓ 检查 + 等
for (const chunk of bigData) {
    if (!writable.write(chunk)) {
        await new Promise(r => writable.once('drain', r));
    }
}

pipeline 自动处理背压 —— 推荐永远用 pipeline。

  • 流的错误必须监听——不监听 + 出错 = 进程崩
  • pipeline.pipe() 强——错误处理完整
  • 大文件不要 readFile——内存爆
  • Node Streams 和 Web Streams 是两套——互转用 Readable.fromWeb / toWeb

下一篇:EventEmitter。