最近在开发一个新的应用,有需要用到rabbitmq,workbunny大佬的插件很好用
记录一下如何使用重试机制,这对于消息消费失败很有用
按照文档需要开启延迟队列插件 workbunny/webman-rabbitmq
首先我创建了2个消费者
SendForgetPassWordEmailBuilder.php ===> 发送忘记密码邮箱
SendForgetPassWordEmailBuilderDelayed.php ===> 发送忘记密码邮箱(延迟队列)
下面直接贴代码:
# SendForgetPassWordEmailBuilder 发送忘记密码邮箱消费者
<?php declare(strict_types=1);
namespace process\workbunny\rabbitmq;
use Bunny\Async\Client as BunnyClient;
use Bunny\Channel as BunnyChannel;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use Workbunny\WebmanRabbitMQ\Constants;
use function Workbunny\WebmanRabbitMQ\sync_publish;
class SendForgetPassWordEmailBuilder extends QueueBuilder
{
/** @inheritdoc */
protected ?string $connection = 'rabbitmq';
/**
* @var array = [
* 'name' => 'example',
* 'delayed' => false,
* 'prefetch_count' => 1,
* 'prefetch_size' => 0,
* 'is_global' => false,
* 'routing_key' => '',
* ]
*/
protected array $queueConfig = [
// 队列名称 ,默认由类名自动生成
'name' => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder',
// 是否延迟
'delayed' => false,
// QOS 数量
'prefetch_count' => 0,
// QOS size
'prefetch_size' => 0,
// QOS 全局
'is_global' => false,
// 路由键
'routing_key' => '',
];
/** @var string 交换机类型 */
protected string $exchangeType = Constants::DIRECT;
/** @var string|null 交换机名称,默认由类名自动生成 */
protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder';
/** @inheritDoc */
public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
{
# 获取重试次数
$retry = $message->getHeader('x-retry', 1);
# 获取延迟时间
$delay = $message->getHeader('x-delay', 10000);
$delay = abs($delay);
# 业务逻辑
try {
# 模拟真实业务耗时
sleep(1);
} catch (\Exception $e) {
$headers = [ 'x-retry' => $retry, 'x-delay' => $delay * $retry];
$sync_publish = sync_publish(SendForgetPassWordEmailBuilderDelayed::instance(), $message->content, headers: $headers);
if ($sync_publish) {
echo "发布到延迟队列成功\n";
} else {
echo "发布到延迟队列失败\n";
}
}
return Constants::ACK;
}
}
# SendForgetPassWordEmailBuilderDelayed.php 发送忘记密码邮箱(延迟队列)
<?php declare(strict_types=1);
namespace process\workbunny\rabbitmq;
use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use function Workbunny\WebmanRabbitMQ\sync_publish;
class SendForgetPassWordEmailBuilderDelayed extends QueueBuilder
{
/** @inheritdoc */
protected ?string $connection = 'rabbitmq';
/**
* @var array = [
* 'name' => 'example',
* 'delayed' => false,
* 'prefetch_count' => 1,
* 'prefetch_size' => 0,
* 'is_global' => false,
* 'routing_key' => '',
* ]
*/
protected array $queueConfig = [
// 队列名称 ,默认由类名自动生成
'name' => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilderDelayed',
// 是否延迟
'delayed' => true,
// QOS 数量
'prefetch_count' => 0,
// QOS size
'prefetch_size' => 0,
// QOS 全局
'is_global' => false,
// 路由键
'routing_key' => '',
];
/** @var string 交换机类型 */
protected string $exchangeType = Constants::DIRECT;
/** @var string|null 交换机名称,默认由类名自动生成 */
protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilderDelayed';
/** @inheritDoc */
public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
{
# 获取重试次数
$retry = $message->getHeader('x-retry', 0);
# 获取延迟时间(仅用于显式输入日志)
$delay = $message->getHeader('x-delay', 0);
$delay = abs($delay) / 1000;
echo "重试次数: {$retry},延迟时间: {$delay} 秒\n";
# 如重试次数大于等于3次,则不再重试
if ($retry >= 3) {
echo "消息重试次数已达上限,不再重试\n";
return Constants::ACK;
}
# 设置新的重试次数头信息
$headers = ['x-retry' => $retry + 1];
# 重新发布消息到原队列
$sync_publish = sync_publish(SendForgetPassWordEmailBuilder::instance(), $message->content, headers: $headers);
if ($sync_publish) {
echo "消息重新发布成功\n";
} else {
echo "消息重新发布失败\n";
}
return Constants::ACK;
}
}
# 使用主队列重试消费 不使用延迟队列进行重试消费
<?php declare(strict_types=1);
namespace process\workbunny\rabbitmq;
use Bunny\Async\Client as BunnyClient;
use Bunny\Channel as BunnyChannel;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;
use Workbunny\WebmanRabbitMQ\Constants;
use function Workbunny\WebmanRabbitMQ\sync_publish;
class SendForgetPassWordEmailBuilder extends QueueBuilder
{
/** @inheritdoc */
protected ?string $connection = 'rabbitmq';
/**
* @var array = [
* 'name' => 'example',
* 'delayed' => false,
* 'prefetch_count' => 1,
* 'prefetch_size' => 0,
* 'is_global' => false,
* 'routing_key' => '',
* ]
*/
protected array $queueConfig = [
// 队列名称 ,默认由类名自动生成
'name' => 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder',
// 是否延迟
'delayed' => false,
// QOS 数量
'prefetch_count' => 0,
// QOS size
'prefetch_size' => 0,
// QOS 全局
'is_global' => false,
// 路由键
'routing_key' => '',
];
/** @var string 交换机类型 */
protected string $exchangeType = Constants::DIRECT;
/** @var string|null 交换机名称,默认由类名自动生成 */
protected ?string $exchangeName = 'process.workbunny.rabbitmq.SendForgetPassWordEmailBuilder';
/** @inheritDoc */
public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string
{
# 获取重试次数
$retry = $message->getHeader('x-retry', 0);
# 重试次数不能超过3次
if ($retry >= 3) {
echo "消息重试次数已达上限,不再重试\n";
return Constants::ACK;
}
# 业务逻辑
try {
# 模拟真实业务耗时
sleep(1);
} catch (\Exception $e) {
$headers = [ 'x-retry' => $retry + 1];
$channel->publish($message->content, $headers, $this->exchangeName)->then(
function () {
echo "重回当前队列成功\n";
},
function (Throwable $e) {
echo "重回当前队列失败: " . $e->getMessage() . "\n";
}
);
}
return Constants::ACK;
}
}
# 发布消息到队列
# 生成随机验证码
$validationCode = rand(100000, 999999);
$queueData = ['email' => $email, 'code' => $validationCode];
$queueData = json_encode($queueData);
$sync_publish = sync_publish(SendForgetPassWordEmailBuilder::instance(), $queueData);
if (!$sync_publish) {
throw new ServiceException('发送忘记密码邮件失败,请稍后重试');
}
效果图:

以上是我的使用分享,如有大佬有更好的代码勿喷😊😊😊