数据库 - Model 前置说明
官方开源项目 抓取代理IP

介绍

RabbitMQ的消息队列常用操作,依赖\use x\RabbitMQ; x\rabbitmq\AMQPExchangePool; x\rabbitmq\Message\AMQPMessage;组件。

生产者--使用示例:生产数据

<?php
namespace app\http;
use x\controller\Http;
use x\RabbitMQ;
use x\rabbitmq\AMQPExchangePool;
use x\rabbitmq\Message\AMQPMessage;
use x\Request;

/**
 * @Controller(prefix="")
*/
class Index extends Base
{
    /**
     * @RequestMapping(route="/", method="get", title="主页")
    */
    public function index()
    {
        $exchange = 'exchange';
        $queue = 'queue_3';


        $connection = new RabbitMQ();
        $channel = $connection->channel();

        $channel->queue_declare($queue, false, true, false, false);
        $channel->exchange_declare($exchange, 'direct', false, true, false);

        $channel->queue_bind($queue, $exchange);

        for ($i =0;$i<100000;$i++)
        {
            $messageBody = date('Y-m-d H:i:s',time()).'----'.$i;
            $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
            $result = $channel->basic_publish($message, $exchange);
            var_dump($result);
        }
    }
}

消费者---使用案例:我们以简单的脚本为例 这里直接使用RabbitMQ扩展

<?php
try {
    $conn_args = array(
        'host' => '192.168.32.139',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
    );
    //构造函数   AMQPConnection
    $con = new \AMQPConnection($conn_args);
    if(!$con->connect()) {
        var_dump('连接失败1');
    }

    //先通道声明--传入连接的套接字--构造函数 通过通道连接创建消息通道
    $channel = new \AMQPChannel($con);

//交换机声明--传入声明的通道-- 构造函数 通过通道连接交换机
    $exchange = new \AMQPExchange($channel);

//设置交换机名
    $exchange->setName("RMQ_EN");//设置通道名称

//设置连接方式--直连 [直连,主题,广播]
    $exchange->setType("AMQP_EX_TYPE_DIRECT");

//消息持久化
    $exchange->setFlags("AMQP_DURABLE");

//声明
    $exchange->declareExchange();

//声明队列,绑定交换机和路由
    $queue = new AMQPQueue($channel);

//设置队列名字
    $queue->setName('queue_3');

//消息持久化
    $queue->setFlags(AMQP_DURABLE);

//声明
    $queue->declareQueue();

//绑定获取数据 参数一:交换机名  参数二:路由
    $queue->bind('exchange',"RMQ_RKN");

//消费,没有数据时,阻塞监听获取数据
    $queue->consume(function($event,$queue){
        $body = $event->getBody();
        var_dump($body);

        $queue->ack($event->getDeliveryTag());
    });

} catch (Exception $e) {
    var_dump('连接失败2');
}

}

SW-X

企业级 - 高性能 PHP 框架

最后更新:3年前 . 作者-小黄牛

本篇目录