workerman=信号处理和转发

贵哥的编程之路

Workerman 信号处理和转发系统

这是一个简单的 Workerman 信号处理和转发示例,适用于 Windows 环境。

系统架构

客户端 (send_signal.php)
    ↓ 发送命令
信号分发器 (signal_dispatcher.php) - 监听 6666 端口
    ↓ 转发信号
工作进程 (signal_worker.php) - 4个进程监听 5555 端口
    ↓ 处理信号并响应

文件说明

  1. signal_worker.php - 工作进程

    • 启动 4 个工作进程
    • 监听 5555 端口接收内部信号
    • 处理各种信号并返回结果
  2. signal_dispatcher.php - 信号分发器

    • 监听 6666 端口接收外部命令
    • 将命令转发给所有或指定的工作进程
    • 收集所有工作进程的响应并返回
  3. send_signal.php - 信号发送客户端

    • 命令行工具,用于发送信号

signal_worker.php(先运行这个)

<?php
/**
 * Workerman 信号处理和转发示例 - 工作进程
 * 这个进程接收信号并处理
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

// 创建一个 Worker 监听内部信号端口
$signal_worker = new Worker('tcp://127.0.0.1:5555');
$signal_worker->name = 'SignalWorker';
$signal_worker->count = 4; // 启动4个进程

// 工作进程状态
$worker_status = [
    'start_time' => time(),
    'request_count' => 0,
    'status' => 'running'
];

// 当进程启动时
$signal_worker->onWorkerStart = function($worker) use (&$worker_status) {
    echo "工作进程 #{$worker->id} 启动,PID: " . getmypid() . "\n";
    $worker_status['worker_id'] = $worker->id;
};

// 接收信号消息
$signal_worker->onMessage = function(TcpConnection $connection, $data) use (&$worker_status) {
    $signal = json_decode($data, true);

    if (!$signal || !isset($signal['action'])) {
        $connection->send(json_encode(['status' => 'error', 'message' => '无效的信号']));
        return;
    }

    echo "工作进程 #{$worker_status['worker_id']} 收到信号: {$signal['action']}\n";

    switch ($signal['action']) {
        case 'status':
            // 返回状态信息
            $response = [
                'status' => 'success',
                'data' => [
                    'worker_id' => $worker_status['worker_id'],
                    'pid' => getmypid(),
                    'status' => $worker_status['status'],
                    'uptime' => time() - $worker_status['start_time'],
                    'request_count' => $worker_status['request_count']
                ]
            ];
            $connection->send(json_encode($response));
            break;

        case 'reload':
            // 模拟重载配置
            echo "工作进程 #{$worker_status['worker_id']} 正在重载配置...\n";
            $worker_status['status'] = 'reloading';

            // 模拟重载操作
            sleep(1);

            $worker_status['status'] = 'running';
            echo "工作进程 #{$worker_status['worker_id']} 配置重载完成\n";

            $connection->send(json_encode([
                'status' => 'success',
                'message' => "工作进程 #{$worker_status['worker_id']} 配置已重载"
            ]));
            break;

        case 'pause':
            // 暂停处理
            $worker_status['status'] = 'paused';
            echo "工作进程 #{$worker_status['worker_id']} 已暂停\n";
            $connection->send(json_encode([
                'status' => 'success',
                'message' => "工作进程 #{$worker_status['worker_id']} 已暂停"
            ]));
            break;

        case 'resume':
            // 恢复处理
            $worker_status['status'] = 'running';
            echo "工作进程 #{$worker_status['worker_id']} 已恢复\n";
            $connection->send(json_encode([
                'status' => 'success',
                'message' => "工作进程 #{$worker_status['worker_id']} 已恢复"
            ]));
            break;

        case 'increment':
            // 增加计数器
            $worker_status['request_count']++;
            $connection->send(json_encode([
                'status' => 'success',
                'message' => "计数器已增加",
                'count' => $worker_status['request_count']
            ]));
            break;

        default:
            $connection->send(json_encode([
                'status' => 'error',
                'message' => "未知的信号: {$signal['action']}"
            ]));
    }
};

// 连接时
$signal_worker->onConnect = function($connection) {
    echo "收到新的信号连接\n";
};

// 连接关闭时
$signal_worker->onClose = function($connection) {
    echo "信号连接已关闭\n";
};

// 运行所有 Worker
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

php signal_worker.php increment (运行命令)

这段 PHP 代码是一个基于 ‌Workerman‌ 框架实现的工作进程(Worker Process),用于接收来自信号分发器(如之前讨论的 signal_dispatcher.php)发送的命令信号,并根据命令类型执行相应的操作。每个工作进程监听本地 5555 端口,等待分发器的指令。

以下是该脚本的详细解释:

🧠 ‌功能概述‌

这是一个 TCP 服务器工作进程,运行多个实例(默认 4 个),监听在 127.0.0.1:5555,用于接收和处理来自信号分发器转发的命令,如 status、reload、pause 等。

📦 ‌依赖‌
使用了 Workerman:高性能的 PHP socket 服务框架。
引入了 Composer 的自动加载文件:require_once __DIR__ . '/vendor/autoload.php';
🧩 ‌核心结构解释‌

  1. ‌创建 TCP 服务监听‌
    php
    Copy Code
    $signal_worker = new Worker('tcp://127.0.0.1:5555');
    $signal_worker->name = 'SignalWorker';
    $signal_worker->count = 4;

创建一个 TCP Worker,监听本地 5555 端口。
设置名称为 SignalWorker,启动 4 个进程实例。

  1. ‌工作进程状态维护‌
    php
    Copy Code
    $worker_status = [
    'start_time' => time(),
    'request_count' => 0,
    'status' => 'running'
    ];

每个工作进程维护自己的状态信息,包括启动时间、请求数量、当前状态等。

  1. ‌进程启动回调‌
    php
    Copy Code
    $signal_worker->onWorkerStart = function($worker) use (&$worker_status) {
    echo "工作进程 #{$worker->id} 启动,PID: " . getmypid() . "\n";
    $worker_status['worker_id'] = $worker->id;
    };

每个工作进程启动时输出提示信息,并记录进程 ID。
📥 ‌接收信号并处理‌
php
Copy Code
$signal_worker->onMessage = function(TcpConnection $connection, $data) use (&$worker_status) { ... };

当信号分发器发送命令时,触发 onMessage 回调函数。
支持的命令类型
命令 功能说明
status 返回当前工作进程状态(ID、PID、运行时间、请求数等)
reload 模拟重载配置,将状态设为 reloading,1 秒后恢复 running
pause 暂停当前工作进程,状态设为 paused
resume 恢复当前工作进程,状态设为 running
increment 增加请求数量计数器
其他 返回错误信息
📤 ‌响应信号‌
每个命令执行后,工作进程会通过 json_encode 构造响应信息,通过 $connection->send() 返回给分发器。
🔄 ‌连接与关闭事件‌
php
Copy Code
$signal_worker->onConnect = function($connection) {
echo "收到新的信号连接\n";
};

$signal_worker->onClose = function($connection) {
echo "信号连接已关闭\n";
};

当有连接建立或关闭时,输出提示信息。
▶️ ‌启动服务‌
php
Copy Code
if (!defined('GLOBAL_START')) {
Worker::runAll();
}

启动 Workerman 事件循环,开始监听连接。
📌 ‌总结‌
功能点 描述
工作进程数量 默认启动 4 个
监听地址 127.0.0.1:5555
支持命令 status、reload、pause、resume、increment
响应机制 每个命令处理后返回 JSON 格式响应
状态管理 每个进程维护自己的运行状态
通信协议 使用 JSON 格式通过 TCP 传输
🧪 示例使用流程
启动该脚本:php signal_worker.php start
启动信号分发器(监听 6666 端口)
通过客户端向分发器发送命令,如:
bash
Copy Code
php send_signal.php status

分发器将命令转发给工作进程 → 工作进程处理并返回结果 → 分发器汇总返回客户端
✅ ‌应用场景‌

该脚本常用于以下场景:

后台任务管理(如队列消费进程)
多进程服务控制(如 Swoole、Workerman 服务)
进程状态监控与远程控制

signal_dispatcher.php()在运行这个

<?php
/**
 * Workerman 信号分发器
 * 接收外部命令并转发给所有工作进程
 */

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncTcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

// 创建一个外部命令接收器
$dispatcher = new Worker('tcp://0.0.0.0:6666');
$dispatcher->name = 'SignalDispatcher';
$dispatcher->count = 1; // 分发器只需要1个进程

echo "========================================\n";
echo "信号分发器启动\n";
echo "监听端口: 6666 (外部命令)\n";
echo "工作进程端口: 5555\n";
echo "========================================\n";

// 接收外部命令
$dispatcher->onMessage = function(TcpConnection $connection, $data) {
    $command = json_decode($data, true);

    if (!$command || !isset($command['action'])) {
        $connection->send(json_encode(['status' => 'error', 'message' => '无效的命令格式']));
        return;
    }

    echo "\n收到外部命令: {$command['action']}\n";
    echo "准备转发给所有工作进程...\n";

    // 获取目标工作进程数量(默认4个)
    $worker_count = isset($command['worker_count']) ? $command['worker_count'] : 4;
    $target_worker = isset($command['worker_id']) ? $command['worker_id'] : null;

    $results = [];
    $completed = 0;

    // 如果指定了特定的 worker_id,只发送给该 worker
    if ($target_worker !== null) {
        $worker_count = 1;
        echo "仅转发给工作进程 #{$target_worker}\n";
    } else {
        echo "广播给所有 {$worker_count} 个工作进程\n";
    }

    // 连接到每个工作进程并发送信号
    for ($i = 0; $i < $worker_count; $i++) {
        $worker_connection = new AsyncTcpConnection('tcp://127.0.0.1:5555');

        // 连接成功
        $worker_connection->onConnect = function($worker_conn) use ($command, $i, &$results) {
            echo "已连接到工作进程 #{$i}\n";
            $worker_conn->send(json_encode($command));
        };

        // 接收工作进程响应
        $worker_connection->onMessage = function($worker_conn, $response) use ($i, &$results, &$completed, $worker_count, $connection) {
            $result = json_decode($response, true);
            $results[$i] = $result;
            $completed++;

            echo "工作进程 #{$i} 响应: " . json_encode($result) . "\n";

            $worker_conn->close();

            // 所有工作进程都响应后,返回汇总结果
            if ($completed >= $worker_count) {
                $summary = [
                    'status' => 'success',
                    'message' => "命令已发送给 {$worker_count} 个工作进程",
                    'results' => $results
                ];
                $connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
                echo "\n所有工作进程已响应完成\n";
                echo "========================================\n";
            }
        };

        // 连接错误
        $worker_connection->onError = function($worker_conn, $code, $msg) use ($i, &$completed, &$results, $worker_count, $connection) {
            echo "连接工作进程 #{$i} 失败: $msg\n";
            $results[$i] = ['status' => 'error', 'message' => $msg];
            $completed++;

            if ($completed >= $worker_count) {
                $summary = [
                    'status' => 'partial',
                    'message' => "部分工作进程响应失败",
                    'results' => $results
                ];
                $connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
            }
        };

        $worker_connection->connect();
    }
};

$dispatcher->onConnect = function($connection) {
    echo "\n新的客户端连接: {$connection->getRemoteIp()}\n";
};

$dispatcher->onClose = function($connection) {
    echo "客户端断开连接\n";
};

// 运行
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}

php signal_dispatcher.php increment
这段 PHP 代码是一个基于 ‌Workerman‌ 框架实现的 ‌信号分发器(Signal Dispatcher)‌,它的作用是接收外部命令并通过 TCP 协议将命令转发给多个工作进程。下面是对代码的详细解释:

🧠 ‌功能概述‌

这是一个 TCP 服务器,监听在 6666 端口,用于接收来自外部客户端(如命令行脚本)发送的命令,然后将这些命令转发给运行在 5555 端口的工作进程(Worker Processes)。

📦 ‌依赖‌
使用了 Workerman:一个高性能的 PHP socket 服务框架。
引入了 Composer 的自动加载文件:require_once __DIR__ . '/vendor/autoload.php';
🧩 ‌核心结构解释‌

  1. ‌创建 TCP 服务监听‌
    php
    Copy Code
    $dispatcher = new Worker('tcp://0.0.0.0:6666');
    $dispatcher->name = 'SignalDispatcher';
    $dispatcher->count = 1;

启动一个 TCP 服务,监听所有网络接口上的 6666 端口。
设置服务名称为 SignalDispatcher。
设置进程数量为 1,因为信号分发只需要一个进程来协调。

  1. ‌启动信息输出‌
    php
    Copy Code
    echo "========================================\n";
    echo "信号分发器启动\n";
    echo "监听端口: 6666 (外部命令)\n";
    echo "工作进程端口: 5555\n";
    echo "========================================\n";

启动时输出提示信息,告知用户当前监听的端口和工作进程的端口。
📥 ‌接收外部命令‌
php
Copy Code
$dispatcher->onMessage = function(TcpConnection $connection, $data) { ... };

当有客户端发送命令时,触发 onMessage 回调函数。
命令格式
客户端发送的命令必须是 JSON 格式,至少包含 action 字段。
示例:
json
Copy Code
{
"action": "status",
"worker_id": 2
}

处理逻辑
解析命令内容。
判断是否指定 worker_id,决定是广播给所有工作进程,还是只发送给一个。
默认连接 4 个工作进程(端口 5555),可通过 worker_count 指定。
🔄 ‌转发命令给工作进程‌
php
Copy Code
$worker_connection = new AsyncTcpConnection('tcp://127.0.0.1:5555');

使用异步连接方式连接到每个工作进程。
连接成功后将原始命令转发过去。
接收响应
每个工作进程处理完命令后返回响应。
所有工作进程都响应后,汇总结果并返回给客户端。
✅ ‌汇总响应并返回客户端‌
php
Copy Code
$summary = [
'status' => 'success',
'message' => "命令已发送给 {$worker_count} 个工作进程",
'results' => $results
];
$connection->send(json_encode($summary, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));

所有工作进程响应后,将结果封装成 JSON 并返回给客户端。
⚠️ ‌错误处理‌
php
Copy Code
$worker_connection->onError = function(...) { ... };

如果连接某个工作进程失败,也记录错误并加入最终结果中。
📞 ‌连接管理‌
php
Copy Code
$dispatcher->onConnect = function($connection) { ... };
$dispatcher->onClose = function($connection) { ... };

当有客户端连接或断开时输出提示信息。
▶️ ‌启动服务‌
php
Copy Code
if (!defined('GLOBAL_START')) {
Worker::runAll();
}

启动 Workerman 事件循环,开始监听连接。
📌 ‌总结‌
功能点 描述
服务监听 在 6666 端口接收外部命令
命令转发 将命令转发给本机 5555 端口的多个工作进程
响应汇总 收集所有工作进程的响应并返回给客户端
支持指定 worker 可只发送给某个特定工作进程
错误处理 对连接失败的工作进程做记录和返回
使用框架 基于 Workerman 实现高性能异步通信
🧪 示例使用流程
启动该脚本:php signal_dispatcher.php start
启动工作进程(需监听 5555 端口)
运行客户端脚本发送命令,如:
bash
Copy Code
php send_signal.php status

分发器接收命令 → 转发给工作进程 → 汇总结果 → 返回给客户端
✅ ‌应用场景‌

该脚本常用于以下场景:

后台任务管理(如队列消费进程)
多进程服务控制(如 Swoole、Workerman 服务)
进程状态监控与远程控制

send_signal.php

<?php
/**
 * 信号发送客户端
 * 用于向分发器发送命令
 */

if ($argc < 2) {
    echo "用法: php send_signal.php <action> [worker_id]\n";
    echo "\n可用的 action:\n";
    echo "  status     - 查询所有工作进程状态\n";
    echo "  reload     - 重载所有工作进程配置\n";
    echo "  pause      - 暂停所有工作进程\n";
    echo "  resume     - 恢复所有工作进程\n";
    echo "  increment  - 增加计数器\n";
    echo "\n示例:\n";
    echo "  php send_signal.php status          # 查询所有进程状态\n";
    echo "  php send_signal.php reload          # 重载所有进程\n";
    echo "  php send_signal.php pause 2         # 暂停工作进程 #2\n";
    exit(1);
}

$action = $argv[1];
$worker_id = isset($argv[2]) ? (int)$argv[2] : null;

// 创建到分发器的连接
$socket = @stream_socket_client('tcp://127.0.0.1:6666', $errno, $errstr, 3);

if (!$socket) {
    die("无法连接到信号分发器: $errstr ($errno)\n请确保 signal_dispatcher.php 正在运行\n");
}

// 发送命令
$command = [
    'action' => $action,
    'timestamp' => time()
];

if ($worker_id !== null) {
    $command['worker_id'] = $worker_id;
}

fwrite($socket, json_encode($command));

echo "已发送信号: $action\n";
echo "等待响应...\n\n";

// 接收响应
stream_set_timeout($socket, 5);
$response = '';
while (!feof($socket)) {
    $chunk = fread($socket, 8192);
    if ($chunk === false || $chunk === '') {
        break;
    }
    $response .= $chunk;
}

fclose($socket);

if ($response) {
    echo "服务器响应:\n";
    echo "----------------------------------------\n";

    $result = json_decode($response, true);
    if ($result) {
        echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
    } else {
        echo $response . "\n";
    }
    echo "----------------------------------------\n";
} else {
    echo "未收到响应\n";
}

最后运行这个向信号分发器发送一个计数器增加指令
php send_signal.php increment

根据代码逻辑,php send_signal.php increment 命令的作用是向信号分发器发送一个计数器增加指令,具体解释如下:

📌 ‌命令功能‌
命令类型‌:increment
作用‌:触发所有工作进程(默认4个)的计数器增加操作
触发机制‌:
客户端发送命令:php send_signal.php increment
分发器(signal_dispatcher.php)接收命令并转发给所有工作进程
每个工作进程(signal_worker.php)接收到命令后,将内部 request_count 计数器加1
分发器汇总所有工作进程的响应结果并返回给客户端
📊 ‌计数器作用‌
用于统计工作进程的请求数量
每次执行 increment 命令时,所有工作进程的计数器都会增加1
通过 status 命令可以查看当前计数器值:
bash
Copy Code
php send_signal.php status

⚙️ ‌实现原理‌
分发器处理‌:
php
Copy Code
// 接收命令后,设置 worker_count=4(默认4个工作进程)
$worker_count = isset($command['worker_count']) ? $command['worker_count'] : 4;

工作进程处理‌:
php
Copy Code
case 'increment':
$worker_status['request_count']++; // 计数器加1
$response = [
'status' => 'success',
'data' => [
'worker_id' => $worker_status['worker_id'],
'request_count' => $worker_status['request_count'] // 返回更新后的计数器值
]
];
$connection->send(json_encode($response));
break;

📌 ‌使用场景‌
监控工作进程的请求数量
测试信号分发和响应机制
作为负载均衡的简单示例

注意‌:实际生产环境可能需要更复杂的计数逻辑(如分布式计数器),此示例仅用于演示基本功能。

174 0 0
0个评论

贵哥的编程之路

440
积分
0
获赞数
0
粉丝数
2025-07-11 加入
🔝