redis队列中循环添加异步队列数据丢失

破建站的

问题描述

用户提交了一万条左右的数据更新操作,用了一个A异步队列将数据放进去,在A队列中,将一万条数据循环放入B异步队列,奇怪的事情发生了,B异步可能只有几十条任务,死活这一万条数据加不进B队列。

尝试去掉A队列,直接循环将一万条数据丢入B队列,结果成功了。

程序代码或配置

//redis-queue/redis
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => null,       // 密码,字符串类型,可选参数
            'db' => 2,            // 数据库
            'prefix' => '',       // key 前缀
            'max_attempts' => 3, // 消费失败后,重试次数
            'retry_seconds' => 5, // 重试间隔,单位秒
            'wait_timeout'=>8640000,
            'connect_timeout'=>86400,
        ]
    ],
];

//redis-queue/process
return [
    'consumer' => [
        'handler' => Webman\RedisQueue\Process\Consumer::class,
        'count' => 1, // 可以设置多进程同时消费
        'constructor' => [
            // 消费者类目录
            'consumer_dir' => app_path() . '/queue/redis'
        ],
        'user' => 'www',
        'group' => 'www',
    ],

];

    // A队列
    public $queue = 'keyword-queue';

    // 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
    public $connection = 'default';

    // 消费
    public function consume($data)
    {

        foreach ($data['keywords'] as $key=>$keyword) {
            // 队列名
            $queue = 'zhishu-queue';
            // 数据,可以直接传数组,无需序列化

            dump($keyword.' '.$key);
            // 投递消息
            Client::send($queue, ['keyword' => $keyword, 'type' => $data['type'], 'id' => $data['id'],'status'=>$data['status']]);
        }

    }

    //B队列
      // 要消费的队列名
    public $queue = 'zhishu-queue';

    // 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
    public $connection = 'default';

    // 消费
    public function consume($data)
    {
        $lock = Cache::lock(KeywordsService::LOCK_BY_ID . $data['id'], 60);
        $process_task_key = snowflake_id();
        try {
            $lock->block(60);
            // 等待最多 5 秒后获得的锁...
            $keyword = KeywordsService::getById($data['id']);
            if ($keyword->status == 1 && $data['status'] == $keyword->status) {

                $keyword->current += 1;

                if ($keyword->current >= $keyword->total) {
                    $keyword->status = 2;
                }

                $keyword->save();
                KeywordsService::saved($keyword, $process_task_key);
            }
            $lock?->release();
            delete_process_key($process_task_key);
        } catch (\Throwable $exception) {
            $lock?->release();
            delete_process_keys($process_task_key);
            throw  $exception;
        }

操作系统环境及workerman/webman等具体版本

"workerman/webman-framework": "^1.5.0"

345 1 0
1个回答

yongdao35

文档有说异步投送原理,异步投递是先把消息放在本地内存里,等进程空闲时发送给redis。
如果数据还没投送完进程退出就会导致数据丢失。
还有如果你的进程有lock相关的操作,我想也会影响投递,因为进程被你lock的时间段里,内存中的消息无法投递到redis。

感觉你应该用同步投递,文档说重要数据应该用同步投递。

  • 破建站的 2023-10-22

    嗯 你说的有道理 我后面想了一下 上万条数据放内存里可能也受PHP内存参数控制,目前已经改成同步方式放入

🔝