分享一个基于workerman的rabbitmq客户端生产者、消费者(基于Timer实现事件驱动)

zgh419566

通过综合分析、研究和探索workerman和PhpAmqpLibr相关手册,经过长期的实践,现分享一套基于workerman的rabbitmq客户端生产者和消费者代码,供大家测试,使用。

个人觉得这套代码比workerman官方的代码逻辑更清晰,更便于使用

希望walkor采用,这样的话我也算为开源社区做了一些贡献。

rabbitmq 生产者

rabbitmq_productor.php

<?php
require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;

$worker = new Worker();
//开启进程数量
$worker->count = 4;
$worker->name = "rabbitmq_productor";

$date = date("Y-m-d");
Worker::$pidFile = "var/mq_service_productor.pid";
Worker::$logFile = "var/mq_service_productor_logFile.log";
Worker::$stdoutFile = "var/mq_service_productor_stdout.log";

$worker->onWorkerStart = function () {
    global $rabbit_connection, $rabbit_channel ,  $rabbitmq_exchange_name , $rabbitmq_queueName;

    $rabbitmq_exchange_name = "exchange_name";
    $rabbitmq_queueName = "queuePrefix_QueueName";

    // 连接 rabbitmq 服务
    $rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

    // 获取信道
    $rabbit_channel = $rabbit_connection->channel();

    //声明创建交换机
    $rabbit_channel->exchange_declare( $rabbitmq_exchange_name , 'topic', false, true, false);

    // 声明创建队列
    $rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);

    // 绑定队列
    $rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);

    //可以修改时间间隔,如果为0.002秒,则每秒产生500*4=2000条
    Timer::add( 0.002 , function() {
        global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;

        //需要向rabbitmq队列投递消息的内容,通常为数组,经过json转换再发送
        $data_all = array(
            'name' => "张三",
            'time' => time(),
        );
        $data_all_out_json = json_encode($data_all , JSON_UNESCAPED_UNICODE );
        $data_all_out_msg = new AMQPMessage($data_all_out_json, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        //向队列里面写内容
        @$rabbit_channel->basic_publish($data_all_out_msg , $rabbitmq_exchange_name , $rabbitmq_queueName);
    });
};

Worker::runAll();

rabbitmq 消费者

rabbitmq_comsumer.php

<?php

require_once ('./vendor/autoload.php');
require_once ("./Lib_global.php");

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\Connection\TcpConnection;
use Workerman\Connection\AsyncUdpConnection;
use Workerman\Connection\AsyncTcpConnection;

$worker = new Worker();
//开启进程数量
$worker->count = 10;
$worker->name = "rabbitmq_comsumer";

$date = date("Y-m-d");
Worker::$pidFile = "var/rabbitmq_comsumer.pid";
Worker::$logFile = "var/rabbitmq_comsumer_logFile.log";
Worker::$stdoutFile = "var/rabbitmq_comsumer_stdout.log";

$worker->onWorkerStart = function () {
    global $rabbit_connection, $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;

    $rabbitmq_exchange_name = "exchange_name";
    $rabbitmq_queueName = "queuePrefix_QueueName";

    // 连接 rabbitmq 服务
    $rabbit_connection = new AMQPStreamConnection(RABBITMQ_SERVER_IP, RABBITMQ_SERVER_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

    // 获取信道
    $rabbit_channel = $rabbit_connection->channel();

    // 声明队列
    $rabbit_channel->queue_declare( $rabbitmq_queueName , false, true, false, false);

    // 绑定队列
    $rabbit_channel->queue_bind($rabbitmq_queueName , $rabbitmq_exchange_name, $rabbitmq_queueName);

    // 消费者订阅队列
    $rabbit_channel->basic_consume($rabbitmq_queueName , '', false, false, false, false,
        function ($msg){
            global $rabbit_channel , $rabbitmq_exchange_name , $rabbitmq_queueName;
            $data_all_str = $msg->body;
            // 消息确认,表明已经收到这条信息
            @$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            //echo "{$data_all_str}\n";

            //这里是业务处理逻辑
            //如果这条消息处理失败,你可以在这里将其再次放回消息队列(最好给消息做个放回去的次数判断,避免无限失败和无限循环)
        });

    //这里是重点,网上很多教程起的使用while死循环容易导致程序异步代码无法执行,这种方法能避免
    //按照每个进程每秒处理1000条来设定定时器,每个进程每秒消费1000条,4个进程每秒消费4000条,经过实际验证,将时间改小也无法提升单个进程的处理速度
    //实际测试,4个进程每秒的消费能力有4000左右,可以满足很多中小型系统的应用,如果想提升系统处理能力,
    //可以增加消费者进程数量来解决,比如我将进程数量提升到10个,每秒处理能力约为1万
    //这个机制,希望能力更强的你来进行优化
    Timer::add( 0.0001 , function() {
        global $rabbit_channel;
        if( count($rabbit_channel->callbacks) > 0 ){
            $rabbit_channel->wait();
        }
    });
};

Worker::runAll();

附上我的代码运行情况

205 2 3
2个回答

walkor

感谢分享

  • 暂无评论
evilk

666,感谢分享,赞一个,有机会试用一下
另外,对于其他方面,说一下个人愚见(跟楼主这个没有太大关系)
很多时候,当遇到的确需要使用MQ的情况,中小型公司,真的不会花额外的成本去用专业的MQ,最多用redis
我知道,可能很多人会说,这样很操蛋,但事实上,有很多公司,就是这样,成本能低就低
但redis目前有一个很大的问题,目前的webman队列,是使用的普通的list结构,这个是没有ack机制的
这对于某些对消息可靠性有比较高的要求的情况,就很尴尬
我们目前使用webman的redis队列插件,都是用在对消息可靠性要求不是特别高的场景,比如通知等
我有个想法,基于redis stream 写一套类似的消息队列插件
这样就可以真正满足实际需求了
有时间一定写一个出来,相信这个可能会更加实用

  • 暂无评论
🔝