Nacos在我司的应用及SOA初尝

chaz6chez

前言

我目前所在的部门主要是负责公司的数据相关的内容,可以理解为数据统计,做的工作其实也比较复杂,除了做一些数据统计分析业务之外,需要做一些基础服务的开发;我部门因为内部开发语言并不统一,在这种情况下,项目被动的分成了A\B\C\D等子项目,并没有将项目合并到一个项目中开发,在这种过程中,被动的接受了SOA这样的结构。

A项目是一个任务的调度分配服务,可以理解为一个大型的脚本/定时执行器,有点类似与现在比较流行的serverless函数服务,向A项目中添加一个任务函数或者执行脚本,他就会在合适的时候被触发;由于硬件服务器并不止有一台,数据库也并不只有一台,结合现在容器化思路,这样的配置需要很多,如果仅仅是写在配置文件中,并不能方便运维的统一快速方便的管理,所以我们计划做一个配置中心;因为我们的A\B\C\D等子项目也并不只有一个实例,他们各自是可以横向拓展的主体;在这样的前提下,我们决定引入Nacos/consul等包含了配置管理的服务注册/发现服务(Nacos/consul都是优秀的服务注册/发现服务,选用Nacos是一些额外因素,他们各自有优缺点)。

PHP

PHP在SOA中扮演了Web业务服务的一个角色,主要是进行一些业务接口的输出,但是我们的服务由于需要高承载量,原本计划是使用自研的Reactor模型的NIO框架,但是考虑到减少心智负担,所以选用了文档及社群更完善Webman作为开发框架。

接入Nacos

最初我是使用了 Tinywan/nacos 的插件进行的业务开发,但是我们在使用过程中发现,他的配置监听项是通过Timer创建一个nacos->config->get请求实现的,在Timer间隔期内可能变更了配置,也就是极限状况下存在{Timer interval}的同步延迟,这样并不符合我司的具体情况,我们要求的服务变更可能需要更迅速,因为在一些业务点我们不能有过多时长的错误及业务不通畅,但如果仅仅是将{timer interval}的值缩小至ms,那么又会存在对Nacos服务的过多请求;另外由于我们的业务已经写了有一段时间了,累积了大量的config()调用方式,这时候我们需要考虑怎么样非侵入的改变这一习惯或者着一些代码,于是,我基于 Tinywan/nacos 的思路封装了适合我们的Nacos客户端插件 Workbunny/webman-nacos;

Workbunny/webman-nacos

1. 配置监听

配置监听部分我们需要完成以下三个要求

  • 时效性
  • 触达深度
  • 0侵入

我们在配置中使用yaml文件作为了环境配置替代了原有的.env文件,并且将yaml文件保存在nacos对应的namespace;相当于业务使用config函数的时候,config函数会找到config目录下对应的php文件,PHP文件中又使用yaml函数去调用对应的yaml文件引入对应的值,调用链可以理解为如下:

config() -> /config/X.php -> yaml() -> /x.yaml

这个过程完全可以简化成config()直接找到config目录的对应php文件,将多个php文件保存至nacos对应的namespace下。

基于上述的过程,我最早使用了Timer + Guzzle异步请求 + nacos长轮询监听 保证 时效性,因为存在多个yaml文件,所以需要对多个yaml文件进行监听,如果单纯一个配置开一个进程有点太奢侈,所以我使用了一个进程 + Guzzle异步请求;Nacos监听的长轮询机制你可以理解为如果有消息,就马上返回对应的配置id,如果没消息,就一直阻塞到timeout并且返回一个空字符串;考虑到请求会阻塞,为了不影响该进程内Timer的下一个执行周期,我将Timer的间隔时长和长轮询阻塞时长画上了等号。

    public function onWorkerStart(Worker $worker)
    {
        $worker->count = 1;

        if($this->configListeners){
            // 拉取配置项文件
            foreach ($this->configListeners as $listener){
                list($dataId, $group, $tenant, $configPath) = $listener;
                if(!file_exists($configPath)){
                    $this->_get($dataId, $group, $tenant, $configPath);
                }
            }
            // 创建定时监听
            Timer::add($this->longPullingInterval, function (){
                $promises = [];
                foreach ($this->configListeners as $listener){
                    list($dataId, $group, $tenant, $configPath) = $listener;
                    # 初始化文件
                    if(file_exists($configPath)){
                        $promises[] = $this->client->config->listenerAsync(
                            $dataId,
                            $group,
                            md5(file_get_contents($configPath)),
                            $tenant,
                            $this->longPullingInterval * 1000
                        )->then(function (ResponseInterface $response) use($dataId, $group, $tenant, $configPath){
                            if($response->getStatusCode() === 200){
                                if($response->getBody()->getContents() !== ''){
                                    # 文件通过nacos get并覆盖写入本地文件
                                    $this->_get($dataId, $group, $tenant, $configPath);
                                }
                            }
                        },function (GuzzleException $exception){
                            Log::channel('error')->error($exception->getMessage(), $exception->getTrace());
                        });
                    }
                }
                if($promises){
                    Utils::settle($promises)->wait();
                }
            });
        }
    }

第一版完成后我发现了一些问题:

  1. 比如config()中已经获取的配置无法刷新,常驻内存的一些数据库连接等没有被触达
  2. Timer + Guzzle异步请求实际上在Timer的执行周期内是阻塞的,只是Guzzle在对多个请求可以并发的发起
  3. Timer的第一次执行并不能立即执行,导致初次启动时并不能及时获取最新的配置文件

为了解决第一个问题,我在_get方法内加入了对workers的reload

    protected function _get(string $dataId, string $group, string $tenant, string $path)
    {
        $res = $this->client->config->get($dataId, $group, $tenant);
        if(file_put_contents($path, $res, LOCK_EX)){
            reload($path);
        }
    }
    function reload(string $file)
    {
        Worker::log($file . ' update and reload. ');
        if(extension_loaded('posix') and extension_loaded('pcntl')){
            posix_kill(posix_getppid(), SIGUSR1);
        }else{
            Worker::reloadAllWorkers();
        }
    }

第二个问题我使用了Workerman/http-client的异步http客户端,在使用的过程中还有个 小插曲 ,由于http-client使用了workerman的event-loop,我的项目是在workerman的on回调生命周期内,所以可以利用event-loop达到无阻塞的请求;

    public function onWorkerStart(Worker $worker)
    {
        $worker->count = 1;

        if($this->configListeners){
            // 拉取配置项文件
            foreach ($this->configListeners as $listener){
                list($dataId, $group, $tenant, $configPath) = $listener;
                if(!file_exists($configPath)){
                    $this->_get($dataId, $group, $tenant, $configPath);
                }
                $this->timers[$dataId] = Timer::add($this->longPullingInterval,
                    function () use($dataId, $group, $tenant, $configPath){
                        $this->client->config->listenerAsyncUseEventLoop([
                                'dataId' => $dataId,
                                'group' => $group,
                                'contentMD5' => md5(file_get_contents($configPath)),
                                'tenant' => $tenant
                        ], function (Response $response) use($dataId, $group, $tenant, $configPath){
                            if($response->getStatusCode() === 200){
                                if((string)$response->getBody() !== ''){
                                    $this->_get($dataId, $group, $tenant, $configPath);
                                }
                            }
                        }, function (\Exception $exception){
                            Log::channel('error')->error($exception->getMessage(), $exception->getTrace());
                        });
                });
            }
        }
    }

第三个问题,我基于workerman/timer封装了一个简易的能达到我目的的timer:

<?php
declare(strict_types=1);

namespace Workbunny\WebmanNacos;

use Workerman\Timer as WorkermanTimer;

/**
 * 定时器
 *
 * @desc 对workerman/timer的封装
 * 1.延迟单此执行
 * 2.立即单次执行
 * 3.延迟循环执行
 *      - 延迟与循环时间不同
 *      - 延迟与循环间隔相同
 * 4.立即循环执行
 * @author chaz6chez
 */
final class Timer {

    /** @var array[] 子定时器 */
    protected static array $_timers = [];

    /**
     * 新增定时器
     * @param float $delay
     * @param float $repeat
     * @param callable $callback
     * @param ...$args
     * @return int|bool
     */
    public static function add(float $delay, float $repeat, callable $callback, ... $args)
    {
        switch (true){
            # 立即循环
            case ($delay === 0.0 and $repeat !== 0.0):
                $callback(...$args);
                return WorkermanTimer::add($repeat, $callback, $args);

            # 延迟执行一次
            case ($delay !== 0.0 and $repeat === 0.0):
                return WorkermanTimer::add($delay, $callback, $args, false);

            # 延迟循环执行,延迟与重复相同
            case ($delay !== 0.0 and $repeat !== 0.0 and $repeat === $delay):
                return WorkermanTimer::add($delay, $callback, $args);

            # 延迟循环执行,延迟与重复不同
            case ($delay !== 0.0 and $repeat !== 0.0 and $repeat !== $delay):
                return $id = WorkermanTimer::add($delay, function(...$args) use(&$id, $repeat, $callback){
                    $callback(...$args);
                    self::$_timers[$id] = WorkermanTimer::add($repeat, $callback, $args);
                }, $args, false);

            # 立即执行
            default:
                $callback(...$args);
                return 0;
        }
    }

    /**
     * 移除定时器
     * @param int $id
     * @return void
     */
    public static function del(int $id): void
    {
        if(
            $id !== 0 and
            isset(self::$_timers[$id]) and
            is_int($timerId = self::$_timers[$id])
        ){
            unset(self::$_timers[$id]);
            WorkermanTimer::del($timerId);
        }
    }

    /**
     * @return void
     */
    public static function delAll(): void
    {
        self::$_timers = [];
        WorkermanTimer::delAll();
    }
}

更新于 2022-05-13


服务负载

在后续的过程中,我接到很多人的反馈,说nacos的客户端没有提供服务负载均衡相关的内容,这块地方我是这样觉得:

  1. 每个服务架构的设计不同,所需要的负载策略不同;
  2. 负载可以在提供方做,也可以是调用方做;
  3. 一个简单的客户端如果加入太多功能,会显得过分臃肿;

参考方案

假设有如下两个服务服务:

     ┌─────┐                                       ┌─────┐
     |  A  | ────────────>  service  <──────────── |  B  |
     └─────┘                                       └─────┘
    /   |   \                                     /   |   \
┌───┐ ┌───┐ ┌───┐                             ┌───┐ ┌───┐ ┌───┐
| a | | b | | c | ───────>  instance <─────── | a | | b | | c |
└───┘ └───┘ └───┘                             └───┘ └───┘ └───┘
  |     |     |                                 |     |     |
 1|2   1|2   1|2 ────────>  process  <──────── 1|2   1|2   1|2
 3|4   3|4   3|4                               3|4   3|4   3|4

假设我们是A服务的a实例(简称为Aa),需要调用B服务;从调用者的角度,我们该如何做负载均衡?

服务调用方:

  1. 调用者Aa分别在1、2、3、4号进程中各创建一个nacos-client实例,请保持单例且长连接;

    初始化的时候可以基于健康、权重或者基于metadata的约定等方式对服务B的实例进行选择连接;
    这样的好处是,不论Aa内对于B服务的请求可以复用连接,无需重复创建http连接;

  2. 为每一个nacos-client实例创建一个timer,timer负责对当前实例进行健康状态检查;

    如果PHP支持线程是最好的,因为基于event-loop的timer如果阻塞了,是会影响当前event-loop的;如果是线程去旁置执行,则不会影响event-loop,但无伤大雅,只要该使用的连接尽可能的使用长连接,并且做足异常的判断和处理,实际上相较也没有太大的差异;

  3. 假设健康状态不佳,则将当前nacos-client实例中的连接停止,从通过nacos实例列表中挑选一个健康状态良好的实例进行连接(除了健康状态,还可以根据metadata等参数自定义处理);

    因为event-loop实际上在loop中也是顺序执行,所以不用担心在处理连接的时候会有消息正在处理;

服务提供方:

  1. 需要通过自身实现的流量监控定时向nacos上报健康状况或其他属性,如metadata内定义相关如 请求数量、连接数量等参数;

    可以在metadata提供一些元数据,交给调用者自行判断;也可以直接在提供方进行处理后以health的方式交给调用方直接使用;建议二者选其一。

以上这样做相较于传统的轮询、随机、加权轮询等负载方案更适合长连接,效率更高,但缺点就是实现方式上较复杂一些,需要服务提供方和服务调用方实现各自对应的处理逻辑;但我个人认为,本身在微服务体系下,整个体系应该是一致的,这样做是一种一劳永逸的做法,并不会出现在一个体系下有多种执行方案的情况。

更新于 2022-08-19


6079 10 9
10个评论

Tinywan

感谢分享

没茅台

为什么没人留言 因为太高深了

  • chaz6chez 2022-05-14

    这个就是简单的配置中心和服务中心的使用过程和其客户端封装的过程

  • Tinywan 2022-05-14

    用的人少!

Tinywan

为了更好的发展,我也迁过来,一起维护,不然搞两套插件不太好!

🔰🔰🔰 为了方便维护和使用,推荐大家使用最新版的Nacos插件 https://www.workerman.net/plugin/50 我也会以后积极参与这个仓库的贡献。

  • chaz6chez 2022-05-16

    非常感谢!

  • wadeYang 2022-06-14

    两位大佬合力是好事,这样才集思广益,搞出更优质的插件,有助于社区发展。

Maoxp

很好的分享

  • 暂无评论
JackDx

这分享不错~

  • 暂无评论
nitron

配置文件变更通知是否可以考虑使用inotify做实现?

  • chaz6chez 2022-05-27

    因为配置数据是通过nacos get下来的,那么就代表没有get操作的时候文件是不会变更的,所以这个地方其实还用不着inotify,直接在get的业务逻辑后跟一个reload就好了

cuixuefeng

不错的文章 谢谢分享 学到了

  • 暂无评论
as5739

想请教一下大佬,如何通过nacos实现服务之间的相互调用呢?可以通过 $client->instance->list 获取实例列表hosts然后根据ip端口访问,但这样每次请求都要重新去nacos上获取,还需要单独实现负载均衡。如果缓存的话,更新也不及时,感觉不是很好。有什么好的方法实现吗?有API网关的话,服务内部之间的调用也需要走网关吗?

  • chaz6chez 2022-08-19

    一般情况下,服务和服务之间大部分情况都是在一个内网下,nacos客户端每个实例都是长连接,这时候每次请求都去nacos上获取也不会浪费连接数,其实还好;
    如果担心请求会浪费的话,其实可以和获取配置信息一样,起一个定时更新的缓存,一般情况下秒级别的更新实际上够用了,只要在真正获取实例并执行的地方做好切换、重试机制就好,一般这种情况也只影响一个时间单位内的少部分用户;
    至于负载,每个公司的架构实现方式是不同的,所以负载也是自己实现,根据list的权重或者自定义的metadata,nacos这部分的灵活性比较高,全部交给用户自己基于这些内容实现,其实不难;除了使用方通过list来做负载外,服务提供方可以通过注册和注销根据自身限流策略来做熔断、降级等负载,其实说白了就是让不想要的服务不要出现在list或者从list剔除不想要的服务;
    大部分工作其实是要在服务注册的时候做好的,比如定义的metadata,再比如是否规范使用了ephemeral、weight等属性,这些属性和自己的负载策略可以挂勾;另外就是要做好自身的服务限流和一些实例的更新策略,毕竟一般情况下自身服务达到限流的时候,可能需要降级、熔断等。

    webman-nacos-client也在计划围绕负载在下一个版本增加一个比较通用的负载策略,并且会增加测试用例。

  • chaz6chez 2022-08-19

    有些人使用了网关之后,会使用网关提供的负载策略,所以有服务内部调用走网关的做法,不过我不是很推荐这样的做法,我认为内外部需要独立,内部可能会有内部的策略,外部会有外部的策略,不应耦合在一起。

  • as5739 2022-08-19

    感谢分享,我也觉得内外独立比较好,网关也不需要单独为我的服务访问配置白名单了。我这边目前使用的是定时缓存服务实例,然后再单独负载查找(用的插件是tinywan/load-balancing),没有走外部的网关,期待webman-nacos-client之后的版本

  • chaz6chez 2022-08-19

    我推荐的做法也是定时缓存的策略,不过和你的负载的处理方式可能有些不同;
    假设A服务需要调用B服务,我们以A服务中1号实例举例;
    A-1有4个进程,那么就会分别创建4个连接B服务的client连接对象实例,同时会创建4个负责负载监听的timer分别为各自的client连接进行处理以下事务:

    1. 创建nacos-client单例,如果nacos-client单例创建成功则直接返回
    2. 通过nacos-client单例获取当前对象连接的B服务实例地址,查看其是否健康,是否需要切换
    3. 上一个timer周期的平均的请求时间,是否需要切换(可选)
      以上就实现了一个简单的连接池,而且有旁置的timer来做一切旁置的事情;
      尽可能的用长连接,而不是短链接,这样就类似于连接池的效果,毕竟创建连接和销毁链接消耗的时间远比接发送消息大;

    注:这个timer如果是通过线程实现的话,效果会更好;目前event-loop的timer中的业务逻辑如果阻塞,是会影响当前进程的其他业务的,具体这部分可以了解reactor模型。

chaz6chez

更新了一下文章,对负载部分做了一些解释

  • as5739 2022-08-19

    看到了更新,有点疑问,A在一开始准备长连接的时候,如何判断需要绑定B的哪个实例呢,除了健康状态,是否还是需要根据权重或者metadata之类的信息做负载轮询呢?

  • chaz6chez 2022-08-19

    初次连接和后续timer的逻辑其实是差不多的,一般情况就是要么根据健康,要么就metadata基于约定

  • as5739 2022-08-19

    那其实是把负载改为在连接的一开始进行,后续请求保持这个连接是吧。这里的长连接和Socket这种是不同的吧

  • chaz6chez 2022-08-19

    其实http连接只要keep-alive,并且自己不释放掉客户端实例,就已经是长连接了;后续请求保持当前这个连接,但是需要有一个timer定时的对当前连接及连接的服务进行检查,毕竟当前连接调用的服务实例也有别的服务正在调用,很可能因为别人的调用导致健康状况不良好,这时候timer的作用就是及时将这个连接中的实例切换。

  • as5739 2022-08-19

    了解,学习了

  • as5739 2022-08-19

    如果A服务的实例比B服务少很多,例如1个实例1个进程,那不就全部请求都打在了B的单个实例上了吗,通过timer检查健康状态,如果还需要判断这些的话,确实就很复杂了

  • chaz6chez 2022-08-19

    最大限度的利用连接,不论是timer还是触发式的检查,都是属于重连/负载方案;类似的东西,如webman的DB连接也是通过一个timer来做select 1;通俗的来讲,这个就是只有一个连接的连接池,毕竟PHP没有线程,没办法通过线程来实现连接池,但作用是一样的,就是复用连接;
    原因有2:

    1. 连接的消耗很大;
    2. 服务都有连接上限,不服用连接迟早会遇到并发瓶颈,并且云服务如果要对TPS进行升级,成本非常高;
  • chaz6chez 2022-08-19

    另外B服务对应提供的服务不止有A服务一个服务进行使用,如果A\B\C\D等服务的调用方客户端都是按照上述描述的实现方案,那他们就是一个体系的,那么你只需要实现一个客户端负载方案,其他服务都可以复用该方案,毕竟不止AB两个服务,还可能有CDEFG。

forever

Nacos 讨论帖没办法发布评论啦?我想问一下,插件可以实现发布接口上去吗?像图片这样,方便rpc 调用

chaz6chez

4314
积分
0
获赞数
0
粉丝数
2018-11-16 加入
🔝