PHP代码实践RabbitMQ
•
RabbitMQ
创建交换区、队列以及路由
<?php /** * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1 * 此代码不是生产者,也不是消费者 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); # 建立连接 if(!$conn->connect()){ die('connetc error'); } $channel = new AMQPChannel($conn);//创建channel(信道或者叫通道) $ExChangeName = 'goods_msm'; //交换区名称 $queueName = 'goods_worker'; //队列名称 $routeName1 = 'code1'; //路由key # 创建交换机对象 $exChange = new AMQPExchange($channel); $exChange->setName($ExChangeName); $exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复 echo "Exchange Status:".$exChange->declare()."\n"; //查看如果交换机不存在则进行创建 # 创建队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); //队列持久化 echo "Message Total:".$queue->declare()."\n"; //查看,如果不存在则创建 # 绑定交换区与队列,指定路由键 # rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列 echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由 $conn->disconnect(); //关闭连接
生产者
<?php /** * 生产者 * 生产者也就是发送方 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); # 建立连接 if(!$conn->connect()){ die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $ExChangeName = 'goods_msm'; //交换区名称 $routeName1 = 'code1'; //路由key # 创建交换机对象 $exChange = new AMQPExchange($channel); $exChange->setName($ExChangeName); # 发送消息 $exChange->publish('第一条测试消息', $routeName1); $conn->disconnect(); //关闭连接
消费者
<?php /** * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1 * 此代码不是生产者,也不是消费者 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); # 建立连接 if(!$conn->connect()){ die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $queueName = 'goods_worker'; //队列名称 # 创建队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); //队列持久化 # 接受消息 $queue->consume(function ($envelope, $queue) { $msg = $envelope->getBody(); # 处理消息 echo $msg."\n"; }, AMQP_AUTOACK); //自动应答 $conn->disconnect(); //关闭连接