我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Netty源码分析NioEventLoop线程的启动

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Netty源码分析NioEventLoop线程的启动

之前的小节我们学习了NioEventLoop的创建以及线程分配器的初始化, 那么NioEventLoop是如何开启的呢, 我们这一小节继续学习

NioEventLoop开启方法

NioEventLoop的开启方法在其父类SingleThreadEventExecutor中的execute(Runnable task)方法中, 我们跟到这个方法:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    //判断当前线程是不是eventLoop线程
    boolean inEventLoop = inEventLoop();
    //如果是eventLoop线程
    if (inEventLoop) {
        addTask(task);
    } else {
        //不是eventLoop线程启动线程
        startThread();
        //添加task
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

这个方法传入一个Runnble对象, 也就是一个任务

首先boolean inEventLoop = inEventLoop()方法会判断是不是NioEventLoop线程

跟进 inEventLoop()方法

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

这里inEventLoop(Thread.currentThread())方法传入了当前线程对象, 这个方法会调用当前类的inEventLoop(Thread thread)方法

跟进inEventLoop(Thread thread)方法:

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

我们看到判断的依据是当前线程对象是不是NioEventLoop绑定的线程对象, 这里我们会想到开启线程肯定会为NioEventLoop绑定一个线程对象, 如果判断当前线程对象不是当前NioEventLoop绑定的线程对象, 说明执行此方法的线程不是当前NioEventLoop线程, 那么这个线程如何初始化的, 后面我们会讲到, 我们继续看execute(Runnable task)方法:

如果是NioEventLoop线程, 则会通过addTask(task)添加任务, 通过NioEventLoop异步执行, 那么这个task是什么时候执行的, 同样后面会讲到

跟一下addTask(task)

protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    //如果添加不成功
    if (!offerTask(task)) {
        reject(task);
    }
}

这里offerTask(task)代表添加一个task, 跟进去:

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    //往taskQ中添加一个task
    return taskQueue.offer(task);
}

我们看到taskQueue.offer(task)将一个task添加到任务队列, 而这个任务队列taskQueue就是我们NioEventLoop初始化的时候与NioEventLoop唯一绑定的任务队列

回顾一下初始构造方法

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, 
                                    boolean addTaskWakesUp, int maxPendingTasks, 
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ObjectUtil.checkNotNull(executor, "executor");
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

在这里通过 taskQueue = newTaskQueue(this.maxPendingTasks) 创建了taskQueue

回到execute(Runnable task)方法中, 我们继续往下看:

如果不是NioEventLoop线程我们通过startThread()开启一个NioEventLoop线程

跟到startThread()之前, 我们先继续往下走:

开启NioEventLoop线程之后, 又通过addTask(task)往taskQueue添加任务

最后我们注意有这么一段代码:

if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
}

addTaskWakesUp代表添加task之后, NioEventLoop的select()操作是不是要唤醒, 这个属性是在初始化NioEventLoop的时候传入的, 大家可以回顾下, 默认是false, 这里!addTaskWakesUp就是需要唤醒, wakesUpForTask(task)与addTaskWakesUp意义相同, 默认是true, 可以看代码:

protected boolean wakesUpForTask(Runnable task) {
    return true;
}

这里恒为true, 所以这段代码就是添加task时需要通过wakeup(inEventLoop)唤醒, 这样NioEventLoop在做select()操作时如果正在阻塞则立刻唤醒, 然后执行任务队列的task

回到execute(Runnable task)方法中我们跟进开启线程的startThread()方法中:

private void startThread() {
    //判断线程是否启动, 未启动则启动
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            //当前线程未启动, 则启动
            doStartThread();
        }
    }
}

前面的判断是判断当前NioEventLoop线程是否启动, 如果未启动, 则通过doStartThread()方法启动, 我们第一次执行execute(Runnable task)线程是未启动的, 所以会执行doStartThread(), 后续该线程则不会再执行doStartThread()方法

我们跟进doStartThread()方法中

private void doStartThread() { 
    assert thread == null;
    //线程执行器执行线程(所有的eventLoop共用一个线程执行器)
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //将当前线程复制给属性
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }
            boolean success = false;
            updateLastExecutionTime();
            try {
                //开始轮询
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
               //代码省略
            }
        }
    });
}

我们重点关注executor.execute()这个方法, 其中executor就是我们创建NioEventLoop的线程器, execute()就是开启一个线程

回顾下execute()方法

public void execute(Runnable command) {
    //起一个线程
    threadFactory.newThread(command).start();
}

我们看到通过线程工厂开启一个线程, 由于前面的小节已经剖析, 这里不再赘述

开启线程则执行Runnble类中的run()方法, 我们看到在run()方法里通过 thread = Thread.currentThread() 将新开启的线程对象赋值NioEventLoop的thread的属性, 这样就可以通过线程对象的判断, 来确定是不是NioEventLoop线程了

后面我们看到 SingleThreadEventExecutor.this.run() , 这里this, 就是当前NioEventLoop对象, 而这里的run()方法, 就是NioEventLoop中的run()方法, 在这个run()方法中, 真正开始了selector的轮询工作, 对于run()方法的详细剖析, 我们会在之后的小节中进行

刚才我们剖析了NioEventLoop的启动方法, 那么根据我们的分析, 就是第一次调用NioEventLoop的execute(Runnable task)方法的时候, 则会开启NioEventLoop线程, 之后的调用只是往taskQueue中添加任务, 那么第一次是什么时候开启的呢?这里我们要回顾上一章讲过的内容

上一章中我们讲过在AbstractServerBootstrap中有个initAndRegister()方法, 这个方法主要用于channel的初始化和注册, 其中注册的代码为:

ChannelFuture regFuture = config().group().register(channel);

其中group()我们剖析过是Boss线程的group, 我们剖析过其中的register(channel)方法:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

首先跟到next()方法:

public EventLoop next() {
    return (EventLoop) super.next();
}

首先调用了其父类MultithreadEventExecutorGroup的next方法, 跟进去:

public EventExecutor next() {
    return chooser.next();
}

这里chooser, 就是初始化NioEventLoopGroup的线程选择器, 为此分配了不同的策略, 这里不再赘述, 通过这个方法, 返回一个NioEventLoop线程

回到MultithreadEventLoopGroup类的register()方法中, next().register(channel)代表分配后的NioEventLoop的register()方法, 这里会调用NioEventLoop的父类SingleThreadEventLoop类中的register()方法

跟到SingleThreadEventLoop类中的register()方法:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

DefaultChannelPromise是一个监听器, 它会跟随channel的读写进行监听, 绑定传入的channel和NioEventLoop, 有关Promise后面的章节会讲到

这里我们继续跟进register(new DefaultChannelPromise(channel, this))

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

unsafe()方法返回创建channel初始化的unsafe()对象, 如果是NioSeverSocketChannel, 则绑定NioMessageUnsafe对象, 上一小节进行剖析过这里不再赘述

最终这个unsafe对象会调用到AbstractChannel的内部类AbstractUnsafe中的register()方法, 这里register(), 无论是客户端channel和服务器channel都会通过这个一个register注册, 在以后的客户端接入章节中我们会看到

这里我们继续看register方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //代码省略
    //所有的复制操作, 都交给eventLoop处理(1)
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    //做实际主注册(2)
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            //代码省略
        }
    }
}

这里我们上一小节分析过, 不再陌生, 这里只分析有关NioEventLoop相关的内容

我们首先看到 AbstractChannel.this.eventLoop = eventLoop , 获取当前channel的NioEventLoop, 通过上一章的学习, 我们知道每个channel创建的时候会绑定一个NioEventLoop

这里通过eventLoop.inEventLoop()判断当前线程是否是NioEventLoop线程, inEventLoop()方法在前面的小节剖析过, 这里不再赘述

如果是NioEventLoop线程则通过register0(promise)方法做实际的注册, 但是我们第一次执行注册方法的时候, 如果是服务器channel是则是由server的用户线程执行的, 如果是客户端channel, 则是由Boss线程执行的, 所以走到这里均不是当前channel的NioEventLoop的线程, 于是会走到下面的eventLoop.execute()方法中

eventLoop.execute()上一小节剖析过, 就是将task添加到taskQueue中并且开启器NioEventLoop线程, 所以, 在这里就开启了NioEventLoop线程, 有关开启步骤, 可以通过上一小节内容进行回顾

这里注意一点, 有的资料会讲第一次开启NioEventLoop线程是在AbstractBootstrap的doBind0(regFuture, channel, localAddress, promise)方法中开启的, 个人经过debug和分析, 实际上并不是那样的, 希望大家不要被误导

简单看下doBind0(regFuture, channel, localAddress, promise)方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { 
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //绑定端口
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

这里虽然调用了eventLoop的execute()方法, 但是eventLoop线程在注册期间已经启动, 所以这里不会重复启动, 只会将任务添加到taskQueue中

其实这里我们也能够看出, 其实绑定端口的相关操作, 同样是也是eventLoop线程中执行的

以上就是Netty源码分析NioEventLoop线程的启动的详细内容,更多关于Netty NioEventLoop线程启动的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Netty源码分析NioEventLoop线程的启动

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Netty源码分析NioEventLoop怎么执行select

本篇内容主要讲解“Netty源码分析NioEventLoop怎么执行select”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Netty源码分析NioEventLoop怎么执行select”吧!
2023-06-29

Netty NioEventLoop启动过程是怎样的

本篇内容介绍了“Netty NioEventLoop启动过程是怎样的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!启动分析NioEventL
2023-06-04

ContentProvider启动流程源码分析

本文小编为大家详细介绍“ContentProvider启动流程源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“ContentProvider启动流程源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。C
2023-07-05

SpringBoot启动流程SpringApplication源码分析

这篇“SpringBoot启动流程SpringApplication源码分析”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“S
2023-07-05

Tomcat源码分析 | 启动过程深度解析

在启动过程中,LifecycleBase 首先发出 STARTING_PREP 事件,StandardServer 额外还会发出 CONFIGURE_START_EVENT 和 STARTING 事件,通知 LifecycleListene

[图解]Android源码分析——Activity的启动过程

Activity的启动过程一.Launcher进程请求AMSLauncher.java的startActivitySafely方法的执行过程:Activity.java中startActivity方法的执行过程:startActivityF
2022-06-06

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录