webman队列 redis-queue_{redis-queue}-failed 出现空值 [已解决]

ZQZ

问题描述

webman队列 redis-queue_{redis-queue}-failed 出现空值 ,队列未出现消费报错及异常
在Workerman\RedisQueue\Client.php 里面增加了日志便于抓取 redis-queue_{redis-queue}-failed 出现空值,但是队列名:iots_group_task,采用的同步模式,此时段并未发送队列,如果发送队列会记录日志,无对应空值的日志。
现在疑惑的是此空值,如何产生的?
确实找不到原因?

另:消费iots_group_task队列 没有任何报错,包括日志记录也是一样,没有与之对应错误。每次消费,均有日志,但是没有与此failed时 空对应的记录

队列 iots_group_task 使用场景, 是在gateway-worker event onMessage,收到消息使用队列。

程序代码

/**
     * pull.
     */
    public function pull()
    {
        $this->tryToPullDelayQueue();
        if (!$this->_subscribeQueues || $this->_redisSubscribe->brPoping) {
            return;
        }
        $cb = function ($data) use (&$cb) {
            if ($data) {
                $this->_redisSubscribe->brPoping = 0;
                $redis_key = $data[0];
                $package_str = $data[1];
                $package = json_decode($package_str, true);
                if (!$package) {
                    $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str);

                    // 临时队列标记错误,解决空值 失败队列抓去
                    Base::zLog([
                        'number' => '02-pull()',
                        'package_str' => $package_str,
                        'package_str_type' => gettype($data),
                        'package_str_length' => strlen($package_str),
                        'package_str_hex' => bin2hex($package_str),  // 查看实际字节
                        'redis_key' => $redis_key ?? 'unknown',
                        'data_raw' => \json_encode($data, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
                    ], '_Workerman_RedisQueue_Client_error');

                } else {
                    if (!isset($this->_subscribeQueues[$redis_key])) {
                        // 取消订阅,放回队列
                        $this->_redisSend->rPush($redis_key, $package_str);
                    } else {
                        $callback = $this->_subscribeQueues[$redis_key];
                        try {
                            \call_user_func($callback, $package['data']);
                        } catch (UnretryableException $e) {
                            $this->log((string)$e);
                            $package['max_attempts'] = $this->_options['max_attempts'];
                            $package['error'] = $e->getMessage();
                            $this->fail($package);
                        } catch (\Throwable $e) {
                            $this->log((string)$e);
                            $package['max_attempts'] = $this->_options['max_attempts'];
                            $package['error'] = $e->getMessage();
                            $package_modified = null;
                            if ($this->_consumeFailure) {
                                try {
                                    $package_modified = \call_user_func($this->_consumeFailure, $e, $package);
                                } catch (\Throwable $ta) {
                                    $this->log((string)$ta);
                                }
                            }
                            if (is_array($package_modified)) {
                                $package['data'] = $package_modified['data'] ?? $package['data'];
                                $package['attempts'] = $package_modified['attempts'] ?? $package['attempts'];
                                $package['max_attempts'] = $package_modified['max_attempts'] ?? $package['max_attempts'];
                                $package['error'] = $package_modified['error'] ?? $package['error'];
                            }
                            if (++$package['attempts'] > $package['max_attempts']) {
                                $this->fail($package);
                            } else {
                                $this->retry($package);
                            }
                        }
                    }
                }
            }
            if ($this->_subscribeQueues) {
                $this->_redisSubscribe->brPoping = 1;
                Timer::add(0.000001, [$this->_redisSubscribe, 'brPop'], [\array_keys($this->_subscribeQueues), 1, $cb], false);
            }
        };
        $this->_redisSubscribe->brPoping = 1;
        $this->_redisSubscribe->brPop(\array_keys($this->_subscribeQueues), 1, $cb);
    }

/**
     * tryToPullDelayQueue.
     */
    protected function tryToPullDelayQueue()
    {
        if ($this->_retryTimerId) {
            return;
        }
        $this->_retryTimerId = Timer::add(1, function () {
            $now = time();
            $options = ['LIMIT', 0, 128];
            $this->_redisSend->zrevrangebyscore($this->_options['prefix'] . static::QUEUE_DELAYED, $now, '-inf', $options, function ($items) {
                if ($items === false) {
                    throw new RuntimeException($this->_redisSend->error());
                }
                foreach ($items as $package_str) {
                    $this->_redisSend->zRem($this->_options['prefix'] . static::QUEUE_DELAYED, $package_str, function ($result) use ($package_str) {
                        if ($result !== 1) {
                            return;
                        }
                        $package = \json_decode($package_str, true);
                        if (!$package) {
                            $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, $package_str);

                            // 临时队列标记错误
                            Base::zLog([
                                'number' => '01-tryToPullDelayQueue()',
                                'value' => $package_str,
                            ], '_Workerman_RedisQueue_Client_error');

                            return;
                        }
                        $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_WAITING . $package['queue'], $package_str);
                    });
                }
            });
        });
    }

    /**
     * @param $package
     */
    protected function fail($package)
    {
        $this->_redisSend->lPush($this->_options['prefix'] . static::QUEUE_FAILED, \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT));

        // 临时队列标记错误
        Base::zLog([
            'number' => '03-fail()',
            'value' => \json_encode($package, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
        ], '_Workerman_RedisQueue_Client_error');
    }

报错信息

日志明细如下:
LOG[2026-04-26 18:01:33.158] {"number":"02-pull()","package_str":"","package_str_type":"array","package_str_length":0,"package_str_hex":"","redis_key":"redis-queue_{redis-queue}-waitingiots_group_task","data_raw":"[\n    \"redis-queue_{redis-queue}-waitingiots_group_task\",\n    \"\"\n]"}
LOG[2026-04-26 18:01:34.035] {"number":"01-tryToPullDelayQueue()","value":""}

LOG[2026-04-26 18:01:34.891] {"number":"02-pull()","package_str":"","package_str_type":"array","package_str_length":0,"package_str_hex":"","redis_key":"redis-queue_{redis-queue}-waitingiots_group_task","data_raw":"[\n    \"redis-queue_{redis-queue}-waitingiots_group_task\",\n    \"\"\n]"}
LOG[2026-04-26 18:01:35.034] {"number":"01-tryToPullDelayQueue()","value":""}

LOG[2026-04-26 18:01:39.731] {"number":"02-pull()","package_str":"","package_str_type":"array","package_str_length":0,"package_str_hex":"","redis_key":"redis-queue_{redis-queue}-waitingiots_group_task","data_raw":"[\n    \"redis-queue_{redis-queue}-waitingiots_group_task\",\n    \"\"\n]"}
LOG[2026-04-26 18:01:40.034] {"number":"01-tryToPullDelayQueue()","value":""}

LOG[2026-04-26 18:01:43.216] {"number":"02-pull()","package_str":"","package_str_type":"array","package_str_length":0,"package_str_hex":"","redis_key":"redis-queue_{redis-queue}-waitingiots_group_task","data_raw":"[\n    \"redis-queue_{redis-queue}-waitingiots_group_task\",\n    \"\"\n]"}
LOG[2026-04-26 18:01:44.034] {"number":"01-tryToPullDelayQueue()","value":""}

操作系统及workerman/webman等框架组件具体版本

webman/cache 2.1.3
webman/captcha 1.0.5 Captcha generator
webman/channel 2.1.0
webman/console 2.2.1 Webman console
webman/database 2.1.9 Webman database
webman/domain 2.1.0
webman/event 1.0.5 Webman event plugin
webman/gateway-worker 1.0.9
webman/log 2.1.4 Webman plugin webman/log
webman/push 1.1.1
webman/rate-limiter 1.1.8 Webman plugin webman/rate-limiter
webman/redis 2.1.3 Webman redis
webman/redis-queue 2.1.1 Redis message queue plugin for webman.
workerman/channel 1.2.2
workerman/coroutine 1.1.4 Workerman coroutine
workerman/crontab 1.0.7 A crontab written in PHP based on workerman
workerman/gateway-worker 4.0.1
workerman/gatewayclient 3.1.1
workerman/http-client 3.0.6
workerman/mqtt 2.2
workerman/psr7 2.0.2 PSR-7 message implementation that also provides common utility methods
workerman/redis 2.0.5
workerman/redis-queue 1.2.2 Message queue system written in PHP based on workerman and backed by Redis.
workerman/webman-framework 2.2.0 High performance HTTP Service Framework.
workerman/workerman 5.1.9 An asynchronous event driven PHP framework for easily building fast, scalable network applications.

还望哪位高手,有偿解惑下!

87 0 0
0个回答

🔝