AI 用wokerman 实现的 mqtt 服务器 大家看看能不能用

za2883632
<?php
/**
 * 基于Workerman的MQTT服务器实现
 * 完全自主实现的MQTT 3.1.1协议服务器
 */

require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

/**
 * MQTT协议常量定义
 */
class MqttTypes
{
    const CONNECT     = 1;  // 客户端请求连接服务器
    const CONNACK     = 2;  // 连接确认
    const PUBLISH     = 3;  // 发布消息
    const PUBACK      = 4;  // QoS 1消息确认
    const SUBSCRIBE   = 8;  // 客户端订阅请求
    const SUBACK      = 9;  // 订阅确认
    const UNSUBSCRIBE = 10; // 客户端取消订阅
    const UNSUBACK    = 11; // 取消订阅确认
    const PINGREQ     = 12; // 心跳请求
    const PINGRESP    = 13; // 心跳响应
    const DISCONNECT  = 14; // 客户端断开连接
}

/**
 * MQTT服务器主类
 */
class MqttServer
{
    private $clients = [];        // 客户端连接信息
    private $subscriptions = [];   // 主题订阅关系
    private $retainedMessages = []; // 保留消息
    private $messageIdCounter = 0; // 消息ID计数器

    /**
     * 启动MQTT服务器
     */
    public function start()
    {
        // 创建Worker实例,监听1883端口
        $worker = new Worker('tcp://0.0.0.0:1883');

        // 设置进程数
        $worker->count = 2;

        // 连接建立时的回调
        $worker->onConnect = [$this, 'onConnect'];

        // 收到消息时的回调
        $worker->onMessage = [$this, 'onMessage'];

        // 连接关闭时的回调
        $worker->onClose = [$this, 'onClose'];

        echo "=== MQTT服务器启动成功 ===\n";
        echo "监听地址: tcp://0.0.0.0:1883\n";
        echo "进程数: {$worker->count}\n";
        echo "协议: MQTT 3.1.1\n";
        echo "========================\n\n";

        // 运行worker
        Worker::runAll();
    }

    /**
     * 客户端连接建立
     */
    public function onConnect(TcpConnection $connection)
    {
        $clientId = 'client_' . $connection->id;
        $this->clients[$connection->id] = [
            'connection' => $connection,
            'client_id' => $clientId,
            'connected_at' => time(),
            'username' => null,
            'clean_session' => true,
            'keepalive' => 60
        ];

        echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 已连接\n";
    }

    /**
     * 处理接收到的消息
     */
    public function onMessage(TcpConnection $connection, $data)
    {
        try {
            // 解析MQTT数据包
            $packet = $this->parseMqttPacket($data);

            if (!$packet) {
                echo "[" . date('Y-m-d H:i:s') . "] 无法解析MQTT数据包\n";
                return;
            }

            $packetType = $packet['type'];

            switch ($packetType) {
                case MqttTypes::CONNECT:
                    $this->handleConnect($connection, $packet);
                    break;

                case MqttTypes::PUBLISH:
                    $this->handlePublish($connection, $packet);
                    break;

                case MqttTypes::SUBSCRIBE:
                    $this->handleSubscribe($connection, $packet);
                    break;

                case MqttTypes::UNSUBSCRIBE:
                    $this->handleUnsubscribe($connection, $packet);
                    break;

                case MqttTypes::PINGREQ:
                    $this->handlePing($connection);
                    break;

                case MqttTypes::DISCONNECT:
                    $this->handleDisconnect($connection);
                    break;

                default:
                    echo "[" . date('Y-m-d H:i:s') . "] 未知的MQTT包类型: {$packetType}\n";
            }
        } catch (\Exception $e) {
            echo "[" . date('Y-m-d H:i:s') . "] 处理消息时出错: " . $e->getMessage() . "\n";
        }
    }

    /**
     * 处理CONNECT包
     */
    private function handleConnect(TcpConnection $connection, $packet)
    {
        $clientId = $packet['client_id'] ?? 'client_' . $connection->id;
        $username = $packet['username'] ?? null;
        $cleanSession = $packet['clean_session'] ?? true;

        // 更新客户端信息
        $this->clients[$connection->id]['client_id'] = $clientId;
        $this->clients[$connection->id]['username'] = $username;
        $this->clients[$connection->id]['clean_session'] = $cleanSession;

        // 发送CONNACK响应 (连接接受)
        $connack = $this->buildConnackPacket(0); // 0表示连接接受
        $connection->send($connack);

        echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 连接成功" . ($username ? " (用户名: {$username})" : "") . "\n";

        // 发送保留消息(如果有)
        $this->sendRetainedMessages($connection);
    }

    /**
     * 处理PUBLISH包
     */
    private function handlePublish(TcpConnection $connection, $packet)
    {
        $topic = $packet['topic'] ?? '';
        $message = $packet['message'] ?? '';
        $qos = $packet['qos'] ?? 0;
        $retain = $packet['retain'] ?? false;
        $messageId = $packet['message_id'] ?? null;

        $clientInfo = $this->clients[$connection->id] ?? null;
        if (!$clientInfo) {
            return;
        }

        $clientId = $clientInfo['client_id'];

        echo "[" . date('Y-m-d H:i:s') . "] 发布消息 - 客户端: {$clientId}, 主题: {$topic}, 消息: {$message}, QoS: {$qos}\n";

        // 处理保留消息
        if ($retain) {
            if (empty($message)) {
                // 清空保留消息
                unset($this->retainedMessages[$topic]);
            } else {
                // 设置保留消息
                $this->retainedMessages[$topic] = [
                    'message' => $message,
                    'qos' => $qos,
                    'timestamp' => time()
                ];
            }
        }

        // 转发消息给订阅者
        $this->forwardMessage($topic, $message, $qos, $messageId);

        // QoS 1 需要回复PUBACK
        if ($qos === 1 && $messageId) {
            $puback = $this->buildPubackPacket($messageId);
            $connection->send($puback);
        }
    }

    /**
     * 处理SUBSCRIBE包
     */
    private function handleSubscribe(TcpConnection $connection, $packet)
    {
        $topics = $packet['topics'] ?? [];
        $messageId = $packet['message_id'] ?? 0;

        $clientInfo = $this->clients[$connection->id] ?? null;
        if (!$clientInfo) {
            return;
        }

        $clientId = $clientInfo['client_id'];
        $returnCodes = [];

        foreach ($topics as $topic => $qos) {
            // 添加订阅关系
            if (!isset($this->subscriptions[$topic])) {
                $this->subscriptions[$topic] = [];
            }

            $this->subscriptions[$topic][$connection->id] = $qos;
            $returnCodes[] = $qos; // 返回最大支持的QoS等级

            echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 订阅主题: {$topic}, QoS: {$qos}\n";

            // 立即发送该主题的保留消息
            if (isset($this->retainedMessages[$topic])) {
                $retainedMsg = $this->retainedMessages[$topic];
                $this->sendMessageToClient($connection, $topic, $retainedMsg['message'], $retainedMsg['qos']);
            }
        }

        // 发送SUBACK响应
        $suback = $this->buildSubackPacket($messageId, $returnCodes);
        $connection->send($suback);
    }

    /**
     * 处理UNSUBSCRIBE包
     */
    private function handleUnsubscribe(TcpConnection $connection, $packet)
    {
        $topics = $packet['topics'] ?? [];
        $messageId = $packet['message_id'] ?? 0;

        $clientInfo = $this->clients[$connection->id] ?? null;
        if (!$clientInfo) {
            return;
        }

        $clientId = $clientInfo['client_id'];

        foreach ($topics as $topic) {
            if (isset($this->subscriptions[$topic][$connection->id])) {
                unset($this->subscriptions[$topic][$connection->id]);

                // 如果主题没有订阅者了,清理主题
                if (empty($this->subscriptions[$topic])) {
                    unset($this->subscriptions[$topic]);
                }

                echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 取消订阅主题: {$topic}\n";
            }
        }

        // 发送UNSUBACK响应
        $unsuback = $this->buildUnsubackPacket($messageId);
        $connection->send($unsuback);
    }

    /**
     * 处理PING请求
     */
    private function handlePing(TcpConnection $connection)
    {
        // 发送PINGRESP响应
        $pingresp = $this->buildPingrespPacket();
        $connection->send($pingresp);

        $clientInfo = $this->clients[$connection->id] ?? null;
        if ($clientInfo) {
            echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 发送心跳包\n";
        }
    }

    /**
     * 处理断开连接
     */
    private function handleDisconnect(TcpConnection $connection)
    {
        $this->cleanupClient($connection->id);
        $connection->close();
    }

    /**
     * 转发消息给订阅者
     */
    private function forwardMessage($topic, $message, $qos, $messageId = null)
    {
        if (!isset($this->subscriptions[$topic])) {
            return;
        }

        foreach ($this->subscriptions[$topic] as $clientId => $subQos) {
            $clientInfo = $this->clients[$clientId] ?? null;
            if (!$clientInfo) {
                continue;
            }

            $actualQos = min($qos, $subQos);
            $this->sendMessageToClient($clientInfo['connection'], $topic, $message, $actualQos, $messageId);
        }
    }

    /**
     * 发送消息给客户端
     */
    private function sendMessageToClient(TcpConnection $connection, $topic, $message, $qos, $messageId = null)
    {
        $publishPacket = $this->buildPublishPacket($topic, $message, $qos, $messageId);
        $connection->send($publishPacket);
    }

    /**
     * 发送保留消息给新连接的客户端
     */
    private function sendRetainedMessages(TcpConnection $connection)
    {
        foreach ($this->retainedMessages as $topic => $messageInfo) {
            $this->sendMessageToClient($connection, $topic, $messageInfo['message'], $messageInfo['qos']);
        }
    }

    /**
     * 解析MQTT数据包
     */
    private function parseMqttPacket($data)
    {
        if (strlen($data) < 2) {
            return null;
        }

        $offset = 0;

        // 解析固定头部
        $firstByte = ord($data[$offset]);
        $packetType = ($firstByte >> 4) & 0x0F;
        $offset++;

        // 解析剩余长度
        $remainingLength = 0;
        $multiplier = 1;

        do {
            if ($offset >= strlen($data)) {
                return null;
            }
            $digit = ord($data[$offset]);
            $remainingLength += ($digit & 127) * $multiplier;
            $multiplier *= 128;
            $offset++;
        } while (($digit & 128) != 0);

        $packet = ['type' => $packetType];

        // 根据包类型解析内容
        switch ($packetType) {
            case MqttTypes::CONNECT:
                $packet = array_merge($packet, $this->parseConnectPacket($data, $offset));
                break;

            case MqttTypes::PUBLISH:
                $packet = array_merge($packet, $this->parsePublishPacket($data, $offset, $firstByte));
                break;

            case MqttTypes::SUBSCRIBE:
                $packet = array_merge($packet, $this->parseSubscribePacket($data, $offset));
                break;

            case MqttTypes::UNSUBSCRIBE:
                $packet = array_merge($packet, $this->parseUnsubscribePacket($data, $offset));
                break;
        }

        return $packet;
    }

    /**
     * 解析CONNECT包
     */
    private function parseConnectPacket($data, $offset)
    {
        $packet = [];

        // 跳过协议名长度和协议名
        $protocolNameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2 + $protocolNameLength;

        // 协议级别
        $packet['protocol_level'] = ord($data[$offset]);
        $offset++;

        // 连接标志
        $connectFlags = ord($data[$offset]);
        $packet['clean_session'] = ($connectFlags & 0x02) != 0;
        $packet['username_flag'] = ($connectFlags & 0x80) != 0;
        $packet['password_flag'] = ($connectFlags & 0x40) != 0;
        $offset++;

        // 保持连接时间
        $keepAlive = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2;

        // 客户端ID
        $clientIdLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2;
        $packet['client_id'] = substr($data, $offset, $clientIdLength);
        $offset += $clientIdLength;

        // 用户名
        if ($packet['username_flag']) {
            $usernameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
            $offset += 2;
            $packet['username'] = substr($data, $offset, $usernameLength);
            $offset += $usernameLength;
        }

        // 密码
        if ($packet['password_flag']) {
            $passwordLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
            $offset += 2;
            $packet['password'] = substr($data, $offset, $passwordLength);
        }

        return $packet;
    }

    /**
     * 解析PUBLISH包
     */
    private function parsePublishPacket($data, $offset, $firstByte)
    {
        $packet = [];

        // QoS等级
        $packet['qos'] = ($firstByte >> 1) & 0x03;

        // 保留标志
        $packet['retain'] = ($firstByte & 0x01) == 0x01;

        // 主题名
        $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2;
        $packet['topic'] = substr($data, $offset, $topicLength);
        $offset += $topicLength;

        // 消息ID (QoS > 0)
        if ($packet['qos'] > 0) {
            $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
            $offset += 2;
        }

        // 消息内容
        $packet['message'] = substr($data, $offset);

        return $packet;
    }

    /**
     * 解析SUBSCRIBE包
     */
    private function parseSubscribePacket($data, $offset)
    {
        $packet = ['topics' => []];

        // 消息ID
        $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2;

        // 解析主题列表
        while ($offset < strlen($data)) {
            $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
            $offset += 2;
            $topic = substr($data, $offset, $topicLength);
            $offset += $topicLength;

            $qos = ord($data[$offset]) & 0x03;
            $offset++;

            $packet['topics'][$topic] = $qos;
        }

        return $packet;
    }

    /**
     * 解析UNSUBSCRIBE包
     */
    private function parseUnsubscribePacket($data, $offset)
    {
        $packet = ['topics' => []];

        // 消息ID
        $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
        $offset += 2;

        // 解析主题列表
        while ($offset < strlen($data)) {
            $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
            $offset += 2;
            $topic = substr($data, $offset, $topicLength);
            $offset += $topicLength;

            $packet['topics'][] = $topic;
        }

        return $packet;
    }

    /**
     * 构建CONNACK包
     */
    private function buildConnackPacket($returnCode)
    {
        // 固定头部: CONNACK (0x20)
        $packet = chr(0x20);

        // 剩余长度: 2字节
        $packet .= chr(2);

        // 连接确认标志 + 返回码
        $packet .= chr(0) . chr($returnCode);

        return $packet;
    }

    /**
     * 构建PUBACK包
     */
    private function buildPubackPacket($messageId)
    {
        // 固定头部: PUBACK (0x40)
        $packet = chr(0x40);

        // 剩余长度: 2字节
        $packet .= chr(2);

        // 消息ID
        $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

        return $packet;
    }

    /**
     * 构建SUBACK包
     */
    private function buildSubackPacket($messageId, $returnCodes)
    {
        // 固定头部: SUBACK (0x90)
        $packet = chr(0x90);

        // 剩余长度: 2 + 返回码数量
        $remainingLength = 2 + count($returnCodes);
        $packet .= $this->encodeLength($remainingLength);

        // 消息ID
        $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

        // 返回码
        foreach ($returnCodes as $code) {
            $packet .= chr($code);
        }

        return $packet;
    }

    /**
     * 构建UNSUBACK包
     */
    private function buildUnsubackPacket($messageId)
    {
        // 固定头部: UNSUBACK (0xB0)
        $packet = chr(0xB0);

        // 剩余长度: 2字节
        $packet .= chr(2);

        // 消息ID
        $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

        return $packet;
    }

    /**
     * 构建PINGRESP包
     */
    private function buildPingrespPacket()
    {
        // 固定头部: PINGRESP (0xD0) + 剩余长度0
        return chr(0xD0) . chr(0);
    }

    /**
     * 构建PUBLISH包
     */
    private function buildPublishPacket($topic, $message, $qos, $messageId = null)
    {
        // 固定头部
        $fixedHeader = 0x30; // PUBLISH
        $fixedHeader |= ($qos << 1);

        $packet = chr($fixedHeader);

        // 可变头部: 主题长度 + 主题
        $topicLength = strlen($topic);
        $variableHeader = chr($topicLength >> 8) . chr($topicLength & 0xFF) . $topic;

        // QoS > 0 需要消息ID
        if ($qos > 0) {
            if (!$messageId) {
                $messageId = $this->generateMessageId();
            }
            $variableHeader .= chr($messageId >> 8) . chr($messageId & 0xFF);
        }

        // 有效载荷
        $payload = $message;

        // 计算剩余长度
        $remainingLength = strlen($variableHeader) + strlen($payload);
        $packet .= $this->encodeLength($remainingLength) . $variableHeader . $payload;

        return $packet;
    }

    /**
     * 编码剩余长度
     */
    private function encodeLength($length)
    {
        $encoded = '';
        do {
            $digit = $length % 128;
            $length = (int)($length / 128);
            if ($length > 0) {
                $digit |= 0x80;
            }
            $encoded .= chr($digit);
        } while ($length > 0);

        return $encoded;
    }

    /**
     * 生成消息ID
     */
    private function generateMessageId()
    {
        $this->messageIdCounter = ($this->messageIdCounter + 1) % 65536;
        return $this->messageIdCounter;
    }

    /**
     * 清理客户端资源
     */
    private function cleanupClient($clientId)
    {
        $clientInfo = $this->clients[$clientId] ?? null;
        if (!$clientInfo) {
            return;
        }

        echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 断开连接\n";

        // 清理订阅关系
        foreach ($this->subscriptions as $topic => $subscribers) {
            if (isset($subscribers[$clientId])) {
                unset($this->subscriptions[$topic][$clientId]);

                if (empty($this->subscriptions[$topic])) {
                    unset($this->subscriptions[$topic]);
                }
            }
        }

        // 清理客户端信息
        unset($this->clients[$clientId]);
    }

    /**
     * 连接关闭时的回调
     */
    public function onClose(TcpConnection $connection)
    {
        $this->cleanupClient($connection->id);
    }
}

// 启动服务器
if (php_sapi_name() == 'cli') {
    $server = new MqttServer();
    $server->start();
} else {
    echo "请在命令行中运行此脚本: php mqtt_server_implement.php\n";
}
367 1 0
1个评论

TM

https://www.workerman.net/doc/workerman/components/workerman-mqtt.html 官网不是有包吗0.0,干嘛重复造轮子

  • za2883632 2026-03-25

    官方的是客户端,这个是服务器端 是ai写的 只是不知道能不能用

za2883632

264
积分
0
获赞数
0
粉丝数
2021-01-22 加入
🔝