redis-queue怎么批量投递数据呢

小白

比如一个表里有个3,5万个数据遍历投递很慢

foreach ($users as $data) {
        $queue = 'sms';
        Redis::send($queue, $data);
    }

这样很慢有啥解决方案吗

647 3 5
3个回答

北月

首先 workerman/redis-queue 队列组件是利用 RedisListSorted Set 实现的,因此在客户端可以直接使用原生 Redis 的管道功能,文档地址:在非workerman环境向队列发送消息

自己单独封装一个函数去用即可:

function multi_redis_queue_send($redis, $queue, $multiData, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前为redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//1.0.5版本之前为redis-queue-delayed
    // 创建管道
    $pipe = $redis->multi(Redis::PIPELINE);
    foreach ($multiData as $data) {
        $now = time();
        $package_str = json_encode([
            'id'       => rand(),
            'time'     => $now,
            'delay'    => 0,
            'attempts' => 0,
            'queue'    => $queue,
            'data'     => $data
        ]);
        if ($delay) {
            $pipe->zAdd($queue_delay, $now + $delay, $package_str);
        } else {
            $pipe->lPush($queue_waiting.$queue, $package_str);
        }
    }
    // 执行管道中的命令
    $results = $pipe->exec();
    // 需要注意的是,管道操作并不是原子操作,虽然管道中的所有命令一起执行,但是每个命令的执行结果仍然会返回给客户端。因此,在使用管道操作时,需要注意处理每个命令的执行结果。
    // 处理每个命令的执行结果
    foreach ($results as $result) {
        // ...
    }
}

在处理每个命令的执行结果时,需要注意以下几点:

  1. 如果命令执行成功,对应的执行结果是一个非负整数,表示执行成功的命令数量。

  2. 如果命令执行失败,对应的执行结果是一个RedisException异常对象,需要对异常进行处理。

  3. 在使用EXEC命令执行管道操作之前,不能执行其他Redis命令,否则会中断管道操作,导致管道中的命令没有被执行。

tanhongbin

没错就是管道

小白

最后的解决方案分享一下还在测试呢,不知道还又什么问题,大佬们看下

function multi_redis_queue_send($queue, $multiData, $delay = 0) {

    Redis::pipeline(function ($pipe) use ($queue, $multiData, $delay) {
        $queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前为redis-queue-waiting
        $queue_delay = '{redis-queue}-delayed';//1.0.5版本之前为redis-queue-delayed
        foreach ($multiData as $data) {
            $now = time();
            $package_str = json_encode([
                'id'       => rand(),
                'time'     => $now,
                'delay'    => 0,
                'attempts' => 0,
                'queue'    => $queue,
                'data'     => $data
            ]);
            if ($delay) {
                $pipe->zAdd($queue_delay, $now + $delay, $package_str);
            } else {
                $pipe->lPush($queue_waiting.$queue, $package_str);
            }
        }
    });

}
  • 暂无评论
🔝