基于订阅发布的PHP websocket推送服务源码

six

发一个基于webman自定义进程+websocket的推送功能,自带心跳,自带断线重连。消息通过订阅发布机制,使用起来非常简单方便,客户端与客户端直接可以直接发消息。可用于单聊、群聊、app消息推送、网页即使推送等。

1、新建process\WebSocketPush.php

<?php

namespace process;

use Workerman\Connection\TcpConnection;
use Workerman\Timer;

class WebsocketPush
{
    /**
     * 所有websocket客户端
     * @var array
     */
    protected $connections = [];

    // Websocket连接上来时
    public function onWebSocketConnect(TcpConnection $connection)
    {
        $connection->timer = Timer::add(25, function () use ($connection) {
            $connection->send(json_encode(['type'=>'ping']));
        });
    }

    // WebSocket发来消息时
    public function onMessage(TcpConnection $connection, $data)
    {
        $data = json_decode($data, true);
        $type = $data['type'] ?? null;
        switch ($type) {
            // 心跳
            case 'ping':
                return;
            // 订阅
            case 'subscribe':
                $this->subscribe($connection, $data['subject']);
                return;
            // 发布
            case 'publish':
                $this->publish($data['subject'], $data['data'] ?? null);
                return;
        }
    }

    //  WebSocket连接关闭时
    public function onClose(TcpConnection $connection)
    {
        if (isset($connection->timer)) {
            Timer::del($connection->timer);
        }
        foreach ($connection->subject??[] as $subject) {
            unset($this->connections[$subject][$connection->id]);
        }
    }

    // 订阅
    protected function subscribe(TcpConnection $connection, string $subject)
    {
        $this->connections[$subject][$connection->id] = $connection;
        if (empty($connection->subject)) {
            $connection->subject = [];
        }
        $connection->subject[$subject] = $subject;
    }

    // 发布
    protected function publish(string $subject, $data)
    {
        $connections = $this->connections[$subject] ?? [];
        foreach ($connections as $connection) {
            $connection->send(json_encode(['type' => 'publish', 'subject' => $subject, 'data' => $data]));
        }
    }

}

2、confit/process.php 里加入配置

return [
    // ...其它配置...

    'websocket-push' => [
        'handler' => process\WebSocketPush::class,
        'listen' => 'websocket://0.0.0.0:5050',
        'count => 1, // count必须为1
    ]
];

3、重启webman

php start.php restart

4、前端代码

var ws;
// 连接
function connect()
{
    ws && ws.close();
    ws = new WebSocket('ws://127.0.0.1:5050');
    ws.onopen = function () {
        // 订阅
        ws.send(JSON.stringify({
            type: 'subscribe',
            subject: 'user-1'
        }));
        // 发布
        ws.send(JSON.stringify({
            type: 'publish',
            subject: 'user-1',
            data: 'hello'
        }));
    };
    // 断线重连
    ws.onclose = function () {
        setTimeout(function () {
            connect();
        }, 1000);
    };
    // 收到消息
    ws.onmessage = function (res) {
        var data = JSON.parse(res.data);
        switch (data.type) {
            // 心跳消息
            case 'ping':
                return;
            // 收到消息
            case 'publish':
                console.log(res.data);
        }
    };
}
// 定时心跳
setTimeout(function () {
    ws.readyState === 1 && ws.send(JSON.stringify({
        type: 'ping'
    }));
}, 25000);
// 发起连接
connect();

收到打印

{"type":"publish","subject":"user-1","data":"hello"}

说明

订阅

客户端对哪些消息感兴趣,使用

ws.send(JSON.stringify({
    type: 'subscribe',
    subject: 'user-1'
}));

订阅,例如当前用于对于自己的消息感兴趣,则订阅subject=user-1,对于群id为20的消息感兴趣,则可订阅例如subject=group-20。subject数据格式自己定,发布与订阅一致就行

发布

客户端要对哪个主题发布数据,例如向群20发布消息

ws.send(JSON.stringify({
    type: 'publish',
    subject: 'group-20',
    data: 'hello'
}));
855 1 1
1个评论

manman10

订阅类似群聊吗

six

17787
积分
0
获赞数
0
粉丝数
2016-11-14 加入
🔝