AI 用wokerman 实现的 mqtt 服务器 大家看看能不能用

za2883632

<?php
/**

  • 基于Workerman的MQTT服务器实现
  • 完全自主实现的MQTT 3.1.1协议服务器
    */

require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

/**

  • MQTT协议常量定义
    */
    class MqttTypes
    {
    const CONNECT = 1; // 客户端请求连接服务器
    const CONNACK = 2; // 连接确认
    const PUBLISH = 3; // 发布消息
    const PUBACK = 4; // QoS 1消息确认
    const SUBSCRIBE = 8; // 客户端订阅请求
    const SUBACK = 9; // 订阅确认
    const UNSUBSCRIBE = 10; // 客户端取消订阅
    const UNSUBACK = 11; // 取消订阅确认
    const PINGREQ = 12; // 心跳请求
    const PINGRESP = 13; // 心跳响应
    const DISCONNECT = 14; // 客户端断开连接
    }

/**

  • MQTT服务器主类
    */
    class MqttServer
    {
    private $clients = []; // 客户端连接信息
    private $subscriptions = []; // 主题订阅关系
    private $retainedMessages = []; // 保留消息
    private $messageIdCounter = 0; // 消息ID计数器

    /**

    • 启动MQTT服务器
      */
      public function start()
      {
      // 创建Worker实例,监听1883端口
      $worker = new Worker('tcp://0.0.0.0:1883');

      // 设置进程数
      $worker->count = 2;

      // 连接建立时的回调
      $worker->onConnect = [$this, 'onConnect'];

      // 收到消息时的回调
      $worker->onMessage = [$this, 'onMessage'];

      // 连接关闭时的回调
      $worker->onClose = [$this, 'onClose'];

      echo "=== MQTT服务器启动成功 ===\n";
      echo "监听地址: tcp://0.0.0.0:1883\n";
      echo "进程数: {$worker->count}\n";
      echo "协议: MQTT 3.1.1\n";
      echo "========================\n\n";

      // 运行worker
      Worker::runAll();
      }

    /**

    • 客户端连接建立
      */
      public function onConnect(TcpConnection $connection)
      {
      $clientId = 'client_' . $connection->id;
      $this->clients[$connection->id] = [
      'connection' => $connection,
      'client_id' => $clientId,
      'connected_at' => time(),
      'username' => null,
      'clean_session' => true,
      'keepalive' => 60
      ];

      echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 已连接\n";
      }

    /**

    • 处理接收到的消息
      */
      public function onMessage(TcpConnection $connection, $data)
      {
      try {
      // 解析MQTT数据包
      $packet = $this->parseMqttPacket($data);

      if (!$packet) {
          echo "[" . date('Y-m-d H:i:s') . "] 无法解析MQTT数据包\n";
          return;
      }
      
      $packetType = $packet['type'];
      
      switch ($packetType) {
          case MqttTypes::CONNECT:
              $this->handleConnect($connection, $packet);
              break;
      
          case MqttTypes::PUBLISH:
              $this->handlePublish($connection, $packet);
              break;
      
          case MqttTypes::SUBSCRIBE:
              $this->handleSubscribe($connection, $packet);
              break;
      
          case MqttTypes::UNSUBSCRIBE:
              $this->handleUnsubscribe($connection, $packet);
              break;
      
          case MqttTypes::PINGREQ:
              $this->handlePing($connection);
              break;
      
          case MqttTypes::DISCONNECT:
              $this->handleDisconnect($connection);
              break;
      
          default:
              echo "[" . date('Y-m-d H:i:s') . "] 未知的MQTT包类型: {$packetType}\n";
      }

      } catch (\Exception $e) {
      echo "[" . date('Y-m-d H:i:s') . "] 处理消息时出错: " . $e->getMessage() . "\n";
      }
      }

    /**

    • 处理CONNECT包
      */
      private function handleConnect(TcpConnection $connection, $packet)
      {
      $clientId = $packet['client_id'] ?? 'client_' . $connection->id;
      $username = $packet['username'] ?? null;
      $cleanSession = $packet['clean_session'] ?? true;

      // 更新客户端信息
      $this->clients[$connection->id]['client_id'] = $clientId;
      $this->clients[$connection->id]['username'] = $username;
      $this->clients[$connection->id]['clean_session'] = $cleanSession;

      // 发送CONNACK响应 (连接接受)
      $connack = $this->buildConnackPacket(0); // 0表示连接接受
      $connection->send($connack);

      echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 连接成功" . ($username ? " (用户名: {$username})" : "") . "\n";

      // 发送保留消息(如果有)
      $this->sendRetainedMessages($connection);
      }

    /**

    • 处理PUBLISH包
      */
      private function handlePublish(TcpConnection $connection, $packet)
      {
      $topic = $packet['topic'] ?? '';
      $message = $packet['message'] ?? '';
      $qos = $packet['qos'] ?? 0;
      $retain = $packet['retain'] ?? false;
      $messageId = $packet['message_id'] ?? null;

      $clientInfo = $this->clients[$connection->id] ?? null;
      if (!$clientInfo) {
      return;
      }

      $clientId = $clientInfo['client_id'];

      echo "[" . date('Y-m-d H:i:s') . "] 发布消息 - 客户端: {$clientId}, 主题: {$topic}, 消息: {$message}, QoS: {$qos}\n";

      // 处理保留消息
      if ($retain) {
      if (empty($message)) {
      // 清空保留消息
      unset($this->retainedMessages[$topic]);
      } else {
      // 设置保留消息
      $this->retainedMessages[$topic] = [
      'message' => $message,
      'qos' => $qos,
      'timestamp' => time()
      ];
      }
      }

      // 转发消息给订阅者
      $this->forwardMessage($topic, $message, $qos, $messageId);

      // QoS 1 需要回复PUBACK
      if ($qos === 1 && $messageId) {
      $puback = $this->buildPubackPacket($messageId);
      $connection->send($puback);
      }
      }

    /**

    • 处理SUBSCRIBE包
      */
      private function handleSubscribe(TcpConnection $connection, $packet)
      {
      $topics = $packet['topics'] ?? [];
      $messageId = $packet['message_id'] ?? 0;

      $clientInfo = $this->clients[$connection->id] ?? null;
      if (!$clientInfo) {
      return;
      }

      $clientId = $clientInfo['client_id'];
      $returnCodes = [];

      foreach ($topics as $topic => $qos) {
      // 添加订阅关系
      if (!isset($this->subscriptions[$topic])) {
      $this->subscriptions[$topic] = [];
      }

      $this->subscriptions[$topic][$connection->id] = $qos;
      $returnCodes[] = $qos; // 返回最大支持的QoS等级
      
      echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 订阅主题: {$topic}, QoS: {$qos}\n";
      
      // 立即发送该主题的保留消息
      if (isset($this->retainedMessages[$topic])) {
          $retainedMsg = $this->retainedMessages[$topic];
          $this->sendMessageToClient($connection, $topic, $retainedMsg['message'], $retainedMsg['qos']);
      }

      }

      // 发送SUBACK响应
      $suback = $this->buildSubackPacket($messageId, $returnCodes);
      $connection->send($suback);
      }

    /**

    • 处理UNSUBSCRIBE包
      */
      private function handleUnsubscribe(TcpConnection $connection, $packet)
      {
      $topics = $packet['topics'] ?? [];
      $messageId = $packet['message_id'] ?? 0;

      $clientInfo = $this->clients[$connection->id] ?? null;
      if (!$clientInfo) {
      return;
      }

      $clientId = $clientInfo['client_id'];

      foreach ($topics as $topic) {
      if (isset($this->subscriptions[$topic][$connection->id])) {
      unset($this->subscriptions[$topic][$connection->id]);

          // 如果主题没有订阅者了,清理主题
          if (empty($this->subscriptions[$topic])) {
              unset($this->subscriptions[$topic]);
          }
      
          echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 取消订阅主题: {$topic}\n";
      }

      }

      // 发送UNSUBACK响应
      $unsuback = $this->buildUnsubackPacket($messageId);
      $connection->send($unsuback);
      }

    /**

    • 处理PING请求
      */
      private function handlePing(TcpConnection $connection)
      {
      // 发送PINGRESP响应
      $pingresp = $this->buildPingrespPacket();
      $connection->send($pingresp);

      $clientInfo = $this->clients[$connection->id] ?? null;
      if ($clientInfo) {
      echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 发送心跳包\n";
      }
      }

    /**

    • 处理断开连接
      */
      private function handleDisconnect(TcpConnection $connection)
      {
      $this->cleanupClient($connection->id);
      $connection->close();
      }

    /**

    • 转发消息给订阅者
      */
      private function forwardMessage($topic, $message, $qos, $messageId = null)
      {
      if (!isset($this->subscriptions[$topic])) {
      return;
      }

      foreach ($this->subscriptions[$topic] as $clientId => $subQos) {
      $clientInfo = $this->clients[$clientId] ?? null;
      if (!$clientInfo) {
      continue;
      }

      $actualQos = min($qos, $subQos);
      $this->sendMessageToClient($clientInfo['connection'], $topic, $message, $actualQos, $messageId);

      }
      }

    /**

    • 发送消息给客户端
      */
      private function sendMessageToClient(TcpConnection $connection, $topic, $message, $qos, $messageId = null)
      {
      $publishPacket = $this->buildPublishPacket($topic, $message, $qos, $messageId);
      $connection->send($publishPacket);
      }

    /**

    • 发送保留消息给新连接的客户端
      */
      private function sendRetainedMessages(TcpConnection $connection)
      {
      foreach ($this->retainedMessages as $topic => $messageInfo) {
      $this->sendMessageToClient($connection, $topic, $messageInfo['message'], $messageInfo['qos']);
      }
      }

    /**

    • 解析MQTT数据包
      */
      private function parseMqttPacket($data)
      {
      if (strlen($data) < 2) {
      return null;
      }

      $offset = 0;

      // 解析固定头部
      $firstByte = ord($data[$offset]);
      $packetType = ($firstByte >> 4) & 0x0F;
      $offset++;

      // 解析剩余长度
      $remainingLength = 0;
      $multiplier = 1;

      do {
      if ($offset >= strlen($data)) {
      return null;
      }
      $digit = ord($data[$offset]);
      $remainingLength += ($digit & 127) $multiplier;
      $multiplier
      = 128;
      $offset++;
      } while (($digit & 128) != 0);

      $packet = ['type' => $packetType];

      // 根据包类型解析内容
      switch ($packetType) {
      case MqttTypes::CONNECT:
      $packet = array_merge($packet, $this->parseConnectPacket($data, $offset));
      break;

      case MqttTypes::PUBLISH:
          $packet = array_merge($packet, $this->parsePublishPacket($data, $offset, $firstByte));
          break;
      
      case MqttTypes::SUBSCRIBE:
          $packet = array_merge($packet, $this->parseSubscribePacket($data, $offset));
          break;
      
      case MqttTypes::UNSUBSCRIBE:
          $packet = array_merge($packet, $this->parseUnsubscribePacket($data, $offset));
          break;

      }

      return $packet;
      }

    /**

    • 解析CONNECT包
      */
      private function parseConnectPacket($data, $offset)
      {
      $packet = [];

      // 跳过协议名长度和协议名
      $protocolNameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2 + $protocolNameLength;

      // 协议级别
      $packet['protocol_level'] = ord($data[$offset]);
      $offset++;

      // 连接标志
      $connectFlags = ord($data[$offset]);
      $packet['clean_session'] = ($connectFlags & 0x02) != 0;
      $packet['username_flag'] = ($connectFlags & 0x80) != 0;
      $packet['password_flag'] = ($connectFlags & 0x40) != 0;
      $offset++;

      // 保持连接时间
      $keepAlive = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;

      // 客户端ID
      $clientIdLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $packet['client_id'] = substr($data, $offset, $clientIdLength);
      $offset += $clientIdLength;

      // 用户名
      if ($packet['username_flag']) {
      $usernameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $packet['username'] = substr($data, $offset, $usernameLength);
      $offset += $usernameLength;
      }

      // 密码
      if ($packet['password_flag']) {
      $passwordLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $packet['password'] = substr($data, $offset, $passwordLength);
      }

      return $packet;
      }

    /**

    • 解析PUBLISH包
      */
      private function parsePublishPacket($data, $offset, $firstByte)
      {
      $packet = [];

      // QoS等级
      $packet['qos'] = ($firstByte >> 1) & 0x03;

      // 保留标志
      $packet['retain'] = ($firstByte & 0x01) == 0x01;

      // 主题名
      $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $packet['topic'] = substr($data, $offset, $topicLength);
      $offset += $topicLength;

      // 消息ID (QoS > 0)
      if ($packet['qos'] > 0) {
      $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      }

      // 消息内容
      $packet['message'] = substr($data, $offset);

      return $packet;
      }

    /**

    • 解析SUBSCRIBE包
      */
      private function parseSubscribePacket($data, $offset)
      {
      $packet = ['topics' => []];

      // 消息ID
      $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;

      // 解析主题列表
      while ($offset < strlen($data)) {
      $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $topic = substr($data, $offset, $topicLength);
      $offset += $topicLength;

      $qos = ord($data[$offset]) & 0x03;
      $offset++;
      
      $packet['topics'][$topic] = $qos;

      }

      return $packet;
      }

    /**

    • 解析UNSUBSCRIBE包
      */
      private function parseUnsubscribePacket($data, $offset)
      {
      $packet = ['topics' => []];

      // 消息ID
      $packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;

      // 解析主题列表
      while ($offset < strlen($data)) {
      $topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
      $offset += 2;
      $topic = substr($data, $offset, $topicLength);
      $offset += $topicLength;

      $packet['topics'][] = $topic;

      }

      return $packet;
      }

    /**

    • 构建CONNACK包
      */
      private function buildConnackPacket($returnCode)
      {
      // 固定头部: CONNACK (0x20)
      $packet = chr(0x20);

      // 剩余长度: 2字节
      $packet .= chr(2);

      // 连接确认标志 + 返回码
      $packet .= chr(0) . chr($returnCode);

      return $packet;
      }

    /**

    • 构建PUBACK包
      */
      private function buildPubackPacket($messageId)
      {
      // 固定头部: PUBACK (0x40)
      $packet = chr(0x40);

      // 剩余长度: 2字节
      $packet .= chr(2);

      // 消息ID
      $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

      return $packet;
      }

    /**

    • 构建SUBACK包
      */
      private function buildSubackPacket($messageId, $returnCodes)
      {
      // 固定头部: SUBACK (0x90)
      $packet = chr(0x90);

      // 剩余长度: 2 + 返回码数量
      $remainingLength = 2 + count($returnCodes);
      $packet .= $this->encodeLength($remainingLength);

      // 消息ID
      $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

      // 返回码
      foreach ($returnCodes as $code) {
      $packet .= chr($code);
      }

      return $packet;
      }

    /**

    • 构建UNSUBACK包
      */
      private function buildUnsubackPacket($messageId)
      {
      // 固定头部: UNSUBACK (0xB0)
      $packet = chr(0xB0);

      // 剩余长度: 2字节
      $packet .= chr(2);

      // 消息ID
      $packet .= chr($messageId >> 8) . chr($messageId & 0xFF);

      return $packet;
      }

    /**

    • 构建PINGRESP包
      */
      private function buildPingrespPacket()
      {
      // 固定头部: PINGRESP (0xD0) + 剩余长度0
      return chr(0xD0) . chr(0);
      }

    /**

    • 构建PUBLISH包
      */
      private function buildPublishPacket($topic, $message, $qos, $messageId = null)
      {
      // 固定头部
      $fixedHeader = 0x30; // PUBLISH
      $fixedHeader |= ($qos << 1);

      $packet = chr($fixedHeader);

      // 可变头部: 主题长度 + 主题
      $topicLength = strlen($topic);
      $variableHeader = chr($topicLength >> 8) . chr($topicLength & 0xFF) . $topic;

      // QoS > 0 需要消息ID
      if ($qos > 0) {
      if (!$messageId) {
      $messageId = $this->generateMessageId();
      }
      $variableHeader .= chr($messageId >> 8) . chr($messageId & 0xFF);
      }

      // 有效载荷
      $payload = $message;

      // 计算剩余长度
      $remainingLength = strlen($variableHeader) + strlen($payload);
      $packet .= $this->encodeLength($remainingLength) . $variableHeader . $payload;

      return $packet;
      }

    /**

    • 编码剩余长度
      */
      private function encodeLength($length)
      {
      $encoded = '';
      do {
      $digit = $length % 128;
      $length = (int)($length / 128);
      if ($length > 0) {
      $digit |= 0x80;
      }
      $encoded .= chr($digit);
      } while ($length > 0);

      return $encoded;
      }

    /**

    • 生成消息ID
      */
      private function generateMessageId()
      {
      $this->messageIdCounter = ($this->messageIdCounter + 1) % 65536;
      return $this->messageIdCounter;
      }

    /**

    • 清理客户端资源
      */
      private function cleanupClient($clientId)
      {
      $clientInfo = $this->clients[$clientId] ?? null;
      if (!$clientInfo) {
      return;
      }

      echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 断开连接\n";

      // 清理订阅关系
      foreach ($this->subscriptions as $topic => $subscribers) {
      if (isset($subscribers[$clientId])) {
      unset($this->subscriptions[$topic][$clientId]);

          if (empty($this->subscriptions[$topic])) {
              unset($this->subscriptions[$topic]);
          }
      }

      }

      // 清理客户端信息
      unset($this->clients[$clientId]);
      }

    /**

    • 连接关闭时的回调
      */
      public function onClose(TcpConnection $connection)
      {
      $this->cleanupClient($connection->id);
      }
      }

// 启动服务器
if (php_sapi_name() == 'cli') {
$server = new MqttServer();
$server->start();
} else {
echo "请在命令行中运行此脚本: php mqtt_server_implement.php\n";
}

228 1 0
1个评论

TM

https://www.workerman.net/doc/workerman/components/workerman-mqtt.html 官网不是有包吗0.0,干嘛重复造轮子

  • 暂无评论

za2883632

264
积分
0
获赞数
0
粉丝数
2021-01-22 加入
🔝