Workerman 执行队列任务,不执行

wo664881680

启 worker,在onWorkerStart 开启定时器 1秒获取 Redis 队列数据。

从log中看,消费了队列数据,偶现会没有执行。

没有用workerman-queue

$asyncJobWorker->onWorkerStart = function () use ($client) {
            // 获取配置
            foreach (Yii::$app->service->workermanQueue->jobConfig() as $config) {
                // 订阅队列名,回调方案使用配置的 handler execute 来处理数据
                // 这里的 回调,如果返回 true, 运行后会从 queue 中删除,否则 不删除
                $client->subscribe($config['name'], function ($data, $jobId, $ttr, $attempt) use ($config) {
                    try {
                        // 可以理解为注入
                        $handler = Yii::createObject([
                            'class' => $config['handler'],
                            'task' => $data
                        ]);
                        /** @var $handler JobInterface */
                        return $handler->execute(null);
                    } catch (\Exception $e) {
                        Yii::error('workerman queue exception:' . $e->getTraceAsString(), 'error');
                        return false;
                    }
                });
            }
        };
    /**
     * 任务配置
     * @return \string[][]
     * @desc 每次修改 需要 重启 workerman-queue
     */
    public function jobConfig()
    {
        return [
            ['name' => self::ASYNC_JOB_QUEUE_NAME, 'handler' => AsyncJob::class],
        ];
    }

AsyncJob excute

public function execute($queue)
    {
        try {
            $task = $this->task;
            if (isset($task['type'])) {
                // 根据 任务里面的 type 获取执行的方法
                $functionName = lcfirst(Inflector::id2camel($task['type'], '_') . 'Execute');
                $log = new AsyncJobLog();
                $log->execute_function = $functionName;
                $log->task = json_encode($task, JSON_UNESCAPED_UNICODE);
                $log->save(false);
                // 偶现 下面的不执行了
                // 难道是 $this->$functionName($task); 要改成 $returnVal = call_user_func([$this, $functionName], $task);?
                $returnVal = $this->$functionName($task);
                if ($returnVal === true) {
                    $log->is_finish = 1;
                } else {
                    $log->is_finish = 0;
                    $log->task_print = $returnVal;
                }
                $log->save(false);
            } else {
                Util::errorHandle(new \Exception('Async Job Execute:(' . json_encode($task, JSON_UNESCAPED_UNICODE) . ') type Not set!'));
            }
        } catch (\Exception $e) {
            Util::errorHandle($e);
        }
        return true;
    }
156 1 0
1个回答

wo664881680

用的yii2 跟 workerman 来结合,处理 队列数据的。

  • 暂无评论
🔝