Nacos在我司的应用及SOA初尝

chaz6chez

前言

我目前所在的部门主要是负责公司的数据相关的内容,可以理解为数据统计,做的工作其实也比较复杂,除了做一些数据统计分析业务之外,需要做一些基础服务的开发;我部门因为内部开发语言并不统一,在这种情况下,项目被动的分成了A\B\C\D等子项目,并没有将项目合并到一个项目中开发,在这种过程中,被动的接受了SOA这样的结构。

A项目是一个任务的调度分配服务,可以理解为一个大型的脚本/定时执行器,有点类似与现在比较流行的serverless函数服务,向A项目中添加一个任务函数或者执行脚本,他就会在合适的时候被触发;由于硬件服务器并不止有一台,数据库也并不只有一台,结合现在容器化思路,这样的配置需要很多,如果仅仅是写在配置文件中,并不能方便运维的统一快速方便的管理,所以我们计划做一个配置中心;因为我们的A\B\C\D等子项目也并不只有一个实例,他们各自是可以横向拓展的主体;在这样的前提下,我们决定引入Nacos/consul等包含了配置管理的服务注册/发现服务(Nacos/consul都是优秀的服务注册/发现服务,选用Nacos是一些额外因素,他们各自有优缺点)。

PHP

PHP在SOA中扮演了Web业务服务的一个角色,主要是进行一些业务接口的输出,但是我们的服务由于需要高承载量,原本计划是使用自研的Reactor模型的NIO框架,但是考虑到减少心智负担,所以选用了文档及社群更完善Webman作为开发框架。

接入Nacos

最初我是使用了 Tinywan/nacos 的插件进行的业务开发,但是我们在使用过程中发现,他的配置监听项是通过Timer创建一个nacos->config->get请求实现的,在Timer间隔期内可能变更了配置,也就是极限状况下存在{Timer interval}的同步延迟,这样并不符合我司的具体情况,我们要求的服务变更可能需要更迅速,因为在一些业务点我们不能有过多时长的错误及业务不通畅,但如果仅仅是将{timer interval}的值缩小至ms,那么又会存在对Nacos服务的过多请求;另外由于我们的业务已经写了有一段时间了,累积了大量的config()调用方式,这时候我们需要考虑怎么样非侵入的改变这一习惯或者着一些代码,于是,我基于 Tinywan/nacos 的思路封装了适合我们的Nacos客户端插件 Workbunny/webman-nacos;

Workbunny/webman-nacos

1. 配置监听

配置监听部分我们需要完成以下三个要求

  • 时效性
  • 触达深度
  • 0侵入

我们在配置中使用yaml文件作为了环境配置替代了原有的.env文件,并且将yaml文件保存在nacos对应的namespace;相当于业务使用config函数的时候,config函数会找到config目录下对应的php文件,PHP文件中又使用yaml函数去调用对应的yaml文件引入对应的值,调用链可以理解为如下:

config() -> /config/X.php -> yaml() -> /x.yaml

这个过程完全可以简化成config()直接找到config目录的对应php文件,将多个php文件保存至nacos对应的namespace下。

基于上述的过程,我最早使用了Timer + Guzzle异步请求 + nacos长轮询监听 保证 时效性,因为存在多个yaml文件,所以需要对多个yaml文件进行监听,如果单纯一个配置开一个进程有点太奢侈,所以我使用了一个进程 + Guzzle异步请求;Nacos监听的长轮询机制你可以理解为如果有消息,就马上返回对应的配置id,如果没消息,就一直阻塞到timeout并且返回一个空字符串;考虑到请求会阻塞,为了不影响该进程内Timer的下一个执行周期,我将Timer的间隔时长和长轮询阻塞时长画上了等号。

    public function onWorkerStart(Worker $worker)
    {
        $worker->count = 1;

        if($this->configListeners){
            // 拉取配置项文件
            foreach ($this->configListeners as $listener){
                list($dataId, $group, $tenant, $configPath) = $listener;
                if(!file_exists($configPath)){
                    $this->_get($dataId, $group, $tenant, $configPath);
                }
            }
            // 创建定时监听
            Timer::add($this->longPullingInterval, function (){
                $promises = [];
                foreach ($this->configListeners as $listener){
                    list($dataId, $group, $tenant, $configPath) = $listener;
                    # 初始化文件
                    if(file_exists($configPath)){
                        $promises[] = $this->client->config->listenerAsync(
                            $dataId,
                            $group,
                            md5(file_get_contents($configPath)),
                            $tenant,
                            $this->longPullingInterval * 1000
                        )->then(function (ResponseInterface $response) use($dataId, $group, $tenant, $configPath){
                            if($response->getStatusCode() === 200){
                                if($response->getBody()->getContents() !== ''){
                                    # 文件通过nacos get并覆盖写入本地文件
                                    $this->_get($dataId, $group, $tenant, $configPath);
                                }
                            }
                        },function (GuzzleException $exception){
                            Log::channel('error')->error($exception->getMessage(), $exception->getTrace());
                        });
                    }
                }
                if($promises){
                    Utils::settle($promises)->wait();
                }
            });
        }
    }

第一版完成后我发现了一些问题:

  1. 比如config()中已经获取的配置无法刷新,常驻内存的一些数据库连接等没有被触达
  2. Timer + Guzzle异步请求实际上在Timer的执行周期内是阻塞的,只是Guzzle在对多个请求可以并发的发起
  3. Timer的第一次执行并不能立即执行,导致初次启动时并不能及时获取最新的配置文件

为了解决第一个问题,我在_get方法内加入了对workers的reload

    protected function _get(string $dataId, string $group, string $tenant, string $path)
    {
        $res = $this->client->config->get($dataId, $group, $tenant);
        if(file_put_contents($path, $res, LOCK_EX)){
            reload($path);
        }
    }
    function reload(string $file)
    {
        Worker::log($file . ' update and reload. ');
        if(extension_loaded('posix') and extension_loaded('pcntl')){
            posix_kill(posix_getppid(), SIGUSR1);
        }else{
            Worker::reloadAllWorkers();
        }
    }

第二个问题我使用了Workerman/http-client的异步http客户端,在使用的过程中还有个 小插曲 ,由于http-client使用了workerman的event-loop,我的项目是在workerman的on回调生命周期内,所以可以利用event-loop达到无阻塞的请求;

    public function onWorkerStart(Worker $worker)
    {
        $worker->count = 1;

        if($this->configListeners){
            // 拉取配置项文件
            foreach ($this->configListeners as $listener){
                list($dataId, $group, $tenant, $configPath) = $listener;
                if(!file_exists($configPath)){
                    $this->_get($dataId, $group, $tenant, $configPath);
                }
                $this->timers[$dataId] = Timer::add($this->longPullingInterval,
                    function () use($dataId, $group, $tenant, $configPath){
                        $this->client->config->listenerAsyncUseEventLoop([
                                'dataId' => $dataId,
                                'group' => $group,
                                'contentMD5' => md5(file_get_contents($configPath)),
                                'tenant' => $tenant
                        ], function (Response $response) use($dataId, $group, $tenant, $configPath){
                            if($response->getStatusCode() === 200){
                                if((string)$response->getBody() !== ''){
                                    $this->_get($dataId, $group, $tenant, $configPath);
                                }
                            }
                        }, function (\Exception $exception){
                            Log::channel('error')->error($exception->getMessage(), $exception->getTrace());
                        });
                });
            }
        }
    }

第三个问题,我基于workerman/timer封装了一个简易的能达到我目的的timer:

<?php
declare(strict_types=1);

namespace Workbunny\WebmanNacos;

use Workerman\Timer as WorkermanTimer;

/**
 * 定时器
 *
 * @desc 对workerman/timer的封装
 * 1.延迟单此执行
 * 2.立即单次执行
 * 3.延迟循环执行
 *      - 延迟与循环时间不同
 *      - 延迟与循环间隔相同
 * 4.立即循环执行
 * @author chaz6chez
 */
final class Timer {

    /** @var array[] 子定时器 */
    protected static array $_timers = [];

    /**
     * 新增定时器
     * @param float $delay
     * @param float $repeat
     * @param callable $callback
     * @param ...$args
     * @return int|bool
     */
    public static function add(float $delay, float $repeat, callable $callback, ... $args)
    {
        switch (true){
            # 立即循环
            case ($delay === 0.0 and $repeat !== 0.0):
                $callback(...$args);
                return WorkermanTimer::add($repeat, $callback, $args);

            # 延迟执行一次
            case ($delay !== 0.0 and $repeat === 0.0):
                return WorkermanTimer::add($delay, $callback, $args, false);

            # 延迟循环执行,延迟与重复相同
            case ($delay !== 0.0 and $repeat !== 0.0 and $repeat === $delay):
                return WorkermanTimer::add($delay, $callback, $args);

            # 延迟循环执行,延迟与重复不同
            case ($delay !== 0.0 and $repeat !== 0.0 and $repeat !== $delay):
                return $id = WorkermanTimer::add($delay, function(...$args) use(&$id, $repeat, $callback){
                    $callback(...$args);
                    self::$_timers[$id] = WorkermanTimer::add($repeat, $callback, $args);
                }, $args, false);

            # 立即执行
            default:
                $callback(...$args);
                return 0;
        }
    }

    /**
     * 移除定时器
     * @param int $id
     * @return void
     */
    public static function del(int $id): void
    {
        if(
            $id !== 0 and
            isset(self::$_timers[$id]) and
            is_int($timerId = self::$_timers[$id])
        ){
            unset(self::$_timers[$id]);
            WorkermanTimer::del($timerId);
        }
    }

    /**
     * @return void
     */
    public static function delAll(): void
    {
        self::$_timers = [];
        WorkermanTimer::delAll();
    }
}

之后想到什么再补充吧

1365 4 2
4个评论

Tinywan

感谢分享

没茅台

为什么没人留言 因为太高深了

  • chaz6chez 9天前

    这个就是简单的配置中心和服务中心的使用过程和其客户端封装的过程

  • Tinywan 9天前

    用的人少!

Tinywan

为了更好的发展,我也迁过来,一起维护,不然搞两套插件不太好!

🔰🔰🔰 为了方便维护和使用,推荐大家使用最新版的Nacos插件 https://www.workerman.net/plugin/50 我也会以后积极参与这个仓库的贡献。

Maoxp

很好的分享

  • 暂无评论

chaz6chez

704
积分
0
获赞数
0
粉丝数
2018-11-16 加入
🔝