数据库 - Model 前置说明
官方开源项目 抓取代理IP

多进程消费队列

1、创建自定义进程文件\box\process\TestJob.php

异步模式:

namespace box\process;
use design\AbstractProcess;
use Swoole\Process;

class TestJob extends AbstractProcess
{
    /**
     * 是否需要while(true) 永久堵塞
    */
    public $onWhile = true;

    /**
     * 等待间隔时间(毫秒)  0不堵塞
    */
    public $sleepS = 0;

    // 编写队列消费代码
    public function run() {
        // 队列
        $Queue = new \x\Queue();
        // 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
        // 默认使用default节点
        $Queue->store('default');

        // 进程实例
        $pros = [];
        // 进程数量
        $process_max = 10;
        for ($i = 0; $i<$process_max; $i++) {
            //创建子进程
            $process = new Process(function($worker) use($Queue) {
                // 可以拿到父进程发过来的数据
                $i = $worker->read();
                // 取出一条消息
                $Job = $Queue->pop();
                if ($Job) {
                    // 执行消费逻辑
                    $res = $Job->run();
                    // 抛出结果
                    $worker->write('1');
                } else {
                    $worker->write('0');
                }
            }, true);
            $process->start();
            //通过管道发送数据到子进程
            $process->write($i);//管道是单向的发出的 数据必须由另一端读取。
            //先不获取子进程返回值等循环结束后统一返回
            $pros[] = $process;
        }
        // 统一获取子进程数据
        foreach($pros as $item){
            $res = $item->read();
        }
        // 子进程结束必须执行wait进行回收,否则子进程会变成僵尸进程。
        while($ret = Process::wait()){
            // 进程ID
            $pid = $ret["pid"];
        }
    }
}

协程模式:

<?php
namespace box\process;
use design\AbstractProcess;
use Swoole\Process;

class TestJob extends AbstractProcess
{
    /**
     * 是否需要while(true) 永久堵塞
    */
    public $onWhile = true;

    /**
     * 等待间隔时间(毫秒)  0不堵塞
    */
    public $sleepS = 0;

    // 编写队列消费代码
    public function run() {
        // 队列
        $Queue = new \x\Queue();
        // 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
        // 默认使用default节点
        $Queue->store('default');

        // 进程实例
        $channel = new \Swoole\Coroutine\Channel;

        // 进程数量
        $process_max = 10;
        for ($i = 0; $i<$process_max; $i++) {
            //创建子进程
            go(function () use ($channel, $Queue){
                // 取出一条消息
                $Job = $Queue->pop();
                if ($Job) {
                    // 执行消费逻辑
                    $res = $Job->run();
                    // 抛出结果
                    $channel->push('1');
                } else {
                    $channel->push('0');
                }
            });
        }
        // 获取结果
        for($i = 0; $i < $process_max; ++$i) {
            var_dum($channel->pop());
        }
        // 关闭管道
        $channel->close();
    }
}

2、在/config/process.php配置文件中注册该自定义进程即可。

return [
    \box\process\TestJob::class,
];

SW-X

企业级 - 高性能 PHP 框架

最后更新:3年前 . 作者-小黄牛

本篇目录