Workerman version:4.1.15
PHP version:7.3
windows server 环境
connectToRemote 后运行一段时间(几小时~2天)就无法接收发送消息,会出现: sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready.
没有connectToRemote不会有问题,请问这是什么问题,有什么解决办法
namespace App\Http\Socket;
use App\Http\Controllers\PtTestController;
use Exception;
use GatewayWorker\Lib\Gateway;
use Illuminate\Http\Request;
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;
class Events
{
private static $connection = null;
private static $retryTimer = null;
private static $remoteAddress = null;
private static $externalID = [];
public static function onWebSocketConnect($client_id, $data)
{
if (isset($data['get']['client'])) {
$ids = Gateway::getClientIdByUid($data['get']['client']);
if (!empty($ids)) {
foreach ($ids as $id) {
if (Gateway::isOnline($id)) {
Gateway::closeClient($id);
}
}
}
Gateway::bindUid($client_id, $data['get']['client']);
}
}
public static function onMessage(string $client_id, $message)
{
try {
$msg = json_decode($message);
$op = $msg->op ?? false;
$data = $msg->data ?? null;
$deviceID = $data->deviceID ?? null;
$type = $data->type ?? null;
$idx = $data->idx ?? null;
switch ($op) {
case 'Login':
if ($type === 'app_server') {
Gateway::bindUid($client_id, 'app_server');
// 链接至其他websocket
self::$remoteAddress = config('app.extranet_websocket');
if (self::$remoteAddress) {
self::connectToRemote();
}
}
break;
case 'ScanQR':
if ($idx) {
self::sendMessage($idx, $message);
}
break;
case 'AppMsg':
if ($deviceID) {
self::sendMessage($deviceID, $message);
}
break;
default:
if (Gateway::getUidByClientId($client_id) === 'app_server') {
if ($deviceID) {
self::sendMessage($deviceID, $message);
}
} else {
Gateway::sendToUid('app_server', $message);
}
break;
}
} catch (Exception $e) {
echo "Error in onMessage: " . $e->getMessage() . "\n";
}
}
public static function connectToRemote()
{
try {
self::$connection = null;
self::$connection = new AsyncTcpConnection(self::$remoteAddress);
} catch (Exception $e) {
echo 'Async tcp connection error: ' . $e->getMessage() . "\n";
}
self::$connection->onConnect = function($connection) {
try {
self::$connection->send(json_encode(['op' => 'Login', 'data' => ['type' => 'intranet_socket']]));
if (self::$retryTimer) {
Timer::del(self::$retryTimer);
self::$retryTimer = null;
}
} catch (Exception $e) {
echo 'Error during connection send: ' . $e->getMessage() . "\n";
}
};
self::$connection->onMessage = function($connection, $data) {
try {
$msg = json_decode($data, true);
$op = $msg['op'] ?? false;
$type = $msg['type'] ?? null;
if ($op === 'ExternalLogin') {
if (isset($msg['clientID']) && !in_array($msg['clientID'], self::$externalID)) {
self::$externalID[] = $msg['clientID'];
}
} else if ($op === 'ExternalLogout') {
if (isset($msg['clientID'])) {
$key = array_search($msg['clientID'], self::$externalID);
if ($key !== false) {
unset(self::$externalID[$key]);
self::$externalID = array_values(self::$externalID);
}
}
} else if ($type === 'to_controller') {
try {
$request = new Request($msg['data']);
$controller = new PtTestController();
if (method_exists($controller, $op)) {
$res = $controller->$op($request);
$message = json_encode([
'op' => $msg['op'],
'type' => 'to_controller',
'data' => $msg['data'],
'msg' => $res['msg'] ?? $res,
'code' => $res['code'] ?? 0,
]);
self::isConnectedSend($message);
}
} catch (Exception $e) {
$message = json_encode([
'op' => $msg['op'],
'type' => 'to_controller',
'data' => $msg['data'],
'msg' => $e->getMessage()
]);
self::isConnectedSend($message);
}
} else {
Gateway::sendToUid('app_server', $data);
}
} catch (Exception $e) {
echo 'Error in onMessage (remote connection): ' . $e->getMessage() . "\n";
}
};
self::$connection->onClose = function() {
if (self::$externalID) {
self::$externalID = [];
}
self::reconnect();
};
self::$connection->onError = function($connection, $code, $msg) {
if (self::$externalID) {
self::$externalID = [];
}
self::reconnect();
};
self::$connection->connect();
}
public static function reconnect()
{
if (self::$retryTimer) {
return;
}
self::$retryTimer = Timer::add(30, function() {
self::connectToRemote();
});
}
public static function sendMessage($recipientID, $message)
{
try {
$isExternal = in_array($recipientID, self::$externalID);
if ($isExternal) {
self::isConnectedSend($message);
} else {
Gateway::sendToUid($recipientID, $message);
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
public static function isConnectedSend($message)
{
try {
if (self::$connection) {
self::$connection->send($message);
} else {
self::reconnect();
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
}
sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready.
Workerman version:4.1.15
PHP version:7.3
windows server 环境
connectToRemote 里不断创建连接又不关闭,导致资源达到上限了
public static function connectToRemote()
{
// 如果连接已存在,创建新连接之前关闭它
if (self::$connection !== null) {
self::$connection->close();
self::$connection = null;
}
// 创建到远程服务器的异步TCP连接
self::$connection = new AsyncTcpConnection(self::$remoteAddress);
...
请问是这样解决吗
其他地方还有需要关闭的吗
可以,其它地方不需要的AsyncTcpConnection连接也要关闭
self::$connection->onError = function($connection, $code, $msg) {
echo "Error: $msg\n";
};
在请教下,这个出错时的回调里面什么都不写,会有问题吗?会出现: sendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. 这个错误吗?
因为在$connection->onError里之前是有错误重连,导致网络不好经常出现 Error: send buffer full and drop package 就会重新链接
$connection->onError 不用处理
多谢大佬的帮助,现在有个问题就是通过self::$connection->send($message) 发送老是send buffer full and drop package,这个问题怎么解决呢,辛苦大佬帮忙看下,ai的回答也解决不了
public static function sendToRemote($message)
{
try {
self::$messageQueue[] = $message;
if (!self::$isSending) {
self::processQueue();
}
} catch (Exception $e) {
echo "Error in sendMessage: " . $e->getMessage() . "\n";
}
}
private static function processQueue()
{
if (empty(self::$messageQueue) || self::$isSending) {
return;
}
}
send buffer full and drop package 是发送速度太快了,对方接收不过来,慢点发送
从代码层面有什么办法能解决办法呢,链接的网络有时候就是会不太稳定
发送数据不要太快,不要太多