我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Node.js中如何解决“背压”问题

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Node.js中如何解决“背压”问题

Node.js中如何解决“背压”问题,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Node.js的4种stream是什么?什么是背压问题?把一个东西从 A 搬到 B 该怎么搬呢?

抬起来,移动到目的地,放下不就行了么。

那如果这个东西有一吨重呢?

那就一部分一部分的搬。

其实 IO 也就是搬东西,包括网络的 IO、文件的 IO,如果数据量少,那么直接传送全部内容就行了,但如果内容特别多,一次性加载到内存会崩溃,而且速度也慢,这时候就可以一部分一部分的处理,这就是流的思想。

各种语言基本都实现了 stream 的 api,Node.js 也是,stream api 是比较常用的

流的直观感受

从一个地方流到另一个地方,显然有流出的一方和流入的一方,流出的一方就是可读流(readable),而流入的一方就是可写流(writable)。

Node.js中如何解决“背压”问题

当然,也有的流既可以流入又可以流出,这种叫做双工流(duplex)

Node.js中如何解决“背压”问题

既然可以流入又可以流出,那么是不是可以对流入的内容做下转换再流出呢,这种流叫做转换流(transform)

Node.js中如何解决“背压”问题

duplex 流的流入和流出内容不需要相关,而 transform 流的流入和流出是相关的,这是两者的区别。

流的 api

Node.js 提供的 stream 就是上面介绍的那 4 种:

const stream = require('stream');

// 可读流
const Readable = stream.Readable;
// 可写流
const Writable = stream.Writable;
// 双工流
const Duplex = stream.Duplex;
// 转换流
const Transform = stream.Transform;

它们都有要实现的方法:

  • Readable 需要实现 _read 方法来返回内容

  • Writable 需要实现 _write 方法来接受内容

  • Duplex 需要实现 _read 和 _write 方法来接受和返回内容

  • Transform 需要实现 _transform 方法来把接受的内容转换之后返回

我们分别来看一下:

Readable

Readable 要实现 _read 方法,通过 push 返回具体的数据。

const Stream = require('stream');
const readableStream = Stream.Readable();
readableStream._read = function() {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

当 push 一个 null 时,就代表结束流。

执行效果如下:

Node.js中如何解决“背压”问题

创建 Readable 也可以通过继承的方式:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor() {
        super();
    }
    _read() {
        this.push('阿门阿前一棵葡萄树,');
        this.push('阿东阿东绿的刚发芽,');
        this.push('阿东背着那重重的的壳呀,');
        this.push('一步一步地往上爬。')
        this.push(null);
    }
}

const readableStream = new ReadableDong();

readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

可读流是生成内容的,那么很自然可以和生成器结合:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator;
    }
    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }
}
function *songGenerator() {
    yield '阿门阿前一棵葡萄树,';
    yield '阿东阿东绿的刚发芽,';
    yield '阿东背着那重重的的壳呀,';
    yield '一步一步地往上爬。';
}
const songIterator = songGenerator();
const readableStream = new ReadableDong(songIterator);
readableStream.on('data', (data)=> {
    console.log(data.toString())
});
readableStream.on('end', () => {
    console.log('done~');
});

这就是可读流,通过实现 _read 方法来返回内容。

Writable

Writable 要实现 _write 方法,接收写入的内容。

const Stream = require('stream');
const writableStream = Stream.Writable();
writableStream._write = function (data, enc, next) {
   console.log(data.toString());
   // 每秒写一次
   setTimeout(() => {
       next();
   }, 1000);
}
writableStream.on('finish', () => console.log('done~'));
writableStream.write('阿门阿前一棵葡萄树,');
writableStream.write('阿东阿东绿的刚发芽,');
writableStream.write('阿东背着那重重的的壳呀,');
writableStream.write('一步一步地往上爬。');
writableStream.end();

接收写入的内容,打印出来,并且调用 next 来处理下一个写入的内容,这里调用 next 是异步的,可以控制频率。

跑了一下,确实可以正常的处理写入的内容:

Node.js中如何解决“背压”问题

这就是可写流,通过实现 _write 方法来处理写入的内容。

Duplex

Duplex 是可读可写,同时实现 _read 和 _write 就可以了

const Stream = require('stream');
var duplexStream = Stream.Duplex();
duplexStream._read = function () {
    this.push('阿门阿前一棵葡萄树,');
    this.push('阿东阿东绿的刚发芽,');
    this.push('阿东背着那重重的的壳呀,');
    this.push('一步一步地往上爬。')
    this.push(null);
}
duplexStream._write = function (data, enc, next) {
    console.log(data.toString());
    next();
}
duplexStream.on('data', data => console.log(data.toString()));
duplexStream.on('end', data => console.log('read done~'));
duplexStream.write('阿门阿前一棵葡萄树,');
duplexStream.write('阿东阿东绿的刚发芽,');
duplexStream.write('阿东背着那重重的的壳呀,');
duplexStream.write('一步一步地往上爬。');
duplexStream.end();
duplexStream.on('finish', data => console.log('write done~'));

整合了 Readable 流和 Writable 流的功能,这就是双工流 Duplex。

Node.js中如何解决“背压”问题

Transform

Duplex 流虽然可读可写,但是两者之间没啥关联,而有的时候需要对流入的内容做转换之后流出,这时候就需要转换流 Transform。

Transform 流要实现 _transform 的 api,我们实现下对内容做反转的转换流:

const Stream = require('stream');
class TransformReverse extends Stream.Transform {
  constructor() {
    super()
  }
  _transform(buf, enc, next) {
    const res = buf.toString().split('').reverse().join('');
    this.push(res)
    next()
  }
}
var transformStream = new TransformReverse();
transformStream.on('data', data => console.log(data.toString()))
transformStream.on('end', data => console.log('read done~'));
transformStream.write('阿门阿前一棵葡萄树');
transformStream.write('阿东阿东绿的刚发芽');
transformStream.write('阿东背着那重重的的壳呀');
transformStream.write('一步一步地往上爬');
transformStream.end()
transformStream.on('finish', data => console.log('write done~'));

跑了一下,效果如下:

Node.js中如何解决“背压”问题

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

Node.js中如何解决“背压”问题

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做“背压”,或者“负压”。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

Node.js中如何解决“背压”问题

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) != null) {
    console.log(data.toString());
}

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

  • true: 数据已经写入目标

  • false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(class="lazy" data-src);
const ws = fs.createWriteStream(dst);
rs.on('data', function (chunk) {
    if (ws.write(chunk) === false) {
        rs.pause();
    }
});
rs.on('end', function () {
    ws.end();
});
ws.on('drain', function () {
    rs.resume();
});

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(class="lazy" data-src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现“背压”现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注编程网行业资讯频道,感谢您对编程网的支持。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Node.js中如何解决“背压”问题

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Flow如何解决背压问题的方法详解

这篇文章主要为大家介绍了Flow如何解决背压问题的方法详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-11-16

node.js中如何解决跨域问题

这篇文章主要讲解了“node.js中如何解决跨域问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“node.js中如何解决跨域问题”吧!如果用Express框架的话可以使用cors包来解决跨
2023-06-17

php压缩中文乱码问题如何解决

这篇“php压缩中文乱码问题如何解决”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“php压缩中文乱码问题如何解决”文章吧。p
2023-07-04

Node.js中下包速度慢问题如何解决

这篇文章主要介绍“Node.js中下包速度慢问题如何解决”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Node.js中下包速度慢问题如何解决”文章能帮助大家解决问题。切换npm下包镜像源查看当前下包
2023-07-06

如何解决Python复杂zip文件的解压问题

这篇文章主要介绍“如何解决Python复杂zip文件的解压问题”,在日常操作中,相信很多人在如何解决Python复杂zip文件的解压问题问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何解决Python复杂z
2023-06-25

云服务器中如何解决linux下zip文件解压乱码问题

这篇文章给大家介绍云服务器中如何解决linux下zip文件解压乱码问题,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。云服务器:解决linux下zip文件解压乱码问题由于zip格式并没有指定编码格式,Windows下生成
2023-06-05

解压版MYSQL中文乱码问题解决方案

安装的是解压版的MYSQL 1:解压之后copy 一个my.ini文件 然后添加字节编码配置:[client] default-character-set=gbk [mysqld] character-set-server=utf8指定数据
2022-05-29

如何在Python中处理文件压缩和解压缩的问题

如何在Python中处理文件压缩和解压缩的问题简介:在日常开发和工作中,我们经常会遇到需要处理文件压缩和解压缩的问题。Python提供了一些强大的库,使得我们可以轻松地处理文件的压缩和解压缩操作。本文将介绍Python中常用的文件压缩和解压
2023-10-22

如何解决Java中文问题

这篇文章主要为大家展示了“如何解决Java中文问题”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何解决Java中文问题”这篇文章吧。我来说一下tomcat如何实现JSP的你就明白了。预备知识:
2023-06-03

vue中如何解决qs问题

这篇文章主要介绍“vue中如何解决qs问题”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“vue中如何解决qs问题”文章能帮助大家解决问题。什么是查询字符串查询字符串是一个包含在URL中的参数列表,用
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录