数据库 - Model 前置说明
官方开源项目 抓取代理IP

说明

队列的消费方式有很多种,主要取决不同的使用场景,一般情况下有:

  • 使用自定义进程,利用While堵塞读取队列进行循环消费,还可以配合sleeps进行毫秒级间隔堵塞。
  • 使用秒级定时任务组件,间隔读取队列进行循环消费。

以上两种方法,都可以再配合Swoole多进程提升消费速度。

自定义进程消费队列

1、创建自定义进程文件\box\process\TestJob.php

namespace box\process;
use design\AbstractProcess;

class TestJob extends AbstractProcess
{
    /**
     * 是否需要while(true) 永久堵塞
    */
    public $onWhile = true;

    /**
     * 等待间隔时间(毫秒)  0不堵塞
    */
    public $sleepS = 0;

    // 编写队列消费代码
    public function run() {
        $Queue = new \x\Queue();
        // 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
        // 默认使用default节点
        $Queue->store('default');

        // 取出一条消息
        $Job = $Queue->pop();
        if ($Job) {
            // 执行消费逻辑
            $res = $Job->run();
        }
    }
}

2、在/config/process.php配置文件中注册该自定义进程即可。

return [
    \box\process\TestJob::class,
];

计划任务消费队列

1、创建定时任务文件\box\crontab\TestJob.php

namespace box\crontab;
use x\Crontab;

class TestJob extends Crontab{

    /**
     * 编写队列消费代码
     * @todo 无
     * @author 小黄牛
     * @version v2.5.0 + 2021.07.20
     * @deprecated 暂不启用
     * @global 无
     * @return void
    */
    public function run() {
        $Queue = new \x\Queue();
        // 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
        // 默认使用default节点
        $Queue->store('default');

        // 取出一条消息
        $Job = $Queue->pop();
        if ($Job) {
            // 执行消费逻辑
            $res = $Job->run();
        }
    }
}

2、在/config/crontab.php配置文件中注册定时计划即可。

return [
    [
        'rule' => 1000, // 1秒执行一次
        'use' => '\box\crontab\TestJob',
        'status' => true,
    ]
];

MQTT / WebSocket消费案例

namespace box\process;
use design\AbstractProcess;

class TestJob extends AbstractProcess
{
    /**
     * 是否需要while(true) 永久堵塞
    */
    public $onWhile = true;

    /**
     * 等待间隔时间(毫秒)  0不堵塞
    */
    public $sleepS = 0;

    // 编写队列消费代码
    public function run() {
        $Queue = new \x\Queue();
        // 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
        // 默认使用default节点
        $Queue->store('default');

        // 取出一条消息
        $Job = $Queue->pop();
        if ($Job) {
            // 需要传入server
            $server = \x\Config::get('swoolex_context.server');
            // 执行消费逻辑
            $Job->setServer($server);
            $res = $Job->run();
        }
    }
}

SW-X

企业级 - 高性能 PHP 框架

最后更新:2年前 . 作者-小黄牛

本篇目录