使用start_worker的onMessage无法执行内部函数方法

vzainet

@walkor大神,非常感谢大神这么晚还这么快速的回复,谢谢了!也特别感谢大神们开发出workerman这么优秀的框架,给我们广大开发者带来的极大便利,是码农们的福音!

我对问题重新做了下编辑,同时也把代码demo直接发上来,还请大神能给予指导,问题还是onmessage回调里去实现服务端给客户端发送消息时候,无法触达,代码如下:

use \Workerman\Worker;
use \Workerman\Timer;
use \Workerman\Connection\UdpConnection;

// 自动加载类
require_once __DIR__ . '/../../vendor/autoload.php';

$worker = new Worker('udp://0.0.0.0:6000');
// worker名称
$worker->name = 'XMC-UDP-Worker';
// Worker进程数量
/*
 * 注意这里进程数必须设置为1,否则会报端口占用错误
 * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
 */
$worker->count = 1;
$worker->uids = [];

//启动内部通讯
$worker->onWorkerStart = function ($worker) {
    // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $inner_text_worker->onMessage = function ($connection, $buffer) {
        // $data数组格式,里面有uid,表示向那个uid的页面推送数据
        $data = json_decode($buffer, true);
        $uid = intval($data['uid']);
        // 通过workerman,向uid的页面推送数据
        $ret = sendMessageByUid($uid, $data['msg']);
        // 返回推送结果
        $connection->send($ret ? 'ok' : 'fail');
    };
    // ## 执行监听 ##
    $inner_text_worker->listen();
};

//监听接收消息
$worker->onMessage = function ($connection, $message) {
    global $worker;
    if (!isset($connection->uid)) {
        $ip_arr = explode('.', $connection->getRemoteIp());
        ///$port = $connection->getRemotePort();
        $ip = intval(end($ip_arr));
        $connection->uid = $ip;
        $worker->uids[$connection->uid] = $connection;
    }
    //$client_id这里为了测试直接写死为一个客户端IP
    $client_id=109;
    $client = stream_socket_client('tcp://0.0.0.0:5678', $errno, $errmsg, 1);
    // 推送的数据,包含uid字段,表示是给这个uid推送
    $data = json_encode(['uid' => $client_id, 'msg' => 'test']);
    // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    fwrite($client, $data . "\n");
    //echo fread($client, 8192);
};

// 向所有验证的用户推送数据
function broadcast($message)
{
    global $worker;
    foreach ($worker->uids as $connection) {
        $connection->send($message);
    }
};

// 针对uid推送数据
function sendMessageByUid($uid, $message)
{
    global $worker;
    if (isset($worker->uids[$uid])) {
        $connection = $worker->uids[$uid];
        $connection->send($message);
    }
};

// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}
1442 4 0
4个回答

walkor

require_once 只会加载一次,下次再次执行require_once 时这个文件就不会加载了,所以里面的业务逻辑就不会再触发了。
require_once 改成 require 试下

  • vzainet 2022-01-05

    好的,我先试试看,谢谢walkor !

  • vzainet 2022-01-05

    walkor,好像还并不是这问题,我直接把push.php推送消息给客户端的代码直接写到onmessage回调里,客户端也还是收不到,这个是和内部通讯实现方式有关系么?还是需要其他手段来实现推送呢?

  • vzainet 2022-01-05

    但是我使用普通的php文件,或直接用push.php去推送,就是可以的,说明内部端口那应该没问题,问题就是onmessage回调去做推送就存在问题!

  • walkor 2022-01-05

    写个能重现问题的精简的demo吧。没代码没办法定位啊。我又不是神,能预知你哪里写了bug 😂

  • vzainet 2022-01-06

    walkor 大神,我编辑了下问题,把代码都贴上了,烦请您再帮忙指导下,十分感谢!

walkor

$worker->uids[$connection->uid] = $connection;
这里不对,应该改成
$worker->uids[$connection->id] = $connection;

还有你的uid写死了只给109发数据,那么

$connection->uid = $ip;
$worker->uids[$connection->uid] = $connection;

也要写死,$connection->uid = 109; 才对。

另外uid要做个定时器,将长时间不活跃的$connection 执行关闭,然后从$worker->uids里删除,否则这个connection一直占用内存,会导致内存泄漏。

最后,udp是无连接的并且无法保证数据一定能推送到客户端,客户端及客户端所在网关可能随时会关闭udp临时端口。一般外网udp超过1分钟,udp可能就无法发送到客户端了。

walkor

我写死 $client_id=1; 了,本地127.0.0.1测试没问题。
客户端代码

<?php

use \Workerman\Worker;
use \Workerman\Connection\AsyncUdpConnection;

require_once __DIR__ . '/../../vendor/autoload.php';
$worker = new Worker();

$worker->onWorkerStart = function ($worker) {
   $con = new AsyncUdpConnection('udp://127.0.0.1:6000');
   $con->onMessage = function($con, $data){
      var_dump($data);
   };
   $con->onConnect = function($con){$con->send(1);};
   $con->connect();
};

if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

结果:
截图

  • vzainet 2022-01-06

    好呢,我再仔细研读下你的思路和写法,调测看下,再次表示感谢!

vzainet

@walkor,目前测试是每次重启服务后,第1次能发送消息到客户端,再次发送就无法送达呢(也就是后面的发送消息给客户端执行无效了),这个是哪需要做特殊处理不?代码是这样写的:

//监听接收消息
$worker->onMessage = function (UdpConnection $connection, $message) {
    global $worker;
    //设置连接
    $connection->uid = 109;
    if (isset($worker->uids[$connection->uid])) {
        $worker->uids[$connection->uid]->close();
    }
    $worker->uids[$connection->uid] = $connection;
    //发送消息给客户端
    $client = stream_socket_client('tcp://0.0.0.0:5678', $errno, $errmsg, 1);
    $data = json_encode(['uid' => 109, 'msg' => 'TASK[109 001]CRC[2c4c]']);
    fwrite($client, $data . "\n");
}
  • walkor 2022-01-06

    加了个定时器,测试没问题

    <?php
    
    use \Workerman\Worker;
    use \Workerman\Timer;
    use \Workerman\Connection\AsyncUdpConnection;
    
    require_once __DIR__ . '/../../vendor/autoload.php';
    $worker = new Worker();
    
    $worker->onWorkerStart = function ($worker) {
       $con = new AsyncUdpConnection('udp://127.0.0.1:6000');
       $con->onMessage = function($con, $data){
          var_dump($data);
       };
       $con->onConnect = function($con){
          Timer::add(5, function()use($con) {
             $con->send(2);
          });
          $con->send(1);
       };
       $con->connect();
    };
    
    if (!defined('GLOBAL_START')) {
        Worker::runAll();
    }

    截图

  • vzainet 2022-01-06

    大神,这个是客户端的代码么?我写的只有服务端程序哦,目前场景是手机APP里有个条形码,然后线下有个硬件扫码器,用来扫码手机APP的条形码,扫码成功就扫码器那会发送消息给UDP服务器,服务器onmessage进行回调后,就启动发送消息指令给某个客户端哈!这个我实在不知道要改哪块的代码了,因为这个动作是扫码器客户端发送消息给服务器后触发的呢!

  • walkor 2022-01-06

    对,客户端。

  • vzainet 2022-01-06

    服务端没有办法解决这个第2次请求不触发的问题不?谢谢啊

  • walkor 2022-01-06

    服务端应该没问题,你看我发的截图,已经触发多次了

  • vzainet 2022-01-06

    现在就是线下扫码器回传信息给服务端,第一次可以触发,往下就不触发发送消息给指定客户端了,搞了好几天,百思不得其解哈,实在是不知道问题出在哪?如果我不走这个服务端程序,用普通的php去调用就完全正常,卡就卡在服务端onmessage这个点上来,感谢walkor不厌其烦的回复和解答哈,再次感谢!

  • vzainet 2022-01-06

    会不会跟线下硬件扫码器有关系呢?但是扫码器的UDP数据是发送上来了,服务端onmessage里也收到了,但是就是不走触发发送消息给客户端的指令。

  • walkor 2022-01-06

    不客气。建议你用我上面workerman做的udp客户端代替真实客户端来测试,没问题再用真实客户端测试。
    另外可以配合抓包工具看下服务端是否有发送udp给客户端。

  • walkor 2022-01-06

    多打日志看下吧,看下代码走大哪个分支。客户端id都打印出来,看下id对不对,看下对应的客户端是否和服务端发器过udp请求,对应的客户端connection是否是存在的。感觉这种问题打日志抓包很好定位。

  • vzainet 2022-01-06

    呃呃,忘了这块调试日志了,我好好按照您说的仔细检查一下,务必揪出元凶!感谢大神哈,多次发问实在打搅了!

年代过于久远,无法发表回答
🔝