FlowerMQ 基于workerman和redis实现的消息队列

dazhaozhao

FlowerMQ

FlowerMQ 一个基于Workerman和Redis实现的消息队列,一个小小工具,用来给主项目解耦的,也支持延迟队列,失败尝试这些。

运行依赖

  • php7.2
  • Redis5.0.4上,因为用到Redis Stream
  • pecl依赖,redis扩展
  • composer依赖,workerman/workerman 4.0以上

安装

composer安装

composer create-project mrtwenty/flower

下载安装

  1. 下载或者 git clone
  2. 项目根目录执行命令,composer install

原理说明

  1. workerman实现消费端,开多个进程,在 onWorkerStart 函数里面,阻塞读取,阻塞间隔5秒后,就重新阻塞,因为是阻塞了,所以用了一个字符串key来处理停止的问题,每次阻塞5秒,就断开,以便判断是否需要终止程序。
  2. 可以随时停止消费端,因为客户端发送的消息都会存放redis stream 队列里面。
  3. 一个pending进程,每隔1秒检查是否有未ack的消息,并尝试消费掉
  4. 一个delay进程,负责处理延迟消息,利用redis的zset有序集合存储,起一个定时器,定时获取可以执行的消息,写入消费端
  5. 遵循约定大于配置的方式,直接用默认的即可。
  6. 默认配置是app目录下的config目录,如果需要更改配置项,可以在项目根目录下,提供一个.env的配置文件,替换掉
  7. 回收裁剪机制: 有三种模式,默认 no ,不做裁剪
    1. no,不做裁剪,所有消息保留。
    2. maxlen, 最大长度回收,概率性触发 xtrim maxlen mq ~ 长度
    3. minid, 最小已读消息回收,概率行触发,xtrim minid mq ~ 消息id ,需要 redis server 6.2.0 以上。

可用命令

win

windows下仅限于开发,不适合做生产环境使用,启动需要开三个命令行窗口,执行 start、pending、delay命令

  1. php index.php start 启动消费队列
  2. php index.php pending 启动重试队列
  3. php index.php delay 启动延迟队列
  4. php index.php test 测试,执行此命令会发送两个消息给服务,一个是即时消息,一个是延迟消息。
  5. php monitor.php start 运行信息查看,会启动一个http进程

linux

  1. php index.php start linux启动相当于执行了 start、pending、delay命令
  2. php index.php start -d 守护进程启动
  3. php index.php stop 强制停止,可能会导致消息未ack,不建议使用
  4. php index.php stop -g 优雅停止 (不加参数-g会强制干掉子进程,加参数-g的话,会等子进程处理完后再关闭)
  5. php index.php config 查看配置信息
  6. php index.php test 测试,执行此命令会发送两个消息给服务,一个是即时消息,一个是延迟消息。
  7. php monitor.php start 运行信息查看,会启动一个http进程

服务端说明

  1. 下载项目后,配置 .env
  2. 编写业务逻辑,app\consumer\Run.php 只需要编写这里,如果代码有curl请求,记得要做好超时
  3. 启动,php index.php start 即可。

客户端说明

flower配备了一个客户端,方便在别的项目中使用:

composer install mrtwenty/flower-client

使用方式:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
//$mq需要与服务端三个配置信息相同
$mq    = [
    'name' => 'mq',
    'delay_name' => 'mq_delay',
    'fail_list' => 'mq_fail_list'
];
$client = new Client($redis, $mq);

//立即执行
$res = $client->add(['test' => 'data']);
var_dump($res);
//延迟消息
$res = $client->add(['test' => 'data'], 3);
var_dump($res);

问题

1.引入MySQL

可以安装此依赖包,当然也可以根据自己需要用别的包

composer require workerman/mysql

配置信息可以在.env里面写入:

[mysql]
host = 127.0.0.1
username = root
password = 123456
database = test
port     = 3306

代码实现:

<?php

declare(strict_types=1);

namespace app;

use app\library\BaseInterface;

/**
 * 消费类
 */
class Run implements BaseInterface
{
    protected $db = null;

    public function getDb()
    {
        if (is_null($this->db)) {
            $config = config('mysql');
            $host     = $config['host'];
            $port     = $config['port'];
            $user     = $config['username'];
            $password = $config['password'];
            $database  = $config['database'];
            $this->db = new \Workerman\MySQL\Connection($host, $port, $user, $password, $database);
        }
        return $this->db;
    }

    /**
     * 消费方法,如何消费,取决用户自己
     *
     * @param mixed $data
     * @param mixed $id
     * @return bool 返回true就会执行ack确认消息已消费
     */
    public function consumer($data, $id): bool
    {
        $db   = $this->getDb();
        $info = $db->row("SELECT * FROM `short_url` WHERE id=3");
        print_r($info);
        return true;
    }

    /**
     * 超过尝试的次数,就会写入失败队列里面,并调用此方法,可以用此方法通知运维
     *
     * @return void
     */
    public function fail($data, $id)
    {
        print_r($data);
        print_r($id);
    }
}

2. 避免内存泄露

由于是守护进程,为了避免php业务代码bug隐藏的内存泄露,可以在消费者执行完一定数量的时候重启进程。具体实现请查看workerman手册。

链接1链接2

相关资料

  1. redis stream 手册

  2. pecl redis 文档

  3. workerman 手册

引用

  1. monitor登录页模板
  2. env、config类这些学自thinkphp
  3. monitor 后端的一些代码,学自webman
  4. monitor主页面,用的layui
  5. 延迟队列的思路抄的workerman的redis queue
  6. 感谢workerman、thinkphp、layui、redis
阅读 333
7个评论

Tinywan

你好!你的 重试消费失败的消息,超出失败次数确认消息,写入队列 这个方法永远不会被触发。你确定你是否测试过?我看你这个是跑不通的

//如果该消息重试次数大于3,保存到失败队列
if ($fail_num > $try_fail_num) {
    //消息内容
    $data = json_decode($msg_content['data'], true);
    $this->redis->lPush($fail_list, json_encode(['time' => time(), 'data' => $data], JSON_UNESCAPED_UNICODE));
    $this->client->ack($group_name, $msg_id);
    $run->fail($data, $msg_id);
    continue;
}

你这里的 $fail_num,消息被读取次数永远是1

  • dazhaozhao 14天前

    app\consumer\Run的consumer 返回false, 消息变成已读取未ack,就会被xpending读取到,并重试,不行就继续xClaim 转移消息,次数就会加1

  • Tinywan 13天前

    xClaim 转移消息 。$fail_num 还是 1

dazhaozhao

是运行环境问题,还是redis或者php版本问题?
我这里试了可以。

  • 暂无评论
dazhaozhao


我直接 composer create-project mrtwenty/flower 安装
app\consumer\Run的consumer返回false,你说的那个地方,我加了调试信息

[$msg_id, $consumer, $over_time, $fail_num] = $msg;
echo "fail_num", $fail_num, "\n";

  • Tinywan 13天前

    $fail_num 这个变量没有打印,打印的话一直是1

  • dazhaozhao 13天前

    这个变量的值是xpending得到的,请看我的评论。

  • dazhaozhao 13天前

    echo "fail_num", $fail_num, "\n"; 我变量有打印的,你看那个截图,输出都是
    fail_num1、fail_num2、fail_num3,后面的数字就是。

  • Tinywan 13天前

    fail_num 这个你消息被读取的次数,你这里并没有读取(XreadGroup),就直接取(Xpending),就不会变的

dazhaozhao

不写代码,用redis 命令执行下流程。

    # 创建消息test_mq,返回消息ID 1641570877368-0
    XADD test_mq * key value
    # 创建消费组 test_group
    XGROUP CREATE test_mq test_group 0
    # hehe去读取数据,不进行xack确认消息,xpending就会有消息
    XREADGROUP GROUP test_group hehe COUNT 1 STREAMS test_mq >
    # 查看未xack的消息,此时得到 $fail_num=1
    XPENDING test_mq test_group - + 1

    # 尝试消费该消息,使用xRange读取该消息内容,结果还是没成ack掉,那就转移

    # 转移消息
    XCLAIM test_mq test_group hehe 3000 消息ID
    XCLAIM test_mq test_group hehe 3000 1641570877368-0

    # 再次查看未xack的消息,此时得到 $fail_num=2
    XPENDING test_mq test_group - + 1

    # 再次尝试消费该消息,结果还是没成,如此反复

  • Tinywan 13天前

    又排查了一下,你这需要6.0才支持的,而不是5.0就支持

Tinywan

Redis 版本

127.0.0.1:6379> info
# Server
redis_version:5.0.3
...
os:Linux 4.19.128-microsoft-standard x86_64

以下是不进行xclaim

截图

以下是进行xclaim

截图

我用php脚本跑结果和上面是一样的。结果为啥和你跑的不一样,哈哈

是不是由于你使用的是Windows版本,而我是Linux版本 os:Linux 4.19.128-microsoft-standard x86_64

再次验证,Redis Server 版本需要 5.0.4 及最新版本

截图

127.0.0.1:6379[14]> info
# Server
redis_version:6.2.1
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:68b3db7bf8188804
redis_mode:standalone
os:Linux 4.15.0-137-generic x86_64
  • Tinywan 13天前

    很可惜,各大云厂商现在都不支持6.0,最高是5.0,不支持这个消息转移后自动增加1

Tinywan
  • dazhaozhao 13天前

    那这样子就需要redis5.0.4以上了,谢谢你的测试,回头我写在说明里

  • Tinywan 13天前

    嗯嗯!搞了两天终于搞明白为啥了,哈哈!

张先生

有一个很严重的问题,在你修剪消息队列的时候,如果消息还没消费,直接会修剪掉到没消费到的消息,现象就是某个区间的消息全部丢失,我只是看了一下代码,没验证我的想法。

  • Tinywan 2天前

    他这只会修剪XPENDING 消息

  • 张先生 2天前

    不是哦,你仔细看看代码,测试一下,把GC评率调成100%,队列最大长度改短,消息数量超过队列长度,一定会丢失消息

  • Tinywan 1天前

    你是说他消费失败的消息再次消费是会这种情况是吗?

  • 张先生 1天前

    不是, 最简单的浮现方式,你把最大队列长度改为10,GC返回true,推送1000条消息进去, 你看消费的消息有多少条,最多几十条,不确定具体多少条是因为xtrim加了true参数

  • Tinywan 1天前

    你是说 stream 队列长度?

  • 张先生 1天前

    嗯, 修剪只能修剪stream ,哪有修剪XPENDING 一说?

  • dazhaozhao 1天前

    嗯,你说的对,我应该在项目里面提醒一下使用这个项目的人,这是redis stream xtrim 本身的设计,如果想要长久的保存,可以将gc设置成永远不会执行,这样就不会执行xtrim命令了,或者设置队列长度足够长,xtrim 执行了也不会裁剪到,这取决于开发者的应用场景,我自己用的时候,都是为了能够及时触发消息,基本都是即发即消费。

  • 张先生 1天前

    其实裁剪也是有必要的,不然久了内存会爆炸,或者被redis清空,在裁剪的时候应该裁剪到所有分组消费者 最小已消费的消息位置,不应该是固定裁剪。

  • dazhaozhao 1天前

    嗯,你这就更细腻的操作了,但是redis 好像并没有提供相关的命令,xtrim本身是先裁剪掉旧的消息。

  • dazhaozhao 1天前

    额,我刚刚去看了下redis6.2的英文手册,好像支持新的方式,这是个好消息。
    Redis version >= 6.2.0: Added the MINID trimming strategy and the LIMIT option.

  • dazhaozhao 1小时前

    我新增了一种gc机制,大佬帮忙看看 ,minid

dazhaozhao

2120
积分
0
获赞数
0
粉丝数
2016-08-17 加入