为webman的command赋能使用event-loop

chaz6chez

可能很多人使用webmancommand模块的时候发现command没办法使用event-loop的能力,很多workerman的特性在command中没办法进行友好的使用,比如AsyncTcpConncetion或者其他的一定需要运行在event-loop的功能;

我会在本分享中提供一种思路,简单的一点代码即可实现上述的功能:

  1. Worker类实现一个执行一次便结束进程的方法,我们命名为once
<?php
declare(strict_types=1);

namespace YOUR_NAMESAPCE;

class Worker extends \Workerman\Worker
{

    /**
     * @param string|null $socketName
     * @param array $socketContext
     */
    public function __construct(?string $socketName = null, array $socketContext = [])
    {
        if (!extension_loaded('posix')) {
            throw new \RuntimeException('Please install posix extension.');
        }
        if (!extension_loaded('pcntl')) {
            throw new \RuntimeException('Please install pcntl extension.');
        }
        parent::__construct($socketName, $socketContext);
    }

    /**
     * 运行一个一次性的进程
     *
     * @param string $name
     * @param callable $run
     * @param callable|null $stop
     * @return void
     */
    public function once(string $name, callable $run, ?callable $stop = null): void
    {
        $this->name = $name;
        $this->count = 1;
        $this->onWorkerStart = function () use ($run) {
            try {
                $run();
            } finally {
                static::stopAll();
                // 向主进程发送信号,结束进程
                \posix_kill(\posix_getppid(), \SIGINT);
            }
        };
        $this->onWorkerStop = $stop;
        if (!$this->name) {
            throw new \RuntimeException('Please set the name of the worker process through the name property.');
        }
        try {
            static::checkSapiEnv();
            static::initStdOut();
            static::init();
            static::checkPortAvailable();
            static::lock();
            static::daemonize();
            static::initWorkers();
            static::installSignal();
            static::saveMasterPid();
            static::lock(LOCK_UN);
            static::forkWorkers();
            static::resetStd();
            static::monitorWorkers();
        } catch (\Throwable $e) {
            static::log($e);
        }
    }
}
  1. 实现一个兼容普通cli脚本的AbstractCommand
<?php
declare(strict_types=1);

namespace YOUR_NAMESAPCE;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\ExceptionInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

abstract class AbstractCommand extends Command
{
    /**
     * 循环类,null为普通cli程序
     *
     * @return string|null
     */
    abstract protected function eventClass(): ?string;

    /**
     * info
     *
     * @param OutputInterface $output
     * @param string $message
     * @return void
     */
    protected function info(OutputInterface $output, string $message): void
    {
        $output->writeln("ℹ️ $message");
    }

    /**
     * error
     *
     * @param OutputInterface $output
     * @param string $message
     * @return int
     */
    protected function error(OutputInterface $output, string $message): int
    {
        $output->writeln("❌ $message");

        return self::FAILURE;
    }

    /**
     * success
     *
     * @param OutputInterface $output
     * @param string $message
     * @return int
     */
    protected function success(OutputInterface $output, string $message): int
    {
        $output->writeln("✅ $message");

        return self::SUCCESS;
    }

    /**
     * @override
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     * @throws ExceptionInterface
     */
    public function run(InputInterface $input, OutputInterface $output): int
    {
        if ($event = $this->eventClass()) {
            $result = 0;

            $name = strtolower(str_replace('\\','_', trim(get_called_class(), '\\')));
            $worker = new Worker();
            $worker::$pidFile = runtime_path() . "$name.pid";
            $worker::$logFile = runtime_path() . "/logs/$name.log";
            $worker->eventLoop = $event;
            $worker->once($name, function () use ($input, $output, &$result) {
                    return $result = parent::run($input, $output);
                });

            return $result;
        }

        return parent::run($input, $output);
    }
}
  1. 新建一个command,实现eventClass,指定其使用的event-loop
<?php

declare(strict_types=1);

namespace app\command\Debug;

use app\packages\face\builders\GetFaceEmbeddingBuilder;
use app\packages\face\workflow\FacesEmbeddingLog;
use YOUR_NAMESAPCE\AbstractCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Workerman\Events\Fiber;
use function Workbunny\WebmanRabbitMQ\publish;

class Test extends AbstractCommand
{
    protected static $defaultName = 'debug:test';
    protected static $defaultDescription = 'DEBUG';

    /** @inheritDoc */
    protected function eventClass(): ?string
    {
        return Fiber::class;
    }

    /** @inheritDoc */
    protected function configure()
    {
    }

    /**
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     */
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $body = [
            'image_url' => $url = 'http://www.xxx.com/img/PCtm_d9c8750bed0b3c7d089fa7d55720d6cf.png', // 图片地址
            'user_id'   => $userId = -1, // 用户id
            'user_type' => $type = "test",
            'prob_threshold' => 0.95, // 获取判定阈值
            'workflow'  => [
                // 人脸特征数据缓存存储/读取
                ['class' => FacesEmbeddingLog::class, 'params' => [
                    'userId'    => $userId, // 用户id
                    'userType'  => $type,
                    'imageUrl'  => $url, // 图片地址
                    'probThreshold' => 0.01, // 储存/查询阈值
                ]]
            ]
        ];
        // workbunny/rabbitmq依赖AsyncTcpConnection,一定要运行在event-loop中
        publish(new GetFaceEmbeddingBuilder(), GetFaceEmbeddingBuilder::encode($body));
        return $this->success($output, 'success');
    }
}

这时候运行php webman debug:test便能正常运行在event-loop

root@ef2d58e20604:/var/www/face# php webman debug:test
✅ success
Workerman[webman] received signal SIGINT
Workerman[webman] stopping
Workerman[webman] has been stopped
103 0 1
0个评论

chaz6chez

5294
积分
0
获赞数
0
粉丝数
2018-11-16 加入
🔝