关于webman多次消费rabbitmq fanout消息

calvin

问题描述

现有个需求是需要把物联网设备的状态同步到两个独立的系统中(都是用webman开发)。计划设备回来的消息扔到mq中,然后两个系统去消费处理各自的业务,但发现消息被消费了多次,单条消息消费次数也不等于进程数。

为此你搜索到了哪些方案及不适用的原因

我是在进程启动后做的监听操作,代码如下:
截图
app\bootstrap\SubscribeRabbitmq.php

    public static function start($worker)
    {
        $is_console = !$worker;
        if ($is_console) {
            // If you do not want to execute this in console, just return.
            return;
        }
        if ($worker->id === 0) {
            Log::channel('mq')->info('【初始化MQ】' . getmypid());
            //  连接 RabbitMQ 实例并创建频道
            (new Client([
                'vhost'    => envs('MQ_VHOST', '/'),
                'user'     => envs('MQ_USERNAME', 'guest'),
                'password' => envs('MQ_PASSWORD', 'guest')
            ]))->connect()
                ->then(function (Client $client) {
                    return $client->channel();
                })->then(function (Channel $channel) {
                    //  声明【设备状态变更】交换机和匿名队列
                    return $channel->exchangeDeclare('device.status.change', 'fanout')->then(function () use ($channel
                    ) {
                        return $channel->queueDeclare('', false, false, true, false);
                    })->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
                        // 获取绑定到队列上的帧对象,并将该帧的队列名传递给 $channel->queueBind() 函数来将队列绑定到 【设备状态变更】 交换机。
                        return $channel->queueBind($frame->queue, 'device.status.change')->then(function () use ($frame
                        ) {
                            return $frame;
                        });
                    })->then(function (MethodQueueDeclareOkFrame $frame) use ($channel) {
                        //  设置消息消费函数,设置 $channel->consume() 函数,当消费者监听一个虚拟主机和队列时,该函数将一直运行,等待接收实例的消息
                        $channel->consume(
                            function (Message $message, Channel $channel, Client $client) {
                                Log::channel('mq')->info('【MQ-consume】收到一条待消费数据进程ID:' . getmypid(),
                                    ['message_content' => $message->content]);
                            },
                            $frame->queue,
                            '',
                            false,
                            true
                        );
                    });
                });
        }
    }

记录的日志:
截图

两个疑问:
1.不是很理解为啥有些消息消费了3次,有些2次,还有些消费的进程id和初始的不是同一个
2.我预期的是每个系统只要消费一次,这样的话不是应该用自定义进程?

687 1 0
1个回答

智佳思远

自定义进程。bootstrap是每个进程都执行,包括webman的http进程和monitor进程,虽然你设置了worker->id === 0,但是http 0号进程和 monitor 0号进程都参与消费了,所以是两个消费进程在消费。至于重复消费,是不是没有ack?或者生产者确实发布了多个重复的消息?

  • 8355 2023-04-17

    我猜是exchange和queue配置问题

  • calvin 2023-04-17

    生产者重发发布这个可以排除。改成自定义进程就没问题了 谢谢大佬们

年代过于久远,无法发表回答
🔝