【已解决】workerman/http-client的返回值是空

chaz6chez

总结

  1. 目前避免使用$response->getBody()->getContents(),建议使用(string)$response->getBody()
  2. 等会儿提交个PR

原因

  1. 因为workerman/http-client包引入的workerman/psr7中的Stream::getContents中并没有将偏移量重置,在write后,偏移量一般在最新,所以这个时候用默认参数stream_get_content肯定是空字符串
    public function getContents()
    {
        if (!isset($this->stream)) {
            throw new \RuntimeException('Stream is detached');
        }

        $contents = stream_get_contents($this->stream);

        if ($contents === false) {
            throw new \RuntimeException('Unable to read stream contents');
        }

        return $contents;
    }
  1. 这里魔术方法toString却做了偏移量的重置
    public function __toString()
    {
        try {
            $this->seek(0);
            return (string) stream_get_contents($this->stream);
        } catch (\Exception $e) {
            return '';
        }
    }

描述

组件版本:

workerman/workerman              v4.0.36
workerman/http-client            v1.0.9
workerman/webman-framework       v1.3.13

在webman中创建自定义进程,该进程只启动一个进程,进程内注册一个Timer,Timer的回调函数中使用workerman/http-client异步请求,请求成功回调中$response->getBody()->getContents()始终为空字符串,代码大致如下

public function onWorkerStart(Worker $worker)
    {
        $worker->count = 1;

        if($this->configListeners){
            // 拉取配置项文件
            foreach ($this->configListeners as $listener){
                list($dataId, $group, $tenant, $configPath) = $listener;
                if(!file_exists($configPath)){
                    $this->_get($dataId, $group, $tenant, $configPath);
                }
                $this->timers[$dataId] = Timer::add(
                    $this->longPullingInterval,
                    function () use($dataId, $group, $tenant, $configPath){
                        $this->client->config->listenerAsyncUseEventLoop([
                                'dataId' => $dataId,
                                'group' => $group,
                                'contentMD5' => md5(file_get_contents($configPath)),
                                'tenant' => $tenant
                        ], function (Response $response) use($dataId, $group, $tenant, $configPath){
                            if($response->getStatusCode() === 200){
                                if($response->getBody()->getContents() !== ''){
                                    $this->_get($dataId, $group, $tenant, $configPath);
                                }
                            }
                        }, function (\Exception $exception){
                            Log::channel('error')->error($exception->getMessage(), $exception->getTrace());
                        }, $this->longPullingInterval * 1000);
                });
            }
        }
    }
public function requestAsyncUseEventLoop(string $method, string $uri, array $options = [])
    {
        try {
            # 同步阻塞获取token
            if($token = $this->issueToken()){
                $options[RequestOptions::QUERY]['accessToken'] = $token;
            }
            $queryString = http_build_query($options[RequestOptions::QUERY] ?? []);
            $headers = array_merge($options[RequestOptions::HEADERS] ?? [], [
                'Connection' => 'keep-alive'
            ]);
            $this->httpClientAsync()->request(
                sprintf('http://%s:%d%s?%s', $this->host, $this->port, $uri, $queryString),
                [
                    'method'    => $method,
                    'version'   => '1.1',
                    'headers'   => $headers,
                    'data'      => $options['data'] ?? [],
                    'success'   => $options['success'] ?? function (Response $response) {},
                    'error'     => $options['error'] ?? function (\Exception $exception) {}
                ]
            );
        } catch (RequestException $exception) {
            if ($exception->hasResponse()) {
                if (200 != $exception->getResponse()->getStatusCode()) {
                    return $this->setError(false, $exception->getResponse()->getBody()->getContents());
                }
            }
            return $this->setError(false, 'server notice:' . $exception->getMessage());
        }
    }

实际上我打印了Request中onMessage的$response_data中是由body值,且不为空字符串的,如下图:

^ array:3 [
  "start-line" => "HTTP/1.1 200 "
  "headers" => array:7 [
    "Pragma" => array:1 [
      0 => "no-cache"
    ]
    "Expires" => array:1 [
      0 => "Thu, 01 Jan 1970 00:00:00 GMT"
    ]
    "Cache-Control" => array:1 [
      0 => "no-cache,no-store"
    ]
    "Content-Type" => array:1 [
      0 => "application/json;charset=UTF-8"
    ]
    "Content-Length" => array:1 [
      0 => "44"
    ]
    "Date" => array:1 [
      0 => "Thu, 12 May 2022 07:50:38 GMT"
    ]
    "Connection" => array:1 [
      0 => "close"
    ]
  ]
  "body" => "config.yaml%02DEFAULT_GROUP%02fj_oh-test%01\n"
]
1693 2 2
2个回答

chaz6chez

稍微调试了一下,感觉是Request中367行的handleData方法中的write没有把数据写入stream,因为我打印了传入的data是有值的,但是write后立马对其getContents,依然是空字符串:

    public function handleData($connection, $data)
    {
        try {
            $body = $this->_response->getBody();
            $count = $body->write($data);
            # 输出打印
            dump($body->getContents());
            if ($this->_expectedLength) {
                $recv_length = $body->getSize();
                if ($this->_expectedLength <= $recv_length) {
                    $this->emitSuccess();
                }
            }
        } catch (\Exception $e) {
            $this->emitError($e);
        }
    }
  • 暂无评论
chaz6chez

我在Stream的write方法中直接对流进行操作:

    public function write($string)
    {
        if (!isset($this->stream)) {
            throw new \RuntimeException('Stream is detached');
        }
        if (!$this->writable) {
            throw new \RuntimeException('Cannot write to a non-writable stream');
        }

        // We can't know the size after writing anything
        $this->size = null;
        $result = fwrite($this->stream, $string);
        # 打印相关内容
        dump(__METHOD__, $string, $result, stream_get_contents($this->stream));

        if ($result === false) {
            throw new \RuntimeException('Unable to write to stream');
        }

        return $result;
    }

很奇怪,fwrite返回了int,表示数据写入了流,但我立马stream_get_content该流,却依旧是空字符串

  • chaz6chez 2022-05-12

    这里直接使用stream_get_contents($this->stream)无法获取到数据,但是stream_get_contents($this->stream,$result,0)就可以

  • JackDx 2022-05-27

    这个问题我也出现过,但是我忘记这么解决的了。。。。

  • chaz6chez 2022-05-27

    这个地方是因为打开的流没有将偏移归零,始终是在流末尾,所以按照默认参数获取的数据是空,这个时候用fseek进行归零就好了。
    在workerman/http-client下用(string)$this->_response->getBody()代替$this->_response->getBody()->getContent()是解决办法,这个是一个bug,在__toString中,使用了fseek,但在getContent()中并没有,所以获取不到数据。

年代过于久远,无法发表回答
🔝