数据库 - Model 前置说明
官方开源项目 抓取代理IP

说明

本章案例,统一使用 定义生产者 一章文档中的TestJob生产者对象举例。
案例运行在HTTP服务中。


生产者的对象投递,依赖于\x\Queue类,下面我们来认识如何把生产者投递到不同的队列中。

普通任务

消息会被投递到waiting队列中。

namespace app\http;
use x\controller\Http;
// 引入生产者对象
use box\queue\TestJob;

class Index extends Http
{
    /**
     * @RequestMapping(route="/", method="get", title="测试队列投递")
    */
    public function index() {
        // 创建一个生产者
        $Job = new TestJob();
        // 设置投递数据
        $Job->data([
            'user_id' => rand(1, 99),
        ]);
        // 设置消费超时时间,不设置使用queue配置文件
        $Job->outTime(3);

        // 实例化队列类
        $Queue = new \x\Queue();
        // 投递任务
        $res = $Queue->push($Job);
        if ($res) {
            echo 'Success';
        } else {
            echo 'Error';
        }
    }
}

延迟任务

消息会被投递到delayed队列中。

namespace app\http;
use x\controller\Http;
// 引入生产者对象
use box\queue\TestJob;

class Index extends Http
{
    /**
     * @RequestMapping(route="/", method="get", title="测试队列投递")
    */
    public function index() {
        // 创建一个生产者
        $Job = new TestJob();
        // 设置投递数据
        $Job->data([
            'user_id' => rand(1, 99),
        ]);
        // 设置消费超时时间,不设置使用queue配置文件
        $Job->outTime(3);
        // 设置延迟投递时间
        $Job->delayTime(5);

        // 实例化队列类
        $Queue = new \x\Queue();
        // 投递任务
        $res = $Queue->push($Job);
        if ($res) {
            echo 'Success';
        } else {
            echo 'Error';
        }
    }
}

安全任务

消息会被投递到confirm队列中。

namespace app\http;
use x\controller\Http;
// 引入生产者对象
use box\queue\TestJob;

class Index extends Http
{
    /**
     * @RequestMapping(route="/", method="get", title="测试队列投递")
    */
    public function index() {
        // 创建一个生产者
        $Job = new TestJob();
        // 设置投递数据
        $Job->data([
            'user_id' => rand(1, 99),
        ]);
        // 设置消费超时时间,不设置使用queue配置文件
        $Job->outTime(3);
        // 设置最迟确认时间,从设置时开始计算
        $Job->waitTime(15);

        // 实例化队列类
        $Queue = new \x\Queue();
        // 投递任务
        $Queue->push($Job);
        // 堵塞14秒测试
        sleep(14);
        // 确认任务投递
        $res = $Queue->confirm($Job);
        if ($res) {
            echo 'Success';
        } else {
            echo 'Error';
        }
    }
}

注意:如果delayTime()waitTime()同时设置,只有waitTime()会生效。

建议:可以把上述案例中的sleep()堵塞时间进行修改调试。

投递到指定队列节点

namespace app\http;
use x\controller\Http;
// 引入生产者对象
use box\queue\TestJob;

class Index extends Http
{
    /**
     * @RequestMapping(route="/", method="get", title="测试队列投递")
    */
    public function index() {
        // 创建一个生产者
        $Job = new TestJob();
        // 指定使用哪个queue配置节点,默认使用default节点
        $Job->store('test');
        // 设置投递数据
        $Job->data([
            'user_id' => rand(1, 99),
        ]);
        // 设置消费超时时间,不设置使用queue配置文件
        $Job->outTime(3);

        // 实例化队列类
        $Queue = new \x\Queue();
        // 投递任务
        // 投递到哪个queue,也可以 new \x\Queue('test');时指定
        $res = $Queue->store('test')->push($Job);
        if ($res) {
            echo 'Success';
        } else {
            echo 'Error';
        }
    }
}

注意:JobQueue所使用的store队列配置要一致。

Job的更多参数支持

本案例,只做Job()的方法演示。

namespace app\http;
use x\controller\Http;
// 引入生产者对象
use box\queue\TestJob;

class Index extends Http
{
    /**
     * @RequestMapping(route="/", method="get", title="测试队列投递")
    */
    public function index() {
        // 创建一个生产者
        $Job = new TestJob();
        // 设置投递数据
        $Job->data([
            'user_id' => rand(1, 99),
        ]);
        // 设置消费超时时间,不设置使用queue配置文件
        $Job->outTime(3);
        // 设置延迟投递时间
        $Job->delayTime(5);
        // 设置最迟确认时间,从设置时开始计算
        $Job->waitTime(15);
        // 设置消费失败重试间隔时间,不设置使用queue配置文件
        $Job->retrySeconds([5, 60, 1800]);
    }
}

SW-X

企业级 - 高性能 PHP 框架

最后更新:2年前 . 作者-小黄牛

本篇目录