webman mysql连接池/redis连接池,手动取还,orm操作

speedy

不知道大家有没有遇到因为耗时任务,连接池耗尽,无法处理后续请求的问题。
因为这个问题也是苦恼,文档虽然提供了一些连接池的案例,但是好像和业务不太匹配。也无法使用laravel orm。
所以自己写了一个连接池用例。
支持laravel orm操作
支持手动取还,快取快还,解决连接被占用,尽快释放出来给其他请求使用。

<?php

namespace app\tools;

use Illuminate\Container\Container;
use Illuminate\Database\Connection;
use Illuminate\Database\Connectors\ConnectionFactory;
use Swoole\Coroutine\Channel;
use Swoole\Timer;

class MysqlPool
{
    protected static $instance = []; //不同配置不同连接池实例
    protected static $lock;
    public static function getInstance($connectionName = "mysql"): self
    {
        if (!isset(self::$instance[$connectionName])) {
            if (self::$lock === null) {
                self::$lock = new Channel(1); // 容量为1的锁通道
            }
            self::$lock->push(true); // 加锁
            if (!isset(self::$instance[$connectionName])) {
                self::$instance[$connectionName] = new self($connectionName);
                self::$lock->close(); //关闭锁通道
            }
        }
        return self::$instance[$connectionName];
    }

    public static function get($connectionName = "mysql")
    {
        return self::getInstance($connectionName)->getConnection();
    }

    public static function put(Connection $connection)
    {
        self::getInstance($connection->getName())->releaseConnection($connection);
    }

    protected $pool;
    protected $config;
    protected $connectionName;
    protected $wait_timeout;
    protected $heartbeat_interval;
    protected $maxConnections;
    private function __construct($connectionName)
    {
        $this->connectionName = $connectionName;
        $this->config = config("database.connections.{$connectionName}");
        $this->maxConnections = $this->config['pool']['max_connections'] ?? 5;
        $this->wait_timeout = $this->config['pool']['wait_timeout'] ?? 3;
        $this->heartbeat_interval = $this->config['pool']['heartbeat_interval'] ?? 30;
        $this->pool = new Channel($this->maxConnections);
        $this->initPool();
    }
    protected function initPool(): void
    {
        for ($i = 0; $i < $this->maxConnections; $i++) {
            $this->pool->push($this->createConnection());
        }
        //异步心跳检测,重连检测
        Timer::tick($this->heartbeat_interval * 1000, function () {
            $pool = [];
            while (!$this->pool->isEmpty()) {
                $conn = $this->pool->pop(0.1);
                try {
                    $conn->statement('SELECT 1');
                    $pool[] = $conn;
                } catch (\Exception $e) {
                    $pool[] = $this->createConnection();
                }
            }

            for ($i = 0; $i < $this->maxConnections; $i++) {
                if ($i < $this->maxConnections) {
                    //中断导致没有归还,进行填充
                    $this->pool->push($pool[$i] ?? $this->createConnection());
                } else {
                    //释放多余连接
                    $pool[$i]->disconnect();
                }
            }
        });
    }
    protected function createConnection(): Connection
    {
        $container = new Container(); //起一个新的容器服务,防止单例复用
        $factory = new ConnectionFactory($container); //该工厂是根据容器判断是否使用单例
        $DB = $factory->make($this->config, $this->connectionName); //连接配置
        return $DB;
    }

    public function getConnection(): Connection
    {
        //提高容错,防止其他代码中断导致连接没有归还,新的连接没有连接可用
        return $this->pool->pop($this->wait_timeout) ?: $this->createConnection();
    }

    public function releaseConnection(Connection $connection): void
    {
        if ($connection->getPdo()->inTransaction()) {
            $connection->rollBack();
        }
        $this->pool->push($connection);
    }
}

//使用方式
$Db = MysqlPool::get(); //获取一个连接实例
$info = $Db->table("xxx")->where([
    ['id', '=', $item['xx']],
    ['name', '=', $item['xx']],
])->first();//orm操作
MysqlPool::put($Db);//归还连接实例
<?php

namespace app\tools;

use Swoole\Coroutine\Channel;
use Swoole\Timer;

class RedisPool
{
    protected static $instance = []; //不同配置不同连接池实例
    protected static $lock;
    private static function getInstance($connectionName = "default"): self
    {

        if (!isset(self::$instance[$connectionName])) {
            if (self::$lock === null) {
                self::$lock = new Channel(1); // 容量为1的锁通道
            }
            self::$lock->push(true); // 加锁
            if (!isset(self::$instance[$connectionName])) {
                self::$instance[$connectionName] = new self($connectionName);
                self::$lock->close(); //关闭锁通道
            }
        }
        return self::$instance[$connectionName];
    }

    public static function get($connectionName = "default")
    {
        return self::getInstance($connectionName)->getConnection();
    }

    public static function put(\Redis $connection, $connectionName = "default")
    {
        self::getInstance($connectionName)->releaseConnection($connection);
    }

    protected $pool;
    protected $config;
    protected $connectionName;
    protected $wait_timeout;
    protected $heartbeat_interval;
    protected $maxConnections;
    private function __construct($connectionName)
    {
        $this->connectionName = $connectionName;
        $this->config = config("redis.{$connectionName}");
        $this->maxConnections = $this->config['pool']['max_connections'] ?? 5;
        $this->wait_timeout = $this->config['pool']['wait_timeout'] ?? 3;
        $this->heartbeat_interval = $this->config['pool']['heartbeat_interval'] ?? 30;
        $this->pool = new Channel($this->maxConnections);
        $this->initPool();
    }

    protected function initPool(): void
    {
        for ($i = 0; $i < $this->maxConnections; $i++) {
            $this->pool->push($this->createConnection());
        }
        Timer::tick($this->heartbeat_interval * 1000, function () {
            $pool = [];
            while (!$this->pool->isEmpty()) {
                $conn = $this->pool->pop(0.1);
                try {
                    $conn->ping();
                    $pool[] = $conn;
                } catch (\Exception $e) {
                    $pool[] = $this->createConnection();
                }
            }
            for ($i = 0; $i < $this->maxConnections; $i++) {
                if ($i < $this->maxConnections) {
                    //中断导致没有归还,进行填充
                    $this->pool->push($pool[$i] ?? $this->createConnection());
                } else {
                    //释放多余连接
                    $pool[$i]->close();
                }
            }
        });
    }
    protected function createConnection(): \Redis
    {
        $redis  = new \Redis();
        $redis->connect($this->config['host'], $this->config['port']);
        $this->config['password'] && $redis->auth($this->config['password']);
        $redis->select($this->config['database'] ?? 0);
        $redis->setOption(\Redis::OPT_PREFIX, $this->config['prefix'] ?? "");
        return $redis;
    }
    public function getConnection(): \Redis
    {
        //提高容错,防止其他代码中断导致连接没有归还,新的连接没有连接可用
        return $this->pool->pop($this->wait_timeout) ?: $this->createConnection();
    }

    public function releaseConnection(\Redis $redis): void
    {
        $redis->discard();
        $this->pool->push($redis);
    }
}
//使用方式
 $redis = RedisPool::get();//取一个连接实例
 $add_limiter = $redis->get("xxxx");
 RedisPool::put($redis);//归还连接
53 0 1
0个评论

speedy

140
积分
0
获赞数
0
粉丝数
2024-11-06 加入
🔝