PHP代码实践RabbitMQ

创建交换区、队列以及路由

<?php
/**
 * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1
 * 此代码不是生产者,也不是消费者
**/
$conn = new AMQPConnection([
    'host'   => '127.0.0.1',
    'vhost' => '/',
    'port'   => 5672,
    'login'  => 'guest',
    'password' => 'guest'
]);

# 建立连接
if(!$conn->connect()){
    die('connetc error');
}
$channel = new AMQPChannel($conn);//创建channel(信道或者叫通道)
$ExChangeName = 'goods_msm'; //交换区名称
$queueName = 'goods_worker'; //队列名称
$routeName1 = 'code1'; //路由key

# 创建交换机对象
$exChange = new AMQPExchange($channel);
$exChange->setName($ExChangeName);
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
echo "Exchange Status:".$exChange->declare()."\n"; //查看如果交换机不存在则进行创建

# 创建队列    
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); //队列持久化
echo "Message Total:".$queue->declare()."\n"; //查看,如果不存在则创建

# 绑定交换区与队列,指定路由键 
# rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由

$conn->disconnect(); //关闭连接

生产者

<?php
/**
 * 生产者
 * 生产者也就是发送方
**/
$conn = new AMQPConnection([
    'host'   => '127.0.0.1',
    'vhost' => '/',
    'port'   => 5672,
    'login'  => 'guest',
    'password' => 'guest'
]);
# 建立连接
if(!$conn->connect()){
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)
$ExChangeName = 'goods_msm'; //交换区名称
$routeName1 = 'code1'; //路由key

# 创建交换机对象   
$exChange = new AMQPExchange($channel);    
$exChange->setName($ExChangeName); 

# 发送消息
$exChange->publish('第一条测试消息', $routeName1);

$conn->disconnect(); //关闭连接

消费者

<?php
/**
 * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1
 * 此代码不是生产者,也不是消费者
**/
$conn = new AMQPConnection([
    'host'  => '127.0.0.1',
    'vhost' => '/',
    'port'   => 5672,
    'login'  => 'guest',
    'password' => 'guest'
]);
# 建立连接
if(!$conn->connect()){
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)
$queueName = 'goods_worker'; //队列名称

# 创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化 

# 接受消息
$queue->consume(function ($envelope, $queue) {
    $msg = $envelope->getBody();
    # 处理消息
    echo $msg."\n";
}, AMQP_AUTOACK); //自动应答

$conn->disconnect(); //关闭连接