🚀 ripple协程 | workerman版驱动

cclilshy

项目简介

随着微服务的流行, 服务之间的调用变得越来越频繁, 传统的同步阻塞模式已经无法满足需求,
协程编程逐渐成为PHP开发者的热门话题, 在引入Fiber之后, PHP早已支持原生的协程编程

workerman是一款高性能的Worker网络服务框架, 其异步模型几乎是callback的方式,
在面对更多复杂场景时, 我们可能需要同步非堵塞的方式来编写代码, 以便更好地管理服务之间的调用

ripple 是一个基于Fiber实现的协程引擎,同时提供了workerman版驱动,
使得workerman可以使用ripple协程引擎的各种特性

兼容版本

Workerman版本 支持状态
4.1.x 长期支持
5.0.x 长期支持

安装

截至目前2024-11-04, 该引擎处于公测阶段, 因此你可能需要配置以确保能够安装beta版本

配置最低兼容版本

编辑composer.json根节点末尾添加

"minimum-stability": "beta",
"prefer-stable": true

一键安装

composer require cloudtay/workerman-ripple

Workerman使用方法

use Workerman\Worker;

Worker::$eventLoopClass = Workerman\Ripple\Driver::class;
Worker::runAll();

Webman使用方法

编辑server.php

return [
    'event_loop' => Workerman\Ripple\Driver::class,
    // other configurations
];

场景示例

workerman-ripple内置了几个常用的组件,下面将以这些组件举例

SSE客户端

此前我们已经分享了如何在workerman中使用GuzzleHttp实现异步请求
《「分享创造」在Workerman中使用GuzzleHttp协程版实现无感异步请求》

下面我将以异步调用通义千问API为例,举例如何使用ripple解决SSE场景所遇到的挑战

use Ripple\Http\Client\Capture\ServerSentEvents;
use Ripple\Http\Guzzle;

/**
 * @var \GuzzleHttp\Client $client
 */

# 参照文档构建请求头和请求体
$token  = '输入你的token';
$header = [
    'Content-Type'    => 'application/json',
    'Authorization'   => "Bearer {$token}",
    'Accept'          => 'text/event-stream',
    'X-DashScope-SSE' => 'enable'
];

$body = [
    'model' => 'qwen-max',
    'input' => [
        'model'    => 'qwen-max',
        'messages' => [
            [
                'role'    => 'system',
                'content' => '你是ripple协程引擎辅导员',
            ],
            [
                'role'    => 'user',
                'content' => '你好',
            ]
        ]
    ],
];

# 创建捕获器
$capture = new ServerSentEvents();

# 设置回调处理器(传统模式)
$capture->onEvent = static function (array|null $event) {
    // TODO: 处理事件
};
$capture->onComplete = static function () {
    // TODO: 请求结束
};

# 创建协程发送请求
\Co\async(static function()use($header,$body,$capture){
    $client = Guzzle::newClient();
    $guzzle->post('https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation', [
        'headers' => $header,
        'json'    => $body,
        'capture' => $capture
    ]);
});

# 同步非堵塞模式
foreach ($capture->getIterator() as $event) {
    echo json_encode($event) , PHP_EOL;
}

\Co\wait(); // 使用workerman或webman时无需调用该方法

运行结果

array(4) {
  ["id"]=>
  string(1) "1"
  ["event"]=>
  string(6) "result"
  [""]=>
  string(15) "HTTP_STATUS/200"
  ["data"]=>
  string(168) "{"output":{"finish_reason":"null","text":"我是R"},"usage":{"total_tokens":20,"input_tokens":18,"output_tokens":2},"request_id":"87f917d8-f427-92fc-9f44-ed4a559576c3"}"
}
array(4) {
  ["id"]=>
  string(1) "2"
  ["event"]=>
  string(6) "result"
  [""]=>
  string(15) "HTTP_STATUS/200"
  ["data"]=>
  string(173) "{"output":{"finish_reason":"null","text":"我是Ripple"},"usage":{"total_tokens":21,"input_tokens":18,"output_tokens":3},"request_id":"87f917d8-f427-92fc-9f44-ed4a559576c3"}"
}
...

SSE服务端

workerman-ripple 提供了最简单的SSE解决方案,在webman控制器中只需这么做

use Ripple\Http\Server\Chunk;
use support\Request;
use Workerman\Ripple\Protocols\Http\IteratorResponse;

/**
 * @param Request $request
 * @return IteratorResponse
 */
public function stream(Request $request): IteratorResponse
{
    $iterator = new IteratorResponse(static function () {
        foreach (range(1, 10) as $i) {
            yield Chunk::event('message', 'hello', $i);
            \Co\sleep(0.1);
        }
    }, $request->connection,true);

    return $iterator->withHeaders([
        'Content-Type'  => 'text/event-stream',
        'Cache-Control' => 'no-cache',
        'Connection'    => 'keep-alive',
    ]);
}

AsyncTcpConnection使用代理

AsyncTcpConnectionworkerman中最常用的组件之一,但据用户反馈在4.x版本中暂不支持通过代理服务器进行连接,我们提供了一种易用的解决方案如下

use Workerman\Connection\TcpConnection;
use Workerman\Ripple\Utils;

/**
 * @var \Workerman\Connection\AsyncTcpConnection $tcpConnection 
 */
$tcpConnection = Utils::asyncTcpConnection('ssl://ipconfig.io:443');
$tcpConnection->onConnect = static function (TcpConnection $connection) {
    echo 'Connected to ipconfig.io' , \PHP_EOL;
    $connection->send("GET /ip HTTP/1.1\r\nHost: ipconfig.io\r\nConnection: close\r\n\r\n");
};

$tcpConnection->onMessage = static function (TcpConnection $connection, string $data) {
    echo 'Received data from ipconfig.io: ' . \substr($data, 0, 10),'...' . \PHP_EOL;
};

/**
 * 通过代理服务器连接
 * 目前支持http,https,socks5代理,格式参考
 * socks5://host:port
 * socks5://username:password@host:port
 */
$tcpConnection->connectViaProxy('socks5://127.0.0.1:1080');

更多用法

更多组件的使用方法请参考官方文档

623 6 5
6个评论

Tinywan

  • 暂无评论
深林孤鹰

牛批,支持!

  • 暂无评论
不败少龙

威武

  • 暂无评论
liwenshu

windows 可以用吗

初心by

太强了

  • 暂无评论
happy321

牛🐂,建议把reactphp amphp里面能用的那些库也汇总下使用方法

  • 暂无评论

cclilshy

520
积分
0
获赞数
0
粉丝数
2024-04-09 加入
×
🔝