我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Netty分布式NioEventLoop任务队列执行的方法

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Netty分布式NioEventLoop任务队列执行的方法

这篇文章主要介绍“Netty分布式NioEventLoop任务队列执行的方法”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Netty分布式NioEventLoop任务队列执行的方法”文章能帮助大家解决问题。

执行任务队列

继续回到NioEventLoop的run()方法:

protected void run() {    for (;;) {        try {            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.SELECT:                    //轮询io事件(1)                    select(wakenUp.getAndSet(false));                    if (wakenUp.get()) {                        selector.wakeup();                    }                default:            }            cancelledKeys = 0;            needsToSelectAgain = false;            //默认是50            final int ioRatio = this.ioRatio;             if (ioRatio == 100) {                try {                    processSelectedKeys();                } finally {                    runAllTasks();                }            } else {                //记录下开始时间                final long ioStartTime = System.nanoTime();                try {                    //处理轮询到的key(2)                    processSelectedKeys();                } finally {                    //计算耗时                    final long ioTime = System.nanoTime() - ioStartTime;                    //执行task(3)                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            }        } catch (Throwable t) {            handleLoopException(t);        }        //代码省略    }}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {    //定时任务队列中聚合任务    fetchFromScheduledTaskQueue();    //从普通taskQ里面拿一个任务    Runnable task = pollTask();    //task为空, 则直接返回    if (task == null) {        //跑完所有的任务执行收尾的操作        afterRunningAllTasks();        return false;    }    //如果队列不为空    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;    long runTasks = 0;    long lastExecutionTime;    //执行每一个任务    for (;;) {        safeExecute(task);        //标记当前跑完的任务        runTasks ++;        //当跑完64个任务的时候, 会计算一下当前时间        if ((runTasks & 0x3F) == 0) {            //定时任务初始化到当前的时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            //如果超过截止时间则不执行(nanoTime()是耗时的)            if (lastExecutionTime >= deadline) {                break;            }        }        //如果没有超过这个时间, 则继续从普通任务队列拿任务        task = pollTask();        //直到没有任务执行        if (task == null) {            //记录下最后执行时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            break;        }    }    //收尾工作    afterRunningAllTasks();    this.lastExecutionTime = lastExecutionTime;    return true;}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {    long nanoTime = AbstractScheduledEventExecutor.nanoTime();    //从定时任务队列中抓取第一个定时任务    //寻找截止时间为nanoTime的任务    Runnable scheduledTask  = pollScheduledTask(nanoTime);    //如果该定时任务队列不为空, 则塞到普通任务队列里面    while (scheduledTask != null) {        //如果添加到普通任务队列过程中失败        if (!taskQueue.offer(scheduledTask)) {            //则重新添加到定时任务队列中            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);            return false;        }        //继续从定时任务队列中拉取任务        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中        scheduledTask = pollScheduledTask(nanoTime);    }    return true;}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime() 代表从定时任务初始化到现在过去了多长时间

 Runnable scheduledTask= pollScheduledTask(nanoTime) 代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {    assert inEventLoop();    //拿到定时任务队列    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;    //peek()方法拿到第一个任务    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();    if (scheduledTask == null) {        return null;    }    if (scheduledTask.deadlineNanos() <= nanoTime) {        //从队列中删除        scheduledTaskQueue.remove();        //返回该任务        return scheduledTask;    }    return null;}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {    long nanoTime = AbstractScheduledEventExecutor.nanoTime();    //从定时任务队列中抓取第一个定时任务    //寻找截止时间为nanoTime的任务    Runnable scheduledTask  = pollScheduledTask(nanoTime);    //如果该定时任务队列不为空, 则塞到普通任务队列里面    while (scheduledTask != null) {        //如果添加到普通任务队列过程中失败        if (!taskQueue.offer(scheduledTask)) {            //则重新添加到定时任务队列中            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);            return false;        }        //继续从定时任务队列中拉取任务        //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中        scheduledTask = pollScheduledTask(nanoTime);    }    return true;}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {    //定时任务队列中聚合任务    fetchFromScheduledTaskQueue();    //从普通taskQ里面拿一个任务    Runnable task = pollTask();    //task为空, 则直接返回    if (task == null) {        //跑完所有的任务执行收尾的操作        afterRunningAllTasks();        return false;    }    //如果队列不为空    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;    long runTasks = 0;    long lastExecutionTime;    //执行每一个任务    for (;;) {        safeExecute(task);        //标记当前跑完的任务        runTasks ++;        //当跑完64个任务的时候, 会计算一下当前时间        if ((runTasks & 0x3F) == 0) {            //定时任务初始化到当前的时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            //如果超过截止时间则不执行(nanoTime()是耗时的)            if (lastExecutionTime >= deadline) {                break;            }        }        //如果没有超过这个时间, 则继续从普通任务队列拿任务        task = pollTask();        //直到没有任务执行        if (task == null) {            //记录下最后执行时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            break;        }    }    //收尾工作    afterRunningAllTasks();    this.lastExecutionTime = lastExecutionTime;    return true;}

首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

任务不为空, 则通过 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task

我们跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {    try {        //直接调用run()方法执行        task.run();    } catch (Throwable t) {        //发生异常不终止        logger.warn("A task raised an exception. Task: {}", task, t);    }}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法:

protected boolean runAllTasks(long timeoutNanos) {    //定时任务队列中聚合任务    fetchFromScheduledTaskQueue();    //从普通taskQ里面拿一个任务    Runnable task = pollTask();    //task为空, 则直接返回    if (task == null) {        //跑完所有的任务执行收尾的操作        afterRunningAllTasks();        return false;    }    //如果队列不为空    //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;    long runTasks = 0;    long lastExecutionTime;    //执行每一个任务    for (;;) {        safeExecute(task);        //标记当前跑完的任务        runTasks ++;        //当跑完64个任务的时候, 会计算一下当前时间        if ((runTasks & 0x3F) == 0) {            //定时任务初始化到当前的时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            //如果超过截止时间则不执行(nanoTime()是耗时的)            if (lastExecutionTime >= deadline) {                break;            }        }        //如果没有超过这个时间, 则继续从普通任务队列拿任务        task = pollTask();        //直到没有任务执行        if (task == null) {            //记录下最后执行时间            lastExecutionTime = ScheduledFutureTask.nanoTime();            break;        }    }    //收尾工作    afterRunningAllTasks();    this.lastExecutionTime = lastExecutionTime;    return true;}

每次执行完task, runTasks自增

这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

关于“Netty分布式NioEventLoop任务队列执行的方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Netty分布式NioEventLoop任务队列执行的方法

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Netty分布式NioEventLoop任务队列执行的方法

这篇文章主要介绍“Netty分布式NioEventLoop任务队列执行的方法”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Netty分布式NioEventLoop任务队列执行的方法”文章能帮助大家解
2023-06-29

Netty分布式flush方法刷新buffer队列源码分析

本文小编为大家详细介绍“Netty分布式flush方法刷新buffer队列源码分析”,内容详细,步骤清晰,细节处理妥当,希望这篇“Netty分布式flush方法刷新buffer队列源码分析”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入
2023-06-29

Golang中使用RabbitMQ实现分布式任务队列的性能优化

在Golang中使用RabbitMQ实现分布式任务队列的性能优化可以从以下几个方面进行优化:1. 消息持久化:RabbitMQ默认情况下消息是内存存储的,如果重启或崩溃,消息将丢失。为了保证消息的持久化,可以将消息标记为持久化,以确保在重启
2023-10-20

如何解决Go语言中的并发任务的分布式任务队列和任务调度策略问题?

如何解决Go语言中的并发任务的分布式任务队列和任务调度策略问题?引言:在分布式系统中,任务的分发和调度是一个关键问题。在Go语言中,通过使用并发技术可以有效地管理和执行任务。本文将介绍如何使用分布式任务队列和任务调度策略来解决Go语言中的并
2023-10-22

Golang中使用RabbitMQ实现分布式任务队列的性能调优技巧

在Golang中使用RabbitMQ实现分布式任务队列时,可以采取以下性能调优技巧:1. 使用持久化队列和消息:通过将队列和消息标记为持久化,可以确保即使在RabbitMQ重启后也不会丢失任务。2. 批量发送消息:将多个消息打包成一个批次进
2023-10-10

利用MongoDB实现分布式任务调度与执行的经验分享

MongoDB是一个开源的NoSQL数据库,具有高性能、伸缩性和灵活性的特点。在分布式系统中,任务调度与执行是一个关键的问题,通过利用MongoDB的特性,可以实现分布式任务调度与执行的方案。一、分布式任务调度的需求分析在分布式系统中,任务
利用MongoDB实现分布式任务调度与执行的经验分享
2023-11-02

Golang与RabbitMQ实现分布式任务调度和执行的高效解决方案的最佳实践

实现分布式任务调度和执行的高效解决方案可以使用Golang和RabbitMQ的组合。下面是一个基于Golang和RabbitMQ的最佳实践示例:1. 定义任务模型:首先定义任务的结构体,包括任务ID、任务参数等字段,并将任务的数据结构序列化
2023-10-10

crontab定时任务不执行的原因分析与解决方法

前言 实现linux定时任务有:cron、anacron、at等,cron是服务名称,crond是后台进程,crontab则是定制好的计划任务表 然而今天真是长知识 用了 crontab 这么久 才知道原来也需要 启动添加了定时任务 但是并
2022-06-04

Go语言开发实现分布式消息队列系统的方法与实践

在当今互联网高并发和大规模数据处理的背景下,分布式消息队列系统作为一种重要的中间件技术日益受到关注。它可以有效地缓解系统压力,提高系统的可扩展性和可靠性。Go语言由于其并发模型、高性能和简洁的特点,在开发分布式消息队列系统中具有独特的优势。
Go语言开发实现分布式消息队列系统的方法与实践
2023-11-20

Node.js node-schedule定时任务隔多少分钟执行一次的方法

在 Node.js 中,我使用 node-schedule 来执行定时任务。Cron-style 的时间格式对于初学者不太直观,所以一般使用这种方式:比如官方示例中的每个小时的 42 分执行任务var schedule = require(
2022-06-04

Go语言开发实现分布式任务调度系统的方法与实践

Go语言开发实现分布式任务调度系统的方法与实践随着互联网的高速发展,大规模系统的任务调度成为了现代计算领域中的重要问题。传统的单机调度已经无法满足大规模任务的需求,而分布式任务调度系统的出现有效地解决了这一问题。本文将介绍如何使用Go语言开
Go语言开发实现分布式任务调度系统的方法与实践
2023-11-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录