```php
require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;
$worker = new Worker();
$worker->count = 1;
$worker->onWorkerStart = function () {
dg_init('system', 'rabbitmq_data_sysnc', array());
global $location_con;
global $sting;
$string = '';
//建立连接
$location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
$location_con->onConnect = function ($connection) {
var_dump('onConnect ok');
//heartbeat($connection, 'rabbitmq_data', 0);
};
$location_con->connect();
$location_con->send('123123123123');
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/'
);
$e_name = 'S2C'; //交换机名
$q_name = 'S2C'; //队列名
$k_route = 'S2C'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();
//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键
********下面这段代码不行
$q->consume(function($envelope, $queue) use($location_con){
echo "send data to 9037";
$location_con->send($envelope->getBody());
echo $string = $envelope->getBody();
});//自动ACK应答
*************上面代码不知道怎么写
$conn->disconnect();
//连接断开
$location_con->onClose = function ($connection) {
connect_close($connection, 'start_data_sync-walkthink');
};
};
Worker::runAll();
```