这个报错不是一直报错,有的时候重启消息队列会报,但是重新运行后又正常了
/**
* 执行任务
* @return void
*/
protected function execute(Input $input, Output $output)
{
$this->resetCli($input, $output);
//计划任务
Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
$worker = new Worker();
$worker->name = 'schedule_work';
$worker->count = 1;
// 设置时区,避免运行结果与预期不一致
date_default_timezone_set('PRC');
$worker->onWorkerStart = function () use ($output) {
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Schedule Starting...');
// // 每分钟的第1秒执行.用于计划任务是否仍在执行
new Crontab('*/10 * * * * *', function () {
$file = root_path('runtime') . '.schedule';
file_put_contents($file, time());
});
$core_schedule_service = new CoreScheduleService();
//查询所有的计划任务
$task_list = $core_schedule_service->getList(['status' => ScheduleDict::ON]);
foreach ($task_list as $item) {
//获取定时任务时间字符串
new Crontab($this->getCrontab($item['time']), function () use ($core_schedule_service, $item, $output) {
if (!empty($item['class'])) {
$core_schedule_service->execute($item, $output);
}
});
}
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Schedule Started.');
};
//消息队列
Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
Worker::$logFile = runtime_path() . 'workerman.log';
$worker = new Worker();
$worker->name = 'queue_work';
// $worker->count = 3;
$worker->onWorkerStart = function () use ($output) {
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Starting...');
// 定时,每10秒一次
Timer::add(30, function () use ($output) {
(new SysSchedule())->select();
});
$redis_option = [
'connect_timeout' => 10,
'max_attempts' => 3,
'retry_seconds' => 5,
'prefix' => md5(root_path()),//缓存key前缀
];
if (!empty(env('redis.redis_password'))) {
$redis_option['auth'] = env('redis.redis_password');
}
$redis_option['db'] = env('redis.queue_select');
$client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
$queue_list = $this->getAllQueue();
foreach ($queue_list as $queue_class_name) {
$queue_class_name = str_replace('.php', '', $queue_class_name);
// 订阅
$client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
$output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processing:' . $queue_class_name);
try {
$class_name = '\\' . $queue_class_name;
$class = new $class_name();
$class->fire($data);
} catch (\Throwable $e) {
Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine(), 'warning');
}
$output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processed:' . $queue_class_name);
});
}
// 消费失败触发的回调(可选)
$client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
$output->writeln('[queue]队列 ' . $package['queue'] . ' 消费失败,' . $exception->getMessage());
});
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Started.');
};
Worker::runAll();
}
RedisException: read error on connection to 127.0.0.1:6379 in r/vendor/topthink/framework/src/think/cache/driver/Redis.php:110
Stack trace:
#0 r/vendor/topthink/framework/src/think/cache/driver/Redis.php(110): Redis->get()
#1 r/vendor/topthink/think-orm/src/db/PDOConnection.php(372): think\cache\driver\Redis->get()
#2 r/vendor/topthink/think-orm/src/db/PDOConnection.php(424): think\db\PDOConnection->getSchemaInfo()
#3 r/vendor/topthink/think-orm/src/db/PDOConnection.php(513): think\db\PDOConnection->getTableInfo()
#4 r/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php(60): think\db\PDOConnection->getFieldsType()
#5 r/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php(72): think\db\Query->getFieldsType()
#6 [internal function]: think\db\Query->getFieldType()
#7 r/vendor/topthink/think-orm/src/Model.php(1091): call_user_func_array()
#8 r/vendor/topthink/think-orm/src/model/concern/TimeStamp.php(82): think\Model->__call()
#9 r/vendor/topthink/think-orm/src/model/concern/TimeStamp.php(62): think\Model->checkTimeFieldType()
#10 r/vendor/topthink/framework/src/think/service/ModelService.php(35): think\Model->isAutoWriteTimestamp()
#11 [internal function]: think\service\ModelService->think\service\{closure}()
#12 r/vendor/topthink/think-orm/src/Model.php(252): call_user_func()
#13 r/app/service/core/addon/CoreAddonService.php(22): think\Model->__construct()
#14 r/app/command/workerman/Workerman.php(180): app\service\core\addon\CoreAddonService->__construct()
#15 r/app/command/workerman/Workerman.php(96): app\command\workerman\Workerman->getAllQueue()
#16 r/vendor/workerman/workerman/Worker.php(2497): app\command\workerman\Workerman->app\command\workerman\{closure}()
#17 r/vendor/workerman/workerman/Worker.php(1636): Workerman\Worker->run()
#18 r/vendor/workerman/workerman/Worker.php(1429): Workerman\Worker::forkOneWorkerForLinux()
#19 r/vendor/workerman/workerman/Worker.php(1403): Workerman\Worker::forkWorkersForLinux()
#20 r/vendor/workerman/workerman/Worker.php(560): Workerman\Worker::forkWorkers()
#21 r/app/command/workerman/Workerman.php(119): Workerman\Worker::runAll()
#22 r/vendor/topthink/framework/src/think/console/Command.php(210): app\command\workerman\Workerman->execute()
#23 r/vendor/topthink/framework/src/think/Console.php(649): think\console\Command->run()
#24 r/vendor/topthink/framework/src/think/Console.php(308): think\Console->doRunCommand()
#25 r/vendor/topthink/framework/src/think/Console.php(245): think\Console->doRun()
#26 r/think(10): think\Console->run()
#27 {main}
workerman启动时会fork进程,如果fork前业务有初始化数据库或redis等连接资源,fork后子进程会继承这些资源,会导致A进程关闭数据或redis等连接,影响其它进程,导致类似错误。
不要在onWorkerStart方法运行前初始化化数据库或redis等连接资源就好了,或者初始化后在onWorkerStart关闭他们,重新初始化。
没太明白你后面段落的意思,我是将weorkman停止后运行的,代码中的redis初始化是在onWorkerStart内的,也就是onWorkerStart后,好像符合你说的修改建议,但是缺是报错的
你可以这样理解,启动workerman前运行了tp的代码,里面可能有数据库或者redis等操作导致的。不要在workerman启动前操作数据库或redis。
好的谢谢,弄明白了