workman使用消息队列时候,redis的db选择非0报错

琦森

问题描述

这个报错不是一直报错,有的时候重启消息队列会报,但是重新运行后又正常了

程序代码


    /**
     * 执行任务
     * @return void
     */
    protected function execute(Input $input, Output $output)
    {
        $this->resetCli($input, $output);
        //计划任务
        Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
        $worker          = new Worker();
        $worker->name    = 'schedule_work';
        $worker->count   = 1;

        // 设置时区,避免运行结果与预期不一致
        date_default_timezone_set('PRC');
        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Schedule Starting...');
//            // 每分钟的第1秒执行.用于计划任务是否仍在执行
            new Crontab('*/10 * * * * *', function () {
                $file = root_path('runtime') . '.schedule';
                file_put_contents($file, time());
            });
            $core_schedule_service = new CoreScheduleService();
            //查询所有的计划任务
            $task_list = $core_schedule_service->getList(['status' => ScheduleDict::ON]);

            foreach ($task_list as $item) {
                //获取定时任务时间字符串
                new Crontab($this->getCrontab($item['time']), function () use ($core_schedule_service, $item, $output) {
                    if (!empty($item['class'])) {
                        $core_schedule_service->execute($item, $output);
                    }
                });
            }
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Schedule Started.');
        };

        //消息队列
        Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
        Worker::$logFile = runtime_path() . 'workerman.log';
        $worker          = new Worker();
        $worker->name    = 'queue_work';
//        $worker->count = 3;

        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Starting...');
            // 定时,每10秒一次
            Timer::add(30, function () use ($output) {
                (new SysSchedule())->select();
            });
            $redis_option = [
                'connect_timeout' => 10,
                'max_attempts'    => 3,
                'retry_seconds'   => 5,
                'prefix'          => md5(root_path()),//缓存key前缀
            ];
            if (!empty(env('redis.redis_password'))) {
                $redis_option['auth'] = env('redis.redis_password');
            }
            $redis_option['db'] = env('redis.queue_select');

            $client     = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
            $queue_list = $this->getAllQueue();

            foreach ($queue_list as $queue_class_name) {
                $queue_class_name = str_replace('.php', '', $queue_class_name);
                // 订阅
                $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processing:' . $queue_class_name);
                    try {
                        $class_name = '\\' . $queue_class_name;
                        $class      = new  $class_name();
                        $class->fire($data);
                    } catch (\Throwable $e) {
                        Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine(), 'warning');
                    }
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processed:' . $queue_class_name);
                });
            }
            // 消费失败触发的回调(可选)
            $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
                $output->writeln('[queue]队列 ' . $package['queue'] . ' 消费失败,' . $exception->getMessage());
            });
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Started.');
        };
        Worker::runAll();
    }

报错信息

RedisException: read error on connection to 127.0.0.1:6379 in r/vendor/topthink/framework/src/think/cache/driver/Redis.php:110
Stack trace:
#0 r/vendor/topthink/framework/src/think/cache/driver/Redis.php(110): Redis->get()
#1 r/vendor/topthink/think-orm/src/db/PDOConnection.php(372): think\cache\driver\Redis->get()
#2 r/vendor/topthink/think-orm/src/db/PDOConnection.php(424): think\db\PDOConnection->getSchemaInfo()
#3 r/vendor/topthink/think-orm/src/db/PDOConnection.php(513): think\db\PDOConnection->getTableInfo()
#4 r/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php(60): think\db\PDOConnection->getFieldsType()
#5 r/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php(72): think\db\Query->getFieldsType()
#6 [internal function]: think\db\Query->getFieldType()
#7 r/vendor/topthink/think-orm/src/Model.php(1091): call_user_func_array()
#8 r/vendor/topthink/think-orm/src/model/concern/TimeStamp.php(82): think\Model->__call()
#9 r/vendor/topthink/think-orm/src/model/concern/TimeStamp.php(62): think\Model->checkTimeFieldType()
#10 r/vendor/topthink/framework/src/think/service/ModelService.php(35): think\Model->isAutoWriteTimestamp()
#11 [internal function]: think\service\ModelService->think\service\{closure}()
#12 r/vendor/topthink/think-orm/src/Model.php(252): call_user_func()
#13 r/app/service/core/addon/CoreAddonService.php(22): think\Model->__construct()
#14 r/app/command/workerman/Workerman.php(180): app\service\core\addon\CoreAddonService->__construct()
#15 r/app/command/workerman/Workerman.php(96): app\command\workerman\Workerman->getAllQueue()
#16 r/vendor/workerman/workerman/Worker.php(2497): app\command\workerman\Workerman->app\command\workerman\{closure}()
#17 r/vendor/workerman/workerman/Worker.php(1636): Workerman\Worker->run()
#18 r/vendor/workerman/workerman/Worker.php(1429): Workerman\Worker::forkOneWorkerForLinux()
#19 r/vendor/workerman/workerman/Worker.php(1403): Workerman\Worker::forkWorkersForLinux()
#20 r/vendor/workerman/workerman/Worker.php(560): Workerman\Worker::forkWorkers()
#21 r/app/command/workerman/Workerman.php(119): Workerman\Worker::runAll()
#22 r/vendor/topthink/framework/src/think/console/Command.php(210): app\command\workerman\Workerman->execute()
#23 r/vendor/topthink/framework/src/think/Console.php(649): think\console\Command->run()
#24 r/vendor/topthink/framework/src/think/Console.php(308): think\Console->doRunCommand()
#25 r/vendor/topthink/framework/src/think/Console.php(245): think\Console->doRun()
#26 r/think(10): think\Console->run()
#27 {main}
175 1 0
1个回答

workerman启动时会fork进程,如果fork前业务有初始化数据库或redis等连接资源,fork后子进程会继承这些资源,会导致A进程关闭数据或redis等连接,影响其它进程,导致类似错误。

不要在onWorkerStart方法运行前初始化化数据库或redis等连接资源就好了,或者初始化后在onWorkerStart关闭他们,重新初始化。

  • 琦森 3天前

    没太明白你后面段落的意思,我是将weorkman停止后运行的,代码中的redis初始化是在onWorkerStart内的,也就是onWorkerStart后,好像符合你说的修改建议,但是缺是报错的

  • walkor 3天前

    你可以这样理解,启动workerman前运行了tp的代码,里面可能有数据库或者redis等操作导致的。不要在workerman启动前操作数据库或redis。

  • 琦森 3天前

    好的谢谢,弄明白了

🔝