Gatewayworker聊天项目中利用消息队列Kafka解偶

jihan

问题描述

萌新,在聊天项目中,每次发送消息时都经过消息队列,想使用Kafka,但不知道如何和gatewayworker结合。逻辑是写在EVENT的onMessage里,每次接收到消息往队列里推,然后同时又接收,再调gateway的方法吗?各位大佬能给个demo吗?

1198 4 0
4个回答

taozywu
  1. 首先 逻辑是写在EVENT的onMessage里 这是对的
  2. 然后在逻辑中可以通过某个字段(Eg. type)来区分是什么数据后,将数据入队即可
  3. 在单独写一个消费脚本来处理Kafka的数据就行。

类似Code如下

<?php
require_once __DIR__ . './autoload.php';

use Workerman\Worker;

$consumer = new Worker();
$consumer->count = 8;

$consumer->onWorkerStart = function () {
   // @TODO Your code here...
   Worker::stopAll();
};

Worker::runAll();
  • jihan 2022-10-12

    感谢大佬,但是有个疑问就是,单独写一个消费脚本来处理Kafka的数据时需要用到gateway的方法,比如A发送消息给B,在消费时要调用gateay的方法发送消息给B. 这点如何在消费脚本里做到呢?

  • 晚安。 2022-10-12

    这样数量大的时候,收发消息会有点延迟把

  • jihan 2022-10-12

    解偶了肯定会有延迟的,但消费方可以一直扩展

不行可以用redis-queue队列也可以的

  • jihan 2022-10-12

    感谢大佬,但用那种应该都是一样,只是在消费时如何调用gateway得方法发送消息?场景:A发送消息给B, 消息在队列里消费时,如何发送给B?

  • muyu 2022-10-12

    单独起一个进程,拉取中间件数据推送给响应的客户端

  • jihan 2022-10-12

    emm, 具体的就是在这个进程里调用Gateway::sendToClient($client_id, $data) ?然后入队列的时候把client_id 传过来就行了吗?我的疑问点就是在消费脚本里直接调用Gateway得方法不知道可不可行?

  • muyu 2022-10-12

    本身其实可以直接发给客户端,但是你不是想削峰解耦吗,所以入队列把双方client_id和必要信息都入栈,出栈的时候在根据双方是否在线发送消息

  • jihan 2022-10-12

    懂了,感谢大佬~

muyu

Kafka不大适合聊天推送吧,消费后数据不清除

  • 暂无评论
taozywu

感谢大佬,但是有个疑问就是,单独写一个消费脚本来处理Kafka的数据时需要用到gateway的方法,比如A发送消息给B,在消费时要调用gateay的方法发送消息给B. 这点如何在消费脚本里做到呢?

TestConsumer.php

<?php
require_once __DIR__ . './autoload.php';

use Workerman\Worker;

$consumer = new Worker();
$consumer->count = 8;

$consumer->onWorkerStart = function () {
   // @TODO Your code here...
   // @TODO Your code here...

   //TestModule->dealData();

   Worker::stopAll();
};

Worker::runAll();

TestModule.php

GatewayWorker\Lib\Gateway 自行下载独立文件

<?php

use GatewayWorker\Lib\Gateway;

function dealData() {
    // Eg.拿到Kafka的一条数据 & json
    $data = json_decode($msg, true);

    if (!$data) {
        return false;
    }

    switch($data['type']) {
        case 'A_to_B':
            Gateway::sendToUid("B", json_encode($msg));
            break;
        case 'B_to_A':
            Gateway::sendToUid("A", json_encode($msg));
            break;
    }

}
年代过于久远,无法发表回答
🔝