场景
需要对N多个队列进行数据处理,每个队列一次处理一个数据耗时较长。要尽可能短的时间里处理一个队列数据需要开M多个进程消耗一个队列数据。N个队列需要对应 N * M 个进程处理,但是还需要有一个限流的功能,要求单个队列每分钟最多处理100数据。
思路
使用 Swoole 的多进程 + 进程池的方案进行进程调度处理。
队列1 <----> 进程A(建立进程池) |-- 子进程A1 |-- 子进程A2 |-- 子进程 ...
队列2 <----> 进程B(建立进程池) |-- 子进程B1 |-- 子进程B2
涉及到的知识点:
- 限流,使用模拟令牌桶算法
- 利用 Swoole 进程池自动管理工作进程,柔性重启
- 利用 Redis 的 bRPop 阻塞队列
- 进程的优雅重启、退出,通过监听系统信号
实操
1.限流实现
使用 redis 模拟实现了每分钟限流:
<?php
// [queueName => workCofing] 可作为整体配置,统一引入
$redisQueue = [
'queue1'=>['workerNum'=>2, 'rateMin'=>100,],
'retry_queue1'=>['workerNum'=>1, 'rateMin'=>1,],
];
$curMin = date('YmdHi');
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
foreach ($redisQueue as $k=>$v){
$_key = 'rateMin_' . $k . '_' . $curMin;
$fillList = range(1, $v['rateMin']);
$redis->lpush($_key, ...$fillList);
$redis->expire($_key, 60);
}
2.多进程+进程池
利用Swoole进程池自动管理工作进程
创建运行主文件,如:multiSwoolePool.php
<?php
// [queueName => workCofing] 可作为整体配置,统一引入
// retry 前缀为重试队列
// workNum 每个队列对应的工作进程数量
// rateMin 每个队列的消耗速率 单位 分
$config = [
'queue1'=>['workerNum'=>2, 'rateMin'=>100,],
'retry_queue1'=>['workerNum'=>1, 'rateMin'=>1,],
];
$queueLen = count($config);
if(empty($config)) exit('empty queue');
use Swoole\Process;
// 用于保存子进程pid,方便后续进程重启和停止
$processPidFile = 'process_pid.txt';
file_put_contents($processPidFile, '');
foreach ($config as $configQueueName=>$configParams){
// 启动多个子进程
$process = new Process(function () use ($configQueueName, $configParams, $processPidFile) {
echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . " start {$configQueueName}" . PHP_EOL;
file_put_contents($processPidFile, getmypid() . PHP_EOL, FILE_APPEND);
// 子进程启动进程池
$workerNum = $configParams['workerNum'];
$pool = new Swoole\Process\Pool($workerNum);
// $pool->set(['enable_coroutine' => true]); // 是否开启协程
$pool->on("WorkerStart", function ($pool, $workerId) use ($configQueueName, $configParams) {
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} is started\n";
try {
// 使用 include 便于业务重载
include "logic.php";
} catch (\Exception $e){
// 记录错误日志
echo $e->getMessage() . PHP_EOL;
exit; // 让出错进程主动退出,进程池会自动拉起新进程(由业务侧保证数据)
}
});
$pool->on("WorkerStop", function ($pool, $workerId) {
echo date('Y-m-d H:i:s') . " Worker #{$workerId} is stopped\n";
});
$pool->start();
echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . ' exit' . PHP_EOL;
});
$process->start();
}
// 监听多进程
for ($i=0; $i<$queueLen; $i++) {
$status = Process::wait(true);
echo date('Y-m-d H:i:s') . " Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}" . PHP_EOL;
}
echo date('Y-m-d H:i:s') . ' Parent #' . getmypid() . ' exit' . PHP_EOL;
// Swoole\Process::daemon();
3.业务处理
进程的优雅重启、退出,通过监听系统信号,文件 logic.php
<?php
/**
* 进程内传入参数
* $pool, $workerId
* $configQueueName, $configParams
*
*/
$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);
$logic = new MyBusiness();
// 安装退出进程信号,实现优雅退出
$running = true;
pcntl_signal(SIGTERM, function () use (&$running, $pool, $workerId, $redis) {
$running = false;
echo date('Y-m-d H:i:s') . " Worker #{$pool->getProcess($workerId)->pid}-{$workerId} TERM TERM TERM !!!\n";
// 关闭 redis、mysql 连接等
$redis->close();
exit;
});
while ($running) {
// 响应退出信号防止逻辑丢失
pcntl_signal_dispatch();
$result = $redis->bRPop($configQueueName, 10); // 10s 超时
if ( $result == null) continue;
[$_queueName, $_params] = $result;
// step 0 队列调度,防止业务逻辑耗时进程异常导致数据丢失
if (!preg_match('/^retry/', $configQueueName)) $redis->lPush($configQueueName, $_params);
// step 1 检查限流/分钟(模拟令牌桶算法)
$rateMinKey = 'rateMin_' . $configQueueName . '_' . date('YmdHi');
$rateValue = $redis->rPop($rateMinKey);
if(empty($rateValue)) {
// 限流时等待下一次任务
$redis->rPush($configQueueName, $_params);
$redis->lPop($configQueueName);
// 限流时: redis 操作结束也可响应退出信号
pcntl_signal_dispatch();
$rand = mt_rand(2, 10);
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName}:RateLimit will sleep {$rand}s\n";
sleep($rand);
continue;
}
// step 2 处理业务
echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} Start deal logic, current TokenBucketValue:{$rateValue}\n";
try {
$logic->manage($configQueueName, $_params);
} catch (\Exception $e) {
// 业务异常时的处理
echo $e->getMessage() . PHP_EOL;
}
// step 3 清理变量
}
// 业务处理
class MyBusiness {
public function manage($queueName, $queueParams){
$params = json_encode($queueParams);
// 模拟业务处理耗时
echo date('Y-m-d H:i:s') . " Deal Logic Start:{$queueName}-{$params}==========>>\n";
sleep(2);
// 模拟异常
// if ($params%9==0) throw new Exception("Exception #queueName:{$queueName}, Value is {$params}");
// 业务处理失败,丢弃或推送到重试队列
$usedMemory = memory_get_usage();
$usedMemory = round($usedMemory/1024/1024, 3);
echo date('Y-m-d H:i:s') . " Deal Logic Success #queueName:{$queueName}-params:{$params}-usedMemory:{$usedMemory}Mb==========>>\n";
}
}
4. 给程序发送重启、停止信号
进程启动时记录下自己的pid,使用系统命令发送信号
# reloadMultiSwoolePool.sh
# -10 SIGUSR1 柔性重启
cat process_pid.txt | xargs kill -SIGUSR1
# stopMultiSwoolePool.sh
# -15 SIGTERM 柔性停止
cat process_pid.txt | xargs kill -SIGTERM