关于redis-queue同时消费多个任务,不能及时消费。

雪饮

问题描述

关于redis-queue同时消费多个任务,目前发现向队列里面推送ffmpeg转码任务,不能及时消费。
我有两个丁订阅者,user1和user2,我分别推两个ffmpeg转码任务(转码比较慢,大概1小时能完成),结果我推给user1收到了,但是user2没有反应。
我怀疑是只能等待user1处理结束吧。能不能配置成多个同时处理,一个堵塞了就另外一个也能处理。

程序代码或配置

是workerman

重现问题的步骤

用exec在subscribe里面执行命令如:
ffmpeg -re -rtsp_transport tcp -i rtsp://sunba.tpddns.cn:554/openUrl/u5lcVOM?beginTime=20250215T140049&endTime=20250215T150145&playBackMode=1 -vcodec libx264 -vprofile baseline -acodec aac -ar 44100 -strict -2 -ac 1 -f mp4 -s 1280x720 -q 10 -progress /www/wwwroot/badminton/public/orderVideoTmp/test_202502181907181152028118_20250218070719_progress.log /www/wwwroot/badminton/public/orderVideoTmp/test_202502181907181152028118_20250218070719.mp4

操作系统环境及workerman/webman等具体版本

versions : * v4.1.17

911 4 0
4个回答

oliver.

多个进程是可以同时消费的。排查下服务器 CPU 是否满载,或是否存在锁冲突。

  • 暂无评论
nitron

ffmpeg不走显卡就是纯CPU编码,是CPU密集不是IO密集,核心是会跑满载,另一个没跑很正常,同时转两个你分两个机器跑吧,

另外视频转码任务,走CPU的话,串行处理的效率高于并行处理,因为会省去任务上下文切换的时间,要加快速度就是纯堆硬件了

  • 暂无评论
qqxxr

你贴下你2个消费者的代码,先排除是不是你说的情况。正常的是可以同时消费,还有消费者进程数的问题

  • 暂无评论
琦森
$worker          = new Worker();
        $worker->name    = 'queue_work:' . QueueDict::NORMAL;
        $worker->count   = 1;

        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . ' Queue Starting...');
            // 定时,每10秒一次
            Timer::add(30, function () use ($output) {
                (new SysSchedule())->select();
            });
            $redis_option = [
                'connect_timeout' => 20,
                'max_attempts'    => 0,// 尝试次数
                'retry_seconds'   => 5,// 重连间隔
                'prefix'          => QueueDict::NORMAL,
            ];
            if (!empty(env('redis.redis_password'))) {
                $redis_option['auth'] = env('redis.redis_password');
            }
            $redis_option['db'] = env('redis.select');
            $client             = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
            sleep(1);
            $queue_list = $this->getAllQueue();// 获取到需要订阅的job消费队列
            foreach ($queue_list as $queue_class_name) {
                $queue_class_name = str_replace('.php', '', $queue_class_name);
                // 订阅
                $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' Processing:' . $queue_class_name);
                    try {
                        $class_name = '\\' . $queue_class_name;
                        $class      = new  $class_name();
                        $class->fire($data);
                    } catch (\Throwable $e) {
                        $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . ' error:' . $queue_class_name);
                    }
                });
            }
            // 消费失败触发的回调(可选)
            $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
                $output->writeln('[queue]队列 ' . $package['queue'] . ' 消费失败,' . $exception->getMessage());
            });
        };

我也是这个情况,是这个worker的count设置为1的问题吗
看文档count是进程数量,按理说多任务并发消费应该使用的是多线程,现在没弄明白应该怎么设置能多队列并行消费

🔝