如何和指定的子进程通讯?

dignfei

问题描述

开启10个进程,每个进程执行不同的任务1、2、3、……10,然后主线程和子进程通讯分别交互不同的信息,怎么实现?

为此你搜索到了哪些方案及不适用的原因

默认的方案是,有消息时,会随机选一个进程接收信息,而我需要和指定的进程通信。

子进程A:--> 运行10秒计算任务 --> 和主进程通信 --> 运行10秒计算任务 --> 和主进程通信 -->……
子进程B:--> 运行10秒计算任务 --> 和主进程通信 --> 运行10秒计算任务 --> 和主进程通信 -->……
子进程C:--> 运行10秒计算任务 --> 和主进程通信 --> 运行10秒计算任务 --> 和主进程通信 -->……

主进程和需要先和A通信,然后和B,然后和C,……有先后顺序

592 9 0
9个回答

efnic

你这种工作逻辑类似于流水线;
可以开N个消费者,通过队列来实现你要的功能;
工作流:1队列 --->>> 2队列 --->>> 3队列 --->>> 4队列 --->>> 5队列 --->>> 6队列

  • dignfei 2023-09-03

    没这么简单。
    每个消费者都需要运行10秒钟cpu繁重的任务,然后通信,然后继续运行繁重任务,然后继续通讯,循环
    然后不同的消费者通信又有先后顺序,怎么实现?

efnic

以TaskProcess 类为例,这个类只做2个事情

  1. 构造方法传入任务类型或者处理的队列名
  2. onWorkerStart运行后,添加每秒执行的定时器,去处理当前任务队列;

按你的流水线,在进程配置内新增进程配置(一定要设置构造函数 参数)
完毕。

  • dignfei 2023-09-03

    需要进程间通信啊,每个任务运行的结果,需要传入给后面的任务

  • efnic 2023-09-03

    借助redis通信,明白不?

efnic

进程配置

'pipeline_1' => [
    'count' => 1,
    'handler' => \process\TaskProcess::class,
    'constructor' => [
        'type' => '任务类型1',
        'queue' => '队列1'
    ],
],
'pipeline_2' => [
    'count' => 5,
    // 根据任务繁重情况设置进程数
    'handler' => \process\TaskProcess::class,
    'constructor' => [
        'type' => '任务类型2',
        'queue' => '队列2'
    ],
],
'pipeline_3' => [
    'count' => 3,
    // 根据任务繁重情况设置进程数
    'handler' => \process\TaskProcess::class,
    'constructor' => [
        'type' => '任务类型3',
        'queue' => '队列3'
    ],
],
  • 暂无评论
efnic
<?php

namespace process;

use support\Redis;
use Workerman\Timer;

class TaskProcess
{
    /**
     * @var string
     */
    protected string $type;
    /**
     * @var string
     */
    protected string $queue;

    /**
     * 构造函数
     * @param string $type
     * @param string $queue
     */
    public function __construct(string $type, string $queue)
    {
        $this->type = $type;
        $this->queue = $queue;
    }

    /**
     * 根据类型,得到下一个工作流队列名称
     * @return string
     */
    public function getNextQueue(): string
    {
        //todo...
        switch ($this->type) {
            case '':
                return '';
            default:
                return '';
        }
    }

    /**
     * 进程启动时执行
     * @return void
     */
    public function onWorkerStart(): void
    {
        Timer::add(1, function () {
            //当前工作流队列内有任务
            if (Redis::lLen($this->queue)) {
                $data = Redis::lPop($this->queue);
                //todo... 完成当前流程,得到结果
                $result = call_user_func([$this, $this->type], $data);
                //todo... 投递到下个工作流队列
                Redis::rPush($this->getNextQueue(), $result);
            }
        });
    }

    /**
     * step1
     * @param $data
     * @return void
     */
    protected function step1($data): void
    {
        //todo... 业务逻辑
    }

    /**
     * step2
     * @param $data
     * @return void
     */
    protected function step2($data): void
    {
        //todo... 业务逻辑
    }

    /**
     * step3
     * @param $data
     * @return void
     */
    protected function step3($data): void
    {
        //todo... 业务逻辑
    }

    /**
     * step4
     * @param $data
     * @return void
     */
    protected function step4($data): void
    {
        //todo... 业务逻辑
    }
}
  • dignfei 2023-09-03

    $data = Redis::lPop($this->queue);并不会阻塞啊,要等待数据过来,到了就立马执行

  • dignfei 2023-09-03

    而且用redis获取数据时多个进程间会相互争夺数据,而需求是进程1 的数据传给进程2,进程2 的传给进程3……,有顺序的

  • dignfei 2023-09-03

    最好是进程通信,指定发给某个进程,就没问题了

  • efnic 2023-09-03

    回复1:Redis::lPop($this->queue);是定时器拉起执行的,执行处理任务的过程中就是阻塞的。
    回复2:redis获取时多进程会相互争夺数据,纳尼?实例化进程对象的时候,队列名都不一样,争夺屁的数据??
    回复3:核心逻辑是各个任务进程并行处理任务;用redis的列表(队列)实现了n个任务进程通信,并行处理任务,不理解吗?(实例化的时候,每个进程监听的队列名字不是同一个,理解不了吗)

  • dignfei 2023-09-04

    理解,意思就是进程间通信很垃圾,得靠redis

  • dignfei 2023-09-04

    理解。
    回复1:靠定时器拉起,中间会有浪费的时间,64个进程一轮下来就浪费64秒,有没有实时响应的办法?

  • efnic 2023-09-04

    while(1)

  • efnic 2023-09-04

    进程间通信,不垃圾;如果你理解进程间通信,就不会在这问;
    我只是抛砖引玉,提供一种解决你问题的思路。

  • efnic 2023-09-04

    webman官方,有成熟的进程间通信组件可以直接用;
    https://www.workerman.net/plugin/18

  • dignfei 2023-09-04

    我已经试了,经常丢包报错,还是得redis

  • dignfei 2023-09-04

    while(1)浪费cpu不行

  • efnic 2023-09-04

    \usleep(10000);

  • efnic 2023-09-04

    丢包报错的原因是你执行过程中是堵塞的,进程通信组件发过去,也收不到;
    所以,while(1){
    usleep(10000);
    //todo... 从当前队列内拿上个结果,处理后丢进下个进程队列。。。
    }

dignfei

需要阻塞,数据来了立马执行立马传给下一个进程,分散到多个进程的目的就是为了最大化减少运行时间。

子进程A:--> 运行10秒计算任务 --> 等待数据1,收到后马上处理数据然后返回数据2 --> 运行10秒计算任务 --> 和主进程通信 -->……
子进程B:--> 运行10秒计算任务 --> 等待数据2,处理成数据3返回 --> 运行10秒计算任务 --> 和主进程通信 -->……
子进程C:--> 运行10秒计算任务 --> 等待数据3,处理成数据4返回 --> 运行10秒计算任务 --> 和主进程通信 -->……

最终的目的是要分散到64个进程同时计算

  • efnic 2023-09-03

    A进程(pipeline_1进程)等待队列1的任务,定时器每秒检查【队列1】?有则执行,结果传递到【队列2】
    B进程(pipeline_2进程)等待队列2的任务,定时器每秒检查【队列2】?有则执行,结果传递到【队列3】
    C进程(pipeline_3进程)等待队列3的任务,定时器每秒检查【队列3】?有则执行,结果传递到【队列4】
    ……
    X进程(pipeline_X进程)收到结果,入库 或者 进行下一轮的调度;
    进程间通信用Redis的列表,这么难理解吗?

  • efnic 2023-09-03

    不要通过共享内存来通信,而应该通过通信来共享内存

nitron

为什么非要强行用拖拉机干挂车的事?

admin

主进程,子进程。
你应该是没看完手册,
webman,workerman 你知道主进程是干嘛用的吗,
你开的每一个进程都是平级的,workerman,webman的主进程用来

for ($process_sub>= 你设置的进程数){
创建一个子进程
}

workerman,webman的主进程主要是用来supervise用的,你还想主进程做啥呢。

你要是希望一个一个顺序处理,应该用到队列

  • dignfei 2023-09-04

    选一个workid作为主进程就行了

chaz6chez

你这样的想法,实际上是线程模型的思想,线程模型的思想主要是让主线程分配,子线程执行,主线程回收数据;
进程模型一般不会有这样的思路,通常来说不会通过主进程来进行任务的分配,而是通过一些共同特征,让子进程自己通过特征来获取自己应该获取的任务,而主进程只负责一件事情,就是子进程的正常运行和整体服务的正常运行;
如果你想要强行实现主进程分配任务让固定子进程进行处理,我建议每一个子进程都与主进程分别建立一条专属的channel,然后通过发布订阅的方式进行业务处理

  • chaz6chez 2023-09-04

    进程很难在同一时间并行处理,线程也如此,因为一台服务器上运行的进程有很多,每个进程都会有自己主动和被动出让cpu执行时间的间歇,同时并行的数量和cpu数量是相同的;
    进程间通讯如果使用socket,那么就存在数据需要在用户态和内核态进行拷贝操作,其实效率没有那么高;
    如果在共享内存中,操作会被加锁,可能会存在互斥;

    把各个业务拆分成A\B\C\D来分别执行,在理论上可能效率很高,但在实际的操作过程中存在上述甚至更多的影响效率的点,所以还不如将业务放在一个进程中执行,比如假设一个延迟并不高的业务,通过拆分到不同进程执行,最后的结果只可能比单进程慢,而不会快;

    假设存在慢业务,完全可以做成生产消费模式,将存在大量ip的或者阻塞时间较长的交给消费队列来执行

小W

我觉得你这个需要用go channel实现很简单

🔝