1、创建自定义进程文件\box\process\TestJob.php
:
异步模式:
namespace box\process;
use design\AbstractProcess;
use Swoole\Process;
class TestJob extends AbstractProcess
{
/**
* 是否需要while(true) 永久堵塞
*/
public $onWhile = true;
/**
* 等待间隔时间(毫秒) 0不堵塞
*/
public $sleepS = 0;
// 编写队列消费代码
public function run() {
// 队列
$Queue = new \x\Queue();
// 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
// 默认使用default节点
$Queue->store('default');
// 进程实例
$pros = [];
// 进程数量
$process_max = 10;
for ($i = 0; $i<$process_max; $i++) {
//创建子进程
$process = new Process(function($worker) use($Queue) {
// 可以拿到父进程发过来的数据
$i = $worker->read();
// 取出一条消息
$Job = $Queue->pop();
if ($Job) {
// 执行消费逻辑
$res = $Job->run();
// 抛出结果
$worker->write('1');
} else {
$worker->write('0');
}
}, true);
$process->start();
//通过管道发送数据到子进程
$process->write($i);//管道是单向的发出的 数据必须由另一端读取。
//先不获取子进程返回值等循环结束后统一返回
$pros[] = $process;
}
// 统一获取子进程数据
foreach($pros as $item){
$res = $item->read();
}
// 子进程结束必须执行wait进行回收,否则子进程会变成僵尸进程。
while($ret = Process::wait()){
// 进程ID
$pid = $ret["pid"];
}
}
}
协程模式:
<?php
namespace box\process;
use design\AbstractProcess;
use Swoole\Process;
class TestJob extends AbstractProcess
{
/**
* 是否需要while(true) 永久堵塞
*/
public $onWhile = true;
/**
* 等待间隔时间(毫秒) 0不堵塞
*/
public $sleepS = 0;
// 编写队列消费代码
public function run() {
// 队列
$Queue = new \x\Queue();
// 注意,如果在投递时,Job使用了store()方法切换节点投递,那取出时也需要切换对应的节点进行取出
// 默认使用default节点
$Queue->store('default');
// 进程实例
$channel = new \Swoole\Coroutine\Channel;
// 进程数量
$process_max = 10;
for ($i = 0; $i<$process_max; $i++) {
//创建子进程
go(function () use ($channel, $Queue){
// 取出一条消息
$Job = $Queue->pop();
if ($Job) {
// 执行消费逻辑
$res = $Job->run();
// 抛出结果
$channel->push('1');
} else {
$channel->push('0');
}
});
}
// 获取结果
for($i = 0; $i < $process_max; ++$i) {
var_dum($channel->pop());
}
// 关闭管道
$channel->close();
}
}
2、在/config/process.php
配置文件中注册该自定义进程即可。
return [
\box\process\TestJob::class,
];