stream 流
stream(流)是NodeJS中处理流式数据的核心模块,它提供了一种高效处理大量数据的方式。流的核心思想是将数据分成小块逐步处理,而不是一次性将所有数据加载到内存中,这使得流非常适合处理大文件、网络通信、实时数据等场景。
流的基本概念
NodeJS中的流分为四种基本类型。
| 类型 | 说明 | 常见场景 |
|---|---|---|
| Readable | 可读流,数据的来源 | 文件读取、HTTP请求体 |
| Writable | 可写流,数据的目标 | 文件写入、HTTP响应体 |
| Duplex | 双工流,可读可写 | TCP Socket |
| Transform | 转换流,在读写过程中可修改数据 | 压缩、加密 |
流的核心优势在于:
- 内存效率:不需要一次性将所有数据加载到内存
- 时间效率:可以在接收到数据时立即开始处理,无需等待全部数据到达
- 组合性:可以通过管道(pipe)将多个流连接起来
可读流 Readable
可读流是数据的来源,最常见的可读流包括fs.createReadStream()创建的文件读取流、process.stdin标准输入流、HTTP请求体等。
读取模式
可读流有两种读取模式:
- 流动模式(flowing mode):数据自动从底层系统读取,并通过事件尽快提供给应用程序
- 暂停模式(paused mode):必须显式调用
read()方法来读取数据块
事件驱动方式读取
下面例子演示了使用事件驱动方式读取文件内容。
import { createReadStream } from "node:fs";
const readable = createReadStream("./test.txt", { encoding: "utf-8" });
readable.on("data", (chunk) => {
console.log("收到数据块:", chunk.length, "字节");
console.log(chunk);
});
readable.on("end", () => {
console.log("数据读取完毕");
});
readable.on("error", (err) => {
console.error("读取出错:", err.message);
});
监听data事件会使流自动切换到流动模式,如果需要控制读取速度,可以使用pause()和resume()方法。
import { createReadStream } from "node:fs";
const readable = createReadStream("./test.txt", { encoding: "utf-8" });
readable.on("data", (chunk) => {
console.log("收到数据块:", chunk.length, "字节");
readable.pause();
console.log("暂停读取...");
setTimeout(() => {
console.log("恢复读取");
readable.resume();
}, 1000);
});
readable.on("end", () => {
console.log("数据读取完毕");
});
暂停模式手动读取
在暂停模式下,需要监听readable事件并手动调用read()方法。
import { createReadStream } from "node:fs";
const readable = createReadStream("./test.txt", { encoding: "utf-8" });
readable.on("readable", () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log("手动读取数据块:", chunk.length, "字节");
}
});
readable.on("end", () => {
console.log("数据读取完毕");
});
异步迭代器方式
NodeJS中的可读流实现了异步迭代器协议,可以使用for await...of语法进行读取。
import { createReadStream } from "node:fs";
const readable = createReadStream("./test.txt", { encoding: "utf-8" });
try {
for await (const chunk of readable) {
console.log("收到数据块:", chunk.length, "字节");
}
console.log("数据读取完毕");
} catch (err) {
console.error("读取出错:", err.message);
}
可写流 Writable
可写流是数据的目标,常见的可写流包括fs.createWriteStream()创建的文件写入流、process.stdout标准输出流、HTTP响应体等。
基本写入操作
import { createWriteStream } from "node:fs";
const writable = createWriteStream("./output.txt");
writable.write("第一行内容\n");
writable.write("第二行内容\n");
writable.write("第三行内容\n");
writable.end("最后一行内容");
writable.on("finish", () => {
console.log("写入完成");
});
writable.on("error", (err) => {
console.error("写入出错:", err.message);
});
write()方法返回一个布尔值,表示数据是否已经被完全处理。如果返回false,说明内部缓冲区已满,此时应该暂停写入并等待drain事件。
import { createWriteStream } from "node:fs";
const writable = createWriteStream("./output.txt");
const writeData = (data) => {
const canContinue = writable.write(data);
if (!canContinue) {
console.log("缓冲区已满,等待drain事件...");
writable.once("drain", () => {
console.log("缓冲区已清空,可以继续写入");
});
}
return canContinue;
};
for (let i = 0; i < 100000; i++) {
writeData(`这是第${i}行数据\n`);
}
writable.end();
管道 pipe
管道是连接可读流和可写流的便捷方式,它会自动处理数据流动和背压问题。
管道基本使用
下面例子使用管道实现了文件复制功能。
import { createReadStream, createWriteStream } from "node:fs";
const readable = createReadStream("./source.txt");
const writable = createWriteStream("./dest.txt");
readable.pipe(writable);
writable.on("finish", () => {
console.log("文件复制完成");
});
管线
多个流可以通过stream.pipeline()连接起来。
import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
import { pipeline } from "node:stream/promises";
const readable = createReadStream("./source.txt");
const gzip = createGzip();
const writable = createWriteStream("./source.txt.gz");
try {
await pipeline(readable, gzip, writable);
console.log("压缩完成");
} catch (err) {
console.error("管道处理出错:", err.message);
}
双工流 Duplex
双工流同时实现了可读和可写接口,最典型的例子是TCP Socket,下面例子演示了使用双工流的场景。
import { Duplex } from "node:stream";
const duplex = new Duplex({
read(size) {
const data = this.dataQueue.shift();
if (data) {
this.push(data);
} else {
this.push(null);
}
},
write(chunk, encoding, callback) {
console.log("写入:", chunk.toString());
this.dataQueue.push(chunk.toString().toUpperCase());
callback();
},
});
duplex.dataQueue = [];
duplex.write("hello");
duplex.write("world");
duplex.end();
duplex.on("data", (chunk) => {
console.log("读取:", chunk.toString());
});
转换流 Transform
转换流是一种特殊的双工流,它在读写过程中可以修改或转换数据。zlib模块中的压缩流、crypto模块中的加密流都是转换流的典型应用。
自定义转换流
下面例子实现了一个最简单的转换流,将字符串中的字母转为大写。
import { Transform } from "node:stream";
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
const upperCased = chunk.toString().toUpperCase();
callback(null, upperCased);
},
});
process.stdin.pipe(upperCaseTransform).pipe(process.stdout);
对象模式
默认情况下,流处理的是Buffer或字符串数据。实际上,通过设置objectMode: true,流可以处理任意JavaScript对象。
import { Readable, Writable } from "node:stream";
const readable = new Readable({
objectMode: true,
read() {},
});
const writable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log("收到对象:", chunk);
callback();
},
});
readable.pipe(writable);
readable.push({ id: 1, name: "Alice" });
readable.push({ id: 2, name: "Bob" });
readable.push(null);
流的实用工具
node:stream模块提供了一些实用的工具函数。
Readable.from
从可迭代对象创建可读流。
import { Readable } from "node:stream";
const iterable = ["hello", "world", "nodejs"];
const readable = Readable.from(iterable);
readable.on("data", (chunk) => {
console.log(chunk);
});
stream.finished
stream.finished()是等待流结束的Promise版本。
import { createReadStream } from "node:fs";
import { finished } from "node:stream/promises";
const readable = createReadStream("./test.txt");
readable.on("data", (chunk) => {
console.log("收到数据:", chunk.length, "字节");
});
try {
await finished(readable);
console.log("流处理完成");
} catch (err) {
console.error("流处理出错:", err.message);
}