webman Stream流式返回封装

Codepoch
/**
     * @param $connection
     * @param $options
     * @param \Closure $dataHandler
     * @return \support\Response
     */
    private function handleStreamResponse($connection, $options, \Closure $dataHandler) {
        try {
            $messageId = uniqid('chatcmpl-');
            $startTime = time();

            $id = Timer::add(0.1, function () use ($connection, &$id, $messageId, $startTime, $options, $dataHandler) {
                if ($connection->getStatus() !== TcpConnection::STATUS_ESTABLISHED) {
                    Timer::del($id);
                    return;
                }

                $dataHandler($connection, $messageId, $startTime, $options);

                $endData = json_encode([
                    'id' => uniqid('chatcmpl-'),
                    'object' => 'chat.completion.chunk',
                    'created' => time(),
                    'model' => $options['model'],
                    'choices' => [[
                        'delta' => [],
                        'index' => 0,
                        'finish_reason' => 'stop'
                    ]]
                ], JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES);

                $connection->send(new Chunk("data: {$endData}\n\n"));
                $connection->send(new Chunk(""));
                Timer::del($id);
            });
        } catch (\Throwable $e) {
            $errorData = json_encode([
                'error' => [
                    'message' => $e->getMessage(),
                    'type' => 'error',
                    'code' => $e->getCode()
                ]
            ]);
            $connection->send(new Chunk("data: {$errorData}\n"));
        }

        return response()->withHeaders([
            "Content-Type" => "application/json",
            "Transfer-Encoding" => "chunked",
        ]);
    }

使用方法(闭包函数是我这边的业务,实际自己看情况写):

if ($stream) {
            return $this->handleStreamResponse($connection, $options, function($connection, $messageId, $startTime, $options) use($service, $messages) {
                foreach ($service->chatStream($messages, $options) as $chunk) {
                    if (!empty($chunk)) {
                        $data = json_encode([
                            'id' => $messageId,
                            'object' => 'chat.completion.chunk',
                            'created' => $startTime,
                            'model' => $options['model'],
                            'choices' => [[
                                'delta' => [
                                    'role' => 'assistant',
                                    'content' => $chunk
                                ],
                                'index' => 0,
                                'finish_reason' => null
                            ]]
                        ], JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES);

                        echo "data: {$data}\n\n";
                        $connection->send(new Chunk("data: {$data}\n\n"));
                    }
                }
            });
        }

在控制器中使用流式输出。
尝试了很多方法,在控制器中如果不使用Timer的话,无法正常即时输出。
分享出来我的方法,有更好的方法也希望兄弟们让我学习学习

1310 2 1
2个评论

shiroi

为啥不用这种

    /**
     * 模拟sse请求
     */
    public function test_sse()
    {
        dump(request()->header());
        dump(request()->all());
        //当前连接
        $connect = request()->connection;
        //设置连接头
        $connect->send(new Response(200, ['Content-Type' => 'text/event-stream'], "\r\n"));
        //发送
        $data = array_fill(0, 20, []);
        foreach ($data as $k => $d) {
            $d['id'] = $k;
            $d['time'] = strtotime(date('Y-m-d '.rand(0, 24).':' . rand(1, 59) . ':' . rand(1, 59)));
            $d['message'] = uniqid();
            //发送到客户端消息
            $connect->send(new ServerSentEvents(['event' => 'message', 'data' => to_json($d), 'id'=>$k]));
            sleep(1); //模拟延时发送
        }
        $connect->close();
    }
  • 暂无评论
xpwsgg

我记得有这个啊
截图

  • 暂无评论

Codepoch

220
积分
0
获赞数
0
粉丝数
2024-03-15 加入
🔝