在gatewaywork的bussiness中使用redis-queue[已解决]

pengzhen

在gatewaywork的bussiness中使用redis-queue,onconnect时间中,给队列发送数据

/**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onConnect
     * 
     * @param int $client_id 连接id
     */
    public static function onConnect($client_id)
    {
        // 向当前client_id发送数据 
        // Gateway::sendToClient($client_id, "当前在线".Gateway::getAllClientIdCount()." \r\n");
        // 向所有人发送
        try {
            $client = new Client('redis://127.0.0.1:6379');
            $client->send('user-1', ['demo', 'data']);
            echo "aaa",PHP_EOL;
        } catch (\Throwable $th) {
            print_r($th);
        }

另外在start_queue.php中消费

<?php 
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */
use \Workerman\Worker;
use \Workerman\Timer;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use Workerman\RedisQueue\Client;
use \Workerman\Autoloader;

// 自动加载类
require_once __DIR__ . '/../../vendor/autoload.php';

// bussinessWorker 进程
$worker = new Worker();
// worker名称
$worker->name = 'queue';
// bussinessWorker进程数量
$worker->count = 1;
// 服务注册地址
// $worker->registerAddress = '127.0.0.1:1238';

$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
   // 订阅
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        // var_export($data);

        file_put_contents('a.txt','bbb');
    });
    // $client->send('user-1', ['some', 'data']);
};

if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

经过测试,100次连接,只消费不到一半,有的时候 只有10次不到。单独在start_queue中用定时器,定时推送消费,就没什么问题。而且,似乎还有粘包的问题,因为发现有几行输出是多个aaa连着的,没有换行

517 2 0
2个回答

pengzhen

不好意思,是我搞错了,我在application下直接执行了一个消费队列进程,导致在目录下面stop的时候,没结束这个消费进程

  • 暂无评论
damao

我还看出个问题,每个客户端连接时都会初始化一个Client ,这会导致Client实例越来越多,最终会超出系统限制报错。
你应该将初始化Client放在onWorkerStart里,像你消费那样,只初始化一次。

  • pengzhen 2023-02-25

    嗯,这是写个demo,redisqueue已经处理了断开连接的问题,但是超时这种不知道有没有处理

年代过于久远,无法发表回答
🔝