我的编程空间,编程开发者的网络收藏夹
学习永远不晚

thinkphp6使用think-queue怎么实现普通队列和延迟队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

thinkphp6使用think-queue怎么实现普通队列和延迟队列

本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

###TP6 队列

TP6 中使用 think-queue 可以实现普通队列和延迟队列。

think-queue 是thinkphp 官方提供的一个消息队列服务,它支持消息队列的一些基本特性:

  • 消息的发布,获取,执行,删除,重发,失败处理,延迟执行,超时控制等

  • 队列的多队列, 内存限制 ,启动,停止,守护等

  • 消息队列可降级为同步执行

消息队列实现过程

通过生产者推送消息到消息队列服务中

消息队列服务将收到的消息存入redis队列中(zset)

消费者进行监听队列,当监听到队列有新的消息时,获取队列第一条

处理获取下来的消息调用业务类进行处理相关业务

业务处理后,需要从队列中删除消息

composer 安装 think-queue

composer require topthink/think-queue

配置文件

安装完 think-queue 后会在 config 目录中生成 queue.php,这个文件是队列的配置文件。

tp6中提供了多种消息队列的实现方式,默认使用sync,我这里选择使用Redis。

return [    'default'     => 'redis',    'connections' => [        'sync'     => [            'type' => 'sync',        ],        'database' => [            'type'       => 'database',            'queue'      => 'default',            'table'      => 'jobs',            'connection' => null,        ],        'redis'    => [            'type'       => 'redis',            'queue'      => 'default',            'host'       => env('redis.host', '127.0.0.1'),            'port'       => env('redis.port', '6379'),            'password'   => env('redis.password','123456'),            'select'     => 0,            'timeout'    => 0,            'persistent' => false,        ],    ],    'failed'      => [        'type'  => 'none',        'table' => 'failed_jobs',    ],];

创建目录及队列消费类文件

在 app 目录下创建 queue 目录,然后在该目录下新建一个抽象类 Queue.php 文件,作为基础类

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;abstract class Queue{        public function fire(Job $job, $data)    {        if (empty($data)) {            Log::error(sprintf('[%s][%s] 队列无消息', __CLASS__, __FUNCTION__));            return ;        }        $jobId = $job->getJobId(); // 队列的数据库id或者redis key        // $jobClassName = $job->getName(); // 队列对象类        // $queueName = $job->getQueue(); // 队列名称        // 如果已经执行中或者执行完成就不再执行了        if (!$this->checkJob($jobId, $data)) {            $job->delete();            Cache::store('redis')->delete($jobId);            return ;        }        // 执行业务处理        if ($this->execute($data)) {            Log::record(sprintf('[%s][%s] 队列执行成功', __CLASS__, __FUNCTION__));            $job->delete(); // 任务执行成功后删除            Cache::store('redis')->delete($jobId); // 删除redis中的缓存        } else {            // 检查任务重试次数            if ($job->attempts() > 3) {                Log::error(sprintf('[%s][%s] 队列执行重试次数超过3次,执行失败', __CLASS__, __FUNCTION__));                 // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行;也可以不指定秒数立即执行                //$job->release(10);                 // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数                //$job->failed();                   // 第3种处理方式:删除任务                $job->delete(); // 任务执行后删除                Cache::store('redis')->delete($jobId); // 删除redis中的缓存            }        }    }        protected function checkJob(string $jobId, $message): bool    {        // 查询redis        $data = Cache::store('redis')->get($jobId);        if (!empty($data)) {            return false;        }        Cache::store('redis')->set($jobId, $message);        return true;    }        abstract protected function execute($data): bool;}

所有真正的消费类继承基础抽象类

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{    protected function execute($data): bool    {       // 具体消费业务逻辑    }}

生产者逻辑

use think\facade\Queue;// 普通队列生成调用方式Queue::push($job, $data, $queueName);// 例:Queue::push(Test::class, $data, $queueName);// 延时队列生成调用方式Queue::later($delay, $job, $data, $queueName);// 例如使用延时队列 10 秒后执行:Queue::later(10 , Test::class, $data, $queueName);

开启进程监听任务并执行

php think queue:listenphp think queue:work

命令模式介绍

命令模式
  • queue:work 命令

    work 命令: 该命令将启动一个 work 进程来处理消息队列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue TestQueue
命令行参数
  • Work 模式

    php think queue:work \--daemon            //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出--queue  helloJobQueue  //要处理的队列的名称--delay  0 \        //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0--force  \          //系统处于维护状态时是否仍然处理任务,并未找到相关说明--memory 128 \      //该进程允许使用的内存上限,以 M 为单位--sleep  3 \        //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)--tries  2          //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
  • Listen 模式

    php think queue:listen \--queue  helloJobQueue \   //监听的队列的名称--delay  0 \         //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0--memory 128 \       //该进程允许使用的内存上限,以 M 为单位--sleep  3 \         //如果队列中无任务,则多长时间后重新检查,daemon模式下有效--tries  0 \         //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0--timeout 60         //创建的work子进程的允许执行的最长时间,以秒为单位

    可以看到 listen 模式下,不包含 --deamon 参数,原因下面会说明

  • 消息队列的开始,停止与重启

    • 开始一个消息队列:

      php think queue:work
    • 停止所有的消息队列:

      php think queue:restart
    • 重启所有的消息队列:

      php think queue:restart php think queue:work

读到这里,这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

thinkphp6使用think-queue怎么实现普通队列和延迟队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

thinkphp6使用think-queue怎么实现普通队列和延迟队列

本文小编为大家详细介绍“thinkphp6使用think-queue怎么实现普通队列和延迟队列”,内容详细,步骤清晰,细节处理妥当,希望这篇“thinkphp6使用think-queue怎么实现普通队列和延迟队列”文章能帮助大家解决疑惑,下
2023-06-30

怎么在Redis中实现延迟队列和分布式延迟队列

这篇文章给大家介绍怎么在Redis中实现延迟队列和分布式延迟队列,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。1. 实现一个简单的延迟队列。  我们知道目前JAVA可以有DelayedQueue,我们首先开一个Dela
2023-06-15

ThinkPHP怎么使用think-queue实现redis消息队列

本篇内容主要讲解“ThinkPHP怎么使用think-queue实现redis消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ThinkPHP怎么使用think-queue实现redis消
2023-07-02

使用Redis怎么实现延迟队列

本篇文章给大家分享的是有关使用Redis怎么实现延迟队列,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。方案一:采用通过定时任务采用数据库/非关系型数据库轮询方案。优点:1. 实
2023-06-15

Java线程池队列中的延迟队列DelayQueue怎么使用

今天小编给大家分享一下Java线程池队列中的延迟队列DelayQueue怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧
2023-07-04

RabbitMQ消息队列怎么实现延迟任务

这篇文章主要介绍“RabbitMQ消息队列怎么实现延迟任务”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RabbitMQ消息队列怎么实现延迟任务”文章能帮助大家解决问题。一、序言延迟任务应用广泛,延
2023-06-29

redis怎么实现队列阻塞、延时、发布和订阅

这篇“redis怎么实现队列阻塞、延时、发布和订阅”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“redis怎么实现队列阻塞、
2023-07-02

Redis怎么使用ZSET实现消息队列

这篇文章主要介绍了Redis怎么使用ZSET实现消息队列的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Redis怎么使用ZSET实现消息队列文章都会有所收获,下面我们一起来看看吧。1.redis 用zset做消
2023-07-05

Java怎么使用跳转结构实现队列和栈

本篇内容介绍了“Java怎么使用跳转结构实现队列和栈”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!队列跳转结构结点public static
2023-07-06

使用node怎么实现事件循环和消息队列

这篇文章将为大家详细讲解有关使用node怎么实现事件循环和消息队列,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。什么是异步?异步和同步应该是经常谈的一个话题了。同步的概念很简单,自上而下依次
2023-06-15

怎么使用PHP和数据库实现一个简单的队列系统

本篇内容介绍了“怎么使用PHP和数据库实现一个简单的队列系统”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、数据库队列的基本原理数据库队列
2023-07-06

怎么使用go带缓冲chan实现消息队列功能

本篇内容介绍了“怎么使用go带缓冲chan实现消息队列功能”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1、Channels 定义通道是一种
2023-07-05

C语言怎么利用栈和队列实现回文检测功能

本文小编为大家详细介绍“C语言怎么利用栈和队列实现回文检测功能”,内容详细,步骤清晰,细节处理妥当,希望这篇“C语言怎么利用栈和队列实现回文检测功能”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。具体代码如下:#i
2023-06-16

怎么使用css3实现一个类在线直播的队列动画

这篇文章给大家分享的是有关怎么使用css3实现一个类在线直播的队列动画的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。之前在群里有个朋友问了这样一个问题, 就是如何在 小程序 中实现类似 直播平台 的用户上线时的
2023-06-08

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录