消息队列异步导出案例

雨中笑 php redis 591热度

简介异步导出数据

最近做了一个要导出数据的需求,看了线上的数据一天都有几十万条,如果是常规的直接导出,是不行的,会超时,因此考虑了后采用队列异步导出后。

思路:

1、做一个全局导出的管理页面,把每次要导出的文件名、时间、类型、和人员、状态等记录到数据库里

2、再把生成的id、需要查询导出的参数、执行导出的类、方法名 rPush到队列里

3、启动一个定时每分钟跑的脚本,lPop出数据,执行里面的参数,导出文件

4、更改导出管理数据库的状态,设定导出后邮箱、企业微信通知用户等逻辑。


代码:

增加查询导出的参数


/**
* @param $params
* @return array
*/
public function export()
{
// 时间条件
$timeset = $this->getParam('timeset');
if (empty($timeset)) throw new \Exception('时间范围不能为空', 400);
$params = Request::getParams();
$downloadLogic = new DownloadLogic();
$downloadLogic->add('监控数据_'.date('Ymd'), 'csv', self::class , 'download', $params, 0);
return [
0,
[]
];
}

    /**
* 添加下载记录 && 将下载任务入队列排队
* @param string $fileName 文件名
* @param string $fileExtension 文件扩展
* @param string $className job类
* @param string $actionName job方法
* @param array $params job参数
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
*/
public function add($fileName, $fileExtension, $className, $actionName, $params, $jobType=1)
{
$fileExtension = trim($fileExtension, '.');
$model = new AdminDownload();
$list = $model->select(['filename' => $fileName], ['id']);
$data = [
'filename' => $fileName,
'export_times' => count($list),
'file_size' => '0KB',
'user_id' => AccessLogic::userId(),
'create_time' => time(),
'status' => self::STATUS_QUEUE,
'file_ext' => $fileExtension,
];
$res = $model->add($data);
$newDownloadId = array_keys($res)[0];
$params['optTime'] = date('Y-m-d H:i:s'); // 下载时间
$params['optUserId'] = AccessLogic::userId(); // 下载人
$params['optUserName'] = AccessLogic::userInfo()['name']; // 下载人姓名
// 文件名
if ($data['export_times']) {
$params['filename'] = "{$data['filename']}({$data['export_times']}).{$fileExtension}";
} else {
$params['filename'] = "{$data['filename']}.{$fileExtension}";
}

$params['download_id'] = $newDownloadId;
$this->pushTask($newDownloadId, $className, $actionName, $params, $jobType);
}

/**
* @param $downloadId
* @param $className
* @param $actionName
* @param $params
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
* @return bool
*/
private function pushTask($downloadId, $className, $actionName, $params, $jobType)
{
$data = [
'download_id' => $downloadId,
'class' => $className,
'action' => $actionName,
'params' => $params
];

$this->getRedis()->rPush($this->getRedisKeyByType($jobType)['queue'], json_encode($data));

//用于重入队列
$reQueue = [
'jobType' => $jobType,
'data' => $data,
];
$this->getRedis()->set($this->reDownloadKey.$downloadId, json_encode($reQueue), 3600);

return true;
}


/**
* redis的key值
* @param $type
* @return array
*/
private function getRedisKeyByType($type)
{
return [
'queue'=>"admin_center.download.file.queue{$type}",
'lock'=>"admin_center.download.file.queue{$type}.lock",
];
}


执行队列导出

/**
* @name 定时下载
* @plan 一分钟跑一次
* @command cd rootPath/webroot && php cli.php 'ct=Crond&ac=cliDownloadFile'
*/
public function cliDownloadFile()
{
Log::notice("---------------下载任务开始----------------", 'download_task');
$sLockKey = 'crond.cliDownloadFile.lock';
if ($this->getLock($sLockKey,60)) {
$downloadLogic = new DownloadLogic();
$downloadLogic->executeDownloadTask();
$this->releaseLock($sLockKey);
}
Log::notice("---------------下载任务结束----------------", 'download_task');
}

/**
* 状态:正在进行
*/
const STATUS_RUNNING = 1;
/**
* 状态:完成
*/
const STATUS_FINISH = 2;
/**
* 失败
*/
const STATUS_FAILED = 3;

const STATUS_QUEUE = 4;

public static $statusList = [
self::STATUS_RUNNING =>'正在进行',
self::STATUS_FINISH =>'完成',
self::STATUS_FAILED =>'失败',
self::STATUS_QUEUE =>'排队中',
];


/**
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
* @return mixed
*/
public function popTask($jobType)
{
$data = $this->getRedis()->lPop($this->getRedisKeyByType($jobType)['queue']);
return json_decode($data, true);
}

    /**
    * @return bool
   */

public function executeDownloadTask()
{
ini_set("memory_limit", "4096M");

Log::notice("快速任务开始", 'download_task');
while ($data = $this->popTask(0)){
$this->execute($data);
}
Log::notice("快速任务结束", 'download_task');

if ($this->isRunning(1)) {
Log::notice("耗时任务进行正在跑,请稍后", 'download_task');
}else{
//$this->lock(1);
Log::notice("耗时任务开始", 'download_task');
if($data = $this->popTask(1)){
$this->execute($data);
//$this->unlock(1);
Log::notice("耗时任务结束", 'download_task');
return true;
}
Log::notice("耗时任务结束", 'download_task');
//$this->unlock(1);
}
}

private function execute($data)
{
if(!$data){
return false;
}
Log::notice("收到下载任务:" . json_encode($data), 'download_task');
if (empty($data['download_id']) || empty($data['params'] || empty($data['class']) || empty($data['action']))) {
Log::notice("缺少参数", 'download_task');
return false;
}
$className = $data['class'];
$actionName = $data['action'];
$params = $data['params'];
$downloadId = $data['download_id'];
if (!class_exists($className)) {
Log::notice("{$className}类不存在", 'download_task');
return false;
}
$obj = new $className();
if (!method_exists($obj, $actionName)) {
Log::notice("{$actionName}方法不存在", 'download_task');
return false;
}
$model = new AdminDownload();
try {
$updateData = [];
$updateData['status'] = self::STATUS_RUNNING;
$model->edit($downloadId, $updateData);//排队中更新到正在进行
$filePath = $obj->$actionName($params);
if ($filePath) {
$size = filesize($filePath);
$kb = bcdiv($size, 1024, 1);
$sizeText = $kb . 'KB';
if($kb>1024){
$mb = bcdiv($kb, 1024, 1);
$sizeText = $mb . 'MB';
}
$updateData['file_size'] = $sizeText;
} else {
return false;
}
$updateData['status'] = self::STATUS_FINISH; //排队更新到完成

            //多不同项目的导出路径 url为你本机的域名
    $updateData['url'] = Config::get('url').'?ct=Index&ac=downloadFile&id='.$downloadId;

} catch (\Exception $exception) {
$updateData['status'] = self::STATUS_FAILED;
$updateData['fail_reason'] = $exception->getMessage();
Log::notice("下载异常,信息:" . $exception->getTraceAsString(), 'download_task');
}
$model->edit($downloadId, $updateData);
try {
//企业QQ消息通知
$download = $model->get(['id'=>$downloadId]);
//距离申请时间已经过了3分钟才发送企业QQ消息通知
if (time()-$download['create_time']>180) {
//获取模板
$template = EimLogic::getMsgTemplate(1);
if (empty($template)) {
throw new \Exception('获取模板[1]失败', 1);
}
$params = [
'module' => 'downloadTask',
'scene_id' =>1,
'user_ids'=>$download['user_id'],
'tips_title' => $template['title'],
'tips_content' => sprintf(
$template['content'],
date('Y-m-d H:i', $download['create_time']),
$download['filename']
),
'tips_url' => Config::get('url') . "/?sys=sys&start_url=" . urlencode(BasicLogic::getUrl('My',
'myDownload'))
];
(new AdminMessageRecordLogic())->add($params);
EimLogic::sendEimMsgToUsers($params);
}
} catch (\Exception $e) {
//发送消息错误,更新错误信息
$model->edit($downloadId, ['fail_reason' => $e->getMessage()]);
}
try {
//企业微信消息通知
$download = $model->get(['id'=>$downloadId]);
//距离申请时间已经过了3分钟才发送企业微信消息通知
if (time()-$download['create_time']>180) {
//获取模板
$template = EimLogic::getMsgTemplate(1);
if (empty($template)) {
throw new \Exception('获取模板[1]失败', 1);
}
$params = [
'module' => 'downloadTask',
'scene_id' =>1,
'user_ids'=>$download['user_id'],
'tips_title' => $template['title'],
'tips_content' => sprintf(
$template['content'],
date('Y-m-d H:i', $download['create_time']),
$download['filename']
),
'tips_url' => Config::get('url') . "/?sys=sys&start_url=" . urlencode(BasicLogic::getUrl('My',
'myDownload'))
];
QyWeixinLogic::sendMessageByParams($params);
}
} catch (\Exception $e) {
//发送消息错误,更新错误信息
$model->edit($downloadId, ['fail_reason' => $e->getMessage()]);
}

Log::notice("下载任务结束", 'download_task');
}

/**
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
* @return bool
*/
private function isRunning($jobType)
{
$value = $this->getRedis()->get($this->getRedisKeyByType($jobType)['lock']);
if ($value === false) {
return false;
}
return true;
}

/**
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
*/
private function lock($jobType)
{
$this->getRedis()->set($this->getRedisKeyByType($jobType)['lock'], 1, 3600);
}

/**
* @param int $jobType 任务类型 0代表快速任务,1代表耗时任务
*/
private function unlock($jobType)
{
$this->getRedis()->delete($this->getRedisKeyByType($jobType)['lock']);
}


导出的方法

/**
* @param $params
* @return string
*/
public function download($params)
{
set_time_limit(0);

$type = 'forbidden_roles';
//设置行头
$header = $this->filedList[$type]['value'];
$fileName = $params['filename'];
$headerTitle = array_values($header);
$headerKey = array_keys($header);
$csvWriter = new CSVWriter($fileName);
$csvWriter->writeCSV($headerTitle);
$size = 10000;
$pageNo = 1;
$maxPageNo = 3;
$scrollId = 0;
while (true) {
$csvData = [];
$params['page_size'] = $size;
$params['page_no'] = $pageNo;

// 获取信息
list($count, $list) = $this->getList($params);

if ($list) {
foreach ($list as $item) {
$_ = [];
foreach ($headerKey as $index => $key) {
$_[$index] = isset($item[$key]) ? $item[$key] : '';
}
$csvData[] = $_;
}
$csvWriter->batWriteCSV($csvData);
}

if (count($list) < $size) {
break;
}

if ($pageNo>=$maxPageNo) {
break;
}
$pageNo++;
}

return $csvWriter->getPath();
}



很赞哦!(3)

本文阅读量 1828发布于 2020年8月4日

您的访问IP 3.144.175.178最早于 2024年5月15日 3时14分31秒 阅读过本文 为本文提供了 1 热度 1 阅读量

文章评论
回帖