【workbunny/webman-rabbitmq】消费者超时导致重复消费

Caesar-Tang

问题描述

首先感谢 workbunny 提供这么一款很好用的插件。使用【workbunny】RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67, 在消费者中如果有阻塞并超过一定时间,就会导致重复消费。

框架及插件版本

"workerman/webman-framework": "^1.5.0"
"workbunny/webman-rabbitmq": "^1.0"

复现代码

生产者:IndexController.php

截图

workbunny rabbitmq配置:app.php
截图

消费者:TestBuilder.php
截图

关键日志打印

截图

关键日志说明

通过日志可以看到,在出现错误前,是有消息消费成功并返回了 ACK。在出现错误后,相关进程重启,导致消息开始重新消费,以前返回的 ACK 似乎无效?RabbitMQ的控制台,始终显示有对应数量的信息处于 Unacked 状态。

截图

966 3 1
3个回答

tanhongbin

消费也要做幂等性

  • Caesar-Tang 2023-05-18

    使用幂等的确能弥补重复消费,但需要引入新的服务比如数据库或缓存;测试发现一条消息会存在不止重复消费一次的情况,即使使用幂等但由此带来的阻塞和性能消耗也是未知的。

chaz6chez

workbunny/webman-rabbitmq是基于bunny的一个异步rabbitmq-client,底层依赖event-loop,相当于把所有事件都投送到loop中执行,你在消费函数里使用sleep会出让当前进程,让当前进程的event-loop暂时停转等待cpu调度;
您把sleep换成

$time = time();
while(time() <= $time + 1) {}

来模拟阻塞试试是否还会出现重复消费的情况;

另外在实际使用上,消费者逻辑建议增加幂等性的判断。

最后感谢您的建议和意见,我后续会根据目前的猜测论证一下,然后优化这个问题;
最近也是比较忙,2.0.0-beta也搁置了一段时间了

  • chaz6chez 2023-05-18

    所有的行为都是异步的,包括ack,在打印了ack后,程序将ack行为投放到event-loop,然后进程杀死,event-loop中的ack消息可能还没有真正的执行,但event-loop已经随当前进程杀死了

  • chaz6chez 2023-05-18

    这个也是目前2.X开发的一个方向

  • Caesar-Tang 2023-05-18

    添加幂等性判断后,会存在这样一种情况,仅供参考,测试用例描述以下:

    1. 生产者逻辑:发送 N 条消息
    2. 消费者逻辑:首先从缓存中读取该条消息的消费记录,若有,直接返回 ACK,没有,进入消费处理(耗时操作)后返回 ACK
    3. 现象:消费第一条消息,处理完耗时操作后返回 ACK,异常,进程重启,然后重复处理第 1 条,因为缓存中已经有记录过消费,直接返回 ACK;开始处理第 2 条... ;到最后 1 条抛出异常后,进程重启,会发现前面几条已经直接返回 ACK 的又会被重复消费了。

    以前在项目中也使用过该插件,十分好用。最近在业务上做批量发送消息(发送消息是阻塞的)发现了该问题。也了解到作者技术强并且乐于分享。在此表示感谢。

  • chaz6chez 2023-05-18

    这个也是目前2.x版本开发要解决的问题之一,因为所有行为都是异步的以后会带来很多行为会滞后执行,那么就没办法把控,导致一些意外情况的产生;比如异步publish后进程被杀死,可能导致数据实际上没有投放成功;消费者成功消费了以后返回ack,但进程因意外被杀死,可能导致消息重复消费。
    因为目前1.x属于能将就用就将就用状态,现目前是全力开发2.x以解决之前已知的这些问题,所以你可以描述一下的的业务需求,我看看尝试给你出个方案,可能不一定使用rabbitmq。

  • Caesar-Tang 2023-05-19

    当前的业务场景,由管理端给微信用户推送订阅信息:

    1. 一次推送会有万以上的数量
    2. 微信官方不支持批量推送订阅消息,只能由应用端一个个给用户下发,即请求微信接口
    3. 记录每次的推送结果(可以分批)

    尝试的方案(在选择方案前还未发现超时重复消费问题):

    1. 使用消息中间件 rabbitmq
    2. 由管理端发布推送任务,根据本次推送任务获取推送用户,将用户分批后,按批发布待消费消息并在数据库记录本次消息发布结果(生产者)
    3. 消费者消费,循环给该批用户发送微信订阅消息,结束后更新数据库消费结果(消费者)
    4. 为了避免超时,推送微信订阅消息时设置接口请求超时时间,合理控制每批的用户数量。但由此每批用户数量只能比较少。
  • chaz6chez 2023-05-19

    根据你们的业务来看,貌似没有需要频繁关闭/重启消费进程的地方,如果是进程持续运行的话,实际上不存在重复消费的问题,重复消费的问题只存在于关闭进程的那一瞬间;

    方案一:尽可能减少重复消费的方案:

    1. 将QOS设置为1,即每个消费进程消费1条数据;
    2. 消费进程开启cpu count * 2;
    3. 尽可能避免关闭消费者进程或者重启消费者进程的操作

    方案二:换其他队列组件试试
    可以先暂时测试一下https://www.workerman.net/plugin/69是否满足你们的需求,这个队列同样具备ack机制,同时也经历过小范围生产测试,内部的执行方式是同步执行

W

截图
我用您的插件发布为什么不会持久化呢?
这个插件:
https://www.workerman.net/plugin/67

用的是最新版本:
截图

  • chaz6chez 2023-08-14

    如果使用的是1.x,可以更新到1.0.11试试,如果是用的2.x,可以更新到2.0.0-beta.2试试

  • W 2023-08-14

    好的,谢谢!

🔝