RabbitMQ 断开后,执行Worker::stopAll(),是出于什么考虑呢 ?

mo

问题描述

RabbitMQ 断开后,主动 Worker::stopAll 是出于什么考虑呢?

我想实现服务自动恢复,如果RabbitMQ断开之后,会定时重连。我的服务在onWorkerStart时比较耗时。

 return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
     if (!empty($this->channels)) {
        throw new \LogicException("All channels have to be closed by now.");
     }
     if($replyCode !== 0){
        return null;
     }
     return $this->connectionClose($replyCode, $replyText, 0, 0);
     })->then(function () use ($replyCode, $replyText){
         $this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
         $this->closeStream();
         $this->init();
         if($replyCode !== 0){
            Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
         }
     return $this;
 });

输出为:
RabbitMQ client disconnected: [506] Connection closed by server unexpectedly

350 1 0
1个回答

chaz6chez

子进程stopAll就会退出当前子进程,主进程会重新拉起子进程,在重新拉起子进程的过程中,子进程会对rabbitmq进行重连,这是最简单的重连机制的实现方案;当然还有的做法是可以不用重启进程,在进程内部用代码实现重连,代码复杂度会更高,相对这种方案,维护起来也不方便,两种方案比较起来也没有明显的性能差异,所以选择一种代码维护方便的即可。

  • mo 2023-11-22

    @chaz6chez 谢谢回复, 那我只需要重写一下现在的客户端即可。

  • chaz6chez 2023-11-22

    你想要的不是立即重连,而是定时重连是吗?

    另外你使用的哪一个插件或者组件包?版本是多少?

  • mo 2023-11-22

    是的,定时重连。 使用的是"workerman/rabbitmq": "^1.0"

  • chaz6chez 2023-11-22

    之前重启方案是我提交的PR,你也可以按照以下代码进行修改

    return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
         if (!empty($this->channels)) {
            throw new \LogicException("All channels have to be closed by now.");
         }
         if($replyCode !== 0){
            return null;
         }
         return $this->connectionClose($replyCode, $replyText, 0, 0);
         })->then(function () use ($replyCode, $replyText){
             $this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
             $this->closeStream();
             $this->init();
             if($replyCode !== 0) {
                //定时器执行stopAll
                //在配置中增加一个重启定时器字段,比如restart_time
                if (($restartTime = $this->options['restart_time'] ?? 0) > 0) {
                    $this->eventLoop->add(restartTime, EventInterface::EV_TIMER, function() use ($replyCode, $replyText) {
                         Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
                    });
                   return null;
                } else {
                    Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
                }
             }
         return $this;
     });

    如果你有空,你可以提交这个pr,如果暂时没空,等会儿我去提交个pr

  • chaz6chez 2023-11-22

    上面定时器写错了,用EV_TIMER_ONCE不要用EV_TIMER

  • mo 2023-11-22

    我自己实现的重连方案没并有使用 worker::stopAll, 我是希望不重启子进程。 我现在想重写Client ,去掉Woker::stopAll(), 之后出现这样的错误:
    Fatal error: Uncaught Bunny\Exception\ClientException: Broken pipe or closed connection. in /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/AbstractClient.php:311
    Stack trace:

    0 /home/vagrant/code/wanlay-scheduler/vendor/bunny/bunny/src/Bunny/Async/Client.php(300): Bunny\AbstractClient->read()

    1 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Events/Select.php(311): Bunny\Async\Client->onDataAvailable()

    2 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1635): Workerman\Events\Select->loop()

    3 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1426): Workerman\Worker::forkOneWorkerForLinux()

    4 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(1400): Workerman\Worker::forkWorkersForLinux()

    5 /home/vagrant/code/wanlay-scheduler/vendor/workerman/workerman/Worker.php(560): Workerman\Worker::forkWorkers()

    6 /home/vagrant/code/wanlay-scheduler/vendor/workerman/webman-framework/src/support/App.php(131): Workerman\Worker::runAll()

    7 /home/vagrant/code/wanlay-scheduler/start.php(4): support\App::run()

  • mo 2023-11-22
    return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
                if (!empty($this->channels)) {
                    throw new \LogicException("All channels have to be closed by now.");
                }
                if($replyCode !== 0){
                    return null;
                }
                return $this->connectionClose($replyCode, $replyText, 0, 0);
            })->then(function () use ($replyCode, $replyText){
                $this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
                $this->closeStream();
                $this->init();
                if($replyCode !== 0){
                    Log::error("RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
                }
                return $this;
            });
  • chaz6chez 2023-11-22

    不建议自行实现重启功能,本身客户端高度依赖eventloop,很多准备工作是在进程启动的时候做的,比如加入定时器、加入事件监听,如果自行实现重启,需要对这些工作都要重新再做一遍

  • chaz6chez 2023-11-22

    以上支持定时重启的特性,我已经提交PR了,等待walkor老大合并

  • chaz6chez 2023-11-22

    你上述方式这样写,仅仅只是关闭了流的监听关闭了心跳定时器移除了缓冲区,但是你重连的时候,你还需要把流的监听事件加上,并且为rabbitmq-client的连接进行重新建立,重新获取client-id等等

  • mo 2023-11-22

    谢谢大佬 !太给力了, 我最开始的思路是想,如果断开了连接,通过定时器去尝试连接。 连接成功后覆盖之前的连接对象。

    目前发送MQ仅仅是服务的一部分,直接重启我觉得影响了我其他的工作。

  • chaz6chez 2023-11-22

    谢谢大佬 !太给力了, 我最开始的思路是想,如果断开了连接,通过定时器去尝试连接。 连接成功后覆盖之前的> 连接对象。

    目前发送MQ仅仅是服务的一部分,直接重启我觉得影响了我其他的工作。

    那你可以参考这个client是如何进行连接的,然后在你的定时器里把连接工作做完,然后重连这里就按你自己写的方式写;当然我个人觉得重启当前进程是最快的做法,丝毫不会影响其他的工作。

    这个我再做一个特性吧,就在重连这个地方增加一个注册的回调事件,如果加入了注册回调事件,就执行回调事件,如果没有回调事件,就自动重启

  • walkor 2023-11-22

    感谢PR 已经合并发了版本

  • chaz6chez 2023-11-22

    @walkor 老大,根据需求,新增了用户自定义注册回调;我又调整了一个pr,已提交
    https://github.com/walkor/rabbitmq/pull/15

  • chaz6chez 2023-11-22

    @mo 你可以参考https://github.com/walkor/rabbitmq/pull/15的方式,在回调里面得到传入的client实例,然后对client实例进行处理销毁,重新new一个client代替你之前使用的client实例即可

  • mo 2023-11-22

    非常感谢!!!

🔝