可能很多人使用webman的command模块的时候发现command没办法使用event-loop的能力,很多workerman的特性在command中没办法进行友好的使用,比如AsyncTcpConncetion或者其他的一定需要运行在event-loop的功能;
我会在本分享中提供一种思路,简单的一点代码即可实现上述的功能:
Worker类实现一个执行一次便结束进程的方法,我们命名为once<?php
declare(strict_types=1);
namespace YOUR_NAMESAPCE;
class Worker extends \Workerman\Worker
{
/**
* @param string|null $socketName
* @param array $socketContext
*/
public function __construct(?string $socketName = null, array $socketContext = [])
{
if (!extension_loaded('posix')) {
throw new \RuntimeException('Please install posix extension.');
}
if (!extension_loaded('pcntl')) {
throw new \RuntimeException('Please install pcntl extension.');
}
parent::__construct($socketName, $socketContext);
}
/**
* 运行一个一次性的进程
*
* @param string $name
* @param callable $run
* @param callable|null $stop
* @return void
*/
public function once(string $name, callable $run, ?callable $stop = null): void
{
$this->name = $name;
$this->count = 1;
$this->onWorkerStart = function () use ($run) {
try {
$run();
} finally {
static::stopAll();
// 向主进程发送信号,结束进程
\posix_kill(\posix_getppid(), \SIGINT);
}
};
$this->onWorkerStop = $stop;
if (!$this->name) {
throw new \RuntimeException('Please set the name of the worker process through the name property.');
}
try {
static::checkSapiEnv();
static::initStdOut();
static::init();
static::checkPortAvailable();
static::lock();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::lock(LOCK_UN);
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
} catch (\Throwable $e) {
static::log($e);
}
}
}
cli脚本的AbstractCommand类<?php
declare(strict_types=1);
namespace YOUR_NAMESAPCE;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\ExceptionInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
abstract class AbstractCommand extends Command
{
/**
* 循环类,null为普通cli程序
*
* @return string|null
*/
abstract protected function eventClass(): ?string;
/**
* info
*
* @param OutputInterface $output
* @param string $message
* @return void
*/
protected function info(OutputInterface $output, string $message): void
{
$output->writeln("ℹ️ $message");
}
/**
* error
*
* @param OutputInterface $output
* @param string $message
* @return int
*/
protected function error(OutputInterface $output, string $message): int
{
$output->writeln("❌ $message");
return self::FAILURE;
}
/**
* success
*
* @param OutputInterface $output
* @param string $message
* @return int
*/
protected function success(OutputInterface $output, string $message): int
{
$output->writeln("✅ $message");
return self::SUCCESS;
}
/**
* @override
*
* @param InputInterface $input
* @param OutputInterface $output
* @return int
* @throws ExceptionInterface
*/
public function run(InputInterface $input, OutputInterface $output): int
{
if ($event = $this->eventClass()) {
$result = 0;
$name = strtolower(str_replace('\\','_', trim(get_called_class(), '\\')));
$worker = new Worker();
$worker::$pidFile = runtime_path() . "$name.pid";
$worker::$logFile = runtime_path() . "/logs/$name.log";
$worker->eventLoop = $event;
$worker->once($name, function () use ($input, $output, &$result) {
return $result = parent::run($input, $output);
});
return $result;
}
return parent::run($input, $output);
}
}
command,实现eventClass,指定其使用的event-loop<?php
declare(strict_types=1);
namespace app\command\Debug;
use app\packages\face\builders\GetFaceEmbeddingBuilder;
use app\packages\face\workflow\FacesEmbeddingLog;
use YOUR_NAMESAPCE\AbstractCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\Events\Fiber;
use function Workbunny\WebmanRabbitMQ\publish;
class Test extends AbstractCommand
{
protected static $defaultName = 'debug:test';
protected static $defaultDescription = 'DEBUG';
/** @inheritDoc */
protected function eventClass(): ?string
{
return Fiber::class;
}
/** @inheritDoc */
protected function configure()
{
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$body = [
'image_url' => $url = 'http://www.xxx.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png', // 图片地址
'user_id' => $userId = -1, // 用户id
'user_type' => $type = "test",
'prob_threshold' => 0.95, // 获取判定阈值
'workflow' => [
// 人脸特征数据缓存存储/读取
['class' => FacesEmbeddingLog::class, 'params' => [
'userId' => $userId, // 用户id
'userType' => $type,
'imageUrl' => $url, // 图片地址
'probThreshold' => 0.01, // 储存/查询阈值
]]
]
];
// workbunny/rabbitmq依赖AsyncTcpConnection,一定要运行在event-loop中
publish(new GetFaceEmbeddingBuilder(), GetFaceEmbeddingBuilder::encode($body));
return $this->success($output, 'success');
}
}
这时候运行php webman debug:test便能正常运行在event-loop中
root@ef2d58e20604:/var/www/face# php webman debug:test
✅ success
Workerman[webman] received signal SIGINT
Workerman[webman] stopping
Workerman[webman] has been stopped