如题所示,为方便Redis队列使用,所以新增RedisQueue继承Consumer类,把推送和消费都放在了一起,具体消费类统一继承RedisQueue。但每次在进行webman启动时都会自动实例化消费类,导致一些消费类在需要使用参数时报错,RedisQueue类代码如下:
/**
* @method $this delay(int $delay) 延迟执行时间
* @method $this queue(string $queue) 队列名称
* @method $this async(bool $async) 异步执行
*/
abstract class RedisQueue implements Consumer
{
use Data;
protected int $delay = 0;
protected mixed $data;
protected bool $async = false;
public string $queue = 'default';
public string $connection = 'default';
public function __construct($data = [])
{
$this->data['consumer_params'] = $data;
}
/** ----- 生产者相关Start ----- */
public static function dispatch($data = [])
{
return new static($data);
}
public function __destruct()
{
$this->data['request_id'] = variable()->get('request_id', generateSnowflakeId(8));
$pushData = serialize($this->data);
/** 异步投递 */
if ($this->async) {
Client::send($this->queue, $pushData, $this->delay);
return;
}
/** 同步投递 */
\Webman\RedisQueue\Redis::send($this->queue, $pushData, $this->delay);
}
/** ----- 生产者相关End ----- */
/** ----- 消费者相关Start ----- */
/**
* 消费逻辑处置函数
* @param $data
*/
abstract protected function handle($data);
/**
* 事件处理
* @param $data
*/
abstract protected function event($data);
public function consume($data)
{
var_dump($this->queue);
$data = unserialize($data);
$requestId = $data['request_id'] ?? generateSnowflakeId(8);
variable()->set('request_id', $requestId);
unset($data['request_id']);
$data = $data['consumer_params'] ?? [];
try {
$this->handle($data);
} catch (\Throwable $e) {
$message = sprintf('【Redis】队列 %s 消费失败 Error: %s', $this->queue, $e->getMessage());
$this->log($message, array_merge(['data' => $data, 'trace' => $e->getTrace()]));
return;
}
$this->event($data);
}
/**
* 消费失败回调
* @param \Throwable $e
* @param $package
* $package = [
* 'id' => 1357277951, // 消息ID
* 'time' => 1709170510, // 消息时间
* 'delay' => 0, // 延迟时间
* 'attempts' => 2, // 消费次数
* 'queue' => 'send-mail', // 队列名
* 'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // 消息内容
* 'max_attempts' => 5, // 最大重试次数
* 'error' => '错误信息' // 错误信息
* ]
*/
public abstract function onConsumeFailure(\Throwable $e, $package);
/** ----- 消费者相关End ----- */
protected function log($message, $extend = [])
{
$trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 2);
$class = $trace[0]['file'] ?? '';
$line = $trace[0]['line'] ?? '';
$message = sprintf("%s Line:%s %s",
$class,
$line,
$message);
logWrite($message, $extend, Level::INFO, 'task');
}
}
处理了,不在析构函数里处理推送逻辑就行