数据库 - Model 前置说明
官方开源项目 抓取代理IP

说明

所有生产者都必须继承\x\queue\Job基类。

自定义生产者类不强制存储目录,但建议统一存放在/box/queue/目录下便于管理。

由于框架会把 生产者对象 直接序列化存储在 entity队列中,所以为了保证序列化后的体积,尽量不要将一些大的数据体赋值在生产者队列的成员属性中。

自定义生产者类必须实现public function handle();方法,该方法则为消费逻辑。

若自定义生产者类定义了__construct()构造方法,则必须先执行parent::__construct(),否则无法正常使用该生产者对象。

创建一个生产者对象

下面,我们来创建一个TestJob生产者对象,存放地址为/box/queue/TestJob.php

namespace box\queue;
use x\queue\Job;

// 继承至Job基类
class TestJob extends Job
{
    // 大多情况下都不会用到
    public function __construct() {
        // 如果你有定义构造方法数,则必须先执行父类的构造方法
        parent::__construct();
    }

    /**
     * 生产者必须自定义消费逻辑
     * @todo 无
     * @author 小黄牛
     * @version v2.5.9 + 2021-11-04
     * @deprecated 暂不启用
     * @global 无
     * @return void
    */
    public function handle() {
        // 可以通过该方法,获得投递的数据集
        $param = $this->param();
        
        // 消费成功需要返回true
        // 若返回false,则会进入重试逻辑
        return true;
    }
}

MQTT的生产消费

namespace box\queue;
use x\queue\Job;

// 继承至Job基类
class TestJob extends Job
{
    // 大多情况下都不会用到
    public function __construct() {
        // 如果你有定义构造方法数,则必须先执行父类的构造方法
        parent::__construct();
    }

    /**
     * 生产者必须自定义消费逻辑
     * @todo 无
     * @author 小黄牛
     * @version v2.5.9 + 2021-11-04
     * @deprecated 暂不启用
     * @global 无
     * @return void
    */
    public function handle() {
        // 可以通过该方法,获得投递的数据集
        $param = $this->param();
        
        // 读取server
        $server = $this->getServer();
        $mqtt = new \x\controller\Mqtt();
        $mqtt->setServer($server);
        // 查询出设备信息
        $info = $mqtt->find($param['client_id']);
        
        // 需要推送的内容
        $data = [
                'type' => \x\mqtt\common\Types::PUBACK,
                'topic' => $info['list'][0]['topic'],
                'message' => $param['message'],
                'dup' => $info['list'][0]['dup'] ?? 0,
                'qos' => $info['list'][0]['qos'],
                'retain' => $info['list'][0]['retain'] ?? 0,
                'message_id' => $info['list'][0]['message_id'] ?? '',
        ];
        
        // 推送
        $mqtt->send($info['fd'], $data);
        
        // 消费成功需要返回true
        // 若返回false,则会进入重试逻辑
        return true;
    }
}

WebSocket的生产消费

namespace box\queue;
use x\queue\Job;

// 继承至Job基类
class TestJob extends Job
{
    // 大多情况下都不会用到
    public function __construct() {
        // 如果你有定义构造方法数,则必须先执行父类的构造方法
        parent::__construct();
    }

    /**
     * 生产者必须自定义消费逻辑
     * @todo 无
     * @author 小黄牛
     * @version v2.5.9 + 2021-11-04
     * @deprecated 暂不启用
     * @global 无
     * @return void
    */
    public function handle() {
        // 可以通过该方法,获得投递的数据集
        $param = $this->param();
        
        // 读取消费时传入的server实例
        $server = $this->getServer();
        // 推送
        $obj = new \x\controller\WebSocket();
        $obj->fetch('limit_error', 'error', $param['msg'], $param['fd'], $server);
        
        // 消费成功需要返回true
        // 若返回false,则会进入重试逻辑
        return true;
    }
}

SW-X

企业级 - 高性能 PHP 框架

最后更新:2年前 . 作者-小黄牛

本篇目录