如何操作redis消息队列subscribe中针对queue设置并行的最大值

琦森

看subscribe($queue, callable $callback)代码只有两个参数传递

49 1 0
1个回答

琦森

看了下workerman/redis-queue包中的代码视乎没有这种操作,当前版本
"workerman/workerman": "v4.1.15",
"workerman/redis-queue": "v1.2.0",
"workerman/crontab": "v1.0.6",

  • nitron 1天前

    设置消费者数量

  • 琦森 1天前

    没看懂,Webman\RedisQueue\Process\Consumer::class这个是干嘛的

  • 琦森 1天前

    subscribe($queue, callable $callback) 中的$queue对应的是redis_consumer_fast吗

  • 琦森 1天前

    我的代码是这样的

    $worker = new Worker();
    $worker->name = 'queue_work';
    // $worker->count = 3;

        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Starting...');
            // 定时,每10秒一次
            Timer::add(30, function () use ($output) {
                (new SysSchedule())->select();
            });
            $redis_option = [
                'connect_timeout' => 10,
                'max_attempts'    => 3,
                'retry_seconds'   => 5,
                'prefix'          => env('redis.redis_key', 'geekeer_queue_'),//缓存key前缀
            ];
            if (!empty(env('redis.redis_password'))) {
                $redis_option['auth'] = env('redis.redis_password');
            }
            $redis_option['db'] = env('redis.select');
            $client             = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
            sleep(1);
            $queue_list         = $this->getAllQueue();
    
            foreach ($queue_list as $queue_class_name) {
                $queue_class_name = str_replace('.php', '', $queue_class_name);
                // 订阅
                $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processing:' . $queue_class_name);
                    try {
                        $class_name = '\\' . $queue_class_name;
                        $class      = new  $class_name();
                        $class->fire($data);
                    } catch (\Throwable $e) {
                        $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' error:' . $queue_class_name);
                        $catch_id    = catch_log($e, name: '队列失败日志');
                        $fail_reason = mb_substr($e->getMessage() ?? '', 0, 200, 'UTF-8');
                        JobsFailed::create([
                            'queue'       => $queue_class_name,
                            'data'        => $data,
                            'fail_time'   => time(),
                            'fail_reason' => $fail_reason . ',失败日志:${' . $catch_id . '}',
                        ]);
                    }
                });
            }
            // 消费失败触发的回调(可选)
            $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
                $output->writeln('[queue]队列 ' . $package['queue'] . ' 消费失败,' . $exception->getMessage());
            });
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Started.');
        };
        Worker::runAll();
  • 琦森 1天前

    没有使用官方默认的启动方式,投递和消费是解耦的两个程序

  • 琦森 1天前

    该不会只要开启$worker->count = x就可以了吧

🔝