RabbitMQ的消息队列常用操作,依赖\use x\RabbitMQ;
x\rabbitmq\AMQPExchangePool;
x\rabbitmq\Message\AMQPMessage;
组件。
生产者--使用示例:生产数据
<?php
namespace app\http;
use x\controller\Http;
use x\RabbitMQ;
use x\rabbitmq\AMQPExchangePool;
use x\rabbitmq\Message\AMQPMessage;
use x\Request;
/**
* @Controller(prefix="")
*/
class Index extends Base
{
/**
* @RequestMapping(route="/", method="get", title="主页")
*/
public function index()
{
$exchange = 'exchange';
$queue = 'queue_3';
$connection = new RabbitMQ();
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, 'direct', false, true, false);
$channel->queue_bind($queue, $exchange);
for ($i =0;$i<100000;$i++)
{
$messageBody = date('Y-m-d H:i:s',time()).'----'.$i;
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$result = $channel->basic_publish($message, $exchange);
var_dump($result);
}
}
}
消费者---使用案例:我们以简单的脚本为例 这里直接使用RabbitMQ扩展
<?php
try {
$conn_args = array(
'host' => '192.168.32.139',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
//构造函数 AMQPConnection
$con = new \AMQPConnection($conn_args);
if(!$con->connect()) {
var_dump('连接失败1');
}
//先通道声明--传入连接的套接字--构造函数 通过通道连接创建消息通道
$channel = new \AMQPChannel($con);
//交换机声明--传入声明的通道-- 构造函数 通过通道连接交换机
$exchange = new \AMQPExchange($channel);
//设置交换机名
$exchange->setName("RMQ_EN");//设置通道名称
//设置连接方式--直连 [直连,主题,广播]
$exchange->setType("AMQP_EX_TYPE_DIRECT");
//消息持久化
$exchange->setFlags("AMQP_DURABLE");
//声明
$exchange->declareExchange();
//声明队列,绑定交换机和路由
$queue = new AMQPQueue($channel);
//设置队列名字
$queue->setName('queue_3');
//消息持久化
$queue->setFlags(AMQP_DURABLE);
//声明
$queue->declareQueue();
//绑定获取数据 参数一:交换机名 参数二:路由
$queue->bind('exchange',"RMQ_RKN");
//消费,没有数据时,阻塞监听获取数据
$queue->consume(function($event,$queue){
$body = $event->getBody();
var_dump($body);
$queue->ack($event->getDeliveryTag());
});
} catch (Exception $e) {
var_dump('连接失败2');
}
}