Redis队列的消费者在每次启动webman时都会实例化加载,大佬们有办法避开吗?

Mr.Gong

问题描述

如题所示,为方便Redis队列使用,所以新增RedisQueue继承Consumer类,把推送和消费都放在了一起,具体消费类统一继承RedisQueue。但每次在进行webman启动时都会自动实例化消费类,导致一些消费类在需要使用参数时报错,RedisQueue类代码如下:

/**
 * @method $this delay(int $delay) 延迟执行时间
 * @method $this queue(string $queue) 队列名称
 * @method $this async(bool $async) 异步执行
 */
abstract class RedisQueue implements Consumer
{
    use Data;

    protected int $delay = 0;

    protected mixed $data;

    protected bool $async = false;

    public string $queue = 'default';

    public string $connection = 'default';

    public function __construct($data = [])
    {
        $this->data['consumer_params'] = $data;
    }

    /** ----- 生产者相关Start ----- */
    public static function dispatch($data = [])
    {
        return new static($data);
    }

    public function __destruct()
    {
        $this->data['request_id'] = variable()->get('request_id', generateSnowflakeId(8));
        $pushData                 = serialize($this->data);
        /** 异步投递 */
        if ($this->async) {
            Client::send($this->queue, $pushData, $this->delay);
            return;
        }

        /** 同步投递 */
        \Webman\RedisQueue\Redis::send($this->queue, $pushData, $this->delay);
    }
    /** ----- 生产者相关End ----- */

    /** ----- 消费者相关Start ----- */
    /**
     * 消费逻辑处置函数
     * @param $data
     */
    abstract protected function handle($data);

    /**
     * 事件处理
     * @param $data
     */
    abstract protected function event($data);

    public function consume($data)
    {
        var_dump($this->queue);
        $data      = unserialize($data);
        $requestId = $data['request_id'] ?? generateSnowflakeId(8);
        variable()->set('request_id', $requestId);
        unset($data['request_id']);
        $data = $data['consumer_params'] ?? [];
        try {
            $this->handle($data);
        } catch (\Throwable $e) {
            $message = sprintf('【Redis】队列 %s 消费失败 Error: %s', $this->queue, $e->getMessage());
            $this->log($message, array_merge(['data' => $data, 'trace' => $e->getTrace()]));
            return;
        }

        $this->event($data);
    }

    /**
     * 消费失败回调
     * @param \Throwable $e
     * @param $package
     * $package = [
     * 'id' => 1357277951, // 消息ID
     * 'time' => 1709170510, // 消息时间
     * 'delay' => 0, // 延迟时间
     * 'attempts' => 2, // 消费次数
     * 'queue' => 'send-mail', // 队列名
     * 'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // 消息内容
     * 'max_attempts' => 5, // 最大重试次数
     * 'error' => '错误信息' // 错误信息
     * ]
     */
    public abstract function onConsumeFailure(\Throwable $e, $package);

    /** ----- 消费者相关End ----- */

    protected function log($message, $extend = [])
    {
        $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 2);
        $class = $trace[0]['file'] ?? '';
        $line  = $trace[0]['line'] ?? '';

        $message = sprintf("%s Line:%s %s",
            $class,
            $line,
            $message);
        logWrite($message, $extend, Level::INFO, 'task');
    }
}
102 1 1
1个回答

Mr.Gong

处理了,不在析构函数里处理推送逻辑就行

  • 暂无评论
🔝