第一个问题
子进程是一个mqtt使用进程,运行一段时间后就会出现内存溢出,导致设备控制不正常,使用的官方mqtt插件,代码在下面,有好心人路过看看是什么情况,项目已经上线,急需处理
进程内没有发生报错,所以Worker::stopAll();
没有执行重启
第二问题
主进程内是在哪里可以监测子进程状态,怎么实现检测到子进程死掉了,重启子进程
可加 V 有偿处理:13577428582
<?php
namespace process;
use plugins\Sms\model\Template;
use support\Log;
use think\facade\Db;
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use app\shared\model\Device as DeviceModel;
use app\shared\model\Gateway as GatewayModel;
use Workerman\Mqtt\Client;
class Mqtt
{
protected $mqtt;
protected $is_connect = false;
protected static $_instance = null;
public function __construct()
{
$config = config('process.mqtt.constructor.config');
$this->mqtt = new Client("mqtt://{$config['address']}:{$config['port']}", [
'username' => $config['username'],
'password' => $config['password'],
]);
$this->mqtt->connect();
}
public static function instance()
{
if(!self::$_instance instanceof Mqtt){
self::$_instance = new Mqtt();
}
return self::$_instance;
}
public function onWorkerStart(Worker $worker)
{
$this->mqtt->onConnect = function($mqtt) {
try {
/*Log::warning('Mptt server error. ', [
'message' => '链接成功'
]);*/
$this->is_connect = true;
$arr = GatewayModel::where(['status' => 1])->column('title,device_brand', 'id');
$topics = [];
foreach ($arr as $value) {
if ($value['device_brand'] == 1){
$topics['device/'.$value['title']] = 0;
}
if ($value['device_brand'] == 2){
$topics['espDevice/'.$value['title']] = 0;
}
}
if(count($topics)) $this->subscribe($topics);
} catch (\Throwable $e) {
Log::error("Mptt ERROR:" . $e->getMessage());
Worker::stopAll();
}
};
$this->mqtt->onMessage = function($topic, $content, $mqtt){
try {
$time = time();
/*Log::warning('Mptt server error11. ', [
'message' => $content.$topic
]);*/
if (false !== strpos($topic, 'device/')) {
$data = json_decode($content, true);
if (!is_array($data)) return false;
$gateway = GatewayModel::where(['status' => 1, 'title' => $data['clientId']])->find();
if (!$gateway) return false;
$DeviceObj = \framework\device\Device::instance($gateway['title'], $gateway['device_brand']);
if ($data['cmd'] == 100) {
$DeviceObj->ping([
'id' => (string)$data['data']['id'],
'ispush' => 1,
'pollTime' => 20
]);
GatewayModel::where(['id' => $gateway['id']])->update(['is_online' => 1, 'is_msg_admin' => 0, 'heartbeat' => $time]);
} else {
$DeviceObj->gateway($data);
}
}
if (false !== strpos($topic, 'espDevice/')) {
$clientId = str_replace('espDevice/', '', $topic);
$gateway = GatewayModel::where(['status' => 1, 'title' => $clientId])->find();
if (!$gateway) return false;
$DeviceObj = \framework\device\Device::instance($gateway['title'], $gateway['device_brand']);
if (false !== strpos($content, 's-')) {
GatewayModel::where(['id' => $gateway['id']])->update(['is_online' => 1, 'is_msg_admin' => 0, 'heartbeat' => $time]);
DeviceModel::where(['gateway_id' => $gateway['id']])->update(['is_online' => 1, 'heartbeat' => $time]);
} else {
$DeviceObj->gateway($content);
}
}
} catch (\Throwable $e) {
Log::error("Mptt ERROR:" . $e->getMessage());
Worker::stopAll();
}
};
$this->mqtt->onClose = function () {
$this->is_connect = false;
};
$this->mqtt->onError = function () {
$this->mqtt->disconnect();
$this->is_connect = false;
Worker::stopAll();
// $this->mqtt->connect();
};
}
public function onConnect(TcpConnection $connection) {
}
public function onMessage(TcpConnection $connection, $data) {
if (!$this->is_connect) return false;
$data = json_decode($data, true);
if ($data['type'] == 'subscribe') {
$this->subscribe($data['topic']);
}
if ($data['type'] == 'publish') {
$this->publish($data);
}
if ($data['type'] == 'unsubscribe') {
$this->unsubscribe($data['topic']);
}
}
public function publish($data) {
/*Log::warning('Mptt server error. ', [
'message' => '发送消息: '.json_encode($data)
]);*/
if (false !== strpos($data['topic'], 'espSmartHouse/')){
$this->mqtt->publish($data['topic'], $data['content'], ['qos' => 0], function (\Exception $exception = null) use ($data) { });
return false;
}
$this->mqtt->publish($data['topic'], json_encode($data['content']), ['qos' => 0], function (\Exception $exception = null) use ($data) {
});
}
public function unsubscribe($topic) {
$this->mqtt->unsubscribe($topic, function (\Exception $exception = null, array $granted = []) use ($topic) {
});
}
public function subscribe($topic) {
$this->mqtt->subscribe($topic, ['qos' => 0], function (\Exception $exception = null, array $granted = []) use ($topic) {
});
}
}
这里粘贴截图
php 8.1
workerman/mqtt 1.5
workerman/webman-framework 1.5.18
这个不一定是内存泄露,phar打包后文件一般会很大,默认128M可能不够用,php.ini里把memory_limit设置成256M。
好的,我试试