不同操作系统使用不同的 I/O 多路复用技术:
| 操作系统 | 实现机制 | 特点 |
|---|---|---|
| Linux | Epoll | 高效、可扩展、适合大量连接 |
| BSD/Mac | Kqueue | 性能优异、事件通知机制完善 |
| Windows | Select | 跨平台、有连接数限制 |
┌─────────────────────────────────────┐
│ 1. 注册事件 │
│ └─> 将 socket 添加到事件监听器 │
└────────────┬────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 2. 等待事件 │
│ └─> 内核监听 socket 状态变化 │
└────────────┬────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 3. 事件触发 │
│ └─> 数据可读/可写时通知应用 │
└────────────┬────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 4. 处理事件 │
│ └─> 调用对应回调函数 │
└────────────┬────────────────────────┘
│
↓
┌─────────────────────────────────────┐
│ 5. 继续等待 │
│ └─> 返回步骤 2,形成事件循环 │
└─────────────────────────────────────┘
<?php
/**
* Workerman 事件驱动机制示例
* 演示事件注册、触发和处理的完整流程
*/
use Workerman\Worker;
use Workerman\Timer;
require_once __DIR__ . '/vendor/autoload.php';
// ============================================
// 【事件驱动机制说明】
// ============================================
// 底层实现:
// - Linux: Epoll
// - BSD/Mac: Kqueue
// - Windows: Select
//
// 工作原理:
// 1. 注册事件 → 将 socket 添加到事件监听器
// 2. 等待事件 → 内核监听 socket 状态变化
// 3. 事件触发 → 数据可读/可写时通知应用
// 4. 处理事件 → 调用对应回调函数
// 5. 继续等待 → 返回事件循环
// ============================================
echo "\n";
echo "============================================\n";
echo " Workerman 事件驱动机制演示\n";
echo "============================================\n\n";
// 创建 TCP 服务器
$worker = new Worker('tcp://0.0.0.0:9090');
$worker->count = 2;
$worker->name = 'EventDrivenDemo';
// 统计信息
$statistics = [
'total_connections' => 0,
'total_messages' => 0,
'current_connections' => 0,
];
// ============================================
// 【事件 1: onWorkerStart】
// 说明: Worker 进程启动时触发
// 时机: 在开始监听端口之前执行一次
// 用途: 初始化资源、定时器、数据库连接等
// ============================================
$worker->onWorkerStart = function($worker) use (&$statistics) {
echo "[事件循环启动] 进程 " . posix_getpid() . " 开始运行\n";
echo " ├─ 底层驱动: " . getEventLoopType() . "\n";
echo " ├─ 监听地址: tcp://0.0.0.0:9090\n";
echo " └─ 进入事件循环,等待事件触发...\n\n";
// 【注册定时器事件】- 每 5 秒输出统计信息
Timer::add(5, function() use (&$statistics) {
echo "[定时事件触发] 统计信息:\n";
echo " ├─ 总连接数: {$statistics['total_connections']}\n";
echo " ├─ 当前连接: {$statistics['current_connections']}\n";
echo " └─ 总消息数: {$statistics['total_messages']}\n\n";
});
};
// ============================================
// 【事件 2: onConnect】
// 说明: 客户端连接建立时触发
// 触发: 新 socket 连接成功建立
// 流程:
// 1. 内核检测到新连接(Epoll/Select)
// 2. 将 socket 添加到事件监听器
// 3. 触发 onConnect 回调
// ============================================
$worker->onConnect = function($connection) use (&$statistics) {
$statistics['total_connections']++;
$statistics['current_connections']++;
echo "[连接事件触发] 新客户端连接\n";
echo " ├─ 客户端IP: {$connection->getRemoteIp()}\n";
echo " ├─ 客户端端口: {$connection->getRemotePort()}\n";
echo " ├─ 连接ID: {$connection->id}\n";
echo " └─ 当前连接数: {$statistics['current_connections']}\n";
echo " → 【注册读事件】将此 socket 添加到事件监听器\n\n";
// 发送欢迎消息
$connection->send("=================================\n");
$connection->send("欢迎来到事件驱动演示服务器\n");
$connection->send("=================================\n");
$connection->send("可用命令:\n");
$connection->send(" echo <消息> - 回显消息\n");
$connection->send(" time - 显示服务器时间\n");
$connection->send(" stats - 显示统计信息\n");
$connection->send(" event - 显示事件信息\n");
$connection->send(" quit - 断开连接\n");
$connection->send("---------------------------------\n");
};
// ============================================
// 【事件 3: onMessage】
// 说明: 接收到客户端数据时触发
// 触发: socket 可读(有数据到达)
// 流程:
// 1. 内核检测到 socket 可读
// 2. 事件循环通知应用程序
// 3. 调用 onMessage 回调
// 4. 处理完毕,返回事件循环
// ============================================
$worker->onMessage = function($connection, $data) use (&$statistics) {
$statistics['total_messages']++;
$data = trim($data);
echo "[消息事件触发] 收到数据\n";
echo " ├─ 来自: {$connection->getRemoteIp()}:{$connection->getRemotePort()}\n";
echo " ├─ 数据: {$data}\n";
echo " └─ 事件流程:\n";
echo " 1. 内核检测到 socket 可读\n";
echo " 2. Epoll/Select 通知应用程序\n";
echo " 3. 触发 onMessage 回调\n";
echo " 4. 处理数据...\n\n";
// 解析命令
$parts = explode(' ', $data, 2);
$command = strtolower($parts[0]);
$arg = $parts[1] ?? '';
switch ($command) {
case 'echo':
if ($arg) {
$connection->send("[回显] {$arg}\n");
} else {
$connection->send("[错误] 请提供要回显的消息\n");
}
break;
case 'time':
$connection->send("[时间] " . date('Y-m-d H:i:s') . "\n");
break;
case 'stats':
$connection->send("=================================\n");
$connection->send("服务器统计信息\n");
$connection->send("=================================\n");
$connection->send("总连接数: {$statistics['total_connections']}\n");
$connection->send("当前连接: {$statistics['current_connections']}\n");
$connection->send("总消息数: {$statistics['total_messages']}\n");
$connection->send("进程ID: " . posix_getpid() . "\n");
$connection->send("---------------------------------\n");
break;
case 'event':
$connection->send("=================================\n");
$connection->send("事件驱动机制信息\n");
$connection->send("=================================\n");
$connection->send("底层驱动: " . getEventLoopType() . "\n");
$connection->send("已注册事件:\n");
$connection->send(" ✓ 读事件 (可读时触发)\n");
$connection->send(" ✓ 写事件 (可写时触发)\n");
$connection->send(" ✓ 定时事件 (定时触发)\n");
$connection->send("\n工作流程:\n");
$connection->send(" 1. 注册事件到监听器\n");
$connection->send(" 2. 事件循环等待\n");
$connection->send(" 3. 内核通知事件就绪\n");
$connection->send(" 4. 执行回调函数\n");
$connection->send(" 5. 返回事件循环\n");
$connection->send("---------------------------------\n");
break;
case 'quit':
$connection->send("[再见] 正在断开连接...\n");
// 【触发关闭事件】
$connection->close();
break;
default:
$connection->send("[错误] 未知命令: {$command}\n");
$connection->send("[提示] 输入命令查看帮助\n");
}
echo " → 处理完毕,返回事件循环\n\n";
};
// ============================================
// 【事件 4: onClose】
// 说明: 连接关闭时触发
// 触发: socket 断开连接
// 流程:
// 1. 检测到连接关闭
// 2. 从事件监听器移除 socket
// 3. 触发 onClose 回调
// 4. 清理资源
// ============================================
$worker->onClose = function($connection) use (&$statistics) {
$statistics['current_connections']--;
echo "[关闭事件触发] 连接断开\n";
echo " ├─ 客户端: {$connection->getRemoteIp()}:{$connection->getRemotePort()}\n";
echo " ├─ 连接ID: {$connection->id}\n";
echo " └─ 当前连接数: {$statistics['current_connections']}\n";
echo " → 【移除事件】从事件监听器移除此 socket\n\n";
};
// ============================================
// 【事件 5: onError】
// 说明: 连接发生错误时触发
// 触发: socket 错误(如网络中断)
// 用途: 记录错误日志,清理资源
// ============================================
$worker->onError = function($connection, $code, $msg) {
echo "[错误事件触发] 连接错误\n";
echo " ├─ 错误码: {$code}\n";
echo " ├─ 错误信息: {$msg}\n";
echo " └─ 客户端: {$connection->getRemoteIp()}\n\n";
};
// ============================================
// 【事件 6: onBufferFull】
// 说明: 发送缓冲区满时触发
// 触发: 应用层发送速度 > 网络发送速度
// 用途: 暂停接收数据,避免内存溢出
// ============================================
$worker->onBufferFull = function($connection) {
echo "[缓冲区事件触发] 发送缓冲区已满\n";
echo " ├─ 客户端: {$connection->getRemoteIp()}\n";
echo " └─ 缓冲区大小: " . strlen($connection->getSendBuffer()) . " 字节\n\n";
};
// ============================================
// 【事件 7: onBufferDrain】
// 说明: 发送缓冲区清空时触发
// 触发: 缓冲区数据全部发送完毕
// 用途: 恢复接收数据
// ============================================
$worker->onBufferDrain = function($connection) {
echo "[缓冲区事件触发] 发送缓冲区已清空\n";
echo " └─ 客户端: {$connection->getRemoteIp()}\n\n";
};
// 辅助函数:获取事件循环类型
function getEventLoopType() {
if (extension_loaded('event')) {
return 'Event (Libevent)';
} elseif (extension_loaded('ev')) {
return 'Ev';
} elseif (DIRECTORY_SEPARATOR === '/') {
return 'Epoll (Linux) / Kqueue (BSD/Mac)';
} else {
return 'Select (Windows)';
}
}
// ============================================
// 【启动事件循环】
// Worker::runAll() 会:
// 1. Fork 子进程
// 2. 创建监听 socket
// 3. 初始化事件循环 (Epoll/Select)
// 4. 注册所有事件回调
// 5. 进入无限循环,等待事件
// ============================================
echo "准备启动事件循环...\n";
echo "底层将使用: " . getEventLoopType() . "\n";
echo "\n";
Worker::runAll();
<?php
/**
* 事件驱动演示 - 测试客户端
* 连接到 event_driven_demo.php 并测试各种命令
*/
echo "\n";
echo "===========================================\n";
echo " 事件驱动演示 - 测试客户端\n";
echo "===========================================\n\n";
// 连接到服务器
echo "[1] 正在连接到服务器...\n";
$socket = @stream_socket_client('tcp://127.0.0.1:9090', $errno, $errstr, 5);
if (!$socket) {
die("❌ 连接失败: {$errstr} ({$errno})\n\n请先启动服务器: php event_driven_demo.php start\n");
}
echo "✅ 连接成功!\n";
echo " → 服务器会触发 onConnect 事件\n\n";
// 读取欢迎消息
echo "[2] 接收欢迎消息:\n";
echo "-------------------------------------------\n";
stream_set_timeout($socket, 2);
while ($line = fgets($socket)) {
echo $line;
if (strpos($line, '---------------------------------') !== false) {
break;
}
}
echo "\n";
// 测试命令列表
$commands = [
['echo 你好,Workerman!', '测试消息回显'],
['time', '获取服务器时间'],
['stats', '查看服务器统计'],
['event', '查看事件信息'],
];
foreach ($commands as $index => $cmdInfo) {
list($cmd, $desc) = $cmdInfo;
$num = $index + 3;
echo "[{$num}] 发送命令: {$cmd}\n";
echo " 说明: {$desc}\n";
echo " → 服务器会触发 onMessage 事件\n";
echo "-------------------------------------------\n";
// 发送命令
fwrite($socket, $cmd . "\n");
// 接收响应
sleep(1);
stream_set_blocking($socket, 0);
$response = '';
while ($line = fgets($socket)) {
$response .= $line;
}
stream_set_blocking($socket, 1);
echo $response;
echo "\n";
}
// 断开连接
echo "[7] 发送退出命令: quit\n";
echo " → 服务器会触发 onClose 事件\n";
echo "-------------------------------------------\n";
fwrite($socket, "quit\n");
sleep(1);
echo fgets($socket);
fclose($socket);
echo "\n✅ 测试完成!\n";
echo "\n===========================================\n";
echo " 事件驱动流程总结\n";
echo "===========================================\n";
echo "1. 客户端连接 → 触发 onConnect 事件\n";
echo "2. 发送数据 → 触发 onMessage 事件\n";
echo "3. 服务器响应 → 触发写事件 (内部)\n";
echo "4. 客户端断开 → 触发 onClose 事件\n";
echo "5. 定时器执行 → 触发定时事件\n";
echo "\n所有事件都在事件循环中异步处理!\n";
echo "===========================================\n\n";
📖 完整代码解释
第 1-5 行:文件说明和开始标记
<?php
/**
连接到 event_driven_demo.php 并测试各种命令
*/
第 7-10 行:输出标题
echo "\n";
echo "===========================================\n";
echo " 事件驱动演示 - 测试客户端\n";
echo "===========================================\n\n";
含义:
第 12-13 行:连接提示
echo "[1] 正在连接到服务器...\n";
$socket = @stream_socket_client('tcp://127.0.0.1:9090', $errno, $errstr, 5);
详细解释:
stream_socket_client() 函数参数:
@ 符号:
返回值:
第 15-17 行:连接失败处理
if (!$socket) {
die("❌ 连接失败: {$errstr} ({$errno})\n\n请先启动服务器: php event_driven_demo.php start\n");
}
含义:
示例错误输出:
❌ 连接失败: Connection refused (111)
请先启动服务器: php event_driven_demo.php start
第 19-20 行:连接成功提示
echo "✅ 连接成功!\n";
echo " → 服务器会触发 onConnect 事件\n\n";
含义:
第 22-30 行:读取欢迎消息
echo "[2] 接收欢迎消息:\n";
echo "-------------------------------------------\n";
stream_set_timeout($socket, 2);
while ($line = fgets($socket)) {
echo $line;
if (strpos($line, '---------------------------------') !== false) {
break;
}
}
echo "\n";
● 详细解释:
stream_set_timeout($socket, 2)
防止程序无限等待
while ($line = fgets($socket))
while (...) - 当还有数据可读时,持续循环
工作流程:
strpos($line, '---------------------------------')
!== false - 严格不等于 false,说明找到了
break
欢迎消息读取完毕,不再继续读取
第 32-37 行:定义测试命令
$commands = [
['echo 你好,Workerman!', '测试消息回显'],
['time', '获取服务器时间'],
['stats', '查看服务器统计'],
['event', '查看事件信息'],
];
含义:
每个元素是一个数组,包含两个值:
数组结构:
$commands = [
[命令1, 说明1],
[命令2, 说明2],
[命令3, 说明3],
[命令4, 说明4],
]
第 39-62 行:循环测试所有命令
foreach ($commands as $index => $cmdInfo) {
list($cmd, $desc) = $cmdInfo;
$num = $index + 3;
echo "[{$num}] 发送命令: {$cmd}\n";
echo " 说明: {$desc}\n";
echo " → 服务器会触发 onMessage 事件\n";
echo "-------------------------------------------\n";
// 发送命令
fwrite($socket, $cmd . "\n");
// 接收响应
sleep(1);
stream_set_blocking($socket, 0);
$response = '';
while ($line = fgets($socket)) {
$response .= $line;
}
stream_set_blocking($socket, 1);
echo $response;
echo "\n";
}
详细解释:
循环声明
foreach ($commands as $index => $cmdInfo) {
$cmdInfo - 当前元素的值(子数组)
解构数组
list($cmd, $desc) = $cmdInfo;
$desc - 接收 $cmdInfo[1](说明)
示例:
// 第一次循环
$cmdInfo = ['echo 你好,Workerman!', '测试消息回显'];
list($cmd, $desc) = $cmdInfo;
// 结果: $cmd = 'echo 你好,Workerman!', $desc = '测试消息回显'
计算步骤编号
$num = $index + 3;
$index 为 0 时,$num 为 3
发送命令
fwrite($socket, $cmd . "\n");
换行符告诉服务器:这是完整的一条命令
等效于:
fwrite($socket, "echo 你好,Workerman!\n");
等待响应
sleep(1);
确保响应数据已经到达
设置非阻塞模式
stream_set_blocking($socket, 0);
阻塞:如果没有数据,fgets() 会一直等待
为什么用非阻塞?
用非阻塞可以读到没有数据为止,然后继续执行
读取所有响应
$response = '';
while ($line = fgets($socket)) {
$response .= $line;
}
当没有数据时,fgets() 返回 false,循环结束
工作流程:
第1次: $response = '' + "第一行\n" = "第一行\n"
第2次: $response = "第一行\n" + "第二行\n" = "第一行\n第二行\n"
第3次: $response = "第一行\n第二行\n" + "第三行\n" = ...
没数据了: fgets() 返回 false,循环结束
恢复阻塞模式
stream_set_blocking($socket, 1);
恢复正常模式,为下次命令做准备
输出响应
echo $response;
echo "\n";
额外输出一个空行,让界面更清晰
第 64-71 行:发送退出命令
echo "[7] 发送退出命令: quit\n";
echo " → 服务器会触发 onClose 事件\n";
echo "-------------------------------------------\n";
fwrite($socket, "quit\n");
sleep(1);
echo fgets($socket);
fclose($socket);
详细解释:
发送 quit 命令
fwrite($socket, "quit\n");
服务器收到后会关闭连接
等待响应
sleep(1);
echo fgets($socket);
直接输出,不保存到变量
关闭连接
fclose($socket);
如果不关闭,会导致资源泄漏
第 73-82 行:输出总结
echo "\n✅ 测试完成!\n";
echo "\n===========================================\n";
echo " 事件驱动流程总结\n";
echo "===========================================\n";
echo "1. 客户端连接 → 触发 onConnect 事件\n";
echo "2. 发送数据 → 触发 onMessage 事件\n";
echo "3. 服务器响应 → 触发写事件 (内部)\n";
echo "4. 客户端断开 → 触发 onClose 事件\n";
echo "5. 定时器执行 → 触发定时事件\n";
echo "\n所有事件都在事件循环中异步处理!\n";
echo "===========================================\n\n";
含义:
帮助用户理解事件的触发顺序
🔍 完整执行流程示例
假设服务器正在运行,执行 php test_event_client.php:
[1] 正在连接到服务器...
✅ 连接成功!
→ 服务器会触发 onConnect 事件
[回显] 你好,Workerman!
[时间] 2025-12-18 15:30:45
底层驱动: Epoll (Linux)
已注册事件:
✓ 读事件 (可读时触发)
✓ 写事件 (可写时触发)
✓ 定时事件 (定时触发)
工作流程:
[再见] 正在断开连接...
✅ 测试完成!
🎯 关键知识点总结
| 函数 | 作用 |
|---|---|
| stream_socket_client() | 创建 TCP/UDP 客户端连接 |
| fwrite() | 向 socket 写入数据 |
| fgets() | 从 socket 读取一行数据 |
| fclose() | 关闭 socket 连接 |
| stream_set_timeout() | 设置读取超时时间 |
| stream_set_blocking() | 设置阻塞/非阻塞模式 |
阻塞模式(默认):
stream_set_blocking($socket, 1);
$line = fgets($socket); // 如果没数据,会一直等待
非阻塞模式:
stream_set_blocking($socket, 0);
$line = fgets($socket); // 如果没数据,立即返回 false
// 定义二维数组
$arr = [
['value1', 'value2'],
['value3', 'value4'],
];
// 遍历
foreach ($arr as $index => $item) {
list($first, $second) = $item;
echo "{$first}, {$second}\n";
}
$str = "hello";
$str .= " world"; // 追加,结果: "hello world"
strpos($str, "world"); // 查找位置,返回 6
strpos($str, "xxx"); // 找不到,返回 false
这个测试客户端完整演示了如何与 TCP 服务器通信,是学习网络编程的好例子!