workerman/http-client 增加任务处理进度及全部处理结束后回调方法

剑客落落
<?php

use \Workerman\Worker;

class waitGroup {

    public bool $isWorkering = false;
    public int $workerCounts = 0;
    public int $workerOkCounts = 0;
    public $func = null;
    private int $startTime = 0;

    public function __construct() {
        $this->isWorkering = true;
    }

    /**
     * 增加任务计数
     * @param int $counts 任务数量
     */
    public function Add(int $counts = 1) {
        if (!$this->startTime) {
            $this->startTime = microtime(true);
        }
        $this->workerCounts += $counts;
    }

    /**
     * 子任务完成
     * @param object $func 回调函数
     * @field int $workerOkCounts 已完成的任务数
     * @field int workerCounts 任务总数
     */

    public function Done($func = null) {
        $this->workerOkCounts += 1;
        if ($func) {
            call_user_func($func, $this->workerOkCounts, $this->workerCounts);
        }
        if ($this->isWorkering && $this->func && $this->workerOkCounts >= $this->workerCounts) {
            $this->complete();
        }
    }

    /**
     * 任务执行完成后回调
     * @param object $func 回调函数
     */
    public function Wait($func = null) {
        $this->func = $func;
    }

    /**
     * complete 完成操作后回调
     * @field workerCounts int 任务总数
     * @field workerTimer int 任务执行总时间,单位:秒
     */
    private function complete() {
        call_user_func($this->func, $this->workerCounts, (microtime(true) - $this->startTime));
        return true;
    }
}

$worker = new Worker();
$worker->count = 1;
$waitGroup = new stdClass;

$worker->onWorkerStart = function () {
    $options = [
        'max_conn_per_addr' => 30, // 每个地址最多维持多少并发连接
        'keepalive_timeout' => 15, // 连接多长时间不通讯就关闭
        'connect_timeout' => 30, // 连接超时时间
        'timeout' => 30, // 等待响应的超时时间
    ];
    $http = new Workerman\Http\Client($options);
    $urlArr = [];
    for ($i = 1; $i <= 100; $i += 1) {
        $urlArr[] = 'http://xxxxx.com/test?i=' . $i;
    }
    $waitGroup = new waitGroup();
    $waitGroup->Add(count($urlArr));
    $waitGroup->Wait(function ($workerCounts, $workerTimer) {
        echo "任务已完成:" ,$workerCounts , ",耗时:", sprintf("%0.3f", $workerTimer) , "秒\n";
    });
    foreach ($urlArr as $url) {
        $http->get($url, function ($response) use ($waitGroup) {
            var_dump(['code' => $response->getStatusCode(), 'body' => $response->getBody()->__toString()]);
            $waitGroup->Done(function ($workerOkCounts, $workerCounts) {
                echo "任务进度:[ok] " , $workerOkCounts , "/" , $workerCounts,"\n";
            });
        }, function ($exception) use ($waitGroup) {
            $waitGroup->Done(function ($workerOkCounts, $workerCounts) {
                echo "任务进度:[error] " , $workerOkCounts , "/" , $workerCounts,"\n";
            });
            echo $exception;
        });
    }
};
118 0 0
0个评论

剑客落落

200
积分
0
获赞数
0
粉丝数
2022-04-01 加入
🔝