所有生产者都必须继承\x\queue\Job
基类。
自定义生产者类不强制存储目录,但建议统一存放在/box/queue/
目录下便于管理。
由于框架会把 生产者对象 直接序列化存储在 entity
队列中,所以为了保证序列化后的体积,尽量不要将一些大的数据体赋值在生产者队列的成员属性中。
自定义生产者类必须实现public function handle();
方法,该方法则为消费逻辑。
若自定义生产者类定义了__construct()
构造方法,则必须先执行parent::__construct()
,否则无法正常使用该生产者对象。
下面,我们来创建一个TestJob
生产者对象,存放地址为/box/queue/TestJob.php
:
namespace box\queue;
use x\queue\Job;
// 继承至Job基类
class TestJob extends Job
{
// 大多情况下都不会用到
public function __construct() {
// 如果你有定义构造方法数,则必须先执行父类的构造方法
parent::__construct();
}
/**
* 生产者必须自定义消费逻辑
* @todo 无
* @author 小黄牛
* @version v2.5.9 + 2021-11-04
* @deprecated 暂不启用
* @global 无
* @return void
*/
public function handle() {
// 可以通过该方法,获得投递的数据集
$param = $this->param();
// 消费成功需要返回true
// 若返回false,则会进入重试逻辑
return true;
}
}
namespace box\queue;
use x\queue\Job;
// 继承至Job基类
class TestJob extends Job
{
// 大多情况下都不会用到
public function __construct() {
// 如果你有定义构造方法数,则必须先执行父类的构造方法
parent::__construct();
}
/**
* 生产者必须自定义消费逻辑
* @todo 无
* @author 小黄牛
* @version v2.5.9 + 2021-11-04
* @deprecated 暂不启用
* @global 无
* @return void
*/
public function handle() {
// 可以通过该方法,获得投递的数据集
$param = $this->param();
// 读取server
$server = $this->getServer();
$mqtt = new \x\controller\Mqtt();
$mqtt->setServer($server);
// 查询出设备信息
$info = $mqtt->find($param['client_id']);
// 需要推送的内容
$data = [
'type' => \x\mqtt\common\Types::PUBACK,
'topic' => $info['list'][0]['topic'],
'message' => $param['message'],
'dup' => $info['list'][0]['dup'] ?? 0,
'qos' => $info['list'][0]['qos'],
'retain' => $info['list'][0]['retain'] ?? 0,
'message_id' => $info['list'][0]['message_id'] ?? '',
];
// 推送
$mqtt->send($info['fd'], $data);
// 消费成功需要返回true
// 若返回false,则会进入重试逻辑
return true;
}
}
namespace box\queue;
use x\queue\Job;
// 继承至Job基类
class TestJob extends Job
{
// 大多情况下都不会用到
public function __construct() {
// 如果你有定义构造方法数,则必须先执行父类的构造方法
parent::__construct();
}
/**
* 生产者必须自定义消费逻辑
* @todo 无
* @author 小黄牛
* @version v2.5.9 + 2021-11-04
* @deprecated 暂不启用
* @global 无
* @return void
*/
public function handle() {
// 可以通过该方法,获得投递的数据集
$param = $this->param();
// 读取消费时传入的server实例
$server = $this->getServer();
// 推送
$obj = new \x\controller\WebSocket();
$obj->fetch('limit_error', 'error', $param['msg'], $param['fd'], $server);
// 消费成功需要返回true
// 若返回false,则会进入重试逻辑
return true;
}
}