高速共享缓存插件分享

chaz6chez

前言

今年接触了一个策略类手游相关的项目,后端本身计划是使用skynet进行开发的,后来结合项目的时间紧急程度和客户端开发组讨论后决定使用PHP进行快速开发,后期再使用其他语言框架进行拆分业务;综合考虑最后选用了webman作为主要开发框架。

整体项目分为配置服务、http-api服务、websocket服务三大部分,其中配置管理主要是兼容客户端生成的配置数据进行导入导出转换加载,底层使用MySQL进行储存,多服务间使用Redis进行一级缓存,服务进程间使用了基于APCu的共享缓存,后期我将该共享缓存组件化也贡献给了社区。
https://www.workerman.net/plugin/133

Redis

在游戏开发界实际上使用Redis的情况还是比较多的,我们使用Redis主要还是为了将一些数据缓存共享给各个服务器实例:


     ┌─────┐                                       ┌─────┐
     |  A  | ────────────>  service  <──────────── |  B  |
     └─────┘                                       └─────┘
    /   |   \                                     /   |   \
┌───┐ ┌───┐ ┌───┐                             ┌───┐ ┌───┐ ┌───┐
| a | | b | | c | ───────>  instance <─────── | a | | b | | c |
└───┘ └───┘ └───┘                             └───┘ └───┘ └───┘
  |     |     |                                 |     |     |
 1|2   1|2   1|2 ────────>  process  <──────── 1|2   1|2   1|2
 3|4   3|4   3|4                               3|4   3|4   3|4

如图所示,我们A/B为区服,每个区服下可能存在abc不同的服务器实例,他们需要共享相同的区服配置;每个区服各自管理自己的数据库数据区域/数据库实例;每个区服下的服务器实例对于数据库数据的要求是强需求,且为变动较为频繁的数据内容,与web的微服务有区别,所以我们没有使用类似Nacos或者其他配置中心进行处理,从而用更适配当前场景的Redis作为缓存服务。

同时Redis也可以作为用户登录鉴权相关中的一环,也可以为运营相关功能提供一些辅助,比如使用Redis-Stream作为消息队列,处理一些事件通知等。

共享内存

在游戏开发中,许多业务都是在内存中进行的计算处理,而我们上述的模式是多进程模式,进程间通讯是一个比较频繁出现的点;一开始解决这个问题是粗暴的将一些固定业务固定在对应的进程上执行,尽可能避免进程间的通讯问题,后来随着业务逐步的扩大,单纯限制业务是没办法完全实现的,这时候有考虑过使用webman的channel;但实际上channel基于socket涉及系统内核态用户态的拷贝等问题,同时受网络影响受限,在一些业务的计算处理上会带来比较高的延迟,包括Redis也同样是这样的问题,我们需要实现数据的零拷贝。

后续我们的目标锁定在了共享内存上,因为共享内存可以轻易的在进程间进行通讯交换,而且不存在深拷贝和网络等问题,效率、性能非常的高,整体微秒级别的响应满足我们的需求;于是我基于PHP的拓展APCu封装了适合我们业务场景的插件包进行使用。

webman-shared-cache

我们的基础应用实现了定时器来从MySQL数据库读取配置信息,定时器的处理器也在读取数据刷入Redis的同时触发共享内存的更新事件,上层业务通过更新事件的回调出发会将Redis的数据刷入共享内存中,以便当前区服实例的各个进程能够使用。

我们使用缓存的场景很多都是MAP数据,所以我在实现插件的时候特别实现了类似Redis-Hash相关的功能:

  • HSet/HGet/HDel/HKeys/HExists

由于我们需要一些自增自减的运算,所以也实现了以下功能点:

  • HIncr/HDecr,支持浮点运算

由于APCu的特性所以储存的数据也是支持储存对象数据的;

webman-shared-cache为何使用锁?

之前我有和社区的同学们聊过,他们不是很理解为什么我在实现插件的时候自己使用了锁,这是因为APCu本身的自行实现了对它自身函数的原子性操作,但我们使用它的时候是在多进程的环境下,每一个进程内存在多次APCu的操作,为了业务的原子性,我们希望这多次的操作要在一个原子性内完成,所以需要一个锁来进行隔离,以免在多进程的环境下被其他进程的操作污染,整体是类似MySQl的事务的:

protected static function _HIncr(string $key, string|int $hashKey, int|float $hashValue = 1): bool|int|float
{
    $func = __FUNCTION__;
    $result = false;
    $params = func_get_args();
    self::_Atomic($key, function () use (
        $key, $hashKey, $hashValue, $func, $params, &$result
    ) {
        $hash = self::_Get($key, []);
        if (is_numeric($v = ($hash[$hashKey] ?? 0))) {
            $hash[$hashKey] = $result = $v + $hashValue;
            self::_Set($key, $hash);
        }
        return [
            'timestamp' => microtime(true),
            'method'    => $func,
            'params'    => $params,
            'result'    => null
        ];
    }, true);
    return $result;
}

比如上述代码,就是一个Hash key的自增操作,我们需要在读取Hash后在写入,读取和写入应为一体的;

原子性执行函数Atomic的实现如下:

    /**
     * 原子操作
     *  - 无法对锁本身进行原子性操作
     *  - 只保证handler是否被原子性触发,对其逻辑是否抛出异常不负责
     *  - handler尽可能避免超长阻塞
     *  - lockKey会被自动设置特殊前缀#lock#,可以通过Cache::LockInfo进行查询
     *
     * @param string $lockKey
     * @param Closure $handler
     * @param bool $blocking
     * @return bool
     */
    protected static function _Atomic(string $lockKey, Closure $handler, bool $blocking = false): bool
    {
        $func = __FUNCTION__;
        $result = false;
        if ($blocking) {
            $startTime = time();
            while ($blocking) {
                // 阻塞保险
                if (time() >= $startTime + self::$fuse) {return false;}
                // 创建锁
                apcu_entry($lock = self::GetLockKey($lockKey), function () use (
                    $lockKey, $handler, $func, &$result, &$blocking
                ) {
                    $res = call_user_func($handler);
                    $result = true;
                    $blocking = false;
                    return [
                        'timestamp' => microtime(true),
                        'method'    => $func,
                        'params'    => [$lockKey, '\Closure'],
                        'result'    => $res
                    ];
                });
            }
        } else {
            // 创建锁
            apcu_entry($lock = self::GetLockKey($lockKey), function () use (
                $lockKey, $handler, $func, &$result
            ) {
                $res = call_user_func($handler);
                $result = true;
                return [
                    'timestamp' => microtime(true),
                    'method'    => $func,
                    'params'    => [$lockKey, '\Closure'],
                    'result'    => $res
                ];
            });
        }
        if ($result) {
            apcu_delete($lock);
        }
        return $result;
    }

当使用阻塞模式的时候,我们会在当前进程内使用一个while循环来进行阻塞抢占,为了不将当前进程阻塞死,我们还加入了一个保险,由self::$fuse提供;

注意

这里在实践过程中需要注意的是,Atomic在传入回调函数时切勿再使用匿名函数作为参数值或者是通过use传入一个匿名函数,如:

$fuc = function() {
    // do something
}
Cache::Atomic('test', function () use ($fuc) {
    // do anything
})

APCu底层会对函数参数值或引用参数进行序列化储存,但匿名函数不可以被序列化,所以会抛出一个异常;但你可以通过当前对象的属性值或者静态属性来保存一个匿名函数,然后在Atomic的回调内调用使用。

0.4.x版本

由于目前我使用Webman基于SQLite和共享内存在自行实现一个具备RAFT的轻调度服务插件和服务注册与发现插件,所以特此为其完善增加了Channel特性;

Channel可以辅助实现类似Redis-List、Redis-stream、Redis-Pub/Sub的功能。

Channel

Channel是个特殊的数据格式,他的格式是固定如下的:

[
    '--default--' => [
        'futureId' => null,
        'value'    => []
    ],
    workerId_1 => [
        'futureId' => 1,
        'value'    => []
    ],
    workerId_2 => [
        'futureId' => 1,
        'value'    => []
    ],
    ......
]

它在共享内存中的键默认以 #Channel# 开头。

  • --default--是默认储存空间,workerId_1/workerId_2等是子通道储存空间,命名是由用户代码传入的,这里建议使用workerman自带的workerId即可。

  • 默认储存空间和子通道储存空间是互斥的,也就是说当存在子通道储存空间时,是不存在--default--的,反之亦然;子通道储存空间是当当前通道存在监听器时生成的,而在监听器产生前,消息会暂存在--default--空间,当监听器创建时,--default--的数据value会被同步到子通道储存空间内,加入value的队头

  • 每一个子通道储存空间的value都是拷贝的,存在相同的数据,各自监听器监听各自的子通道储存空间;消息的发布支持向所有子通道发布,也可以指定子通道进行发布。

  • 监听器的底层使用了workerman的定时器,区别与workerman的timer,在event驱动下定时器的间隔是0,也就是一个future,而其他的事件驱动是0.001s为间隔。

实现一个List

由于监听器创建消费是基于workerId的,我们可以通过不同进程创建相同的workerId的监听器来对同一个子通道进行监听:

  1. A进程使用list作为workerId:

    Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  2. B进程也同样创建list的workerId监听器:

    Cache::ChCreateListener('test', 'list', function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  3. 此时Channel test的数据如下:

    [
    'list' => [
        'futureId' => 1,
        'value'    => []
    ],
    ......
    ]

    注意:共享内存中储存的futureId为最后一个监听器创建的futureId;当当前进程需要对监听器进行移除时,请勿使用该数据,对应进程内可以通过Cache::ChCreateListener()的返回值获取到当前进程创建的futureId用于移除监听器,不使用共享内存中储存的futureId即可

  4. 这时任意进程通过Cache::ChPublish('test', '这是一个测试消息', true);发送消息,或者指定workerIdCache::ChPublish('test', '这是一个测试消息', true, 'list');

实现一个Pub/Sub

  1. A进程使用workerman的workerId作为workerId:

    Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  2. B进程使用workerman的workerId作为workerId:

    Cache::ChCreateListener('test', $worker->id, function(string $channelKey, string|int $workerId, mixed $message) {
    // TODO 你的业务逻辑
    });
  3. 此时Channel test的数据可能如下:

    [
    1 => [
        'futureId' => 1,
        'value'    => []
    ],
    2 => [
        'futureId' => 1,
        'value'    => []
    ]
    ]
  4. 这时,任意进程通过 Cache::ChPublish('test', '这是一个测试消息', false); 发送消息即可。

    注:发送消息第三个参数使用false时,如发送时还未创建监听器,消息则不会储存至Channel,即监听后才可存在消息

实现类似Redis-stream

与Pub/Sub相同,只不过发布消息使用 Cache::ChPublish('test', '这是一个测试消息', true);, 当发布消息指定workerId时,可以实现类似Redis-Stream Group的功能。

注:这里更复杂的功能可能需要对workerId进行变通,不能简单使用workerman自带的workerId,只需要自行规划好即可

更多内容可以查看源码和文档

如有需要新特性的支持,欢迎留言和提交issue

1513 8 9
8个评论

walkor

全是干货的绝世好文

Tinywan

哇塞!全是干货。赶紧用起来🚀🚀🚀🚀🚀🚀。

Forsend

谢谢大佬,你的分享和文章都很好

Mr_Deng

怎么才能成长为大神

  • Tinywan 2023-11-21

    向作者和群主看齐

  • chaz6chez 2023-11-21

    其实就是把所学知识尽可能运用在工作生活中,并且在工作中尽可能发现一些可能性,就可以慢慢成长为大神,温故而知新即可

  • Tinywan 2023-11-21

    干自己感想干的,真的很好...

  • chaz6chez 2023-11-21

    总有机会嘛,万总,有机会就去尝试,没机会就巩固呗,哈哈

shanjian

又可以学新知识了

army

之前和楼主聊过锁的问题,原因是楼主对单key存map,需要对该map读改写,这样就导致了并发安全问题,其实可以把map的key给个前缀当做apcu的单key来写入就解决并发安全问题了,要获取这个“map”就遍历这个前缀的key,这样可以做到并发安全且性能翻倍。
如果实在要对单key存map,可以考虑yac,yac底层不带锁性能比apcu还快,配合你的业务锁比apcu更优。

  • army 2023-11-25

    apcu底层原理是根据key分割多个切片,每个切片里放置了一个map,每个map一个写锁来达到并发安全和超高性能读写, 如果在map里再存入map我认为这种用法是不合理的,失去了高性能读写的含义。

  • chaz6chez 2023-11-25

    多key的map存在hit miss,带来不完整性,所以要处理的工作也会很多

  • chaz6chez 2023-11-25

    map本来就可以嵌套,形成阵列,这个在计算的时候也会遇到,类似python的numpy,我们在php中使用了实验性质的numphp组件处理阵列数据计算

JackDx

关注

  • 暂无评论
windss

使用通道时一直报错

监听
Cache::ChCreateListener('channel', $worker->id, function (string $key, int|string $workerId, mixed $message) {
var_dump($key);
var_dump($workerId);
var_dump($message);
});

发布
Cache::ChPublish('channel',$msg,false,0);

错误
TypeError: call_user_func(): Argument #1 ($callback) must be a valid callback, no array or string given in /Users/winds/Project/php/new/api/vendor/workbunny/webman-shared-cache/src/Traits/ChannelMethods.php:163

  • chaz6chez 6天前

    更新到最新版0.4.5试试

  • windss 6天前

    已经更新了, Cache::ChPublish($channel,$msg,false,2);指定workerId时,相同$channel 都能收到消息,是不是我写代码有问题

  • chaz6chez 6天前

    你是需要指定通道进行publish对吧,可以提一个issue,附上测试样例,我会在今天内修复这个问题

  • chaz6chez 6天前

    可以尝试使用0.4.6,我已修复指定workerId进行publish没有按预期执行的bug

  • windss 5天前

    ok,试一下 我在反馈

chaz6chez

4334
积分
0
获赞数
0
粉丝数
2018-11-16 加入
🔝