不知道大家有没有遇到因为耗时任务,连接池耗尽,无法处理后续请求的问题。
因为这个问题也是苦恼,文档虽然提供了一些连接池的案例,但是好像和业务不太匹配。也无法使用laravel orm。
所以自己写了一个连接池用例。
支持laravel orm操作
支持手动取还,快取快还,解决连接被占用,尽快释放出来给其他请求使用。
<?php
namespace app\tools;
use Illuminate\Container\Container;
use Illuminate\Database\Connection;
use Illuminate\Database\Connectors\ConnectionFactory;
use Swoole\Coroutine\Channel;
use Swoole\Timer;
class MysqlPool
{
protected static $instance = []; //不同配置不同连接池实例
protected static $lock;
public static function getInstance($connectionName = "mysql"): self
{
if (!isset(self::$instance[$connectionName])) {
if (self::$lock === null) {
self::$lock = new Channel(1); // 容量为1的锁通道
}
self::$lock->push(true); // 加锁
if (!isset(self::$instance[$connectionName])) {
self::$instance[$connectionName] = new self($connectionName);
self::$lock->close(); //关闭锁通道
}
}
return self::$instance[$connectionName];
}
public static function get($connectionName = "mysql")
{
return self::getInstance($connectionName)->getConnection();
}
public static function put(Connection $connection)
{
self::getInstance($connection->getName())->releaseConnection($connection);
}
protected $pool;
protected $config;
protected $connectionName;
protected $wait_timeout;
protected $heartbeat_interval;
protected $maxConnections;
private function __construct($connectionName)
{
$this->connectionName = $connectionName;
$this->config = config("database.connections.{$connectionName}");
$this->maxConnections = $this->config['pool']['max_connections'] ?? 5;
$this->wait_timeout = $this->config['pool']['wait_timeout'] ?? 3;
$this->heartbeat_interval = $this->config['pool']['heartbeat_interval'] ?? 30;
$this->pool = new Channel($this->maxConnections);
$this->initPool();
}
protected function initPool(): void
{
for ($i = 0; $i < $this->maxConnections; $i++) {
$this->pool->push($this->createConnection());
}
//异步心跳检测,重连检测
Timer::tick($this->heartbeat_interval * 1000, function () {
$pool = [];
while (!$this->pool->isEmpty()) {
$conn = $this->pool->pop(0.1);
try {
$conn->statement('SELECT 1');
$pool[] = $conn;
} catch (\Exception $e) {
$pool[] = $this->createConnection();
}
}
for ($i = 0; $i < $this->maxConnections; $i++) {
if ($i < $this->maxConnections) {
//中断导致没有归还,进行填充
$this->pool->push($pool[$i] ?? $this->createConnection());
} else {
//释放多余连接
$pool[$i]->disconnect();
}
}
});
}
protected function createConnection(): Connection
{
$container = new Container(); //起一个新的容器服务,防止单例复用
$factory = new ConnectionFactory($container); //该工厂是根据容器判断是否使用单例
$DB = $factory->make($this->config, $this->connectionName); //连接配置
return $DB;
}
public function getConnection(): Connection
{
//提高容错,防止其他代码中断导致连接没有归还,新的连接没有连接可用
return $this->pool->pop($this->wait_timeout) ?: $this->createConnection();
}
public function releaseConnection(Connection $connection): void
{
if ($connection->getPdo()->inTransaction()) {
$connection->rollBack();
}
$this->pool->push($connection);
}
}
//使用方式
$Db = MysqlPool::get(); //获取一个连接实例
$info = $Db->table("xxx")->where([
['id', '=', $item['xx']],
['name', '=', $item['xx']],
])->first();//orm操作
MysqlPool::put($Db);//归还连接实例
<?php
namespace app\tools;
use Swoole\Coroutine\Channel;
use Swoole\Timer;
class RedisPool
{
protected static $instance = []; //不同配置不同连接池实例
protected static $lock;
private static function getInstance($connectionName = "default"): self
{
if (!isset(self::$instance[$connectionName])) {
if (self::$lock === null) {
self::$lock = new Channel(1); // 容量为1的锁通道
}
self::$lock->push(true); // 加锁
if (!isset(self::$instance[$connectionName])) {
self::$instance[$connectionName] = new self($connectionName);
self::$lock->close(); //关闭锁通道
}
}
return self::$instance[$connectionName];
}
public static function get($connectionName = "default")
{
return self::getInstance($connectionName)->getConnection();
}
public static function put(\Redis $connection, $connectionName = "default")
{
self::getInstance($connectionName)->releaseConnection($connection);
}
protected $pool;
protected $config;
protected $connectionName;
protected $wait_timeout;
protected $heartbeat_interval;
protected $maxConnections;
private function __construct($connectionName)
{
$this->connectionName = $connectionName;
$this->config = config("redis.{$connectionName}");
$this->maxConnections = $this->config['pool']['max_connections'] ?? 5;
$this->wait_timeout = $this->config['pool']['wait_timeout'] ?? 3;
$this->heartbeat_interval = $this->config['pool']['heartbeat_interval'] ?? 30;
$this->pool = new Channel($this->maxConnections);
$this->initPool();
}
protected function initPool(): void
{
for ($i = 0; $i < $this->maxConnections; $i++) {
$this->pool->push($this->createConnection());
}
Timer::tick($this->heartbeat_interval * 1000, function () {
$pool = [];
while (!$this->pool->isEmpty()) {
$conn = $this->pool->pop(0.1);
try {
$conn->ping();
$pool[] = $conn;
} catch (\Exception $e) {
$pool[] = $this->createConnection();
}
}
for ($i = 0; $i < $this->maxConnections; $i++) {
if ($i < $this->maxConnections) {
//中断导致没有归还,进行填充
$this->pool->push($pool[$i] ?? $this->createConnection());
} else {
//释放多余连接
$pool[$i]->close();
}
}
});
}
protected function createConnection(): \Redis
{
$redis = new \Redis();
$redis->connect($this->config['host'], $this->config['port']);
$this->config['password'] && $redis->auth($this->config['password']);
$redis->select($this->config['database'] ?? 0);
$redis->setOption(\Redis::OPT_PREFIX, $this->config['prefix'] ?? "");
return $redis;
}
public function getConnection(): \Redis
{
//提高容错,防止其他代码中断导致连接没有归还,新的连接没有连接可用
return $this->pool->pop($this->wait_timeout) ?: $this->createConnection();
}
public function releaseConnection(\Redis $redis): void
{
$redis->discard();
$this->pool->push($redis);
}
}
//使用方式
$redis = RedisPool::get();//取一个连接实例
$add_limiter = $redis->get("xxxx");
RedisPool::put($redis);//归还连接