在单一进程下,所有协程共享workerman的mqtt连接来发送数据是否可行?

bobshipwood

问题描述

假设在onWorkerStart里面创建了一个workerman的mqtt链接,底下所有协程用他来发送数据是否可行?

$worker->onWorkerStart = function ($worker) {
            try {
                $mqttClient = new \Workerman\Mqtt\Client(env('MQTT_URL'), ['client_id' => "sy-mqtt-client-".mt_rand(), 'username' => env('MQTT_USERNAME'), 'password' => env('MQTT_PASSWORD'), 'protocol_level' => 5]);
                $mqttClient->connect();
                for ($n=1; $n<=50; $n++) {
                    Coroutine::create(function () use ($mqttClient) {
                        $mqttClient->publish('test/whd/123/r', 'ok test';
                    });
                }

但毕竟mqtt是长链接,担心同一时间发送50个mqtt的时候,由于只有一个链接,所以担心他忙不过来

为此你搜索到了哪些方案及不适用的原因

没有找到。

266 2 0
2个回答

nitron

1.协程不是万能的
2.协程创建/上下文切换/销毁是有代价的
3.你这个例子不用协程理论上性能更好

  • bobshipwood 2025-10-20

    $mqttClient->connect();请问这个是异步的还是同步的?我的理解是异步的

  • nitron 2025-10-20

    异步
    https://github.com/walkor/mqtt
    Asynchronous MQTT client for PHP based on workerman.

  • bobshipwood 2025-10-20

    嗯,我这个只是示例代码,实际上,我要做的是在n个modbus tcp链接中,间隔2.5秒发送一个请求给物联设备,然后在把值通过mqtt返回给前端,。

    我目前做法是这样的(没用协程,用协程的原因是想删除掉usleep(100000); //需要间隔一段时间):
    foreach ($devicePlcs as $devicePlc) {
    $timer_id = Timer::add($this->execute, function() use ($devicePlc) {
    $this->executeDeviceTask2($devicePlc); // 执行实际业务代码;
    });
    }
    }

    protected function executeDeviceTask2($devicePlc) {
    // 取值
    $connection = BinaryStreamConnection::getBuilder()->setPort(502)->setHost($devicePlc->ip)->setConnectTimeoutSec(2.5)->setWriteTimeoutSec(0.5)->setReadTimeoutSec(1.5)->build();
    $newAlarm = $this->readHoldingRegisters($connection, $devicePlc, 25, 1, 1); //读报警
    if (!empty($newAlarm)) { // 为空则程序走到了catch块
    $new['stop_alarm'] = $newAlarm->isBitSet(0);
    $new['main_alarm'] = $newAlarm->isBitSet(1);
    $new['knife_alarm'] = $newAlarm->isBitSet(2);
    $new['detect_alarm'] = $newAlarm->isBitSet(3);
    $new['film_alarm'] = $newAlarm->isBitSet(4);
    }
    usleep(100000); //需要间隔一段时间
    $totalCount = $this->readHoldingRegisters($connection, $devicePlc, 400, 2, 0); // 累计产量

    $this->mqttClient->publish('test/whd/123/r', json_encode(‘‘发送的数组’’, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));

    }

超高级的稻姬

不开协程一条条发也没问题的,可能的加速场景是开一个协程一个连接,然后消息分发到各个协程中发送

  • 暂无评论
🔝