workerman=管理子进程

贵哥的编程之路

worker_demo.php

<?php
/**
 * Workerman 子进程管理演示
 * 展示如何创建、管理和销毁子进程
 */

require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Timer;

// ============================================
// 示例1:创建一个简单的 Worker(4个子进程)
// ============================================
$worker = new Worker('tcp://0.0.0.0:8888');
$worker->name = 'SimpleWorker';
$worker->count = 4;  // 创建 4 个子进程

// 当 Worker 进程启动时
$worker->onWorkerStart = function($worker) {
    $pid = posix_getpid();
    $workerId = $worker->id;
    echo "[Worker 启动] 进程 ID: {$pid}, Worker ID: {$workerId}, 名称: {$worker->name}\n";

    // 每个子进程每 5 秒执行一次任务
    Timer::add(5, function() use ($pid, $workerId) {
        echo "[{$pid}] Worker {$workerId} 正在工作... " . date('H:i:s') . "\n";
    });
};

// 当有客户端连接时
$worker->onConnect = function($connection) {
    $pid = posix_getpid();
    echo "[{$pid}] 新客户端连接: {$connection->id}\n";
    $connection->send("欢迎连接!由进程 {$pid} 为您服务\n");
};

// 当收到消息时
$worker->onMessage = function($connection, $data) {
    $pid = posix_getpid();
    echo "[{$pid}] 收到消息: {$data}";
    $connection->send("进程 {$pid} 已收到: {$data}");
};

// 当连接断开时
$worker->onClose = function($connection) {
    $pid = posix_getpid();
    echo "[{$pid}] 客户端断开: {$connection->id}\n";
};

// 当子进程停止时
$worker->onWorkerStop = function($worker) {
    $pid = posix_getpid();
    echo "[Worker 停止] 进程 {$pid} 正在退出\n";
};

// 当子进程重新加载时(reload 命令)
$worker->onWorkerReload = function($worker) {
    $pid = posix_getpid();
    echo "[Worker 重载] 进程 {$pid} 正在重新加载\n";
};

// ============================================
// 示例2:创建一个任务处理 Worker(2个子进程)
// ============================================
$task_worker = new Worker();
$task_worker->name = 'TaskWorker';
$task_worker->count = 2;  // 创建 2 个子进程

$task_worker->onWorkerStart = function($worker) {
    $pid = posix_getpid();
    $workerId = $worker->id;
    echo "[TaskWorker 启动] 进程 ID: {$pid}, Worker ID: {$workerId}\n";

    // 模拟处理任务
    Timer::add(3, function() use ($pid, $workerId) {
        $taskId = rand(1000, 9999);
        echo "[{$pid}] TaskWorker {$workerId} 处理任务 #{$taskId}\n";
    });
};

// ============================================
// 示例3:创建一个可控制的 Worker(动态管理)
// ============================================
$control_worker = new Worker('text://0.0.0.0:9999');
$control_worker->name = 'ControlWorker';
$control_worker->count = 1;  // 只有 1 个管理进程

$control_worker->onWorkerStart = function($worker) {
    echo "[ControlWorker 启动] 进程管理服务已启动\n";
    echo "连接到 9999 端口可以管理进程\n";
    echo "可用命令: status, stop, reload\n\n";
};

$control_worker->onMessage = function($connection, $data) {
    $command = trim($data);
    $pid = posix_getpid();

    switch ($command) {
        case 'status':
            // 显示所有进程状态
            $info = getProcessInfo();
            $connection->send($info . "\n");
            break;

        case 'stop':
            $connection->send("正在停止所有 Worker...\n");
            Worker::stopAll();
            break;

        case 'reload':
            $connection->send("正在重新加载所有 Worker...\n");
            posix_kill(posix_getppid(), SIGUSR1);
            break;

        default:
            $connection->send("未知命令: {$command}\n可用命令: status, stop, reload\n");
    }
};

// ============================================
// 辅助函数:获取进程信息
// ============================================
function getProcessInfo() {
    $info = "=== Workerman 进程状态 ===\n\n";

    // 获取主进程 PID
    $masterPid = posix_getppid();
    $info .= "主进程 PID: {$masterPid}\n\n";

    // 获取所有 Worker
    $workers = Worker::getAllWorkers();
    foreach ($workers as $worker) {
        $info .= "Worker: {$worker->name}\n";
        $info .= "  - 进程数: {$worker->count}\n";
        $info .= "  - 监听地址: " . ($worker->getSocketName() ?: '无') . "\n";
        $info .= "  - 当前进程 ID: " . posix_getpid() . "\n";
        $info .= "\n";
    }

    // 获取子进程列表
    $cmd = "ps --ppid {$masterPid} -o pid,cmd --no-headers 2>/dev/null";
    $output = shell_exec($cmd);

    if ($output) {
        $info .= "子进程列表:\n";
        $info .= $output . "\n";
    }

    return $info;
}

// ============================================
// 主进程启动回调
// ============================================
Worker::$onMasterReload = function() {
    echo "\n[主进程] 收到 reload 信号,正在重新加载所有 Worker...\n\n";
};

Worker::$onMasterStop = function() {
    echo "\n[主进程] 正在停止所有 Worker...\n\n";
};

// 设置进程标题(可选)
Worker::$processTitle = 'WorkerDemo';

// 打印启动信息
echo "======================================\n";
echo "Workerman 子进程管理演示\n";
echo "======================================\n";
echo "SimpleWorker  : tcp://0.0.0.0:8888 (4个进程)\n";
echo "TaskWorker    : 后台任务 (2个进程)\n";
echo "ControlWorker : text://0.0.0.0:9999 (1个进程)\n";
echo "======================================\n\n";

// 运行所有 Worker
Worker::runAll();

解释:
这是一个 Workerman 多进程管理的完整演示,展示了如何创建、管理和控制子进程。让我逐行详细解释:


📦 第 1-10 行:文件头和依赖导入

<?php
/**

  • Workerman 子进程管理演示
  • 展示如何创建、管理和销毁子进程
    */

    require_once __DIR__ . '/vendor/autoload.php';

    use Workerman\Worker;
    use Workerman\Timer;

    解释:

    • require_once __DIR__ . '/vendor/autoload.php' - 加载 Composer 自动加载器
    • use Workerman\Worker - 导入 Worker 类(Workerman 核心类)
    • use Workerman\Timer - 导入 Timer 类(定时器功能)

    🟦 第 12-61 行:示例1 - SimpleWorker(TCP 服务器)

    创建 Worker 实例(第 15-17 行)

    $worker = new Worker('tcp://0.0.0.0:8888');
    $worker->name = 'SimpleWorker';
    $worker->count = 4; // 创建 4 个子进程

    详细解释:

    参数 含义
    tcp://0.0.0.0:8888 监听协议和地址- tcp:// = TCP 协议- 0.0.0.0 = 监听所有网卡- 8888 = 端口号
    $worker->name Worker 名称,用于识别和管理
    $worker->count = 4 核心参数!创建 4 个子进程处理请求

    进程结构:
    主进程
    ├── SimpleWorker 子进程 0
    ├── SimpleWorker 子进程 1
    ├── SimpleWorker 子进程 2
    └── SimpleWorker 子进程 3


    onWorkerStart(第 20-29 行)

    $worker->onWorkerStart = function($worker) {
    $pid = posix_getpid();
    $workerId = $worker->id;
    echo "[Worker 启动] 进程 ID: {$pid}, Worker ID: {$workerId}, 名称: {$worker->name}\n";

    // 每个子进程每 5 秒执行一次任务
    Timer::add(5, function() use ($pid, $workerId) {
    echo "[{$pid}] Worker {$workerId} 正在工作... " . date('H:i:s') . "\n";
    });
    };

    触发时机: 每个子进程启动完成后执行一次

    关键函数说明:

    函数/变量 作用
    posix_getpid() 获取当前进程的 PID(进程标识符)例如:454, 455, 456, 457
    $worker->id Workerman 内部 ID(从 0 开始)4 个进程分别是:0, 1, 2, 3
    Timer::add(5, ...) 添加定时器,每 5 秒执行一次每个子进程独立运行自己的定时器
    use ($pid, $workerId) 闭包捕获变量,让定时器内部可以使用

    输出示例:
    [Worker 启动] 进程 ID: 454, Worker ID: 0, 名称: SimpleWorker
    [Worker 启动] 进程 ID: 455, Worker ID: 1, 名称: SimpleWorker
    [Worker 启动] 进程 ID: 456, Worker ID: 2, 名称: SimpleWorker
    [Worker 启动] 进程 ID: 457, Worker ID: 3, 名称: SimpleWorker

    // 5 秒后...
    [454] Worker 0 正在工作... 19:05:23
    [455] Worker 1 正在工作... 19:05:23
    [456] Worker 2 正在工作... 19:05:23
    [457] Worker 3 正在工作... 19:05:23


    onConnect(第 32-36 行)

    $worker->onConnect = function($connection) {
    $pid = posix_getpid();
    echo "[{$pid}] 新客户端连接: {$connection->id}\n";
    $connection->send("欢迎连接!由进程 {$pid} 为您服务\n");
    };

    触发时机: 每次有新的客户端连接到 8888 端口时

    参数说明:

    • $connection - 连接对象,代表一个客户端连接
    • $connection->id - Workerman 为每个连接分配的唯一 ID

    负载均衡机制:

    • 操作系统自动将新连接分配给空闲的子进程
    • 一旦连接建立,该连接会绑定到该子进程
    • 这样可以保证连接的状态一致性

    输出示例:
    [454] 新客户端连接: 1
    [455] 新客户端连接: 2
    [454] 新客户端连接: 3 // 454 又处理了一个连接


    onMessage(第 39-43 行)

    $worker->onMessage = function($connection, $data) {
    $pid = posix_getpid();
    echo "[{$pid}] 收到消息: {$data}";
    $connection->send("进程 {$pid} 已收到: {$data}");
    };

    触发时机: 客户端发送数据时

    参数说明:

    • $connection - 发送消息的连接对象
    • $data - 客户端发送的原始数据(字符串)

    工作流程:

    1. 客户端发送:"Hello"
    2. 服务器接收到数据,触发 onMessage
    3. 打印日志:[454] 收到消息: Hello
    4. 回复客户端:"进程 454 已收到: Hello"

    onClose(第 46-49 行)

    $worker->onClose = function($connection) {
    $pid = posix_getpid();
    echo "[{$pid}] 客户端断开: {$connection->id}\n";
    };

    触发时机: 客户端主动断开或连接异常断开时

    作用:

    • 记录连接断开日志
    • 可以在这里清理与该连接相关的资源

    onWorkerStop(第 52-55 行)

    $worker->onWorkerStop = function($worker) {
    $pid = posix_getpid();
    echo "[Worker 停止] 进程 {$pid} 正在退出\n";
    };

    触发时机: 子进程即将退出时(执行 php worker_demo.php stop)

    作用:

    • 清理资源(关闭文件、数据库连接等)
    • 保存数据
    • 记录退出日志

    onWorkerReload(第 58-61 行)

    $worker->onWorkerReload = function($worker) {
    $pid = posix_getpid();
    echo "[Worker 重载] 进程 {$pid} 正在重新加载\n";
    };

    触发时机: 执行 php worker_demo.php reload 时

    reload 机制(平滑重启):

    1. 主进程收到 reload 信号
    2. 逐个重启子进程(不是一次性全部重启)
    3. 重启步骤:
      • 启动新的子进程
      • 等待新进程就绪
      • 向旧进程发送停止信号
      • 旧进程处理完当前请求后退出
    4. 现有连接不会断开,新连接使用新代码

    应用场景: 生产环境更新代码时使用 reload,不影响服务


    🟩 第 63-80 行:示例2 - TaskWorker(后台任务处理器)

    $task_worker = new Worker(); // 注意:没有监听地址!
    $task_worker->name = 'TaskWorker';
    $task_worker->count = 2; // 创建 2 个子进程

    $task_worker->onWorkerStart = function($worker) {
    $pid = posix_getpid();
    $workerId = $worker->id;
    echo "[TaskWorker 启动] 进程 ID: {$pid}, Worker ID: {$workerId}\n";

    // 模拟处理任务
    Timer::add(3, function() use ($pid, $workerId) {
    $taskId = rand(1000, 9999);
    echo "[{$pid}] TaskWorker {$workerId} 处理任务 #{$taskId}\n";
    });
    };

    关键区别:

    特性 SimpleWorker TaskWorker
    监听地址 tcp://0.0.0.0:8888 无(new Worker())
    功能 处理客户端连接 纯后台任务
    触发方式 客户端发送数据 定时器自动触发

    工作方式:

    • 没有监听端口,不接受外部连接
    • 启动后自动执行定时任务
    • 每 3 秒执行一次任务

    应用场景:

    • ✅ 定时数据清理
    • ✅ 消息队列消费
    • ✅ 数据统计分析
    • ✅ 发送邮件/通知
    • ✅ 文件处理

    输出示例:
    [TaskWorker 启动] 进程 ID: 458, Worker ID: 0
    [TaskWorker 启动] 进程 ID: 459, Worker ID: 1

    // 3 秒后...
    [458] TaskWorker 0 处理任务 #3562
    [459] TaskWorker 1 处理任务 #7891


    🟨 第 82-119 行:示例3 - ControlWorker(进程管理控制台)

    $control_worker = new Worker('text://0.0.0.0:9999');
    $control_worker->name = 'ControlWorker';
    $control_worker->count = 1; // 只有 1 个管理进程

    协议说明:

    • text:// - 文本协议,以 \n (换行符) 作为消息分隔符
    • 适合命令行交互

    启动回调(第 89-93 行)

    $control_worker->onWorkerStart = function($worker) {
    echo "[ControlWorker 启动] 进程管理服务已启动\n";
    echo "连接到 9999 端口可以管理进程\n";
    echo "可用命令: status, stop, reload\n\n";
    };

    作用: 打印使用说明


    命令处理(第 95-119 行)

    $control_worker->onMessage = function($connection, $data) {
    $command = trim($data);
    $pid = posix_getpid();

    switch ($command) {
    case 'status':
    // ...
    case 'stop':
    // ...
    case 'reload':
    // ...
    }
    };

    1. status 命令(第 100-104 行)

    case 'status':
    $info = getProcessInfo();
    $connection->send($info . "\n");
    break;

    作用: 返回所有进程的详细信息

    使用方法:
    echo "status" | nc 127.0.0.1 9999


    1. stop 命令(第 106-109 行)

    case 'stop':
    $connection->send("正在停止所有 Worker...\n");
    Worker::stopAll();
    break;

    作用: 停止所有 Worker 进程

    Worker::stopAll() 流程:

    1. 向所有子进程发送停止信号
    2. 子进程触发 onWorkerStop 回调
    3. 子进程处理完当前请求后退出
    4. 主进程等待所有子进程退出后关闭

    1. reload 命令(第 111-114 行)

    case 'reload':
    $connection->send("正在重新加载所有 Worker...\n");
    posix_kill(posix_getppid(), SIGUSR1);
    break;

    关键代码解析:

    代码 作用
    posix_getppid() 获取父进程(主进程)的 PID
    posix_kill() 发送信号给指定进程
    SIGUSR1 UNIX 用户自定义信号1Workerman 用它触发 reload

    信号流程:
    ControlWorker 子进程
    └─> 发送 SIGUSR1 信号
    └─> 主进程
    └─> 逐个重启所有子进程


    🔧 第 124-151 行:辅助函数 getProcessInfo()

    function getProcessInfo() {
    $info = "=== Workerman 进程状态 ===\n\n";

    // 获取主进程 PID
    $masterPid = posix_getppid();
    $info .= "主进程 PID: {$masterPid}\n\n";

    // 获取所有 Worker
    $workers = Worker::getAllWorkers();
    foreach ($workers as $worker) {
    $info .= "Worker: {$worker->name}\n";
    $info .= " - 进程数: {$worker->count}\n";
    $info .= " - 监听地址: " . ($worker->getSocketName() ?: '无') . "\n";
    $info .= " - 当前进程 ID: " . posix_getpid() . "\n";
    $info .= "\n";
    }

    // 获取子进程列表
    $cmd = "ps --ppid {$masterPid} -o pid,cmd --no-headers 2>/dev/null";
    $output = shell_exec($cmd);

    if ($output) {
    $info .= "子进程列表:\n";
    $info .= $output . "\n";
    }

    return $info;
    }

    功能分解:

    1. 获取主进程 PID(第 128 行)

    $masterPid = posix_getppid();

    • posix_getppid() = Parent Process ID
    • 获取当前进程的父进程 ID(即主进程)
    1. 遍历所有 Worker(第 132-139 行)

    $workers = Worker::getAllWorkers();

    • Worker::getAllWorkers() - 返回所有 Worker 实例的数组
    • 包含 SimpleWorker、TaskWorker、ControlWorker

    输出信息:

    • Worker 名称
    • 进程数 ($worker->count)
    • 监听地址 ($worker->getSocketName())
    1. 使用 ps 命令列出子进程(第 142-148 行)

    $cmd = "ps --ppid {$masterPid} -o pid,cmd --no-headers 2>/dev/null";
    $output = shell_exec($cmd);

    命令解析:

    • ps - Linux 进程查看命令
    • --ppid {$masterPid} - 筛选父进程为主进程的所有子进程
    • -o pid,cmd - 只输出 PID 和命令
    • --no-headers - 不显示表头
    • 2>/dev/null - 错误输出重定向到空设备(忽略错误)

    输出示例:
    === Workerman 进程状态 ===

    主进程 PID: 453

    Worker: SimpleWorker

    • 进程数: 4
    • 监听地址: tcp://0.0.0.0:8888
    • 当前进程 ID: 460

    Worker: TaskWorker

    • 进程数: 2
    • 监听地址: 无
    • 当前进程 ID: 460

    Worker: ControlWorker

    • 进程数: 1
    • 监听地址: text://0.0.0.0:9999
    • 当前进程 ID: 460

    子进程列表:
    454 php worker_demo.php: worker process SimpleWorker
    455 php worker_demo.php: worker process SimpleWorker
    456 php worker_demo.php: worker process SimpleWorker
    457 php worker_demo.php: worker process SimpleWorker
    458 php worker_demo.php: worker process TaskWorker
    459 php worker_demo.php: worker process TaskWorker
    460 php worker_demo.php: worker process ControlWorker


    🎯 第 156-162 行:主进程回调

    Worker::$onMasterReload = function() {
    echo "\n[主进程] 收到 reload 信号,正在重新加载所有 Worker...\n\n";
    };

    Worker::$onMasterStop = function() {
    echo "\n[主进程] 正在停止所有 Worker...\n\n";
    };

    注意: 这是静态属性,不是实例属性

    区别:

    回调类型 作用对象 触发次数
    $worker->onWorkerStart 每个子进程 每个子进程一次
    Worker::$onMasterReload 主进程 主进程一次
    Worker::$onMasterStop 主进程 主进程一次

    ⚙️ 第 165 行:设置进程标题

    Worker::$processTitle = 'WorkerDemo';

    作用: 设置进程在系统中的显示名称

    效果:
    ps aux | grep WorkerDemo
    输出:
    WorkerDemo: master process
    WorkerDemo: worker process SimpleWorker
    ...


    📋 第 168-174 行:打印启动信息

    echo "======================================\n";
    echo "Workerman 子进程管理演示\n";
    echo "======================================\n";
    echo "SimpleWorker : tcp://0.0.0.0:8888 (4个进程)\n";
    echo "TaskWorker : 后台任务 (2个进程)\n";
    echo "ControlWorker : text://0.0.0.0:9999 (1个进程)\n";
    echo "======================================\n\n";

    作用: 启动时显示服务配置信息


    🚀 第 177 行:启动所有 Worker

    Worker::runAll();

    这是最重要的一行代码!

    Worker::runAll() 执行流程:

    1. 创建主进程
    2. Fork 子进程
      • 根据每个 Worker 的 count 创建子进程
      • SimpleWorker: 4 个
      • TaskWorker: 2 个
      • ControlWorker: 1 个
    3. 子进程初始化
      • 触发 onWorkerStart 回调
      • 开始监听端口(如果有)
    4. 进入事件循环
      • 等待客户端连接
      • 处理定时器
      • 处理消息

    阻塞特性:

    • Worker::runAll() 会阻塞执行
    • 后面的代码不会执行,直到所有 Worker 停止

    📊 完整进程架构图

    主进程 (PID: 453)
    ├── SimpleWorker #0 (PID: 454) - 监听 8888 端口
    ├── SimpleWorker #1 (PID: 455) - 监听 8888 端口
    ├── SimpleWorker #2 (PID: 456) - 监听 8888 端口
    ├── SimpleWorker #3 (PID: 457) - 监听 8888 端口
    ├── TaskWorker #0 (PID: 458) - 后台任务
    ├── TaskWorker #1 (PID: 459) - 后台任务
    └── ControlWorker (PID: 460) - 监听 9999 端口

    总共 8 个进程: 1 个主进程 + 7 个子进程


    🔑 核心概念总结表

    概念 解释 示例
    主进程 管理子进程,不处理业务 PID 453
    子进程 实际处理业务逻辑 PID 454-460
    count 子进程数量 $worker->count = 4
    Worker ID Workerman 内部编号 0, 1, 2, 3
    PID 操作系统进程 ID 454, 455...
    onWorkerStart 子进程启动回调 每个子进程执行一次
    onConnect 新连接回调 每次连接触发
    onMessage 收到消息回调 每次收到数据触发
    onClose 连接断开回调 连接关闭时触发
    reload 平滑重启 不断开现有连接
    stop 停止服务 优雅退出

    💡 实际应用场景

    Worker 类型 应用场景
    SimpleWorker WebSocket 服务、TCP 长连接、游戏服务器
    TaskWorker 定时任务、消息队列消费、数据处理
    ControlWorker 服务监控、动态管理、命令控制台

    这就是完整的代码解析!每一行都有其明确的作用,理解这些概念就掌握了 Workerman 的核心机制。

    test_client.php

    
    <?php
    /**
    * 测试客户端 - 连接到 Worker 进行测试
    */

// 测试 SimpleWorker (端口 8888)
echo "连接到 SimpleWorker (8888)...\n";
$socket = stream_socket_client('tcp://127.0.0.1:8888', $errno, $errmsg, 3);

if (!$socket) {
die("连接失败: {$errmsg}\n");
}

// 接收欢迎消息
$response = fread($socket, 8192);
echo "服务器响应: {$response}\n";

// 发送测试消息
for ($i = 1; $i <= 5; $i++) {
$message = "测试消息 #{$i}\n";
fwrite($socket, $message);
echo "发送: {$message}";

$response = fread($socket, 8192);
echo "收到: {$response}\n";

sleep(1);

}

fclose($socket);
echo "测试完成!\n";



先运行php  worker_demo.php  start
在运行第二个命令php  test_client.php
 ✅ **完全正确!**测试成功!

  结果分析

  您的输出说明:

  1. ✅ 连接成功 - 成功连接到 SimpleWorker (8888端口)
  2. ✅ 进程响应 - 由进程 454 处理您的连接
  3. ✅ 消息收发正常 - 5条消息全部正确收发
  4. ✅ 进程绑定 - 同一连接始终由同一进程处理(这是正确的)

  为什么始终是进程 454?

  这是 Workerman 的正常行为:
  - 当客户端连接时,操作系统会随机选择一个空闲的子进程来处理
  - 一旦连接建立,该连接会绑定到这个进程,直到断开
  - 这样可以保证连接的状态一致性

  验证多进程负载均衡

  如果想看到不同进程处理,可以多次运行测试:

  # 多次运行,会看到不同的进程 PID
  php test_client.php
  php test_client.php
  php test_client.php

  或者并发测试:

  # 开 4 个终端同时运行
  for i in {1..4}; do php test_client.php & done

  您会看到 4 个不同的进程 PID(因为您设置了 count = 4)。

  查看所有子进程

  ps aux | grep SimpleWorker | grep -v grep

  应该能看到 4 个子进程在运行!
![](/upload/img/20251217/17694294f042a5.png)
17 0 0
0个评论

贵哥的编程之路

440
积分
0
获赞数
0
粉丝数
2025-07-11 加入
🔝