求助:用thinkphp8使用workerman有点问题

问题描述

我主要是要做一个任务。
首先要通过ws连接,相当于设备已连接。
然后再通过http去请求接口,比如/task/send
调用/task/send这个接口的时候,就服务端会通过ws向客户端发送一些数据比如url,params等,这时候这个/task/send还在等待
然后客户端通过ws拿到请求的接口参数去请求拿到数据比如{k1:v1} 这样 通过ws在发给服务端,服务端收到消息后/task/send这里就不用等待。直接把{k1:v1} 这个数据输出出去

这里我做下简化。就请求接口/task/send的时候会发送ws就行。

创建项目

# 创建ThinkPHP 8.x项目(项目名:tp-websocket,和你的项目名对应)
composer create-project topthink/think tp-websocket

# 进入项目根目录(后续所有命令都在该目录下执行)
cd tp-websocket

# 安装ThinkPHP集成Workerman扩展(支持php think worker:server命令)
composer require topthink/think-worker
# 我这里在tp的文档里直接拉的,他的文档扩展有这个命令

# 安装firebase/php-jwt扩展(项目中用于Token验证)
composer require firebase/php-jwt

启动ws服务
截图

启动http服务
截图

程序代码

用tp那边的命令安装完之后
/www/wwwroot/tp-websocket/config/worker_server.php这里会这个worker_server.php,然后我给定义到自定义的类'worker_class' => app\service\WebSocketService::class,

<?php
return [
    // 扩展自身需要的配置
    'protocol'       => 'websocket', // 协议 支持 tcp udp unix http websocket text
    'host'           => '0.0.0.0', // 监听地址
    'port'           => 2345, // 监听端口
    'socket'         => '', // 完整监听地址
    'context'        => [], // socket 上下文选项
    // 'worker_class'   => '', // 自定义Workerman服务类名 支持数组定义多个服务
    'worker_class'   => app\service\WebSocketService::class,

    // 支持workerman的所有配置参数
    'name'           => 'thinkphp',
    'count'          => 4,
    'daemonize'      => false,
    'pidFile'        => '',

对应的/www/wwwroot/tp-websocket/app/service/WebSocketService.php

<?php
namespace app\service;

use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use think\facade\Config;
use think\facade\Db;
use think\worker\Server;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;

class WebSocketService extends Server
{
    // 服务配置(单进程,确保worker实例唯一)
    protected $protocol = 'websocket';
    protected $host = '0.0.0.0';
    protected $port = 2345;
    protected $name = 'tp8_ws_crawler';
    protected $count = 1;
    protected $daemonize = false;
    protected $pidFile;

    // 存储worker实例(用于共享连接和结果)
    protected $worker;

    // JWT配置
    private $jwtConfig;

    /**
     * 构造方法:初始化配置
     */
    public function __construct()
    {
        parent::__construct();
        $this->jwtConfig = Config::get('app.jwt');
        $this->pidFile = runtime_path() . 'worker.pid';
    }

    /**
     * Worker启动事件:保存worker实例+初始化共享属性
     * @param Worker $worker
     */
    public function onWorkerStart($worker)
    {
        // 保存worker实例到类属性,供后续回调使用
        $this->worker = $worker;

        // 初始化worker的自定义属性,存储在线连接和爬虫结果
        $this->worker->onlineUsers = []; // 替代原来的静态变量self::$onlineUsers
        $this->worker->crawlerResults = []; // 替代原来的静态变量self::$crawlerResults

        echo "========================================\n";
        echo "WebSocket服务启动成功,监听端口:{$this->port}\n";
        echo "服务名称:{$this->name},进程数:{$this->count}\n";
        echo "PID文件路径:{$this->pidFile}\n";
        echo "========================================\n";

        // 清理数据库过期连接
        $this->clearExpireOnlineUser();

        // 绑定onConnect事件(通过worker实例访问,无需静态变量)
        $this->worker->onConnect = function (TcpConnection $connection) {
            $connection->onWebSocketConnect = function (TcpConnection $connection, $httpHeaders) {
                $token = $_GET['token'] ?? '';
                echo "\n【握手阶段】收到GET参数:" . json_encode($_GET) . "\n";
                echo "【握手阶段】解析到Token:{$token}\n";

                if (empty($token)) {
                    $connection->close(json_encode(['code' => 1004, 'msg' => 'Token不能为空']));
                    echo "【握手失败】原因:Token为空\n";
                    return;
                }

                try {
                    $payload = JWT::decode($token, new Key($this->jwtConfig['secret'], $this->jwtConfig['alg']));
                    $userInfo = (array)$payload;
                    $userId = (int)$userInfo['user_id'];
                    $userName = $userInfo['username'] ?? '';
                    $connectionId = $connection->id;
                    $nowTime = time();

                    // 1. 存储连接实例到worker的自定义属性(替代静态变量)
                    $this->worker->onlineUsers[$userId] = $connection;
                    $connection->user_id = $userId;
                    $connection->username = $userName;
                    $connection->lastMsgTime = $nowTime;

                    // 2. 数据库增/更操作(兼容低版本TP)
                    $onlineUser = Db::name('ws_online_user')->where('user_id', $userId)->find();
                    if ($onlineUser) {
                        Db::name('ws_online_user')->where('user_id', $userId)->update([
                            'connection_id' => $connectionId,
                            'last_msg_time' => $nowTime
                        ]);
                        echo "【数据库更新】用户ID: {$userId} 连接记录已更新\n";
                    } else {
                        Db::name('ws_online_user')->insert([
                            'user_id' => $userId,
                            'connection_id' => $connectionId,
                            'last_msg_time' => $nowTime,
                            'create_time' => $nowTime
                        ]);
                        echo "【数据库新增】用户ID: {$userId} 连接记录已插入\n";
                    }

                    // 打印在线日志(从worker属性获取连接列表)
                    echo "【握手成功】用户ID: {$userId},用户名: {$userName},连接ID: {$connectionId}\n";
                    echo "【当前在线】worker属性列表:" . json_encode(array_keys($this->worker->onlineUsers)) . "\n";
                    echo "【当前在线】数据库列表:" . json_encode(Db::name('ws_online_user')->column('user_id')) . "\n";

                    $connection->send(json_encode([
                        'code' => 200,
                        'msg' => 'WebSocket连接成功',
                        'data' => ['user_id' => $userId, 'username' => $userName]
                    ]));
                } catch (\Exception $e) {
                    $errorMsg = 'Token无效或已过期:' . $e->getMessage();
                    $connection->close(json_encode(['code' => 1005, 'msg' => $errorMsg]));
                    echo "【握手失败】原因:{$errorMsg}\n";
                    return;
                }
            };
        };

        // 绑定onClose事件(清理worker属性中的连接)
        $this->worker->onClose = function (TcpConnection $connection) {
            $userId = $connection->user_id ?? 0;
            $connectionId = $connection->id;

            if ($userId > 0) {
                // 1. 从worker属性中移除连接实例
                if (isset($this->worker->onlineUsers[$userId])) {
                    unset($this->worker->onlineUsers[$userId]);
                }

                // 2. 清理数据库记录
                $deleteCount = Db::name('ws_online_user')->where([
                    'user_id' => $userId,
                    'connection_id' => $connectionId
                ])->delete();

                echo "\n【连接断开】用户ID: {$userId},连接ID: {$connectionId}\n";
                echo "【数据库删除】成功删除 {$deleteCount} 条连接记录\n";
                echo "【当前在线】worker属性列表:" . json_encode(array_keys($this->worker->onlineUsers)) . "\n";
                echo "【当前在线】数据库列表:" . json_encode(Db::name('ws_online_user')->column('user_id')) . "\n";
            }
        };

        // 绑定onError事件
        $this->worker->onError = function (TcpConnection $connection, $code, $msg) {
            $userId = $connection->user_id ?? 0;
            $connectionId = $connection->id;
            echo "\n【连接错误】用户ID: {$userId},连接ID: {$connectionId},错误码: {$code},错误信息: {$msg}\n";
        };
    }

    /**
     * 接收客户端消息事件(从worker属性获取/操作数据)
     * @param TcpConnection $connection
     * @param string $data
     */
    public function onMessage(TcpConnection $connection, $data)
    {
        $userId = $connection->user_id ?? 0;
        $connectionId = $connection->id;
        $nowTime = time();
        $connection->lastMsgTime = $nowTime;

        try {
            // 更新数据库心跳时间
            Db::name('ws_online_user')->where([
                'user_id' => $userId,
                'connection_id' => $connectionId
            ])->update(['last_msg_time' => $nowTime]);

            $recvData = json_decode($data, true);
            if (json_last_error() !== JSON_ERROR_NONE) {
                $connection->send(json_encode(['code' => 1010, 'msg' => '数据格式错误,请发送JSON格式数据']));
                echo "【消息接收失败】用户ID: {$userId},原因:非JSON格式数据\n";
                return;
            }

            // 处理心跳包
            if (isset($recvData['type']) && $recvData['type'] === 'heartbeat') {
                echo "【心跳包接收】用户ID: {$userId},连接ID: {$connectionId}\n";
                $connection->send(json_encode(['code' => 200, 'msg' => '心跳响应成功']));
                return;
            }

            // 处理爬虫结果
            $taskId = $recvData['task_id'] ?? '';
            $crawlerResult = $recvData['crawler_result'] ?? '';
            if (empty($taskId) || empty($crawlerResult)) {
                $connection->send(json_encode(['code' => 1011, 'msg' => '缺少任务ID或爬虫结果']));
                echo "【消息接收失败】用户ID: {$userId},原因:缺少任务ID或爬虫结果\n";
                return;
            }

            // 存储爬虫结果到worker属性(替代静态变量)
            $resultKey = $userId . '_' . $taskId;
            $this->worker->crawlerResults[$resultKey] = [
                'task_id' => $taskId,
                'crawler_result' => $crawlerResult,
                'recv_time' => date('Y-m-d H:i:s')
            ];

            $connection->send(json_encode([
                'code' => 200,
                'msg' => '爬虫结果已接收',
                'data' => ['task_id' => $taskId]
            ]));

            echo "【消息接收成功】用户ID: {$userId},任务ID: {$taskId}\n";
            echo "【已存储结果】worker属性列表:" . json_encode(array_keys($this->worker->crawlerResults)) . "\n";
        } catch (\Exception $e) {
            $errorMsg = '结果接收失败:' . $e->getMessage();
            $connection->send(json_encode(['code' => 500, 'msg' => $errorMsg]));
            echo "【消息接收异常】用户ID: {$userId},原因:{$errorMsg}\n";
            return;
        }
    }

    /**
     * 主动发送消息给指定用户(通过worker属性获取连接)
     * @param int $userId
     * @param array $data
     * @return bool
     */
    public function sendMessageToUser($userId, $data)
    {
        $userId = (int)$userId;
        // 从worker属性中获取连接实例(替代静态变量)
        if (!isset($this->worker->onlineUsers[$userId])) {
            echo "【消息发送失败】用户ID: {$userId} worker属性中无连接实例\n";
            return false;
        }

        $connection = $this->worker->onlineUsers[$userId];
        $connection->lastMsgTime = time();
        Db::name('ws_online_user')->where('user_id', $userId)->update(['last_msg_time' => $connection->lastMsgTime]);
        $connection->send(json_encode($data));

        echo "【消息发送成功】用户ID: {$userId},任务数据:" . json_encode($data) . "\n";
        return true;
    }

    /**
     * 清理数据库过期连接
     */
    private function clearExpireOnlineUser()
    {
        $expireTime = time() - 300;
        $deleteCount = Db::name('ws_online_user')->where('last_msg_time', '<', $expireTime)->delete();
        echo "【过期连接清理】清理了 {$deleteCount} 条无效记录\n";
    }
}

这样子连接的时候也会跟着鉴权了,这块好像是没啥问题吧

  • 登录注册接口
    /www/wwwroot/tp-websocket/app/controller/Auth.php
<?php
namespace app\controller;

use app\model\User;
use Firebase\JWT\JWT;
use think\facade\Config;
use think\facade\Request;

class Auth
{
    /**
     * 注册接口
     * @return \think\response\Json
     */
    public function register()
    {
        try {
            $username = Request::param('username', '');
            $password = Request::param('password', '');

            // 验证参数
            if (empty($username) || empty($password)) {
                return json(['code' => 1001, 'msg' => '用户名和密码不能为空']);
            }

            // 检查用户名是否存在
            $existUser = User::where('username', $username)->find();
            if ($existUser) {
                return json(['code' => 1002, 'msg' => '用户名已被注册']);
            }

            // 创建用户
            $user = User::create([
                'username' => $username,
                'password' => md5($password), // 示例用md5,生产环境用password_hash
                'create_time' => time(),
                'update_time' => time(),
            ]);

            echo "【用户注册成功】用户名: {$username},用户ID: " . (int)$user->id . "\n";
            return json(['code' => 200, 'msg' => '注册成功']);
        } catch (\Exception $e) {
            $errorMsg = '注册失败:' . $e->getMessage();
            echo "【用户注册失败】原因:{$errorMsg}\n";
            return json(['code' => 500, 'msg' => $errorMsg]);
        }
    }

    /**
     * 登录接口(返回Token)
     * @return \think\response\Json
     */
    public function login()
    {
        try {
            $username = Request::param('username', '');
            $password = Request::param('password', '');

            // 验证参数
            if (empty($username) || empty($password)) {
                return json(['code' => 1001, 'msg' => '用户名和密码不能为空']);
            }

            // 查询用户
            $user = User::where('username', $username)
                ->where('password', md5($password))
                ->find();

            if (!$user) {
                return json(['code' => 1003, 'msg' => '用户名或密码错误']);
            }

            // 生成JWT Token
            $jwtConfig = Config::get('app.jwt');
            $payload = [
                'user_id' => (int)$user->id, // 强制转为整数
                'username' => $user->username,
                'iat' => time(), // 签发时间
                'exp' => time() + $jwtConfig['expire'], // 过期时间
            ];
            $token = JWT::encode($payload, $jwtConfig['secret'], $jwtConfig['alg']);

            $result = [
                'code' => 200,
                'msg' => '登录成功',
                'data' => [
                    'token' => $token,
                    'user_id' => (int)$user->id,
                    'username' => $user->username,
                ]
            ];

            echo "【用户登录成功】用户名: {$username},用户ID: " . (int)$user->id . "\n";
            return json($result);
        } catch (\Exception $e) {
            $errorMsg = '登录失败:' . $e->getMessage();
            echo "【用户登录失败】原因:{$errorMsg}\n";
            return json(['code' => 500, 'msg' => $errorMsg]);
        }
    }
}
  • Task接口
    /www/wwwroot/tp-websocket/app/controller/Task.php
<?php
namespace app\controller;

use app\service\WebSocketService;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use think\facade\Config;
use think\facade\Db;
use think\facade\Request;

class Task
{
    // 其他方法不变,仅修改sendCrawlerTask中的任务发送逻辑
    public function sendCrawlerTask()
    {
        // 1. 验证Token(原有逻辑不变)
        $tokenRes = $this->verifyToken();
        if ($tokenRes['code'] != 200) {
            return json($tokenRes);
        }
        $userInfo = $tokenRes['data'];
        $userId = $userInfo['user_id'];
        $userName = $userInfo['username'] ?? '';

        // 2. 打印日志(原有逻辑不变)
        echo "\n========================================\n";
        echo "【接口请求】发送爬虫任务,用户ID: {$userId},用户名: {$userName}\n";

        // 3. 判断在线状态(原有逻辑不变)
        $onlineUser = Db::name('ws_online_user')->where('user_id', $userId)->find();
        $dbOnline = !empty($onlineUser);
        echo "【数据库在线】:" . ($dbOnline ? '是' : '否') . "\n";
        echo "========================================\n";

        if (!$dbOnline) {
            return json(['code' => 1006, 'msg' => '设备未连接WebSocket']);
        }

        // 4. 验证爬虫URL(原有逻辑不变)
        $crawlerUrl = Request::param('crawler_url', '');
        $crawlerParams = Request::param('crawler_params', []);
        if (empty($crawlerUrl)) {
            return json(['code' => 1007, 'msg' => '爬虫URL不能为空']);
        }

        // 5. 构造任务数据(原有逻辑不变)
        $taskData = [
            'type' => 'crawler_task',
            'task_id' => uniqid('task_'),
            'data' => [
                'crawler_url' => $crawlerUrl,
                'crawler_params' => $crawlerParams
            ],
            'send_time' => date('Y-m-d H:i:s')
        ];

        // 6. 适配非静态方法:通过WebSocketService实例调用(单进程下可通过全局worker实例)
        // 注意:单进程下,WebSocketService的$this->worker已全局共享,可通过以下方式获取
        $sendSuccess = false;
        global $worker; // Workerman会将worker实例注册为全局变量
        if (isset($worker) && isset($worker->onlineUsers[$userId])) {
            // 直接通过全局worker实例发送消息(更简洁)
            $connection = $worker->onlineUsers[$userId];
            $connection->lastMsgTime = time();
            Db::name('ws_online_user')->where('user_id', $userId)->update(['last_msg_time' => $connection->lastMsgTime]);
            $connection->send(json_encode($taskData));
            $sendSuccess = true;
            echo "【消息发送成功】用户ID: {$userId},任务ID: {$taskData['task_id']}\n";
        } else {
            echo "【消息发送失败】用户ID: {$userId} 无有效连接实例\n";
        }

        if (!$sendSuccess) {
            return json(['code' => 1012, 'msg' => '任务发送失败,请重新连接WebSocket']);
        }

        return json([
            'code' => 200,
            'msg' => '爬虫任务已发送',
            'data' => $taskData
        ]);
    }

    // 其他方法(verifyToken、getCrawlerResult)不变
    private function verifyToken()
    {
        $token = Request::header('Authorization', '');
        $token = str_replace('Bearer ', '', $token);

        if (empty($token)) {
            return ['code' => 1004, 'msg' => 'Token不能为空'];
        }

        $jwtConfig = Config::get('app.jwt');
        try {
            $payload = JWT::decode($token, new Key($jwtConfig['secret'], $jwtConfig['alg']));
            $userInfo = (array)$payload;
            $userInfo['user_id'] = (int)$userInfo['user_id'];
            return ['code' => 200, 'data' => $userInfo];
        } catch (\Exception $e) {
            return ['code' => 1005, 'msg' => 'Token无效或已过期:' . $e->getMessage()];
        }
    }

    public function getCrawlerResult()
    {
        $tokenRes = $this->verifyToken();
        if ($tokenRes['code'] != 200) {
            return json($tokenRes);
        }
        $userInfo = $tokenRes['data'];
        $userId = (int)$userInfo['user_id'];

        $taskId = Request::param('task_id', '');
        if (empty($taskId)) {
            return json(['code' => 1008, 'msg' => '任务ID不能为空']);
        }

        // 从全局worker实例中获取爬虫结果
        $resultKey = $userId . '_' . $taskId;
        global $worker;
        if (!isset($worker) || !isset($worker->crawlerResults[$resultKey])) {
            return json(['code' => 1009, 'msg' => '暂无该任务的爬虫结果']);
        }

        return json([
            'code' => 200,
            'msg' => '获取爬虫结果成功',
            'data' => $worker->crawlerResults[$resultKey]
        ]);
    }
}

报错信息

========================================
【接口请求】发送爬虫任务,用户ID: 1,用户名: tp8test
【数据库在线】:是
========================================
【消息发送失败】用户ID: 1 无有效连接实例
{
    "code": 1012,
    "msg": "任务发送失败,请重新连接WebSocket"
}

用到的两张数据库表:

CREATE TABLE `user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户主键ID',
  `username` varchar(50) NOT NULL COMMENT '用户名(唯一)',
  `password` varchar(100) NOT NULL COMMENT '密码(建议加密存储,如bcrypt)',
  `nickname` varchar(50) DEFAULT '' COMMENT '用户昵称',
  `status` tinyint(1) NOT NULL DEFAULT 1 COMMENT '状态:1-正常,0-禁用',
  `create_time` int(11) NOT NULL DEFAULT 0 COMMENT '创建时间(时间戳)',
  `update_time` int(11) NOT NULL DEFAULT 0 COMMENT '更新时间(时间戳)',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_username` (`username`) COMMENT '用户名唯一索引,避免重复注册'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户基础信息表';
CREATE TABLE `ws_online_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `user_id` int(11) NOT NULL COMMENT '用户ID(与user表id关联)',
  `connection_id` varchar(64) NOT NULL COMMENT 'WebSocket连接唯一标识(Workerman的connection->id)',
  `last_msg_time` int(11) NOT NULL DEFAULT 0 COMMENT '最后通信时间(时间戳,用于清理过期连接)',
  `create_time` int(11) NOT NULL DEFAULT 0 COMMENT '连接创建时间(时间戳)',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_user_id` (`user_id`) COMMENT '用户ID唯一索引,一个用户只保留一个有效连接',
  KEY `idx_last_msg_time` (`last_msg_time`) COMMENT '最后通信时间索引,快速清理过期连接'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='WebSocket在线用户映射表';

在配置文件这里就加了个jwt的配置,其他都是tp8。
/www/wwwroot/tp-websocket/config/app.php

// JWT配置
    'jwt' => [
        'secret' => 'HscXnLN2PU1DajfIIyZK8kdvI3YnDB4b', // 自定义密钥(生产环境用复杂随机串)
        'expire' => 7200, // Token过期时间(2小时)
        'alg' => 'HS256', // 加密算法
    ],

截图报错信息里报错文件相关代码

这里明明已经连接了
截图
然后这里还不行
截图

操作系统及workerman/webman等框架组件具体版本

这里写具体的系统环境相关信息
linux下的宝塔,php8.1 tp8.1。有没有老哥来个例子。新人想进步

367 2 0
2个回答

故人重来

个人建议直接使用全套 webman来替代tp8。tp的workerman组件库没有更新,不是最新版本。

  • 暂无评论
ichynul

tp5 => think-worker2.0 => workerman3.5
tp6 => think-worker3.0 => workerman3.5
tp8 => think-worker4.0 => workerman3.5 (只发了一个版本)
tp8 => think-worker5.0 => workerman5.x

看你的用法,是tp8 + think-worker4.0 吗 ?
tp8应该是要用think-worker5.0比较好些

  • ichynul 2025-12-23

    你从http向websocket发消息,需要中转,在websocket里面建个text协议

  • 超级新人啥也学不会 2025-12-23

    需要中转嘛,那不就得用gateway了。我试试,以为直接能http里用ws。我以前就只会http,最近开始学ws

🔝