这几天我在想如何在Webman框架中使用LaravelORM并支持协程。将两者结合起来,理论上可以兼顾高并发与开发效率。
在Webman中集成LaravelORM协程版,并验证其性能和兼容性。
环境配置
类
Illuminate\Database\Connection
public function select($query, $bindings = [], $useReadPdo = true)
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
// For select statements, we'll simply execute the query and return an array
// of the database result set. Each element in the array will be a single
// row from the database table, and will either be an array or objects.
$statement = $this->prepared(
$this->getPdoForSelect($useReadPdo)->prepare($query)
);
$this->bindValues($statement, $this->prepareBindings($bindings));
$statement->execute();
return $statement->fetchAll();
});
}
以select方法为例可以看到上述类中,Laravel将所有对\PDO的操作都封装在了Connection中
并提供了ConnectionInterface
的抽象接口,这意味着如果实现了这个接口,就可以无缝的替换掉PDO逻辑
我选用了AMPHP的MySQL客户端库amphp/mysql
来实现这个接口
<?php declare(strict_types=1);
use Amp\Mysql\MysqlConfig;
use Amp\Mysql\MysqlConnectionPool;
use Amp\Mysql\MysqlTransaction;
use Closure;
use Exception;
use Fiber;
use Generator;
use Illuminate\Database\MySqlConnection;
use Throwable;
use function boolval;
use function in_array;
use function spl_object_hash;
use function trim;
class PConnection extends MySqlConnection
{
private const ALLOW_OPTIONS = [
'host',
'port',
'user',
'password',
'db',
'charset',
'collate',
'compression',
'local-infile',
'username',
'database'
];
/*** @var MysqlConnectionPool */
private MysqlConnectionPool $pool;
/**
* @param $pdo
* @param string $database
* @param string $tablePrefix
* @param array $config
*/
public function __construct($pdo, string $database = '', string $tablePrefix = '', array $config = [])
{
parent::__construct($pdo, $database, $tablePrefix, $config);
$dsn = '';
foreach ($config as $key => $value) {
if (in_array($key, static::ALLOW_OPTIONS, true)) {
if (!$value) {
continue;
}
$key = match ($key) {
'username' => 'user',
'database' => 'db',
default => $key
};
$dsn .= "{$key}={$value} ";
}
}
$config = MysqlConfig::fromString(trim($dsn));
$this->pool = new MysqlConnectionPool($config);
// if (isset($this->pdo)) {
// unset($this->pdo);
// }
}
/**
* @return void
*/
public function beginTransaction(): void
{
$transaction = $this->pool->beginTransaction();
;
if ($fiber = Fiber::getCurrent()) {
$this->fiber2transaction[spl_object_hash($fiber)] = $transaction;
} else {
$this->fiber2transaction['main'] = $transaction;
}
}
/**
* @return void
* @throws Exception
*/
public function commit(): void
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
throw new Exception('Transaction not found');
}
$transaction->commit();
unset($this->fiber2transaction[$key]);
}
/**
* @param $toLevel
* @return void
* @throws Exception
*/
public function rollBack($toLevel = null): void
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
throw new Exception('Transaction not found');
}
$transaction->rollback();
unset($this->fiber2transaction[$key]);
}
/**
* @var MysqlTransaction[]
*/
private array $fiber2transaction = [];
/**
* @param Closure $callback
* @param int $attempts
* @return mixed
* @throws Throwable
*/
public function transaction(Closure $callback, $attempts = 1): mixed
{
$this->beginTransaction();
try {
$result = $callback($this->getTransaction());
$this->commit();
return $result;
} catch (Throwable $e) {
$this->rollBack();
throw $e;
}
}
/**
* @return MysqlTransaction|null
*/
private function getTransaction(): MysqlTransaction|null
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
return null;
}
return $transaction;
}
/**
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return array
*/
public function select($query, $bindings = [], $useReadPdo = true): mixed
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
$statement = $this->pool->prepare($query);
return $statement->execute($this->prepareBindings($bindings));
});
}
/**
* @param string $query
* @param array $bindings
* @return bool
*/
public function statement($query, $bindings = []): bool
{
return $this->run($query, $bindings, function ($query, $bindings) {
if ($this->pretending()) {
return [];
}
$statement = $this->getTransaction()?->prepare($query) ?? $this->pool->prepare($query);
return boolval($statement->execute($this->prepareBindings($bindings)));
});
}
/**
* 针对数据库运行 select 语句并返回所有结果集。
*
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return array
*/
public function selectResultSets($query, $bindings = [], $useReadPdo = true): array
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
$statement = $this->pool->prepare($query);
$result = $statement->execute($this->prepareBindings($bindings));
$sets = [];
while ($result = $result->getNextResult()) {
$sets[] = $result;
}
return $sets;
});
}
/**
* 针对数据库运行 select 语句并返回一个生成器。
*
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return Generator
*/
public function cursor($query, $bindings = [], $useReadPdo = true): Generator
{
while ($record = $this->select($query, $bindings, $useReadPdo)) {
yield $record;
}
}
/**
* 运行 SQL 语句并获取受影响的行数。
*
* @param string $query
* @param array $bindings
* @return int
*/
public function affectingStatement($query, $bindings = []): int
{
return $this->run($query, $bindings, function ($query, $bindings) {
if ($this->pretending()) {
return 0;
}
// 对于更新或删除语句,我们想要获取受影响的行数
// 通过该语句并将其返回给开发人员。我们首先需要
// 执行该语句,然后我们将使用 PDO 来获取受影响的内容。
$statement = $this->pool->prepare($query);
$result = $statement->execute($this->prepareBindings($bindings));
$this->recordsHaveBeenModified(
($count = $result->getRowCount()) > 0
);
return $count;
});
}
/**
* @return void
*/
public function reconnect()
{
//TODO: 无事可做
}
/**
* @return void
*/
public function reconnectIfMissingConnection()
{
//TODO: 无事可做
}
}
实现了Connection之后我们还有Hook住DatabaseManager的工厂方法`
return new class ($app) extends ConnectionFactory {
/**
* Create a new connection instance.
*
* @param string $driver
* @param PDO|Closure $connection
* @param string $database
* @param string $prefix
* @param array $config
* @return SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
*
*/
protected function createConnection($driver, $connection, $database, $prefix = '', array $config = []): SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
{
return match ($driver) {
'mysql' => new PConnection($connection, $database, $prefix, $config),
'mariadb' => new MariaDbConnection($connection, $database, $prefix, $config),
'pgsql' => new PostgresConnection($connection, $database, $prefix, $config),
'sqlite' => new SQLiteConnection($connection, $database, $prefix, $config),
'sqlsrv' => new SqlServerConnection($connection, $database, $prefix, $config),
default => throw new InvalidArgumentException("Unsupported driver [{$driver}]."),
};
}
}
为了验证上述无缝耦合的最终效果,我准备将它安装到Webman
我封装了一个
Database
类,用于Hook住Laravel的DatabaseManager
use Illuminate\Container\Container;
use Illuminate\Database\Capsule\Manager;
use Illuminate\Database\DatabaseManager;
use Illuminate\Events\Dispatcher;
use Illuminate\Pagination\Cursor;
use Illuminate\Pagination\CursorPaginator;
use Illuminate\Pagination\Paginator;
use Psc\Drive\Laravel\Coroutine\Database\Factory;
use function class_exists;
use function config;
use function get_class;
use function method_exists;
use function request;
class Database extends Manager
{
/**
* @return void
*/
public static function install(): void
{
/**
* 判断是否安装Webman
*/
if (!class_exists(\support\Container::class)) {
return;
}
/**
* 判断是否曾被Hook
*/
if (isset(parent::$instance) && get_class(parent::$instance) === Database::class) {
return;
}
/**
* Hook webman LaravelDB
*/
$config = config('database', []);
$connections = $config['connections'] ?? [];
if (!$connections) {
return;
}
$app = Container::getInstance();
/**
* Hook数据库连接工厂
*/
$capsule = new Database($app);
$default = $config['default'] ?? false;
if ($default) {
$defaultConfig = $connections[$config['default']] ?? false;
if ($defaultConfig) {
$capsule->addConnection($defaultConfig);
}
}
foreach ($connections as $name => $config) {
$capsule->addConnection($config, $name);
}
if (class_exists(Dispatcher::class) && !$capsule->getEventDispatcher()) {
$capsule->setEventDispatcher(\support\Container::make(Dispatcher::class, [Container::getInstance()]));
}
// Set as global
$capsule->setAsGlobal();
$capsule->bootEloquent();
// Paginator
if (class_exists(Paginator::class)) {
if (method_exists(Paginator::class, 'queryStringResolver')) {
Paginator::queryStringResolver(function () {
$request = request();
return $request?->queryString();
});
}
Paginator::currentPathResolver(function () {
$request = request();
return $request ? $request->path() : '/';
});
Paginator::currentPageResolver(function ($pageName = 'page') {
$request = request();
if (!$request) {
return 1;
}
$page = (int)($request->input($pageName, 1));
return $page > 0 ? $page : 1;
});
if (class_exists(CursorPaginator::class)) {
CursorPaginator::currentCursorResolver(function ($cursorName = 'cursor') {
return Cursor::fromEncoded(request()->input($cursorName));
});
}
}
parent::$instance = $capsule;
}
/**
* Hook Factory
* @return void
*/
protected function setupManager(): void
{
$factory = new Factory($this->container);
$this->manager = new DatabaseManager($this->container, $factory);
}
}
为了更直观的展现协程的效果,我将webman-worker数量改为了1,并且在每次请求中都会进行数据库查询
/**
* @param Request $request
* @return string
*/
public function index(Request $request): string
{
// 手动Hook调DatabaseManager
Database::install();
// 记录执行时间
$startTime = microtime(true);
// 模拟一个耗时1s的查询
$result = Db::statement('SELECT SLEEP(1);');
// 记录结束时间
$endTime = microtime(true);
// 输出结果
return "{$startTime} - {$endTime}";
}
<?php declare(strict_types=1);
namespace Tests;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Psc\Plugins\Guzzle\PHandler;
use function P\async;
use function P\tick;
class CoroutineTest extends TestCase
{
public function test_main(): void
{
$client = new Client(['handler' => new PHandler(['pool'=>0])]);
for ($i = 0; $i < 100; $i++) {
async(function () use ($client, $i) {
$response = $client->get('http://127.0.0.1:8787/');
$responseContent = $response->getBody()->getContents();
echo "Request $i: $responseContent\n";
});
}
tick();
$this->assertEquals(1, 1);
}
}
Request 0: 1723015194.3121 - 1723015195.4183
Request 1: 1723015194.3389 - 1723015195.4193
Request 2: 1723015194.339 - 1723015195.4196
Request 3: 1723015194.3391 - 1723015195.4187
Request 4: 1723015194.3391 - 1723015195.4198
Request 5: 1723015194.3392 - 1723015195.42
Request 6: 1723015194.3393 - 1723015195.4202
Request 7: 1723015194.3394 - 1723015195.4204
Request 8: 1723015194.3394 - 1723015195.4588
Request 9: 1723015194.3395 - 1723015195.4595
Request 10: 1723015194.3395 - 1723015195.4626
Request 11: 1723015194.3396 - 1723015195.4633
Request 12: 1723015194.3397 - 1723015195.4653
Request 13: 1723015194.3398 - 1723015195.4658
Request 14: 1723015194.3398 - 1723015195.4688
Request 15: 1723015194.3399 - 1723015195.4726
Request 16: 1723015194.34 - 1723015195.4735
Request 17: 1723015194.34 - 1723015195.4774
Request 18: 1723015194.3401 - 1723015195.48
Request 19: 1723015194.3402 - 1723015195.4805
Request 20: 1723015194.3402 - 1723015195.4816
Request 21: 1723015194.3403 - 1723015195.4818
Request 22: 1723015194.3404 - 1723015195.4862
Request 23: 1723015194.3404 - 1723015195.4911
Request 24: 1723015194.3405 - 1723015195.4915
Request 25: 1723015194.3406 - 1723015195.4917
Request 26: 1723015194.3406 - 1723015195.4919
Request 27: 1723015194.3408 - 1723015195.4921
Request 28: 1723015194.3408 - 1723015195.4923
Request 29: 1723015194.3409 - 1723015195.4925
Request 30: 1723015194.3409 - 1723015195.4933
Request 31: 1723015194.341 - 1723015195.4935
Request 32: 1723015194.341 - 1723015195.4936
Request 33: 1723015194.3411 - 1723015195.4938
Request 34: 1723015194.3412 - 1723015195.494
Request 35: 1723015194.3412 - 1723015195.4941
Request 36: 1723015194.3413 - 1723015195.4943
Request 37: 1723015194.3414 - 1723015195.4944
Request 38: 1723015194.3414 - 1723015195.4946
Request 39: 1723015194.3416 - 1723015195.4947
Request 40: 1723015194.3417 - 1723015195.4949
Request 41: 1723015194.3418 - 1723015195.495
Request 42: 1723015194.342 - 1723015195.5174
Request 43: 1723015194.3421 - 1723015195.518
Request 44: 1723015194.3423 - 1723015195.5184
Request 45: 1723015194.3425 - 1723015195.5191
Request 46: 1723015194.3426 - 1723015195.5194
Request 48: 1723015194.3429 - 1723015195.5215
Request 50: 1723015194.3433 - 1723015195.5219
Request 51: 1723015194.3435 - 1723015195.5221
Request 47: 1723015194.3428 - 1723015195.5225
Request 49: 1723015194.3431 - 1723015195.523
Request 52: 1723015194.3436 - 1723015195.5265
Request 53: 1723015194.3437 - 1723015195.5268
Request 54: 1723015194.3439 - 1723015195.527
Request 55: 1723015194.344 - 1723015195.5275
Request 56: 1723015194.3443 - 1723015195.5282
Request 57: 1723015194.3444 - 1723015195.5314
Request 58: 1723015194.3445 - 1723015195.5316
Request 59: 1723015194.3445 - 1723015195.5318
Request 60: 1723015194.3446 - 1723015195.5323
Request 61: 1723015194.3448 - 1723015195.5324
Request 62: 1723015194.3449 - 1723015195.5326
Request 63: 1723015194.345 - 1723015195.5328
Request 64: 1723015194.3451 - 1723015195.533
Request 65: 1723015194.3453 - 1723015195.5331
Request 66: 1723015194.3455 - 1723015195.5437
Request 67: 1723015194.3456 - 1723015195.5441
Request 69: 1723015194.3458 - 1723015195.5443
Request 70: 1723015194.3459 - 1723015195.5445
Request 71: 1723015194.346 - 1723015195.5448
Request 72: 1723015194.3464 - 1723015195.5451
Request 68: 1723015194.3457 - 1723015195.5456
Request 73: 1723015194.3471 - 1723015195.5508
Request 74: 1723015194.3475 - 1723015195.551
Request 75: 1723015194.3478 - 1723015195.5512
Request 76: 1723015194.3482 - 1723015195.5516
Request 77: 1723015194.3486 - 1723015195.5518
Request 78: 1723015194.3489 - 1723015195.5542
Request 79: 1723015194.3491 - 1723015195.5545
Request 80: 1723015194.3492 - 1723015195.5549
Request 81: 1723015194.3493 - 1723015195.5605
Request 82: 1723015194.3493 - 1723015195.561
Request 83: 1723015194.3494 - 1723015195.5633
Request 84: 1723015194.3494 - 1723015195.5638
Request 85: 1723015194.3495 - 1723015195.5641
Request 86: 1723015194.3496 - 1723015195.5661
Request 87: 1723015194.3496 - 1723015195.5678
Request 88: 1723015194.3497 - 1723015195.5681
Request 89: 1723015194.3499 - 1723015195.5684
Request 90: 1723015194.35 - 1723015195.5685
Request 91: 1723015194.3501 - 1723015195.5756
Request 92: 1723015194.3502 - 1723015195.5758
Request 93: 1723015194.3504 - 1723015195.576
Request 94: 1723015194.3505 - 1723015195.5768
Request 95: 1723015194.3506 - 1723015195.577
Request 96: 1723015194.3508 - 1723015195.5772
Request 97: 1723015194.3509 - 1723015195.5774
Request 98: 1723015194.3509 - 1723015195.5777
Request 99: 1723015194.351 - 1723015195.5781
通过上述实践,我们可以看到LaravelORM与Webman中使用协程是非常具有可行性的,
并且在高并发&慢查询场景下,协程的优势也在此得到了充分的体现
牛皮
有没有ThinkPHP的?
暂无
那真是太可惜了···
你是真的牛批啊,代码是不是没贴全,有点没看懂,能否封装成composer扩展呢
在实际项目中,如何使用,比如大量数据的导出,希望在查询阶段消耗较少的时间;有个定时任务,做数据处理和同步,这个能否有实际使用价值
考虑到测试不够全面还没封装
好嘞,等你正式版发布
好!期待composer发布
后面部分,laravel 提供的这个方法就可以增加新的数据库驱动了
\Illuminate\Database\Connection::resolverFor
感谢感谢
太牛了,必须顶一下,感觉webman+pripple已经无敌了
大佬,好好整整 这个composer如果出了 你准能上名人榜 太给力了 很牛
666