请教一个socket长连接相关的问题

foolgry

walkor大神,请教一个socket相关的问题

现在情况是这样的,我要写一个将MySQL数据同步到ES的服务,方案是将MySQL binlog日志解析成结构化的数据,然后写入ES,解析binlog的是一个php cli 单进程,死循环获取binlog数据,因为怕数据太多,消费能力跟不上,想着用workerman多个worker进程处理,但是多个worker进程存在的问题是有序性问题,同一张表的事件只能同时由一个worker进程处理,我目前的想法是缓存了几个缓存了几个socket链接,然后根据表名去走对应的socket链接发送消息,不确定这样子稳不稳定。代码如下面,写了一点点,大概意思能表现出来。
方案和代码参考了这个问题 https://wenda.workerman.net/question/508

有没有什么更好的方案呢?

消费解析后的binlog worker进程

use app\dbBase;
use Workerman\Worker;
require_once __DIR__ . '/../vendor/autoload.php';

// 创建一个Worker监听2347端口,不使用任何应用层协议
$worker = new Worker("text://0.0.0.0:2347");

// 启动4个进程对外提供服务
$worker->count = 6;
$worker->name = 'write_es';
Worker::$logFile = __DIR__ . '/' . $worker->name . '.log';

$worker->onWorkerStart = function($worker)
{
    // 将db实例存储在全局变量中(也可以存储在某类的静态成员中)
    dbBase::getInstance()->init();
};

// 当客户端发来数据时
$worker->onMessage = function($connection, $data)
{
    echo $data.PHP_EOL;
    //将mysql数据写入ES
    // 向客户端发送hello $data
    $connection->send('hello ' . $data."\n");
};

$worker->onConnect = function ($connection) {
    $connection->send('hello\n');
};

// 运行worker
Worker::runAll();

解析binlog后推送到worker进程

class mysqlEventSubscribers extends EventSubscribers
{

    const client_count = 4;
    private static $clients;

    /**
     * mysql增删改查事件
     * @param EventDTO $event
     */
    public function allEvents(EventDTO $event): void {
        // all events got __toString() implementation
        echo $event;
        // all events got JsonSerializable implementation
        //echo json_encode($event, JSON_PRETTY_PRINT);
        //将事件推送到worker进程中进行处理
        $this->send($event);

        echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
    }

    function send($event) {
        if (!isset($this->clients)) {
            // 建立socket连接到内部推送端口
            for ($i = 0; $i<self::client_count;$i++) {
                static::$clients[$i] = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);
            }
        }

        //根据event事件中的数据库表名,找到对应的
        $client = $this->getClientByEvent($event);

        // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
        fwrite($client, json_encode($event) . "\n");
        // 读取推送结果
        echo fread($client, 8192);
    }

    function getTableFromEvent($event) {
        //逻辑没实现,大概就是不同的增删改查返回不同的表名
        return 'table';
    }

    function getClientByEvent($event) {
        $table = $this->getTableFromEvent($event);
        $clientIndex = syncTable::$tables[$table] % self::client_count;
        return static::$clients[$clientIndex];
    }
}
1090 3 0
3个回答

敖德萨

我怎么感觉你这有点像要写定时任务的样子 就是当有消息过多的时候我没办法同步到es的时候就用缓存来解压一下 直至消息同步为此是不是?

  • foolgry 2021-10-11

    主要是为了多进程消费,让同一个表到达同一个worker进程

foolgry

我测试了下,上面的方案是行不通的,因为同一个client发送的多个消息,并不是同一个worker进程处理的,所以上面我的方案不行,应该还是要在发送之前加个数组缓存event消息,同一个表的多条数据,等一条处理完了再发送下一条

  • 暂无评论
six

同一个连接的数据肯定是同一个worker处理的,所以感觉代码没啥问题。

  • foolgry 2021-10-12

    我测试的结果发现同一个连接的数据是分发给不同的worker处理的,这个代码还是不行

  • six 2021-10-12

    不可能,worker进程间是隔离的,连接分配到某个进程后就不会再次分配了,这个连接的所有数据都会给这个进程处理。

  • foolgry 2021-10-13

    @1393:你说的是对的,我之前测试的代码有问题,多谢了

🔝