workbunny rabbitmq 延迟重试消费使用案列

mincon

最近在开发一个新的应用,有需要用到rabbitmq,workbunny大佬的插件很好用

记录一下如何使用重试机制,这对于消息消费失败很有用

按照文档需要开启延迟队列插件 workbunny/webman-rabbitmq

首先我创建了2个消费者

SendForgetPassWordEmailBuilder.php ===> 发送忘记密码邮箱
SendForgetPassWordEmailBuilderDelayed.php ===> 发送忘记密码邮箱(延迟队列)

下面直接贴代码:

# SendForgetPassWordEmailBuilder 发送忘记密码邮箱消费者
<?php declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Async\Client as BunnyClient;
use Bunny\Channel as BunnyChannel;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use Workbunny\WebmanRabbitMQ\Constants;
use function Workbunny\WebmanRabbitMQ\sync_publish;

class SendForgetPassWordEmailBuilder extends QueueBuilder
{
    /** @inheritdoc  */
    protected ?string $connection = 'rabbitmq';

    /**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */
    protected array $queueConfig = [
        // 队列名称 ,默认由类名自动生成
        'name'           => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder',
        // 是否延迟          
        'delayed'        => false,
        // QOS 数量
        'prefetch_count' => 0,
        // QOS size 
        'prefetch_size'  => 0,
        // QOS 全局
        'is_global'      => false,
        // 路由键
        'routing_key'    => '',
    ];

    /** @var string 交换机类型 */
    protected string $exchangeType = Constants::DIRECT;

    /** @var string|null 交换机名称,默认由类名自动生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder';

    /** @inheritDoc */
    public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string 
    {
        # 获取重试次数
        $retry = $message->getHeader('x-retry', 1);
        # 获取延迟时间
        $delay = $message->getHeader('x-delay', 10000);
        $delay = abs($delay);

        # 业务逻辑
        try {
            # 模拟真实业务耗时
            sleep(1);
        } catch (\Exception $e) {
            $headers = [ 'x-retry' => $retry, 'x-delay' => $delay * $retry];
            $sync_publish = sync_publish(SendForgetPassWordEmailBuilderDelayed::instance(), $message->content, headers: $headers);
            if ($sync_publish) {
                echo "发布到延迟队列成功\n";
            } else {
                echo "发布到延迟队列失败\n";
            }
        }
        return Constants::ACK;
    }
}

# SendForgetPassWordEmailBuilderDelayed.php 发送忘记密码邮箱(延迟队列)
<?php declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use function Workbunny\WebmanRabbitMQ\sync_publish;

class SendForgetPassWordEmailBuilderDelayed extends QueueBuilder
{
    /** @inheritdoc  */
    protected ?string $connection = 'rabbitmq';

    /**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */
    protected array $queueConfig = [
        // 队列名称 ,默认由类名自动生成
        'name'           => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilderDelayed',
        // 是否延迟          
        'delayed'        => true,
        // QOS 数量
        'prefetch_count' => 0,
        // QOS size 
        'prefetch_size'  => 0,
        // QOS 全局
        'is_global'      => false,
        // 路由键
        'routing_key'    => '',
    ];

    /** @var string 交换机类型 */
    protected string $exchangeType = Constants::DIRECT;

    /** @var string|null 交换机名称,默认由类名自动生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilderDelayed';

    /** @inheritDoc */
    public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string 
    {
        # 获取重试次数
        $retry = $message->getHeader('x-retry', 0);
        # 获取延迟时间(仅用于显式输入日志)
        $delay = $message->getHeader('x-delay', 0);
        $delay = abs($delay) / 1000;
        echo "重试次数: {$retry},延迟时间: {$delay} 秒\n";

        # 如重试次数大于等于3次,则不再重试
        if ($retry >= 3) {
            echo "消息重试次数已达上限,不再重试\n";
            return Constants::ACK;
        }

        # 设置新的重试次数头信息
        $headers = ['x-retry' => $retry + 1];
        # 重新发布消息到原队列
        $sync_publish = sync_publish(SendForgetPassWordEmailBuilder::instance(), $message->content, headers: $headers);
        if ($sync_publish) {
            echo "消息重新发布成功\n";
        } else {
            echo "消息重新发布失败\n";
        }
        return Constants::ACK;
    }
}

# 使用主队列重试消费 不使用延迟队列进行重试消费
<?php declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Async\Client as BunnyClient;
use Bunny\Channel as BunnyChannel;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use Workbunny\WebmanRabbitMQ\Constants;
use function Workbunny\WebmanRabbitMQ\sync_publish;

class SendForgetPassWordEmailBuilder extends QueueBuilder
{
    /** @inheritdoc  */
    protected ?string $connection = 'rabbitmq';

    /**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */
    protected array $queueConfig = [
        // 队列名称 ,默认由类名自动生成
        'name'           => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder',
        // 是否延迟
        'delayed'        => false,
        // QOS 数量
        'prefetch_count' => 0,
        // QOS size
        'prefetch_size'  => 0,
        // QOS 全局
        'is_global'      => false,
        // 路由键
        'routing_key'    => '',
    ];

    /** @var string 交换机类型 */
    protected string $exchangeType = Constants::DIRECT;

    /** @var string|null 交换机名称,默认由类名自动生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder';

    /** @inheritDoc */
    public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
    {
        # 获取重试次数
        $retry = $message->getHeader('x-retry', 0);
        # 重试次数不能超过3次
        if ($retry >= 3) {
            echo "消息重试次数已达上限,不再重试\n";
            return Constants::ACK;
        }

        # 业务逻辑
        try {
            # 模拟真实业务耗时
            sleep(1);
        } catch (\Exception $e) {
            $headers = [ 'x-retry' => $retry + 1];
            $channel->publish($message->content, $headers, $this->exchangeName)->then(
            function () {
                echo "重回当前队列成功\n";
            },
            function (Throwable $e) {
                echo "重回当前队列失败: " . $e->getMessage() . "\n";
            }
        );
        }
        return Constants::ACK;
    }
}

# 发布消息到队列
# 生成随机验证码
$validationCode = rand(100000, 999999);
$queueData = ['email' => $email, 'code' => $validationCode];
$queueData = json_encode($queueData);
$sync_publish = sync_publish(SendForgetPassWordEmailBuilder::instance(), $queueData);
if (!$sync_publish) {
    throw new ServiceException('发送忘记密码邮件失败,请稍后重试');
}

效果图:

以上是我的使用分享,如有大佬有更好的代码勿喷😊😊😊

85 0 0
0个评论

mincon

240
积分
0
获赞数
0
粉丝数
2022-08-18 加入
🔝