使用gateway-worker绑定了client和uid,如何判断uid是否已离线?

命中水水水

问题描述

截图

我有一个场景,像是车场道闸那种
1.gateway-worker作为服务端,然后道闸系统是客户端,与服务端建立tcp长连接。(客户端会向服务端发送心跳,5s/次)
2.用户请求服务端,获取道闸上的金额,进行支付

假设道闸客户端连接到gateway-worker的client_id是001,车场的客户端编号是A,那在道闸连接上服务端的时候,会进行client_id 001和编号A的绑定Gateway::bindUid(001, A)。那后续有用户发起http请求获取金额的时候,请求中也会传递车场编号A,那服务端会向客户端A发送tcp请求获取金额。

现在有一种情况,就是道闸系统有时候网络不好,老是断网,断网情况下,道闸客户端就没办法向服务端及时发送心跳包,服务端也不知道这个客户端A离线了,这时候如果有http请求进来,服务端会继续向客户端A发送请求获取金额,这时候就会导致进程阻塞,导致http请求一直在请求中,直接影响到后续的http请求也进不来

想问下,这种情况应该如何处理呢?

为此你搜索到了哪些方案及不适用的原因

1、在向客户端A发送请求前,检测客户端A是否离线
使用Gateway::isOnline(string $client_id)先检测客户端A状态。但是这种方法建立在client_id触发了onClose回调,像断网这种情况,客户端是没办法触发onClose回调

2、服务端向客户端发送心跳包检测客户端状态
我看官方文档可以通过服务端向客户端发送心跳包检测客户端状态,假设我设置心跳包3s/次,但是这种也会存在心跳包刚检测客户端正常,过了1s客户端因为断网离线了,这时候有http请求进来,服务端还不知道客户端连接不上了,还是会出现上面我所说的情况

想问下大家有更合适的方案吗?

359 2 5
2个回答

walkor

http请求可以直接用workerman或者webman自定义进程来做,做成非阻塞http服务,包括服务端会继续向客户端A发送请求获取金额这个过程做成非阻塞的请求。
如果你不会做,也可以把服务端会继续向客户端A发送请求获取金额这个过程设置个超时时间,比如1秒。

walkor

准备工作

安装channel

composer require workerman/channel

代码

start.php

<?php
use Workerman\Worker;
define('GLOBAL_START', 1);
require_once __DIR__ . '/vendor/autoload.php';

// 加载所有Applications/*/start.php,以便启动所有服务
foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file)
{
    require_once $start_file;
}

// channel服务用来多进程或者跨服务器通讯
$channel_server = new Channel\Server('0.0.0.0', 2206);

// 保存设备到connection的映射
global $connection_maps;
$connection_maps = [];
$http_worker = new Worker('http://0.0.0.0:1234');
$http_worker->onWorkerStart = function () {
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('127.0.0.1', 2206);
    Channel\Client::on('get_amount_result', function($data)  {
        global $connection_maps;
        $device_id = $data['device_id'];
        foreach ($connection_maps[$device_id]??[] as $connection) {
            $connection->close($data['amount']);
        }
    });
};
$http_worker->onMessage = function (\Workerman\Connection\TcpConnection $connection, \Workerman\Protocols\Http\Request $request) {
    global $connection_maps;
    $device_id = $request->get('device_id');
    if (!$device_id) {
        $connection->send('not found');
        return;
    }
    $connection->device_id = $device_id;
    $connection_maps[$device_id][$connection->id] = $connection;
    // 通过channel向gatewayWorker咨询客户端金额
    Channel\Client::publish('get_amount', [
        'device_id' => $device_id
    ]);
};
$http_worker->onClose = function (\Workerman\Connection\TcpConnection $connection) {
    if (empty($connection->device_id)) {
        return;
    }
    // 删除$connection_maps对应的连接,避免内存泄漏
    $device_id = $connection->device_id;
    global $connection_maps;
    unset($connection_maps[$device_id][$connection->id]);
    if (empty($connection_maps[$device_id])) {
        unset($connection_maps[$device_id]);
    }
};

// 运行所有服务
Worker::runAll();

Events.php

<?php
use \GatewayWorker\Lib\Gateway;
class Events
{
    public static function onWorkerStart($worker)
    {
        // Channel客户端连接到Channel服务端
        Channel\Client::connect('127.0.0.1', 2206);
        // 只需要在0号进程上开启get_amount监听
        if ($worker->id !== 0) {
            Channel\Client::on('get_amount', function($data)  {
                $device_id = $data['device_id'];
                Gateway::sendToUid($device_id, 'get_amount');
            });
        }
    }
   public static function onMessage($client_id, $message)
   {
       // 忽略客户端心跳
       if ($message === '{"type":"ping"}') {
           return;
       }
       // 假设客户端发的第一个消息当作device_id
       if (empty($_SESSION['device_id'])) {
           $device_id = trim($message);
           Gateway::bindUid($client_id, $device_id);
           $_SESSION['device_id'] = $device_id;
           return;
       }
       // 客户端后续发的消息当作金额
       $amount = $message;
       // 通知http进程得到device_id的金额
       Channel\Client::publish('get_amount_result', [
           'device_id' => $_SESSION['device_id'],
           'amount' => $amount
       ]);
   }
}

启动

php start.php start

测试

浏览器打开页面 http://127.0.0.1:1234/?device_id=d2

开启一个终端,输入

telnet 127.0.0.1 8282
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
d2
800

提示
d2代表设备id
800代表金额

浏览器显示 800
截图

原理

启动一个channel server作为多进程或者跨服务器通讯组件
businessWorker的Events里onWorkerStart连接channelserver并监听http进程发来的get_amount请求(请求中包含设备id)
启动一个httpworker作为http接口,onWorkerStart里连接channelserver并监听get_amount_result结果(结果中包含设备id)
浏览器向httpworker发起请求,httpworker获得要查询的设备id,并将连接保存到connection_maps中,然后通过channel发布一个get_amount事件给businessWorke的Events.php
设备返回金额后在Events里的onMessage里通过channel发送get_amount_result事件通知http进程对应的设备返回了金额
httpworker获得金额后查找本地连接里($connection_maps)是否有查询对应设备金额的连接,有的话返回金额

  • walkor 2023-11-19

    整个过程是非阻塞的,可以承受很大的设备在线数

  • 命中水水水 2023-11-19

    谢谢大佬,受教了~ 我研究一下,谢谢~

  • 小W 2023-11-20

    Events.php中“ // 只需要在0号进程上开启get_amount监听 if ($worker->id !== 0) { ” 应该是 $worker->id == 0吧。

  • 小W 2023-11-20

    如果设备不在线,http请求会不会一直等待响应?

  • walkor 2023-11-20

    对,应该是 $worker->id === 0 会。设备不在线会一直等,服务端可以设置定时器去返回数据,也可以客户端设置超时

  • 第六人 2023-11-21

    学习了

🔝