<?php
/**
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
/**
/**
MQTT服务器主类
*/
class MqttServer
{
private $clients = []; // 客户端连接信息
private $subscriptions = []; // 主题订阅关系
private $retainedMessages = []; // 保留消息
private $messageIdCounter = 0; // 消息ID计数器
/**
启动MQTT服务器
*/
public function start()
{
// 创建Worker实例,监听1883端口
$worker = new Worker('tcp://0.0.0.0:1883');
// 设置进程数
$worker->count = 2;
// 连接建立时的回调
$worker->onConnect = [$this, 'onConnect'];
// 收到消息时的回调
$worker->onMessage = [$this, 'onMessage'];
// 连接关闭时的回调
$worker->onClose = [$this, 'onClose'];
echo "=== MQTT服务器启动成功 ===\n";
echo "监听地址: tcp://0.0.0.0:1883\n";
echo "进程数: {$worker->count}\n";
echo "协议: MQTT 3.1.1\n";
echo "========================\n\n";
// 运行worker
Worker::runAll();
}
/**
客户端连接建立
*/
public function onConnect(TcpConnection $connection)
{
$clientId = 'client_' . $connection->id;
$this->clients[$connection->id] = [
'connection' => $connection,
'client_id' => $clientId,
'connected_at' => time(),
'username' => null,
'clean_session' => true,
'keepalive' => 60
];
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 已连接\n";
}
/**
处理接收到的消息
*/
public function onMessage(TcpConnection $connection, $data)
{
try {
// 解析MQTT数据包
$packet = $this->parseMqttPacket($data);
if (!$packet) {
echo "[" . date('Y-m-d H:i:s') . "] 无法解析MQTT数据包\n";
return;
}
$packetType = $packet['type'];
switch ($packetType) {
case MqttTypes::CONNECT:
$this->handleConnect($connection, $packet);
break;
case MqttTypes::PUBLISH:
$this->handlePublish($connection, $packet);
break;
case MqttTypes::SUBSCRIBE:
$this->handleSubscribe($connection, $packet);
break;
case MqttTypes::UNSUBSCRIBE:
$this->handleUnsubscribe($connection, $packet);
break;
case MqttTypes::PINGREQ:
$this->handlePing($connection);
break;
case MqttTypes::DISCONNECT:
$this->handleDisconnect($connection);
break;
default:
echo "[" . date('Y-m-d H:i:s') . "] 未知的MQTT包类型: {$packetType}\n";
}
} catch (\Exception $e) {
echo "[" . date('Y-m-d H:i:s') . "] 处理消息时出错: " . $e->getMessage() . "\n";
}
}
/**
处理CONNECT包
*/
private function handleConnect(TcpConnection $connection, $packet)
{
$clientId = $packet['client_id'] ?? 'client_' . $connection->id;
$username = $packet['username'] ?? null;
$cleanSession = $packet['clean_session'] ?? true;
// 更新客户端信息
$this->clients[$connection->id]['client_id'] = $clientId;
$this->clients[$connection->id]['username'] = $username;
$this->clients[$connection->id]['clean_session'] = $cleanSession;
// 发送CONNACK响应 (连接接受)
$connack = $this->buildConnackPacket(0); // 0表示连接接受
$connection->send($connack);
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 连接成功" . ($username ? " (用户名: {$username})" : "") . "\n";
// 发送保留消息(如果有)
$this->sendRetainedMessages($connection);
}
/**
处理PUBLISH包
*/
private function handlePublish(TcpConnection $connection, $packet)
{
$topic = $packet['topic'] ?? '';
$message = $packet['message'] ?? '';
$qos = $packet['qos'] ?? 0;
$retain = $packet['retain'] ?? false;
$messageId = $packet['message_id'] ?? null;
$clientInfo = $this->clients[$connection->id] ?? null;
if (!$clientInfo) {
return;
}
$clientId = $clientInfo['client_id'];
echo "[" . date('Y-m-d H:i:s') . "] 发布消息 - 客户端: {$clientId}, 主题: {$topic}, 消息: {$message}, QoS: {$qos}\n";
// 处理保留消息
if ($retain) {
if (empty($message)) {
// 清空保留消息
unset($this->retainedMessages[$topic]);
} else {
// 设置保留消息
$this->retainedMessages[$topic] = [
'message' => $message,
'qos' => $qos,
'timestamp' => time()
];
}
}
// 转发消息给订阅者
$this->forwardMessage($topic, $message, $qos, $messageId);
// QoS 1 需要回复PUBACK
if ($qos === 1 && $messageId) {
$puback = $this->buildPubackPacket($messageId);
$connection->send($puback);
}
}
/**
处理SUBSCRIBE包
*/
private function handleSubscribe(TcpConnection $connection, $packet)
{
$topics = $packet['topics'] ?? [];
$messageId = $packet['message_id'] ?? 0;
$clientInfo = $this->clients[$connection->id] ?? null;
if (!$clientInfo) {
return;
}
$clientId = $clientInfo['client_id'];
$returnCodes = [];
foreach ($topics as $topic => $qos) {
// 添加订阅关系
if (!isset($this->subscriptions[$topic])) {
$this->subscriptions[$topic] = [];
}
$this->subscriptions[$topic][$connection->id] = $qos;
$returnCodes[] = $qos; // 返回最大支持的QoS等级
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 订阅主题: {$topic}, QoS: {$qos}\n";
// 立即发送该主题的保留消息
if (isset($this->retainedMessages[$topic])) {
$retainedMsg = $this->retainedMessages[$topic];
$this->sendMessageToClient($connection, $topic, $retainedMsg['message'], $retainedMsg['qos']);
}
}
// 发送SUBACK响应
$suback = $this->buildSubackPacket($messageId, $returnCodes);
$connection->send($suback);
}
/**
处理UNSUBSCRIBE包
*/
private function handleUnsubscribe(TcpConnection $connection, $packet)
{
$topics = $packet['topics'] ?? [];
$messageId = $packet['message_id'] ?? 0;
$clientInfo = $this->clients[$connection->id] ?? null;
if (!$clientInfo) {
return;
}
$clientId = $clientInfo['client_id'];
foreach ($topics as $topic) {
if (isset($this->subscriptions[$topic][$connection->id])) {
unset($this->subscriptions[$topic][$connection->id]);
// 如果主题没有订阅者了,清理主题
if (empty($this->subscriptions[$topic])) {
unset($this->subscriptions[$topic]);
}
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientId} 取消订阅主题: {$topic}\n";
}
}
// 发送UNSUBACK响应
$unsuback = $this->buildUnsubackPacket($messageId);
$connection->send($unsuback);
}
/**
处理PING请求
*/
private function handlePing(TcpConnection $connection)
{
// 发送PINGRESP响应
$pingresp = $this->buildPingrespPacket();
$connection->send($pingresp);
$clientInfo = $this->clients[$connection->id] ?? null;
if ($clientInfo) {
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 发送心跳包\n";
}
}
/**
/**
转发消息给订阅者
*/
private function forwardMessage($topic, $message, $qos, $messageId = null)
{
if (!isset($this->subscriptions[$topic])) {
return;
}
foreach ($this->subscriptions[$topic] as $clientId => $subQos) {
$clientInfo = $this->clients[$clientId] ?? null;
if (!$clientInfo) {
continue;
}
$actualQos = min($qos, $subQos);
$this->sendMessageToClient($clientInfo['connection'], $topic, $message, $actualQos, $messageId);
}
}
/**
/**
/**
解析MQTT数据包
*/
private function parseMqttPacket($data)
{
if (strlen($data) < 2) {
return null;
}
$offset = 0;
// 解析固定头部
$firstByte = ord($data[$offset]);
$packetType = ($firstByte >> 4) & 0x0F;
$offset++;
// 解析剩余长度
$remainingLength = 0;
$multiplier = 1;
do {
if ($offset >= strlen($data)) {
return null;
}
$digit = ord($data[$offset]);
$remainingLength += ($digit & 127) $multiplier;
$multiplier = 128;
$offset++;
} while (($digit & 128) != 0);
$packet = ['type' => $packetType];
// 根据包类型解析内容
switch ($packetType) {
case MqttTypes::CONNECT:
$packet = array_merge($packet, $this->parseConnectPacket($data, $offset));
break;
case MqttTypes::PUBLISH:
$packet = array_merge($packet, $this->parsePublishPacket($data, $offset, $firstByte));
break;
case MqttTypes::SUBSCRIBE:
$packet = array_merge($packet, $this->parseSubscribePacket($data, $offset));
break;
case MqttTypes::UNSUBSCRIBE:
$packet = array_merge($packet, $this->parseUnsubscribePacket($data, $offset));
break;
}
return $packet;
}
/**
解析CONNECT包
*/
private function parseConnectPacket($data, $offset)
{
$packet = [];
// 跳过协议名长度和协议名
$protocolNameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2 + $protocolNameLength;
// 协议级别
$packet['protocol_level'] = ord($data[$offset]);
$offset++;
// 连接标志
$connectFlags = ord($data[$offset]);
$packet['clean_session'] = ($connectFlags & 0x02) != 0;
$packet['username_flag'] = ($connectFlags & 0x80) != 0;
$packet['password_flag'] = ($connectFlags & 0x40) != 0;
$offset++;
// 保持连接时间
$keepAlive = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
// 客户端ID
$clientIdLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$packet['client_id'] = substr($data, $offset, $clientIdLength);
$offset += $clientIdLength;
// 用户名
if ($packet['username_flag']) {
$usernameLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$packet['username'] = substr($data, $offset, $usernameLength);
$offset += $usernameLength;
}
// 密码
if ($packet['password_flag']) {
$passwordLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$packet['password'] = substr($data, $offset, $passwordLength);
}
return $packet;
}
/**
解析PUBLISH包
*/
private function parsePublishPacket($data, $offset, $firstByte)
{
$packet = [];
// QoS等级
$packet['qos'] = ($firstByte >> 1) & 0x03;
// 保留标志
$packet['retain'] = ($firstByte & 0x01) == 0x01;
// 主题名
$topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$packet['topic'] = substr($data, $offset, $topicLength);
$offset += $topicLength;
// 消息ID (QoS > 0)
if ($packet['qos'] > 0) {
$packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
}
// 消息内容
$packet['message'] = substr($data, $offset);
return $packet;
}
/**
解析SUBSCRIBE包
*/
private function parseSubscribePacket($data, $offset)
{
$packet = ['topics' => []];
// 消息ID
$packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
// 解析主题列表
while ($offset < strlen($data)) {
$topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$topic = substr($data, $offset, $topicLength);
$offset += $topicLength;
$qos = ord($data[$offset]) & 0x03;
$offset++;
$packet['topics'][$topic] = $qos;
}
return $packet;
}
/**
解析UNSUBSCRIBE包
*/
private function parseUnsubscribePacket($data, $offset)
{
$packet = ['topics' => []];
// 消息ID
$packet['message_id'] = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
// 解析主题列表
while ($offset < strlen($data)) {
$topicLength = (ord($data[$offset]) << 8) | ord($data[$offset + 1]);
$offset += 2;
$topic = substr($data, $offset, $topicLength);
$offset += $topicLength;
$packet['topics'][] = $topic;
}
return $packet;
}
/**
构建CONNACK包
*/
private function buildConnackPacket($returnCode)
{
// 固定头部: CONNACK (0x20)
$packet = chr(0x20);
// 剩余长度: 2字节
$packet .= chr(2);
// 连接确认标志 + 返回码
$packet .= chr(0) . chr($returnCode);
return $packet;
}
/**
构建PUBACK包
*/
private function buildPubackPacket($messageId)
{
// 固定头部: PUBACK (0x40)
$packet = chr(0x40);
// 剩余长度: 2字节
$packet .= chr(2);
// 消息ID
$packet .= chr($messageId >> 8) . chr($messageId & 0xFF);
return $packet;
}
/**
构建SUBACK包
*/
private function buildSubackPacket($messageId, $returnCodes)
{
// 固定头部: SUBACK (0x90)
$packet = chr(0x90);
// 剩余长度: 2 + 返回码数量
$remainingLength = 2 + count($returnCodes);
$packet .= $this->encodeLength($remainingLength);
// 消息ID
$packet .= chr($messageId >> 8) . chr($messageId & 0xFF);
// 返回码
foreach ($returnCodes as $code) {
$packet .= chr($code);
}
return $packet;
}
/**
构建UNSUBACK包
*/
private function buildUnsubackPacket($messageId)
{
// 固定头部: UNSUBACK (0xB0)
$packet = chr(0xB0);
// 剩余长度: 2字节
$packet .= chr(2);
// 消息ID
$packet .= chr($messageId >> 8) . chr($messageId & 0xFF);
return $packet;
}
/**
/**
构建PUBLISH包
*/
private function buildPublishPacket($topic, $message, $qos, $messageId = null)
{
// 固定头部
$fixedHeader = 0x30; // PUBLISH
$fixedHeader |= ($qos << 1);
$packet = chr($fixedHeader);
// 可变头部: 主题长度 + 主题
$topicLength = strlen($topic);
$variableHeader = chr($topicLength >> 8) . chr($topicLength & 0xFF) . $topic;
// QoS > 0 需要消息ID
if ($qos > 0) {
if (!$messageId) {
$messageId = $this->generateMessageId();
}
$variableHeader .= chr($messageId >> 8) . chr($messageId & 0xFF);
}
// 有效载荷
$payload = $message;
// 计算剩余长度
$remainingLength = strlen($variableHeader) + strlen($payload);
$packet .= $this->encodeLength($remainingLength) . $variableHeader . $payload;
return $packet;
}
/**
编码剩余长度
*/
private function encodeLength($length)
{
$encoded = '';
do {
$digit = $length % 128;
$length = (int)($length / 128);
if ($length > 0) {
$digit |= 0x80;
}
$encoded .= chr($digit);
} while ($length > 0);
return $encoded;
}
/**
/**
清理客户端资源
*/
private function cleanupClient($clientId)
{
$clientInfo = $this->clients[$clientId] ?? null;
if (!$clientInfo) {
return;
}
echo "[" . date('Y-m-d H:i:s') . "] 客户端 {$clientInfo['client_id']} 断开连接\n";
// 清理订阅关系
foreach ($this->subscriptions as $topic => $subscribers) {
if (isset($subscribers[$clientId])) {
unset($this->subscriptions[$topic][$clientId]);
if (empty($this->subscriptions[$topic])) {
unset($this->subscriptions[$topic]);
}
}
}
// 清理客户端信息
unset($this->clients[$clientId]);
}
/**
// 启动服务器
if (php_sapi_name() == 'cli') {
$server = new MqttServer();
$server->start();
} else {
echo "请在命令行中运行此脚本: php mqtt_server_implement.php\n";
}
https://www.workerman.net/doc/workerman/components/workerman-mqtt.html 官网不是有包吗0.0,干嘛重复造轮子