stream 流

stream(流)是NodeJS中处理流式数据的核心模块,它提供了一种高效处理大量数据的方式。流的核心思想是将数据分成小块逐步处理,而不是一次性将所有数据加载到内存中,这使得流非常适合处理大文件、网络通信、实时数据等场景。

流的基本概念

NodeJS中的流分为四种基本类型。

类型 说明 常见场景
Readable 可读流,数据的来源 文件读取、HTTP请求体
Writable 可写流,数据的目标 文件写入、HTTP响应体
Duplex 双工流,可读可写 TCP Socket
Transform 转换流,在读写过程中可修改数据 压缩、加密

流的核心优势在于:

  1. 内存效率:不需要一次性将所有数据加载到内存
  2. 时间效率:可以在接收到数据时立即开始处理,无需等待全部数据到达
  3. 组合性:可以通过管道(pipe)将多个流连接起来

可读流 Readable

可读流是数据的来源,最常见的可读流包括fs.createReadStream()创建的文件读取流、process.stdin标准输入流、HTTP请求体等。

读取模式

可读流有两种读取模式:

  1. 流动模式(flowing mode):数据自动从底层系统读取,并通过事件尽快提供给应用程序
  2. 暂停模式(paused mode):必须显式调用read()方法来读取数据块

事件驱动方式读取

下面例子演示了使用事件驱动方式读取文件内容。

import { createReadStream } from "node:fs";

const readable = createReadStream("./test.txt", { encoding: "utf-8" });

readable.on("data", (chunk) => {
  console.log("收到数据块:", chunk.length, "字节");
  console.log(chunk);
});

readable.on("end", () => {
  console.log("数据读取完毕");
});

readable.on("error", (err) => {
  console.error("读取出错:", err.message);
});

监听data事件会使流自动切换到流动模式,如果需要控制读取速度,可以使用pause()resume()方法。

import { createReadStream } from "node:fs";

const readable = createReadStream("./test.txt", { encoding: "utf-8" });

readable.on("data", (chunk) => {
  console.log("收到数据块:", chunk.length, "字节");
  readable.pause();
  console.log("暂停读取...");
  setTimeout(() => {
    console.log("恢复读取");
    readable.resume();
  }, 1000);
});

readable.on("end", () => {
  console.log("数据读取完毕");
});

暂停模式手动读取

在暂停模式下,需要监听readable事件并手动调用read()方法。

import { createReadStream } from "node:fs";

const readable = createReadStream("./test.txt", { encoding: "utf-8" });

readable.on("readable", () => {
  let chunk;
  while ((chunk = readable.read()) !== null) {
    console.log("手动读取数据块:", chunk.length, "字节");
  }
});

readable.on("end", () => {
  console.log("数据读取完毕");
});

异步迭代器方式

NodeJS中的可读流实现了异步迭代器协议,可以使用for await...of语法进行读取。

import { createReadStream } from "node:fs";

const readable = createReadStream("./test.txt", { encoding: "utf-8" });

try {
  for await (const chunk of readable) {
    console.log("收到数据块:", chunk.length, "字节");
  }
  console.log("数据读取完毕");
} catch (err) {
  console.error("读取出错:", err.message);
}

可写流 Writable

可写流是数据的目标,常见的可写流包括fs.createWriteStream()创建的文件写入流、process.stdout标准输出流、HTTP响应体等。

基本写入操作

import { createWriteStream } from "node:fs";

const writable = createWriteStream("./output.txt");

writable.write("第一行内容\n");
writable.write("第二行内容\n");
writable.write("第三行内容\n");

writable.end("最后一行内容");

writable.on("finish", () => {
  console.log("写入完成");
});

writable.on("error", (err) => {
  console.error("写入出错:", err.message);
});

write()方法返回一个布尔值,表示数据是否已经被完全处理。如果返回false,说明内部缓冲区已满,此时应该暂停写入并等待drain事件。

import { createWriteStream } from "node:fs";

const writable = createWriteStream("./output.txt");

const writeData = (data) => {
  const canContinue = writable.write(data);
  if (!canContinue) {
    console.log("缓冲区已满,等待drain事件...");
    writable.once("drain", () => {
      console.log("缓冲区已清空,可以继续写入");
    });
  }
  return canContinue;
};

for (let i = 0; i < 100000; i++) {
  writeData(`这是第${i}行数据\n`);
}

writable.end();

管道 pipe

管道是连接可读流和可写流的便捷方式,它会自动处理数据流动和背压问题。

管道基本使用

下面例子使用管道实现了文件复制功能。

import { createReadStream, createWriteStream } from "node:fs";

const readable = createReadStream("./source.txt");
const writable = createWriteStream("./dest.txt");

readable.pipe(writable);

writable.on("finish", () => {
  console.log("文件复制完成");
});

管线

多个流可以通过stream.pipeline()连接起来。

import { createReadStream, createWriteStream } from "node:fs";
import { createGzip } from "node:zlib";
import { pipeline } from "node:stream/promises";

const readable = createReadStream("./source.txt");
const gzip = createGzip();
const writable = createWriteStream("./source.txt.gz");

try {
  await pipeline(readable, gzip, writable);
  console.log("压缩完成");
} catch (err) {
  console.error("管道处理出错:", err.message);
}

双工流 Duplex

双工流同时实现了可读和可写接口,最典型的例子是TCP Socket,下面例子演示了使用双工流的场景。

import { Duplex } from "node:stream";

const duplex = new Duplex({
  read(size) {
    const data = this.dataQueue.shift();
    if (data) {
      this.push(data);
    } else {
      this.push(null);
    }
  },
  write(chunk, encoding, callback) {
    console.log("写入:", chunk.toString());
    this.dataQueue.push(chunk.toString().toUpperCase());
    callback();
  },
});

duplex.dataQueue = [];

duplex.write("hello");
duplex.write("world");
duplex.end();

duplex.on("data", (chunk) => {
  console.log("读取:", chunk.toString());
});

转换流 Transform

转换流是一种特殊的双工流,它在读写过程中可以修改或转换数据。zlib模块中的压缩流、crypto模块中的加密流都是转换流的典型应用。

自定义转换流

下面例子实现了一个最简单的转换流,将字符串中的字母转为大写。

import { Transform } from "node:stream";

const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    const upperCased = chunk.toString().toUpperCase();
    callback(null, upperCased);
  },
});

process.stdin.pipe(upperCaseTransform).pipe(process.stdout);

对象模式

默认情况下,流处理的是Buffer或字符串数据。实际上,通过设置objectMode: true,流可以处理任意JavaScript对象。

import { Readable, Writable } from "node:stream";

const readable = new Readable({
  objectMode: true,
  read() {},
});

const writable = new Writable({
  objectMode: true,
  write(chunk, encoding, callback) {
    console.log("收到对象:", chunk);
    callback();
  },
});

readable.pipe(writable);

readable.push({ id: 1, name: "Alice" });
readable.push({ id: 2, name: "Bob" });
readable.push(null);

流的实用工具

node:stream模块提供了一些实用的工具函数。

Readable.from

从可迭代对象创建可读流。

import { Readable } from "node:stream";

const iterable = ["hello", "world", "nodejs"];
const readable = Readable.from(iterable);

readable.on("data", (chunk) => {
  console.log(chunk);
});

stream.finished

stream.finished()是等待流结束的Promise版本。

import { createReadStream } from "node:fs";
import { finished } from "node:stream/promises";

const readable = createReadStream("./test.txt");

readable.on("data", (chunk) => {
  console.log("收到数据:", chunk.length, "字节");
});

try {
  await finished(readable);
  console.log("流处理完成");
} catch (err) {
  console.error("流处理出错:", err.message);
}
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。