我主要是要做一个任务。
首先要通过ws连接,相当于设备已连接。
然后再通过http去请求接口,比如/task/send
调用/task/send这个接口的时候,就服务端会通过ws向客户端发送一些数据比如url,params等,这时候这个/task/send还在等待
然后客户端通过ws拿到请求的接口参数去请求拿到数据比如{k1:v1} 这样 通过ws在发给服务端,服务端收到消息后/task/send这里就不用等待。直接把{k1:v1} 这个数据输出出去
这里我做下简化。就请求接口/task/send的时候会发送ws就行。
# 创建ThinkPHP 8.x项目(项目名:tp-websocket,和你的项目名对应)
composer create-project topthink/think tp-websocket
# 进入项目根目录(后续所有命令都在该目录下执行)
cd tp-websocket
# 安装ThinkPHP集成Workerman扩展(支持php think worker:server命令)
composer require topthink/think-worker
# 我这里在tp的文档里直接拉的,他的文档扩展有这个命令
# 安装firebase/php-jwt扩展(项目中用于Token验证)
composer require firebase/php-jwt
启动ws服务

启动http服务

用tp那边的命令安装完之后
/www/wwwroot/tp-websocket/config/worker_server.php这里会这个worker_server.php,然后我给定义到自定义的类'worker_class' => app\service\WebSocketService::class,
<?php
return [
// 扩展自身需要的配置
'protocol' => 'websocket', // 协议 支持 tcp udp unix http websocket text
'host' => '0.0.0.0', // 监听地址
'port' => 2345, // 监听端口
'socket' => '', // 完整监听地址
'context' => [], // socket 上下文选项
// 'worker_class' => '', // 自定义Workerman服务类名 支持数组定义多个服务
'worker_class' => app\service\WebSocketService::class,
// 支持workerman的所有配置参数
'name' => 'thinkphp',
'count' => 4,
'daemonize' => false,
'pidFile' => '',
对应的/www/wwwroot/tp-websocket/app/service/WebSocketService.php
<?php
namespace app\service;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use think\facade\Config;
use think\facade\Db;
use think\worker\Server;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
class WebSocketService extends Server
{
// 服务配置(单进程,确保worker实例唯一)
protected $protocol = 'websocket';
protected $host = '0.0.0.0';
protected $port = 2345;
protected $name = 'tp8_ws_crawler';
protected $count = 1;
protected $daemonize = false;
protected $pidFile;
// 存储worker实例(用于共享连接和结果)
protected $worker;
// JWT配置
private $jwtConfig;
/**
* 构造方法:初始化配置
*/
public function __construct()
{
parent::__construct();
$this->jwtConfig = Config::get('app.jwt');
$this->pidFile = runtime_path() . 'worker.pid';
}
/**
* Worker启动事件:保存worker实例+初始化共享属性
* @param Worker $worker
*/
public function onWorkerStart($worker)
{
// 保存worker实例到类属性,供后续回调使用
$this->worker = $worker;
// 初始化worker的自定义属性,存储在线连接和爬虫结果
$this->worker->onlineUsers = []; // 替代原来的静态变量self::$onlineUsers
$this->worker->crawlerResults = []; // 替代原来的静态变量self::$crawlerResults
echo "========================================\n";
echo "WebSocket服务启动成功,监听端口:{$this->port}\n";
echo "服务名称:{$this->name},进程数:{$this->count}\n";
echo "PID文件路径:{$this->pidFile}\n";
echo "========================================\n";
// 清理数据库过期连接
$this->clearExpireOnlineUser();
// 绑定onConnect事件(通过worker实例访问,无需静态变量)
$this->worker->onConnect = function (TcpConnection $connection) {
$connection->onWebSocketConnect = function (TcpConnection $connection, $httpHeaders) {
$token = $_GET['token'] ?? '';
echo "\n【握手阶段】收到GET参数:" . json_encode($_GET) . "\n";
echo "【握手阶段】解析到Token:{$token}\n";
if (empty($token)) {
$connection->close(json_encode(['code' => 1004, 'msg' => 'Token不能为空']));
echo "【握手失败】原因:Token为空\n";
return;
}
try {
$payload = JWT::decode($token, new Key($this->jwtConfig['secret'], $this->jwtConfig['alg']));
$userInfo = (array)$payload;
$userId = (int)$userInfo['user_id'];
$userName = $userInfo['username'] ?? '';
$connectionId = $connection->id;
$nowTime = time();
// 1. 存储连接实例到worker的自定义属性(替代静态变量)
$this->worker->onlineUsers[$userId] = $connection;
$connection->user_id = $userId;
$connection->username = $userName;
$connection->lastMsgTime = $nowTime;
// 2. 数据库增/更操作(兼容低版本TP)
$onlineUser = Db::name('ws_online_user')->where('user_id', $userId)->find();
if ($onlineUser) {
Db::name('ws_online_user')->where('user_id', $userId)->update([
'connection_id' => $connectionId,
'last_msg_time' => $nowTime
]);
echo "【数据库更新】用户ID: {$userId} 连接记录已更新\n";
} else {
Db::name('ws_online_user')->insert([
'user_id' => $userId,
'connection_id' => $connectionId,
'last_msg_time' => $nowTime,
'create_time' => $nowTime
]);
echo "【数据库新增】用户ID: {$userId} 连接记录已插入\n";
}
// 打印在线日志(从worker属性获取连接列表)
echo "【握手成功】用户ID: {$userId},用户名: {$userName},连接ID: {$connectionId}\n";
echo "【当前在线】worker属性列表:" . json_encode(array_keys($this->worker->onlineUsers)) . "\n";
echo "【当前在线】数据库列表:" . json_encode(Db::name('ws_online_user')->column('user_id')) . "\n";
$connection->send(json_encode([
'code' => 200,
'msg' => 'WebSocket连接成功',
'data' => ['user_id' => $userId, 'username' => $userName]
]));
} catch (\Exception $e) {
$errorMsg = 'Token无效或已过期:' . $e->getMessage();
$connection->close(json_encode(['code' => 1005, 'msg' => $errorMsg]));
echo "【握手失败】原因:{$errorMsg}\n";
return;
}
};
};
// 绑定onClose事件(清理worker属性中的连接)
$this->worker->onClose = function (TcpConnection $connection) {
$userId = $connection->user_id ?? 0;
$connectionId = $connection->id;
if ($userId > 0) {
// 1. 从worker属性中移除连接实例
if (isset($this->worker->onlineUsers[$userId])) {
unset($this->worker->onlineUsers[$userId]);
}
// 2. 清理数据库记录
$deleteCount = Db::name('ws_online_user')->where([
'user_id' => $userId,
'connection_id' => $connectionId
])->delete();
echo "\n【连接断开】用户ID: {$userId},连接ID: {$connectionId}\n";
echo "【数据库删除】成功删除 {$deleteCount} 条连接记录\n";
echo "【当前在线】worker属性列表:" . json_encode(array_keys($this->worker->onlineUsers)) . "\n";
echo "【当前在线】数据库列表:" . json_encode(Db::name('ws_online_user')->column('user_id')) . "\n";
}
};
// 绑定onError事件
$this->worker->onError = function (TcpConnection $connection, $code, $msg) {
$userId = $connection->user_id ?? 0;
$connectionId = $connection->id;
echo "\n【连接错误】用户ID: {$userId},连接ID: {$connectionId},错误码: {$code},错误信息: {$msg}\n";
};
}
/**
* 接收客户端消息事件(从worker属性获取/操作数据)
* @param TcpConnection $connection
* @param string $data
*/
public function onMessage(TcpConnection $connection, $data)
{
$userId = $connection->user_id ?? 0;
$connectionId = $connection->id;
$nowTime = time();
$connection->lastMsgTime = $nowTime;
try {
// 更新数据库心跳时间
Db::name('ws_online_user')->where([
'user_id' => $userId,
'connection_id' => $connectionId
])->update(['last_msg_time' => $nowTime]);
$recvData = json_decode($data, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$connection->send(json_encode(['code' => 1010, 'msg' => '数据格式错误,请发送JSON格式数据']));
echo "【消息接收失败】用户ID: {$userId},原因:非JSON格式数据\n";
return;
}
// 处理心跳包
if (isset($recvData['type']) && $recvData['type'] === 'heartbeat') {
echo "【心跳包接收】用户ID: {$userId},连接ID: {$connectionId}\n";
$connection->send(json_encode(['code' => 200, 'msg' => '心跳响应成功']));
return;
}
// 处理爬虫结果
$taskId = $recvData['task_id'] ?? '';
$crawlerResult = $recvData['crawler_result'] ?? '';
if (empty($taskId) || empty($crawlerResult)) {
$connection->send(json_encode(['code' => 1011, 'msg' => '缺少任务ID或爬虫结果']));
echo "【消息接收失败】用户ID: {$userId},原因:缺少任务ID或爬虫结果\n";
return;
}
// 存储爬虫结果到worker属性(替代静态变量)
$resultKey = $userId . '_' . $taskId;
$this->worker->crawlerResults[$resultKey] = [
'task_id' => $taskId,
'crawler_result' => $crawlerResult,
'recv_time' => date('Y-m-d H:i:s')
];
$connection->send(json_encode([
'code' => 200,
'msg' => '爬虫结果已接收',
'data' => ['task_id' => $taskId]
]));
echo "【消息接收成功】用户ID: {$userId},任务ID: {$taskId}\n";
echo "【已存储结果】worker属性列表:" . json_encode(array_keys($this->worker->crawlerResults)) . "\n";
} catch (\Exception $e) {
$errorMsg = '结果接收失败:' . $e->getMessage();
$connection->send(json_encode(['code' => 500, 'msg' => $errorMsg]));
echo "【消息接收异常】用户ID: {$userId},原因:{$errorMsg}\n";
return;
}
}
/**
* 主动发送消息给指定用户(通过worker属性获取连接)
* @param int $userId
* @param array $data
* @return bool
*/
public function sendMessageToUser($userId, $data)
{
$userId = (int)$userId;
// 从worker属性中获取连接实例(替代静态变量)
if (!isset($this->worker->onlineUsers[$userId])) {
echo "【消息发送失败】用户ID: {$userId} worker属性中无连接实例\n";
return false;
}
$connection = $this->worker->onlineUsers[$userId];
$connection->lastMsgTime = time();
Db::name('ws_online_user')->where('user_id', $userId)->update(['last_msg_time' => $connection->lastMsgTime]);
$connection->send(json_encode($data));
echo "【消息发送成功】用户ID: {$userId},任务数据:" . json_encode($data) . "\n";
return true;
}
/**
* 清理数据库过期连接
*/
private function clearExpireOnlineUser()
{
$expireTime = time() - 300;
$deleteCount = Db::name('ws_online_user')->where('last_msg_time', '<', $expireTime)->delete();
echo "【过期连接清理】清理了 {$deleteCount} 条无效记录\n";
}
}
这样子连接的时候也会跟着鉴权了,这块好像是没啥问题吧
/www/wwwroot/tp-websocket/app/controller/Auth.php<?php
namespace app\controller;
use app\model\User;
use Firebase\JWT\JWT;
use think\facade\Config;
use think\facade\Request;
class Auth
{
/**
* 注册接口
* @return \think\response\Json
*/
public function register()
{
try {
$username = Request::param('username', '');
$password = Request::param('password', '');
// 验证参数
if (empty($username) || empty($password)) {
return json(['code' => 1001, 'msg' => '用户名和密码不能为空']);
}
// 检查用户名是否存在
$existUser = User::where('username', $username)->find();
if ($existUser) {
return json(['code' => 1002, 'msg' => '用户名已被注册']);
}
// 创建用户
$user = User::create([
'username' => $username,
'password' => md5($password), // 示例用md5,生产环境用password_hash
'create_time' => time(),
'update_time' => time(),
]);
echo "【用户注册成功】用户名: {$username},用户ID: " . (int)$user->id . "\n";
return json(['code' => 200, 'msg' => '注册成功']);
} catch (\Exception $e) {
$errorMsg = '注册失败:' . $e->getMessage();
echo "【用户注册失败】原因:{$errorMsg}\n";
return json(['code' => 500, 'msg' => $errorMsg]);
}
}
/**
* 登录接口(返回Token)
* @return \think\response\Json
*/
public function login()
{
try {
$username = Request::param('username', '');
$password = Request::param('password', '');
// 验证参数
if (empty($username) || empty($password)) {
return json(['code' => 1001, 'msg' => '用户名和密码不能为空']);
}
// 查询用户
$user = User::where('username', $username)
->where('password', md5($password))
->find();
if (!$user) {
return json(['code' => 1003, 'msg' => '用户名或密码错误']);
}
// 生成JWT Token
$jwtConfig = Config::get('app.jwt');
$payload = [
'user_id' => (int)$user->id, // 强制转为整数
'username' => $user->username,
'iat' => time(), // 签发时间
'exp' => time() + $jwtConfig['expire'], // 过期时间
];
$token = JWT::encode($payload, $jwtConfig['secret'], $jwtConfig['alg']);
$result = [
'code' => 200,
'msg' => '登录成功',
'data' => [
'token' => $token,
'user_id' => (int)$user->id,
'username' => $user->username,
]
];
echo "【用户登录成功】用户名: {$username},用户ID: " . (int)$user->id . "\n";
return json($result);
} catch (\Exception $e) {
$errorMsg = '登录失败:' . $e->getMessage();
echo "【用户登录失败】原因:{$errorMsg}\n";
return json(['code' => 500, 'msg' => $errorMsg]);
}
}
}
/www/wwwroot/tp-websocket/app/controller/Task.php<?php
namespace app\controller;
use app\service\WebSocketService;
use Firebase\JWT\JWT;
use Firebase\JWT\Key;
use think\facade\Config;
use think\facade\Db;
use think\facade\Request;
class Task
{
// 其他方法不变,仅修改sendCrawlerTask中的任务发送逻辑
public function sendCrawlerTask()
{
// 1. 验证Token(原有逻辑不变)
$tokenRes = $this->verifyToken();
if ($tokenRes['code'] != 200) {
return json($tokenRes);
}
$userInfo = $tokenRes['data'];
$userId = $userInfo['user_id'];
$userName = $userInfo['username'] ?? '';
// 2. 打印日志(原有逻辑不变)
echo "\n========================================\n";
echo "【接口请求】发送爬虫任务,用户ID: {$userId},用户名: {$userName}\n";
// 3. 判断在线状态(原有逻辑不变)
$onlineUser = Db::name('ws_online_user')->where('user_id', $userId)->find();
$dbOnline = !empty($onlineUser);
echo "【数据库在线】:" . ($dbOnline ? '是' : '否') . "\n";
echo "========================================\n";
if (!$dbOnline) {
return json(['code' => 1006, 'msg' => '设备未连接WebSocket']);
}
// 4. 验证爬虫URL(原有逻辑不变)
$crawlerUrl = Request::param('crawler_url', '');
$crawlerParams = Request::param('crawler_params', []);
if (empty($crawlerUrl)) {
return json(['code' => 1007, 'msg' => '爬虫URL不能为空']);
}
// 5. 构造任务数据(原有逻辑不变)
$taskData = [
'type' => 'crawler_task',
'task_id' => uniqid('task_'),
'data' => [
'crawler_url' => $crawlerUrl,
'crawler_params' => $crawlerParams
],
'send_time' => date('Y-m-d H:i:s')
];
// 6. 适配非静态方法:通过WebSocketService实例调用(单进程下可通过全局worker实例)
// 注意:单进程下,WebSocketService的$this->worker已全局共享,可通过以下方式获取
$sendSuccess = false;
global $worker; // Workerman会将worker实例注册为全局变量
if (isset($worker) && isset($worker->onlineUsers[$userId])) {
// 直接通过全局worker实例发送消息(更简洁)
$connection = $worker->onlineUsers[$userId];
$connection->lastMsgTime = time();
Db::name('ws_online_user')->where('user_id', $userId)->update(['last_msg_time' => $connection->lastMsgTime]);
$connection->send(json_encode($taskData));
$sendSuccess = true;
echo "【消息发送成功】用户ID: {$userId},任务ID: {$taskData['task_id']}\n";
} else {
echo "【消息发送失败】用户ID: {$userId} 无有效连接实例\n";
}
if (!$sendSuccess) {
return json(['code' => 1012, 'msg' => '任务发送失败,请重新连接WebSocket']);
}
return json([
'code' => 200,
'msg' => '爬虫任务已发送',
'data' => $taskData
]);
}
// 其他方法(verifyToken、getCrawlerResult)不变
private function verifyToken()
{
$token = Request::header('Authorization', '');
$token = str_replace('Bearer ', '', $token);
if (empty($token)) {
return ['code' => 1004, 'msg' => 'Token不能为空'];
}
$jwtConfig = Config::get('app.jwt');
try {
$payload = JWT::decode($token, new Key($jwtConfig['secret'], $jwtConfig['alg']));
$userInfo = (array)$payload;
$userInfo['user_id'] = (int)$userInfo['user_id'];
return ['code' => 200, 'data' => $userInfo];
} catch (\Exception $e) {
return ['code' => 1005, 'msg' => 'Token无效或已过期:' . $e->getMessage()];
}
}
public function getCrawlerResult()
{
$tokenRes = $this->verifyToken();
if ($tokenRes['code'] != 200) {
return json($tokenRes);
}
$userInfo = $tokenRes['data'];
$userId = (int)$userInfo['user_id'];
$taskId = Request::param('task_id', '');
if (empty($taskId)) {
return json(['code' => 1008, 'msg' => '任务ID不能为空']);
}
// 从全局worker实例中获取爬虫结果
$resultKey = $userId . '_' . $taskId;
global $worker;
if (!isset($worker) || !isset($worker->crawlerResults[$resultKey])) {
return json(['code' => 1009, 'msg' => '暂无该任务的爬虫结果']);
}
return json([
'code' => 200,
'msg' => '获取爬虫结果成功',
'data' => $worker->crawlerResults[$resultKey]
]);
}
}
========================================
【接口请求】发送爬虫任务,用户ID: 1,用户名: tp8test
【数据库在线】:是
========================================
【消息发送失败】用户ID: 1 无有效连接实例
{
"code": 1012,
"msg": "任务发送失败,请重新连接WebSocket"
}
用到的两张数据库表:
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户主键ID',
`username` varchar(50) NOT NULL COMMENT '用户名(唯一)',
`password` varchar(100) NOT NULL COMMENT '密码(建议加密存储,如bcrypt)',
`nickname` varchar(50) DEFAULT '' COMMENT '用户昵称',
`status` tinyint(1) NOT NULL DEFAULT 1 COMMENT '状态:1-正常,0-禁用',
`create_time` int(11) NOT NULL DEFAULT 0 COMMENT '创建时间(时间戳)',
`update_time` int(11) NOT NULL DEFAULT 0 COMMENT '更新时间(时间戳)',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_username` (`username`) COMMENT '用户名唯一索引,避免重复注册'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户基础信息表';
CREATE TABLE `ws_online_user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`user_id` int(11) NOT NULL COMMENT '用户ID(与user表id关联)',
`connection_id` varchar(64) NOT NULL COMMENT 'WebSocket连接唯一标识(Workerman的connection->id)',
`last_msg_time` int(11) NOT NULL DEFAULT 0 COMMENT '最后通信时间(时间戳,用于清理过期连接)',
`create_time` int(11) NOT NULL DEFAULT 0 COMMENT '连接创建时间(时间戳)',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_user_id` (`user_id`) COMMENT '用户ID唯一索引,一个用户只保留一个有效连接',
KEY `idx_last_msg_time` (`last_msg_time`) COMMENT '最后通信时间索引,快速清理过期连接'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='WebSocket在线用户映射表';
在配置文件这里就加了个jwt的配置,其他都是tp8。
/www/wwwroot/tp-websocket/config/app.php
// JWT配置
'jwt' => [
'secret' => 'HscXnLN2PU1DajfIIyZK8kdvI3YnDB4b', // 自定义密钥(生产环境用复杂随机串)
'expire' => 7200, // Token过期时间(2小时)
'alg' => 'HS256', // 加密算法
],
这里明明已经连接了

然后这里还不行

这里写具体的系统环境相关信息
linux下的宝塔,php8.1 tp8.1。有没有老哥来个例子。新人想进步
个人建议直接使用全套 webman来替代tp8。tp的workerman组件库没有更新,不是最新版本。
tp5 => think-worker2.0 => workerman3.5
tp6 => think-worker3.0 => workerman3.5
tp8 => think-worker4.0 => workerman3.5 (只发了一个版本)
tp8 => think-worker5.0 => workerman5.x
看你的用法,是tp8 + think-worker4.0 吗 ?
tp8应该是要用think-worker5.0比较好些
你从http向websocket发消息,需要中转,在websocket里面建个text协议
需要中转嘛,那不就得用gateway了。我试试,以为直接能http里用ws。我以前就只会http,最近开始学ws