这是一个简单的 Workerman 信号处理和转发示例,适用于 Windows 环境。
客户端 (send_signal.php)
↓ 发送命令
信号分发器 (signal_dispatcher.php) - 监听 6666 端口
↓ 转发信号
工作进程 (signal_worker.php) - 4个进程监听 5555 端口
↓ 处理信号并响应
signal_worker.php - 工作进程
signal_dispatcher.php - 信号分发器
send_signal.php - 信号发送客户端
<?php
/**
* Workerman 信号处理和转发示例 - 工作进程
* 这个进程接收信号并处理
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个 Worker 监听内部信号端口
$signal_worker = new Worker('tcp://127.0.0.1:5555');
$signal_worker->name = 'SignalWorker';
$signal_worker->count = 4; // 启动4个进程
// 工作进程状态
$worker_status = [
'start_time' => time(),
'request_count' => 0,
'status' => 'running'
];
// 当进程启动时
$signal_worker->onWorkerStart = function($worker) use (&$worker_status) {
echo "工作进程 #{$worker->id} 启动,PID: " . getmypid() . "\n";
$worker_status['worker_id'] = $worker->id;
};
// 接收信号消息
$signal_worker->onMessage = function(TcpConnection $connection, $data) use (&$worker_status) {
$signal = json_decode($data, true);
if (!$signal || !isset($signal['action'])) {
$connection->send(json_encode(['status' => 'error', 'message' => '无效的信号']));
return;
}
echo "工作进程 #{$worker_status['worker_id']} 收到信号: {$signal['action']}\n";
switch ($signal['action']) {
case 'status':
// 返回状态信息
$response = [
'status' => 'success',
'data' => [
'worker_id' => $worker_status['worker_id'],
'pid' => getmypid(),
'status' => $worker_status['status'],
'uptime' => time() - $worker_status['start_time'],
'request_count' => $worker_status['request_count']
]
];
$connection->send(json_encode($response));
break;
case 'reload':
// 模拟重载配置
echo "工作进程 #{$worker_status['worker_id']} 正在重载配置...\n";
$worker_status['status'] = 'reloading';
// 模拟重载操作
sleep(1);
$worker_status['status'] = 'running';
echo "工作进程 #{$worker_status['worker_id']} 配置重载完成\n";
$connection->send(json_encode([
'status' => 'success',
'message' => "工作进程 #{$worker_status['worker_id']} 配置已重载"
]));
break;
case 'pause':
// 暂停处理
$worker_status['status'] = 'paused';
echo "工作进程 #{$worker_status['worker_id']} 已暂停\n";
$connection->send(json_encode([
'status' => 'success',
'message' => "工作进程 #{$worker_status['worker_id']} 已暂停"
]));
break;
case 'resume':
// 恢复处理
$worker_status['status'] = 'running';
echo "工作进程 #{$worker_status['worker_id']} 已恢复\n";
$connection->send(json_encode([
'status' => 'success',
'message' => "工作进程 #{$worker_status['worker_id']} 已恢复"
]));
break;
case 'increment':
// 增加计数器
$worker_status['request_count']++;
$connection->send(json_encode([
'status' => 'success',
'message' => "计数器已增加",
'count' => $worker_status['request_count']
]));
break;
default:
$connection->send(json_encode([
'status' => 'error',
'message' => "未知的信号: {$signal['action']}"
]));
}
};
// 连接时
$signal_worker->onConnect = function($connection) {
echo "收到新的信号连接\n";
};
// 连接关闭时
$signal_worker->onClose = function($connection) {
echo "信号连接已关闭\n";
};
// 运行所有 Worker
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
php signal_worker.php increment (运行命令)
这段 PHP 代码是一个基于 Workerman 框架实现的工作进程(Worker Process),用于接收来自信号分发器(如之前讨论的 signal_dispatcher.php)发送的命令信号,并根据命令类型执行相应的操作。每个工作进程监听本地 5555 端口,等待分发器的指令。
以下是该脚本的详细解释:
🧠 功能概述
这是一个 TCP 服务器工作进程,运行多个实例(默认 4 个),监听在 127.0.0.1:5555,用于接收和处理来自信号分发器转发的命令,如 status、reload、pause 等。
📦 依赖
使用了 Workerman:高性能的 PHP socket 服务框架。
引入了 Composer 的自动加载文件:require_once __DIR__ . '/vendor/autoload.php';
🧩 核心结构解释
创建一个 TCP Worker,监听本地 5555 端口。
设置名称为 SignalWorker,启动 4 个进程实例。
每个工作进程维护自己的状态信息,包括启动时间、请求数量、当前状态等。
每个工作进程启动时输出提示信息,并记录进程 ID。
📥 接收信号并处理
php
Copy Code
$signal_worker->onMessage = function(TcpConnection $connection, $data) use (&$worker_status) { ... };
当信号分发器发送命令时,触发 onMessage 回调函数。
支持的命令类型
命令 功能说明
status 返回当前工作进程状态(ID、PID、运行时间、请求数等)
reload 模拟重载配置,将状态设为 reloading,1 秒后恢复 running
pause 暂停当前工作进程,状态设为 paused
resume 恢复当前工作进程,状态设为 running
increment 增加请求数量计数器
其他 返回错误信息
📤 响应信号
每个命令执行后,工作进程会通过 json_encode 构造响应信息,通过 $connection->send() 返回给分发器。
🔄 连接与关闭事件
php
Copy Code
$signal_worker->onConnect = function($connection) {
echo "收到新的信号连接\n";
};
$signal_worker->onClose = function($connection) {
echo "信号连接已关闭\n";
};
当有连接建立或关闭时,输出提示信息。
▶️ 启动服务
php
Copy Code
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
启动 Workerman 事件循环,开始监听连接。
📌 总结
功能点 描述
工作进程数量 默认启动 4 个
监听地址 127.0.0.1:5555
支持命令 status、reload、pause、resume、increment
响应机制 每个命令处理后返回 JSON 格式响应
状态管理 每个进程维护自己的运行状态
通信协议 使用 JSON 格式通过 TCP 传输
🧪 示例使用流程
启动该脚本:php signal_worker.php start
启动信号分发器(监听 6666 端口)
通过客户端向分发器发送命令,如:
bash
Copy Code
php send_signal.php status
分发器将命令转发给工作进程 → 工作进程处理并返回结果 → 分发器汇总返回客户端
✅ 应用场景
该脚本常用于以下场景:
后台任务管理(如队列消费进程)
多进程服务控制(如 Swoole、Workerman 服务)
进程状态监控与远程控制
<?php
/**
* Workerman 信号分发器
* 接收外部命令并转发给所有工作进程
*/
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncTcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 创建一个外部命令接收器
$dispatcher = new Worker('tcp://0.0.0.0:6666');
$dispatcher->name = 'SignalDispatcher';
$dispatcher->count = 1; // 分发器只需要1个进程
echo "========================================\n";
echo "信号分发器启动\n";
echo "监听端口: 6666 (外部命令)\n";
echo "工作进程端口: 5555\n";
echo "========================================\n";
// 接收外部命令
$dispatcher->onMessage = function(TcpConnection $connection, $data) {
$command = json_decode($data, true);
if (!$command || !isset($command['action'])) {
$connection->send(json_encode(['status' => 'error', 'message' => '无效的命令格式']));
return;
}
echo "\n收到外部命令: {$command['action']}\n";
echo "准备转发给所有工作进程...\n";
// 获取目标工作进程数量(默认4个)
$worker_count = isset($command['worker_count']) ? $command['worker_count'] : 4;
$target_worker = isset($command['worker_id']) ? $command['worker_id'] : null;
$results = [];
$completed = 0;
// 如果指定了特定的 worker_id,只发送给该 worker
if ($target_worker !== null) {
$worker_count = 1;
echo "仅转发给工作进程 #{$target_worker}\n";
} else {
echo "广播给所有 {$worker_count} 个工作进程\n";
}
// 连接到每个工作进程并发送信号
for ($i = 0; $i < $worker_count; $i++) {
$worker_connection = new AsyncTcpConnection('tcp://127.0.0.1:5555');
// 连接成功
$worker_connection->onConnect = function($worker_conn) use ($command, $i, &$results) {
echo "已连接到工作进程 #{$i}\n";
$worker_conn->send(json_encode($command));
};
// 接收工作进程响应
$worker_connection->onMessage = function($worker_conn, $response) use ($i, &$results, &$completed, $worker_count, $connection) {
$result = json_decode($response, true);
$results[$i] = $result;
$completed++;
echo "工作进程 #{$i} 响应: " . json_encode($result) . "\n";
$worker_conn->close();
// 所有工作进程都响应后,返回汇总结果
if ($completed >= $worker_count) {
$summary = [
'status' => 'success',
'message' => "命令已发送给 {$worker_count} 个工作进程",
'results' => $results
];
$connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
echo "\n所有工作进程已响应完成\n";
echo "========================================\n";
}
};
// 连接错误
$worker_connection->onError = function($worker_conn, $code, $msg) use ($i, &$completed, &$results, $worker_count, $connection) {
echo "连接工作进程 #{$i} 失败: $msg\n";
$results[$i] = ['status' => 'error', 'message' => $msg];
$completed++;
if ($completed >= $worker_count) {
$summary = [
'status' => 'partial',
'message' => "部分工作进程响应失败",
'results' => $results
];
$connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
}
};
$worker_connection->connect();
}
};
$dispatcher->onConnect = function($connection) {
echo "\n新的客户端连接: {$connection->getRemoteIp()}\n";
};
$dispatcher->onClose = function($connection) {
echo "客户端断开连接\n";
};
// 运行
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
php signal_dispatcher.php increment
这段 PHP 代码是一个基于 Workerman 框架实现的 信号分发器(Signal Dispatcher),它的作用是接收外部命令并通过 TCP 协议将命令转发给多个工作进程。下面是对代码的详细解释:
🧠 功能概述
这是一个 TCP 服务器,监听在 6666 端口,用于接收来自外部客户端(如命令行脚本)发送的命令,然后将这些命令转发给运行在 5555 端口的工作进程(Worker Processes)。
📦 依赖
使用了 Workerman:一个高性能的 PHP socket 服务框架。
引入了 Composer 的自动加载文件:require_once __DIR__ . '/vendor/autoload.php';
🧩 核心结构解释
启动一个 TCP 服务,监听所有网络接口上的 6666 端口。
设置服务名称为 SignalDispatcher。
设置进程数量为 1,因为信号分发只需要一个进程来协调。
启动时输出提示信息,告知用户当前监听的端口和工作进程的端口。
📥 接收外部命令
php
Copy Code
$dispatcher->onMessage = function(TcpConnection $connection, $data) { ... };
当有客户端发送命令时,触发 onMessage 回调函数。
命令格式
客户端发送的命令必须是 JSON 格式,至少包含 action 字段。
示例:
json
Copy Code
{
"action": "status",
"worker_id": 2
}
处理逻辑
解析命令内容。
判断是否指定 worker_id,决定是广播给所有工作进程,还是只发送给一个。
默认连接 4 个工作进程(端口 5555),可通过 worker_count 指定。
🔄 转发命令给工作进程
php
Copy Code
$worker_connection = new AsyncTcpConnection('tcp://127.0.0.1:5555');
使用异步连接方式连接到每个工作进程。
连接成功后将原始命令转发过去。
接收响应
每个工作进程处理完命令后返回响应。
所有工作进程都响应后,汇总结果并返回给客户端。
✅ 汇总响应并返回客户端
php
Copy Code
$summary = [
'status' => 'success',
'message' => "命令已发送给 {$worker_count} 个工作进程",
'results' => $results
];
$connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
所有工作进程响应后,将结果封装成 JSON 并返回给客户端。
⚠️ 错误处理
php
Copy Code
$worker_connection->onError = function(...) { ... };
如果连接某个工作进程失败,也记录错误并加入最终结果中。
📞 连接管理
php
Copy Code
$dispatcher->onConnect = function($connection) { ... };
$dispatcher->onClose = function($connection) { ... };
当有客户端连接或断开时输出提示信息。
▶️ 启动服务
php
Copy Code
if (!defined('GLOBAL_START')) {
Worker::runAll();
}
启动 Workerman 事件循环,开始监听连接。
📌 总结
功能点 描述
服务监听 在 6666 端口接收外部命令
命令转发 将命令转发给本机 5555 端口的多个工作进程
响应汇总 收集所有工作进程的响应并返回给客户端
支持指定 worker 可只发送给某个特定工作进程
错误处理 对连接失败的工作进程做记录和返回
使用框架 基于 Workerman 实现高性能异步通信
🧪 示例使用流程
启动该脚本:php signal_dispatcher.php start
启动工作进程(需监听 5555 端口)
运行客户端脚本发送命令,如:
bash
Copy Code
php send_signal.php status
分发器接收命令 → 转发给工作进程 → 汇总结果 → 返回给客户端
✅ 应用场景
该脚本常用于以下场景:
后台任务管理(如队列消费进程)
多进程服务控制(如 Swoole、Workerman 服务)
进程状态监控与远程控制
<?php
/**
* 信号发送客户端
* 用于向分发器发送命令
*/
if ($argc < 2) {
echo "用法: php send_signal.php <action> [worker_id]\n";
echo "\n可用的 action:\n";
echo " status - 查询所有工作进程状态\n";
echo " reload - 重载所有工作进程配置\n";
echo " pause - 暂停所有工作进程\n";
echo " resume - 恢复所有工作进程\n";
echo " increment - 增加计数器\n";
echo "\n示例:\n";
echo " php send_signal.php status # 查询所有进程状态\n";
echo " php send_signal.php reload # 重载所有进程\n";
echo " php send_signal.php pause 2 # 暂停工作进程 #2\n";
exit(1);
}
$action = $argv[1];
$worker_id = isset($argv[2]) ? (int)$argv[2] : null;
// 创建到分发器的连接
$socket = @stream_socket_client('tcp://127.0.0.1:6666', $errno, $errstr, 3);
if (!$socket) {
die("无法连接到信号分发器: $errstr ($errno)\n请确保 signal_dispatcher.php 正在运行\n");
}
// 发送命令
$command = [
'action' => $action,
'timestamp' => time()
];
if ($worker_id !== null) {
$command['worker_id'] = $worker_id;
}
fwrite($socket, json_encode($command));
echo "已发送信号: $action\n";
echo "等待响应...\n\n";
// 接收响应
stream_set_timeout($socket, 5);
$response = '';
while (!feof($socket)) {
$chunk = fread($socket, 8192);
if ($chunk === false || $chunk === '') {
break;
}
$response .= $chunk;
}
fclose($socket);
if ($response) {
echo "服务器响应:\n";
echo "----------------------------------------\n";
$result = json_decode($response, true);
if ($result) {
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
} else {
echo $response . "\n";
}
echo "----------------------------------------\n";
} else {
echo "未收到响应\n";
}
最后运行这个向信号分发器发送一个计数器增加指令
php send_signal.php increment
根据代码逻辑,php send_signal.php increment 命令的作用是向信号分发器发送一个计数器增加指令,具体解释如下:
📌 命令功能
命令类型:increment
作用:触发所有工作进程(默认4个)的计数器增加操作
触发机制:
客户端发送命令:php send_signal.php increment
分发器(signal_dispatcher.php)接收命令并转发给所有工作进程
每个工作进程(signal_worker.php)接收到命令后,将内部 request_count 计数器加1
分发器汇总所有工作进程的响应结果并返回给客户端
📊 计数器作用
用于统计工作进程的请求数量
每次执行 increment 命令时,所有工作进程的计数器都会增加1
通过 status 命令可以查看当前计数器值:
bash
Copy Code
php send_signal.php status
⚙️ 实现原理
分发器处理:
php
Copy Code
// 接收命令后,设置 worker_count=4(默认4个工作进程)
$worker_count = isset($command['worker_count']) ? $command['worker_count'] : 4;
工作进程处理:
php
Copy Code
case 'increment':
$worker_status['request_count']++; // 计数器加1
$response = [
'status' => 'success',
'data' => [
'worker_id' => $worker_status['worker_id'],
'request_count' => $worker_status['request_count'] // 返回更新后的计数器值
]
];
$connection->send(json_encode($response));
break;
📌 使用场景
监控工作进程的请求数量
测试信号分发和响应机制
作为负载均衡的简单示例
注意:实际生产环境可能需要更复杂的计数逻辑(如分布式计数器),此示例仅用于演示基本功能。