我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Netty分布式flush方法刷新buffer队列源码分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Netty分布式flush方法刷新buffer队列源码分析

本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

flush方法

flush方法通过事件传递, 最终会传递到HeadContext的flush方法:

public void flush(ChannelHandlerContext ctx) throws Exception {    unsafe.flush();}

这里最终会调用AbstractUnsafe的flush方法

public final void flush() {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }    outboundBuffer.addFlush();    flush0();}

这里首先也是拿到ChannelOutboundBuffer对象

然后我们看这一步:

outboundBuffer.addFlush();

这一步同样也是调整ChannelOutboundBuffer的指针

跟进addFlush方法

public void addFlush() {    Entry entry = unflushedEntry;    if (entry != null) {         if (flushedEntry == null) {            flushedEntry = entry;        }        do {            flushed ++;            if (!entry.promise.setUncancellable()) {                int pending = entry.cancel();                decrementPendingOutboundBytes(pending, false, true);            }            entry = entry.next;        } while (entry != null);        unflushedEntry = null;    }}

首先声明一个entry指向unflushedEntry, 也就是第一个未flush的entry

通常情况下unflushedEntry是不为空的, 所以进入if

再未刷新前flushedEntry通常为空, 所以会执行到flushedEntry = entry

也就是flushedEntry指向entry

经过上述操作, 缓冲区的指针情况如图所示:

Netty分布式flush方法刷新buffer队列源码分析

7-4-1

然后通过do-while将, 不断寻找unflushedEntry后面的节点, 直到没有节点为止

flushed自增代表需要刷新多少个节点

循环中我们关注这一步

decrementPendingOutboundBytes(pending, false, true);

这一步也是统计缓冲区中的字节数, 但是是和上一小节的incrementPendingOutboundBytes正好是相反, 因为这里是刷新, 所以这里要减掉刷新后的字节数,

我们跟到方法中:

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {    if (size == 0) {        return;    }    //从总的大小减去    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);    //直到减到小于某一个阈值32个字节    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {        //设置写状态        setWritable(invokeLater);    }}

同样TOTAL_PENDING_SIZE_UPDATER代表缓冲区的字节数, 这里的addAndGet中参数是-size, 也就是减掉size的长度

再看 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) 

getWriteBufferLowWaterMark()代表写buffer的第水位值, 也就是32k, 如果写buffer的长度小于这个数, 就通过setWritable方法设置写状态

也就是通道由原来的不可写改成可写

回到addFlush方法

遍历do-while循环结束之后, 将unflushedEntry指为空, 代表所有的entry都是可写的

经过上述操作, 缓冲区的指针情况如下图所示:

Netty分布式flush方法刷新buffer队列源码分析

7-4-2

回到AbstractUnsafe的flush方法

指针调整完之后, 我们跟到flush0()方法中:

protected void flush0() {    if (inFlush0) {        return;    }    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null || outboundBuffer.isEmpty()) {        return;    }    inFlush0 = true;    if (!isActive()) {        try {            if (isOpen()) {                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);            } else {                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);            }        } finally {            inFlush0 = false;        }        return;    }    try {        doWrite(outboundBuffer);    } catch (Throwable t) {        if (t instanceof IOException && config().isAutoClose()) {            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);        } else {            outboundBuffer.failFlushed(t, true);        }    } finally {        inFlush0 = false;    }}

 if (inFlush0) 表示判断当前flush是否在进行中, 如果在进行中, 则返回, 避免重复进入

我们重点关注doWrite方法

跟到AbstractNioByteChannel的doWrite方法中去:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {    int writeSpinCount = -1;    boolean setOpWrite = false;    for (;;) {        //每次拿到当前节点        Object msg = in.current();        if (msg == null) {            clearOpWrite();            return;        }        if (msg instanceof ByteBuf) {            //转化成ByteBuf            ByteBuf buf = (ByteBuf) msg;            //如果没有可写的值            int readableBytes = buf.readableBytes();            if (readableBytes == 0) {                //移除                in.remove();                continue;            }             boolean done = false;            long flushedAmount = 0;            if (writeSpinCount == -1) {                writeSpinCount = config().getWriteSpinCount();            }            for (int i = writeSpinCount - 1; i >= 0; i --) {                //将buf写入到socket里面                //localFlushedAmount代表向jdk底层写了多少字节                int localFlushedAmount = doWriteBytes(buf);                //如果一个字节没写, 直接break                if (localFlushedAmount == 0) {                    setOpWrite = true;                    break;                }                //统计总共写了多少字节                flushedAmount += localFlushedAmount;                //如果buffer全部写到jdk底层                if (!buf.isReadable()) {                    //标记全写道                    done = true;                    break;                }            }            in.progress(flushedAmount);            if (done) {                //移除当前对象                in.remove();            } else {                break;            }        } else if (msg instanceof FileRegion) {            //代码省略        } else {            throw new Error();        }    }    incompleteWrite(setOpWrite);}

首先是一个无限for循环

 Object msg = in.current() 这一步是拿到flushedEntry指向的entry中的msg

跟到current()方法中

public Object current() {     Entry entry = flushedEntry;    if (entry == null) {        return null;    }    return entry.msg;}

这里直接拿到flushedEntry指向的entry中关联的msg, 也就是一个ByteBuf

回到doWrite方法:

如果msg为null, 说明没有可以刷新的entry, 则调用clearOpWrite()方法清除写标识

如果msg不为null, 则会判断是否是ByteBuf类型, 如果是ByteBuf, 就进入if块中的逻辑

if块中首先将msg转化为ByteBuf, 然后判断ByteBuf是否可读, 如果不可读, 则通过in.remove()将当前的byteBuf所关联的entry移除, 然后跳过这次循环进入下次循环

remove方法稍后分析, 这里我们先继续往下看

 boolean done = false 这里设置一个标识, 标识刷新操作是否执行完成, 这里默认值为false代表走到这里没有执行完成

 writeSpinCount = config().getWriteSpinCount() 这里是获得一个写操作的循环次数, 默认是16

然后根据这个循环次数, 进行循环的写操作

在循环中, 关注这一步:

int localFlushedAmount = doWriteBytes(buf);

这一步就是将buf的内容写到channel中, 并返回写的字节数, 这里会调用NioSocketChannel的doWriteBytes

我们跟到doWriteBytes方法中:

protected int doWriteBytes(ByteBuf buf) throws Exception {     final int expectedWrittenBytes = buf.readableBytes();    return buf.readBytes(javaChannel(), expectedWrittenBytes);}

这里首先拿到buf的可读字节数, 然后通过readBytes将可读字节写入到jdk底层的channel中

回到doWrite方法:

将内容写的jdk底层的channel之后, 如果一个字节都没写, 说明现在channel可能不可写, 将setOpWrite设置为true, 用于标识写操作位, 并退出循环

如果已经写出字节, 则通过 flushedAmount += localFlushedAmount 累加写出的字节数

然后根据是buf是否没有可读字节数判断是否buf的数据已经写完, 如果写完, 将done设置为true, 说明写操作完成, 并退出循环

因为有时候不一定一次就能将byteBuf所有的字节写完, 所以这里会继续通过循环进行写出, 直到循环到16次

如果ByteBuf内容完全写完, 会通过in.remove()将当前entry移除掉

我们跟到remove方法中:

public boolean remove() {    //拿到当前第一个flush的entry    Entry e = flushedEntry;    if (e == null) {        clearNioBuffers();        return false;    }    Object msg = e.msg;    ChannelPromise promise = e.promise;    int size = e.pendingSize;    removeEntry(e);    if (!e.cancelled) {        ReferenceCountUtil.safeRelease(msg);        safeSuccess(promise);        decrementPendingOutboundBytes(size, false, true);    }    e.recycle();    return true;}

首先拿到当前的flushedEntry

我们重点关注removeEntry这步, 跟进去:

private void removeEntry(Entry e) {     if (-- flushed == 0) {        //位置为空        flushedEntry = null;        //如果是最后一个节点        if (e == tailEntry) {            //全部设置为空            tailEntry = null;            unflushedEntry = null;        }    } else {        //移动到下一个节点        flushedEntry = e.next;    }}

 if (-- flushed == 0) 表示当前节点是否为需要刷新的最后一个节点, 如果是, 则flushedEntry指针设置为空

如果当前节点是tailEntry节点, 说明当前节点是最后一个节点, 将tailEntry和unflushedEntry两个指针全部设置为空

如果当前节点不是需要刷新的最后的一个节点, 则通过 flushedEntry = e.nex t这步将flushedEntry指针移动到下一个节点

读到这里,这篇“Netty分布式flush方法刷新buffer队列源码分析”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Netty分布式flush方法刷新buffer队列源码分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Netty分布式flush方法刷新buffer队列源码分析

本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入
2023-06-29

Netty分布式NioEventLoop任务队列执行的方法

这篇文章主要介绍“Netty分布式NioEventLoop任务队列执行的方法”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Netty分布式NioEventLoop任务队列执行的方法”文章能帮助大家解
2023-06-29

laravel源码分析队列Queue方法怎么用

本篇内容介绍了“laravel源码分析队列Queue方法怎么用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!队列任务的创建先通过命令创建一个
2023-06-29

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录