看subscribe($queue, callable $callback)代码只有两个参数传递
看了下workerman/redis-queue包中的代码视乎没有这种操作,当前版本 "workerman/workerman": "v4.1.15", "workerman/redis-queue": "v1.2.0", "workerman/crontab": "v1.0.6",
设置消费者数量
https://www.workerman.net/doc/webman/queue/redis.html#%E4%B8%BA%E4%B8%8D%E5%90%8C%E7%9A%84%E9%98%9F%E5%88%97%E8%AE%BE%E7%BD%AE%E4%B8%8D%E5%90%8C%E7%9A%84%E6%B6%88%E8%B4%B9%E8%BF%9B%E7%A8%8B
没看懂,Webman\RedisQueue\Process\Consumer::class这个是干嘛的
subscribe($queue, callable $callback) 中的$queue对应的是redis_consumer_fast吗
我的代码是这样的
$worker = new Worker(); $worker->name = 'queue_work'; // $worker->count = 3;
$worker->onWorkerStart = function () use ($output) { $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Starting...'); // 定时,每10秒一次 Timer::add(30, function () use ($output) { (new SysSchedule())->select(); }); $redis_option = [ 'connect_timeout' => 10, 'max_attempts' => 3, 'retry_seconds' => 5, 'prefix' => env('redis.redis_key', 'geekeer_queue_'),//缓存key前缀 ]; if (!empty(env('redis.redis_password'))) { $redis_option['auth'] = env('redis.redis_password'); } $redis_option['db'] = env('redis.select'); $client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option); sleep(1); $queue_list = $this->getAllQueue(); foreach ($queue_list as $queue_class_name) { $queue_class_name = str_replace('.php', '', $queue_class_name); // 订阅 $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) { $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processing:' . $queue_class_name); try { $class_name = '\\' . $queue_class_name; $class = new $class_name(); $class->fire($data); } catch (\Throwable $e) { $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' error:' . $queue_class_name); $catch_id = catch_log($e, name: '队列失败日志'); $fail_reason = mb_substr($e->getMessage() ?? '', 0, 200, 'UTF-8'); JobsFailed::create([ 'queue' => $queue_class_name, 'data' => $data, 'fail_time' => time(), 'fail_reason' => $fail_reason . ',失败日志:${' . $catch_id . '}', ]); } }); } // 消费失败触发的回调(可选) $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) { $output->writeln('[queue]队列 ' . $package['queue'] . ' 消费失败,' . $exception->getMessage()); }); $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Started.'); }; Worker::runAll();
没有使用官方默认的启动方式,投递和消费是解耦的两个程序
该不会只要开启$worker->count = x就可以了吧
看了下workerman/redis-queue包中的代码视乎没有这种操作,当前版本
"workerman/workerman": "v4.1.15",
"workerman/redis-queue": "v1.2.0",
"workerman/crontab": "v1.0.6",
设置消费者数量
https://www.workerman.net/doc/webman/queue/redis.html#%E4%B8%BA%E4%B8%8D%E5%90%8C%E7%9A%84%E9%98%9F%E5%88%97%E8%AE%BE%E7%BD%AE%E4%B8%8D%E5%90%8C%E7%9A%84%E6%B6%88%E8%B4%B9%E8%BF%9B%E7%A8%8B
没看懂,Webman\RedisQueue\Process\Consumer::class这个是干嘛的
subscribe($queue, callable $callback) 中的$queue对应的是redis_consumer_fast吗
我的代码是这样的
$worker = new Worker();
$worker->name = 'queue_work';
// $worker->count = 3;
没有使用官方默认的启动方式,投递和消费是解耦的两个程序
该不会只要开启$worker->count = x就可以了吧