Channel组件并没有消息队列类型的代码

nxfte

因为我这边要做对战游戏帧同步的服务器,使用了定时器做事件帧同步。

之前使用的方案是房主发起开启帧同步的事件,然后 BusinessWorker 开启毫秒定时器并将itmer_id记录erdis。

后来突然想到,如果房主掉线再恢复,导致redis记录的timer_id可能会失效(比如重连的时候 gateway 转发消息不再转发到上次在线时绑定的进程)最终导致无法停止帧同步时间。

然后我就想到使用channel,组件来隔离用户主动发起帧同步的事件,转由另一个worker来启动,这样可以保证不管是BusinessWorker重启也好,客户端断线重连也好,都不会影响到正在运行的定时器。

我在github上看到channel组件(https://github.com/walkor/Channel 支持2种消息发送模式发布订阅的事件机制和消息队列机制。

我打算使用消息队列机制开启一组独立的进程专门监听帧同步的业务(开启,停止毫秒定时器并转发帧数据)。因为我只需要一组进程有一个接收到事件并开始执行逻辑即可,不需要订阅模式(订阅模式每个进程都会收到消息并处理,这并不是我想要的)。

消息队列示例

use Workerman\Worker;
use Workerman\Timer;

$worker = new Worker();
$worker->name = 'Producer';
$worker->onWorkerStart = function()
{
    Client::connect();

    $count = 0;
    Timer::add(1, function() {
        Client::enqueue('queue', 'Hello World '.time());
    });
};

$mq = new Worker();
$mq->name = 'Consumer';
$mq->count = 4;
$mq->onWorkerStart = function($worker) {
    Client::connect();

    //订阅消息 queue
    Client::watch('queue', function($data) use ($worker) {
        echo "Worker {$worker->id} get queue: $data\n";
    });

    //10 秒后取消订阅该消息
    Timer::add(10, function() {
        Client::unwatch('queue');
    }, [], false);
};

Worker::runAll();

这段代码是我在github 页面上看到的,但是我在require 组建后,并没有看到客户端有 watch 相关的代码,请问是什么原因?

或者有没有什么其他推荐方案我可以采用

谢谢作者

阅读 634
1个回答

walkor

消息队列机制是今天刚合并的代码,你测试的时候还没正式发布,现在已经发布了,更新试下。

  • nxfte 2020-10-19

    好的, 更新完已经看到有相关方法了