用swow给workerman插上协程的翅膀

he426100

原文:https://github.com/swow/swow/issues/115
快速体验swow:打开https://github.com/dixyes/lwmbs/actions/workflows/linux.yml,选择最新任务,下载cli_static_8.1_musl_x86_64_xxxx,解压即可使用

无阻塞的定时器

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\Events\Swow;
use Swow\Coroutine;

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/swow.php';

$task = new Worker();
$task->onWorkerStart = function ($task) {
    // 2 seconds
    $time_interval = 2; 
    $timer_id = Timer::add($time_interval, function () {
        Coroutine::run(static function () {
            echo Coroutine::getCurrent()->getId(), ' ', date('H:i:s'), PHP_EOL;
            sleep(10);
            echo Coroutine::getCurrent()->getId(), ' ', date('H:i:s'), PHP_EOL;
        });

    });
};

// Run all workers
$task::$eventLoopClass = Swow::class;
Worker::runAll();

效果

root@be1fb440630a:/opt/www# php timer.php start
Workerman[index.php] start in DEBUG mode
----------------------------------------- WORKERMAN ------------------------------------------
Workerman version:4.1.5          PHP version:8.1.1           Event-Loop:Workerman\Events\Swow
------------------------------------------ WORKERS -------------------------------------------
proto   user            worker          listen          processes    status           
tcp     root            none            none            1             [OK]            
----------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
11 07:31:55
12 07:31:57
13 07:31:59
14 07:32:01
15 07:32:03
11 07:32:05

无阻塞的http server

<?php
use Workerman\Worker;
use Workerman\Events\Swow;
use Swow\Coroutine;

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/swow.php';

// #### http worker ####
$http_worker = new Worker('http://0.0.0.0:2345');

// 4 processes
$http_worker->count = 4;

// Emitted when data received
$http_worker->onMessage = function ($connection, $request) {
    Coroutine::run(static function () use ($connection, $request) : void {
        //$request->post();
        //$request->header();
        //$request->cookie();
        //$request->session();
        //$request->uri();
        //$request->path();
        //$request->method();

        // Send data to client
        $start = date('Y-m-d H:i:s');
        sleep(3);
        $connection->send($connection->worker->id . ' - ' .Coroutine::getCurrent()->getId() . ', start: ' . $start . ', now: ' . date('Y-m-d H:i:s'));
    });
};

// Run all workers
$http_worker::$eventLoopClass = Swow::class;
Worker::runAll();

swow.php

<?php

namespace Workerman\Events;

use RuntimeException;
use Swow\Coroutine;
use Swow\Signal;
use Swow\SignalException;
use Workerman\Worker;
use function getmypid;
use function max;
use function msleep;
use function stream_poll_one;
use function Swow\Sync\waitAll;
use const STREAM_POLLHUP;
use const STREAM_POLLIN;
use const STREAM_POLLNONE;
use const STREAM_POLLOUT;

class Swow implements EventInterface
{
    /**
     * All listeners for read timer
     * @var array
     */
    protected $_eventTimer = [];

    /**
     * All listeners for read event.
     * @var array<Coroutine>
     */
    protected $_readEvents = [];

    /**
     * All listeners for write event.
     * @var array<Coroutine>
     */
    protected $_writeEvents = [];

    /**
     * All listeners for signal.
     * @var array<Coroutine>
     */
    protected $_signalListener = [];

    protected $mapId;

    /**
     * Get timer count.
     *
     * @return integer
     */
    public function getTimerCount()
    {
        return \count($this->_eventTimer);
    }

    /**
     * {@inheritdoc}
     */
    public function delay(float $delay, $func, $args)
    {
        $t = (int) ($delay * 1000);
        $t = max($t, 1);
        $coroutine = Coroutine::run(function () use ($t, $func, $args): void {
            msleep($t);
            unset($this->_eventTimer[Coroutine::getCurrent()->getId()]);
            try {
                $func(...(array) $args);
            } catch (\Throwable $e) {
                Worker::stopAll(250, $e);
            }
        });
        $timer_id = $coroutine->getId();
        $this->_eventTimer[$timer_id] = $timer_id;
        return $timer_id;
    }

    /**
     * {@inheritdoc}
     */
    public function repeat(float $interval, $func, $args)
    {
        if ($this->mapId > \PHP_INT_MAX) {
            $this->mapId = 0;
        }
        $t = (int) ($interval * 1000);
        $t = max($t, 1);
        $coroutine = Coroutine::run(function () use ($t, $func, $args): void {
            while (true) {
                msleep($t);
                try {
                    $func(...(array) $args);
                } catch (\Throwable $e) {
                    Worker::stopAll(250, $e);
                }
            }
        });
        $timer_id = $coroutine->getId();
        $this->_eventTimer[$timer_id] = $timer_id;
        return $timer_id;
    }

    /**
     * {@inheritdoc}
     */
    public function deleteTimer($timer_id)
    {
        if (isset($this->_eventTimer[$timer_id])) {
            try {
                (Coroutine::getAll()[$timer_id])->kill();
                return true;
            } finally {
                unset($this->_eventTimer[$timer_id]);
            }
        }
        return false;
    }

    /**
     * {@inheritdoc}
     */
    public function deleteAllTimer()
    {
        foreach ($this->_eventTimer as $timer_id) {
            $this->deleteTimer($timer_id);
        }
    }

    /**
     * {@inheritdoc}
     */
    public function onReadable($stream, $func)
    {
        if (isset($this->_readEvents[(int) $stream])) {
            $this->offReadable($stream);
        }
        $this->_readEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
            try {
                while (true) {
                    $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP);
                    if ($rEvent !== STREAM_POLLNONE) {
                        $func($stream);
                    }
                    if ($rEvent !== STREAM_POLLIN) {
                        $this->offReadable($stream, bySelf: true);
                        break;
                    }
                }
            } catch (RuntimeException) {
                $this->offReadable($stream, bySelf: true);
            }
        });
        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function offReadable($stream, bool $bySelf = false)
    {
        $fd = (int) $stream;
        if (!isset($this->_readEvents[$fd])) {
            return;
        }
        if (!$bySelf) {
            $coroutine = $this->_readEvents[$fd];
            if (!$coroutine->isExecuting()) {
                return;
            }
            $coroutine->kill();
        }
        unset($this->_readEvents[$fd]);
    }

    /**
     * {@inheritdoc}
     */
    public function onWritable($stream, $func)
    {
        if (isset($this->_writeEvents[(int) $stream])) {
            $this->offWritable($stream);
        }
        $this->_writeEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
            try {
                while (true) {
                    $rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP);
                    if ($rEvent !== STREAM_POLLNONE) {
                        $func($stream);
                    }
                    if ($rEvent !== STREAM_POLLOUT) {
                        $this->offWritable($stream, bySelf: true);
                        break;
                    }
                }
            } catch (RuntimeException) {
                $this->offWritable($stream, bySelf: true);
            }
        });
        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function offWritable($stream, bool $bySelf = false)
    {
        $fd = (int) $stream;
        if (!isset($this->_writeEvents[$fd])) {
            return;
        }
        if (!$bySelf) {
            $coroutine = $this->_writeEvents[$fd];
            if (!$coroutine->isExecuting()) {
                return;
            }
            $coroutine->kill();
        }
        unset($this->_writeEvents[$fd]);
    }

    /**
     * {@inheritdoc}
     */
    public function onSignal($signal, $func)
    {
        if (isset($this->_signalListener[$signal])) {
            return false;
        }
        $coroutine = Coroutine::run(static function () use ($signal, $func): void {
            try {
                Signal::wait($signal);
                $func($signal);
            } catch (SignalException) {
            }
        });
        $this->_signalListener[$signal] = $coroutine;
        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function offSignal($signal)
    {
        if (!isset($this->_signalListener[$signal])) {
            return false;
        }
        $this->_signalListener[$signal]->kill();
        unset($this->_signalListener[$signal]);
        return true;
    }

    /**
     * {@inheritdoc}
     */
    public function run()
    {
        waitAll();
    }

    /**
     * Destroy loop.
     *
     * @return void
     */
    public function stop()
    {
        Coroutine::getMain()->kill();
        Signal::kill(getmypid(), Signal::INT);
    }

    public function destroy()
    {
        $this->stop();
    }

    public function add($fd, $flag, $func, $args = [])
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                return $this->onSignal($fd, $func);
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                $method = self::EV_TIMER === $flag ? 'tick' : 'after';
                if ($method === 'tick') {
                    return $this->repeat($fd, $func, $args);
                } else {
                    return $this->delay($fd, $func, $args);
                }
            case self::EV_READ:
                return $this->onReadable($fd, $func);
            case self::EV_WRITE:
                return $this->onWritable($fd, $func);
        }
    }

    public function del($fd, $flag)
    {
        switch ($flag) {
            case self::EV_SIGNAL:
                return $this->offSignal($fd);
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                return $this->deleteTimer($fd);
            case self::EV_READ:
            case self::EV_WRITE:
                if ($flag === self::EV_READ) {
                    $this->offReadable($fd);
                } else {
                    $this->offWritable($fd);
                }
        }
    }

    public function clearAllTimer()
    {
        $this->deleteAllTimer();
    }

    public function loop()
    {
        waitAll();
    }
}

swow版本

root@be1fb440630a:/opt/www# php --ri swow

Swow

Status => enabled
Author => Swow Team
Link => https://github.com/swow/swow
Contact => Twosee <twosee@php.net>
Version => 0.3.2-dev-390a1231 ( NTS )
Built => Dec 17 2022 07:11:23
Context => boost-context
Scheduler => libuv-event
2714 5 8
5个评论

小小子

收藏了

  • 暂无评论
小小子

快速体验swow:打开https://github.com/dixyes/lwmbs/actions/workflows/linux.yml 选择最新任务,下载cli_static_8.1_musl_x86_64_xxxx,解压即可使用

没找到地方下载,笨了---

  • he426100 2022-12-27

    进入页面后在右边选一个linux test,点进去,往下翻,就有下载链接

lcmg

好像在协程里面,我使用的redis和mysql很快就断开连接了,不知道是不是扩展的问题
$timer_id = Timer::add($time_interval, function () {
Coroutine::run(static function () {
Db::table()->find();
});

});
深林孤鹰

请问这个webman业务中能用吗(非新开的workerman进程中)

liruizhe1

用了这个扩展之后,是不是workerman的count(也就是分身数量)可以设置为1了。那么也意味着windows下面也可以得到linux差不多的性能了?

  • 暂无评论

he426100

304
积分
0
获赞数
0
粉丝数
2019-10-21 加入
🔝