什么是 Stream
边收边处理的"数据管道"——不用把整个文件 / 响应加载到内存。
适合:
- 大文件读写(GB 级)
- HTTP 响应流式发送
- 实时日志处理
- 解压 / 加密 / 转码等管道
4 种 Stream
| 类型 | 含义 | 例 |
|---|---|---|
| Readable | 可读 | fs.createReadStream / process.stdin |
| Writable | 可写 | fs.createWriteStream / process.stdout |
| Duplex | 双向 | TCP socket |
| Transform | 转换(边读边写) | zlib / crypto |
基本用法:pipe
import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
import { createGzip } from 'zlib';
// 旧式(出错麻烦)
createReadStream('input.txt')
.pipe(createGzip())
.pipe(createWriteStream('output.txt.gz'));
// 新式(推荐 - 自动错误处理)
await pipeline(
createReadStream('input.txt'),
createGzip(),
createWriteStream('output.txt.gz'),
);
pipeline:
- 任一阶段出错都被捕获
- 自动关闭所有流
- 返回 Promise
读流:手动消费
import { createReadStream } from 'fs';
const stream = createReadStream('big.txt', { encoding: 'utf8' });
// 事件方式
stream.on('data', chunk => console.log(chunk));
stream.on('end', () => console.log('done'));
stream.on('error', err => console.error(err));
// 现代:for await
for await (const chunk of createReadStream('big.txt', 'utf8')) {
console.log(chunk);
}
写流
import { createWriteStream } from 'fs';
const out = createWriteStream('output.txt');
out.write('first line\n');
out.write('second line\n');
out.end(); // 结束
out.on('finish', () => console.log('Written'));
记得 end() —— 否则数据不刷盘。
Transform:自定义转换
import { Transform } from 'stream';
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}
await pipeline(
createReadStream('input.txt'),
new UppercaseTransform(),
createWriteStream('output.txt'),
);
实战:大文件按行处理
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
const rl = createInterface({
input: createReadStream('big-log.txt'),
crlfDelay: Infinity,
});
let errorCount = 0;
for await (const line of rl) {
if (line.includes('ERROR')) errorCount++;
}
console.log(`Errors: ${errorCount}`);
实战:HTTP 响应流式
import { createReadStream } from 'fs';
import http from 'http';
http.createServer((req, res) => {
const stream = createReadStream('big-file.zip');
res.setHeader('Content-Type', 'application/zip');
stream.pipe(res); // 边读边发,不占内存
}).listen(8080);
实战:从网络下载边压缩边存
import { pipeline } from 'stream/promises';
import { createWriteStream } from 'fs';
import { createGzip } from 'zlib';
const response = await fetch('https://example.com/big-data.json');
await pipeline(
response.body, // ReadableStream(Web 标准)
createGzip(), // Transform
createWriteStream('data.json.gz'),
);
Web Streams(现代标准)
Node 18+ 支持 Web Streams API(与浏览器一致):
import { ReadableStream } from 'stream/web';
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello ');
controller.enqueue('world');
controller.close();
},
});
for await (const chunk of rs) {
console.log(chunk);
}
转换:
import { Readable } from 'stream';
const nodeStream = Readable.fromWeb(webReadable);
const webStream = Readable.toWeb(nodeStream);
背压(backpressure)
写流跟不上读流 → 数据堆积内存:
// ❌ 不检查 write 返回值
for (const chunk of bigData) {
writable.write(chunk); // 可能堆爆
}
// ✓ 检查 + 等
for (const chunk of bigData) {
if (!writable.write(chunk)) {
await new Promise(r => writable.once('drain', r));
}
}
用 pipeline 自动处理背压 —— 推荐永远用 pipeline。
坑
- 流的错误必须监听——不监听 + 出错 = 进程崩
pipeline比.pipe()强——错误处理完整- 大文件不要 readFile——内存爆
- Node Streams 和 Web Streams 是两套——互转用
Readable.fromWeb/toWeb
下一篇:EventEmitter。