workerman作为ws客户端,如何保持多个连接?如何动态增删连接?

vasileon

场景: 我们系统每个用户需要通过ws实时同步他们在某第三方平台上的数据。会有新用户产生,产生新用户时动态给他建立一个连接。

我有如下方案,都无法满足需求
方案1:
写一个workerman脚本BTrade.php ,接收cli参数 用户id,以id作为worker的name与id, 如:“php BTrade.php start 1272”
问题: 在命令行执行一次脚本后,再次执行“php BTrade.php start 1273” 会报错 “Workerman[ BTrade.php] already running”
疑问:是不是一个脚本只能存在一个进程?

方案2
在一个脚本里new多个worker出来,每个worker维护一个用户的连接。如果有新用户来了,则重启脚本读取新的用户列表,再建立连接
代码:

$accounts = $pR->get($aKey); //取所有用户

foreach ($accounts as $account){
            $aM = new UserAccount();
            $account = $aM->getAccount($accountId);
            var_dump($account['id']);
            $worker = new Worker();
            $worker->name = 'ws_trade_sync_'.md5($account['id']).'_usdt';
            $worker->id = $account['id'];

            // 进程启动时
            $worker->onWorkerStart = function ($worker) use (
                $key,
                $account,
                $queueTmp
                //下面省略....
        }

问题:restart脚本虽然能拿到最新用户,但是会导致已有的连接断开 reload能保证不断开,但是不会执行 $accounts = $pR->get($aKey); 获取所有用户?

希望有大佬能指点下是不是我的使用方式是错误的?或者谁能提供下解决思路。十分感谢!

阅读 395
3个回答

vasileon

方案3:一个用户一个脚本,每个人一个进程? 这样感觉好low,而且需要不停的动态创建新脚本文件。

希望大佬给与解答,再次感谢!

  • 暂无评论
walkor

代码类似这样,原理是定时从redis获取所有账户数据,然后建立连接,建立连接时判断这个用户的连接是否已经存在,不存在才建立。
代码未测试,大致逻辑写出来了,你调试下应该就能用了

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Connection\AsyncTcpConnection;

require_once __DIR__ . '/vendor/autoload.php';
global $worker;
$worker = new Worker();
$worker->count = 1;
$worker->onWorkerStart = function($worker)
{
    // 在这里建立redis连接
    $pR = new YourRedis(....);
    // 每秒执行一次
    Timer::add(1, function() use ($pR) {
        // 用来存储哪些用户已经建立连接
        static $connections = [];
        $aKey = '....';
        $accounts = $pR->get($aKey);
        foreach ($accounts as $account){
            // 假设数据里有个唯一的id标记用户
            $account_id = $account['id'];
            // 判断这个用户是否建立连接
            if (isset($connections[$account_id])) {
                return;
            }
            // 通过account的到ws连接地址
            $ws_url = get_url_by_acccount($account);
            $ws = new AsyncTcpConnection($ws_url);
            // 记录这个account已经建立起连接
            $connections[$account_id] = $ws;
            // 连接建立时
            $ws->onConnect = function($ws) {

            };
            // 连接上有数据发来时
            $ws->onMessage = function($ws, $data) {
                var_dump($data);
            };
            // 如果连接断开1秒后自动重连
            $ws->onClose = function($ws){
                $ws->reconnect(1);
            };
            $ws->connect();
        }
    });
};
Worker::runAll();
  • 暂无评论
碎玉

给你提供另一个思路,用GatewayWorker来搭一个服务端,每个用户作为客户端进行链接,用id来标识各个客户,针对性的给他们推送数据。
Events.php里OnMessage代码如下.

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
//declare(ticks=1);

use \GatewayWorker\Lib\Gateway;
use Workerman\Lib\Timer;
/**
 * 主逻辑
 * 主要是处理 onConnect onMessage onClose 三个方法
 * onConnect 和 onClose 如果不需要可以不用实现并删除
 */
class Events
{
    static $redis;
    public static function onWorkerStart($worker){
        //初始化redis
        $redis = new Redis();
        $redis->pconnect('127.0.0.1', 6379);
        self::$redis = $redis;
    }
    /**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onConnect
     * 
     * @param int $client_id 连接id
     */
    public static function onConnect($client_id)
    {

    }

   /**
    * 当客户端发来消息时触发
    * @param int $client_id 连接id
    * @param mixed $message 具体消息
    */
    public static function onMessage($client_id, $message)
    {
        //首先判断该websocket是否第一次链接
        $uid = $_SESSION['uid'];
        if(!$uid) //第一次连接
        {   
            //客户端连接后发送的message中带上客户id标识
            //绑定uid,上线
            Gateway::bindUid($client_id, $message);
            //存个session
            $_SESSION['uid'] = $message;

            $_SESSION['timerId'] = Timer::add(1, function() use($client_id){
                //你的业务逻辑
                //开启定时器给ws客户端发送数据
            }
        }
    }
   /**
    * 当用户断开连接时触发
    * @param int $client_id 连接id
    */
    public static function onClose($client_id)
    {
            //断开的时候删掉定时器
        if(isset($_SESSION['timerId']))
        {
            Timer::del($_SESSION['timerId']);
        }
    }
    public static function onWebSocketConnect($client_id, $data)
    {
    }
}
  • 暂无评论