perf: 优化任务队列

This commit is contained in:
kuaifan 2022-11-03 10:49:00 +08:00
parent acd7193e55
commit 8ce7d3689f
10 changed files with 117 additions and 77 deletions

View File

@ -9,29 +9,6 @@ use Hhxsv5\LaravelS\Swoole\Task\Task;
*/
abstract class AbstractTask extends Task
{
protected $newTask = [];
/**
* 添加完成后执行的任务
* @param $task
*/
final protected function addTask($task)
{
$this->newTask[] = $task;
}
/**
* 包装执行过程
*/
final public function handle()
{
try {
$this->start();
} catch (\Throwable $e) {
$this->info($e);
$this->failed($e);
}
}
/**
* 开始执行任务
@ -41,31 +18,41 @@ abstract class AbstractTask extends Task
/**
* 任务完成事件
*/
public function finish()
abstract public function end();
/**
* 重写执行过程
*/
final public function handle()
{
foreach ($this->newTask AS $task) {
Task::deliver($task);
try {
$this->start();
} catch (\Throwable $e) {
$this->failed("start", $e);
}
}
/**
* 重写完成事件
*/
final public function finish()
{
try {
$this->end();
} catch (\Throwable $e) {
$this->failed("end", $e);
}
}
/**
* 任务失败事件
* @param $e
* @param string $type
* @param \Throwable $e
*/
public function failed($e)
public function failed(string $type, \Throwable $e)
{
//
}
/**
* 添加日志
* @param $var
*/
private function info($var)
{
if (!config('app.debug') || defined('DO_NOT_ADD_LOGS')) {
return;
}
info($var);
info($type);
info($e);
}
}

View File

@ -42,4 +42,9 @@ class AutoArchivedTask extends AbstractTask
}
}
}
public function end()
{
}
}

View File

@ -56,4 +56,9 @@ class DeleteTmpTask extends AbstractTask
break;
}
}
public function end()
{
}
}

View File

@ -108,6 +108,11 @@ class EmailNoticeTask extends AbstractTask
}
}
public function end()
{
}
/**
* 任务过期前、超期后提醒
* @param ProjectTask $task

View File

@ -18,6 +18,7 @@ class IhttpTask extends AbstractTask
protected $extra;
protected $apiWebsocket;
protected $apiUserid;
protected $endPush = [];
/**
* IhttpTask constructor.
@ -53,7 +54,7 @@ class IhttpTask extends AbstractTask
$res = Ihttp::ihttp_request($this->url, $this->post, $this->extra);
if ($this->apiWebsocket && $this->apiUserid) {
$data = Base::isSuccess($res) ? Base::json2array($res['data']) : $res;
PushTask::push([
$this->endPush[] = [
'userid' => $this->apiUserid,
'msg' => [
'type' => 'apiWebsocket',
@ -61,7 +62,13 @@ class IhttpTask extends AbstractTask
'apiSuccess' => Base::isSuccess($res),
'data' => $data,
]
]);
];
}
}
public function end()
{
PushTask::push($this->endPush);
}
}

View File

@ -16,6 +16,7 @@ class LineTask extends AbstractTask
{
protected $userid;
protected $online;
protected $endPush = [];
/**
* LineTask constructor.
@ -36,7 +37,7 @@ class LineTask extends AbstractTask
$fd[] = $ws->fd;
}
if ($fd) {
PushTask::push([
$this->endPush[] = [
'fd' => $fd,
'msg' => [
'type' => 'line',
@ -45,8 +46,13 @@ class LineTask extends AbstractTask
'online' => $this->online,
],
]
]);
];
}
});
}
public function end()
{
PushTask::push($this->endPush);
}
}

View File

@ -76,4 +76,9 @@ class LoopTask extends AbstractTask
}
});
}
public function end()
{
}
}

View File

@ -19,6 +19,7 @@ class PushTask extends AbstractTask
{
protected $params;
protected $retryOffline = true;
protected $endPush = [];
/**
* PushTask constructor.
@ -42,12 +43,41 @@ class PushTask extends AbstractTask
}
// 根据会员ID推送离线时收到的消息
elseif (Base::leftExists($this->params, "RETRY::")) {
self::sendTmpMsgForUserid(intval(Base::leftDelete($this->params, "RETRY::")));
$this->sendTmpMsgForUserid(intval(Base::leftDelete($this->params, "RETRY::")));
}
}
is_array($this->params) && self::push($this->params, $this->retryOffline);
}
public function end()
{
self::push($this->endPush);
}
/**
* 根据会员ID推送离线时收到的消息
* @param $userid
*/
private function sendTmpMsgForUserid($userid)
{
if (empty($userid)) {
return;
}
WebSocketTmpMsg::whereCreateId($userid)
->whereSend(0)
->where('created_at', '>', Carbon::now()->subMinute()) // 1分钟内添加的数据
->orderBy('id')
->chunk(100, function($list) use ($userid) {
foreach ($list as $item) {
$this->endPush[] = [
'tmpMsgId' => $item->id,
'userid' => $userid,
'msg' => Base::json2array($item->msg),
];
}
});
}
/**
* 记录离线消息,等上线后重新发送
* @param array $userFail
@ -71,30 +101,6 @@ class PushTask extends AbstractTask
}
}
/**
* 根据会员ID推送离线时收到的消息
* @param $userid
*/
private static function sendTmpMsgForUserid($userid)
{
if (empty($userid)) {
return;
}
WebSocketTmpMsg::whereCreateId($userid)
->whereSend(0)
->where('created_at', '>', Carbon::now()->subMinute()) // 1分钟内添加的数据
->orderBy('id')
->chunk(100, function($list) use ($userid) {
foreach ($list as $item) {
self::push([
'tmpMsgId' => $item->id,
'userid' => $userid,
'msg' => Base::json2array($item->msg),
]);
}
});
}
/**
* 推送消息
* @param array $lists 消息列表

View File

@ -35,4 +35,9 @@ class PushUmengMsg extends AbstractTask
}
UmengAlias::pushMsgToUserid($this->userid, $this->array);
}
public function end()
{
}
}

View File

@ -23,6 +23,8 @@ class WebSocketDialogMsgTask extends AbstractTask
protected $ignoreFd;
protected $msgNotExistRetry = false; // 推送失败后重试
protected $silence = false; // 静默推送1:前端不通知、2:App不推送
protected $endPush = [];
protected $endArray = [];
/**
* WebSocketDialogMsgTask constructor.
@ -72,7 +74,7 @@ class WebSocketDialogMsgTask extends AbstractTask
if ($this->msgNotExistRetry) {
$task = new WebSocketDialogMsgTask($this->id, $this->ignoreFd || '');
$task->delay(1);
$this->addTask($task);
$this->endArray[] = $task;
}
return;
}
@ -123,7 +125,7 @@ class WebSocketDialogMsgTask extends AbstractTask
$msg->save();
// 开始推送消息
foreach ($array as $userid => $mention) {
PushTask::push([
$this->endPush[] = [
'userid' => $userid,
'ignoreFd' => $this->ignoreFd,
'msg' => [
@ -134,7 +136,7 @@ class WebSocketDialogMsgTask extends AbstractTask
'mention' => $mention,
]),
]
]);
];
}
// umeng推送app
if (!$this->silence) {
@ -147,14 +149,13 @@ class WebSocketDialogMsgTask extends AbstractTask
if ($dialog->type == 'group') {
$umengTitle = "{$dialog->getGroupName()} ($umengTitle)";
}
$umengMsg = new PushUmengMsg($umengUserid, [
$this->endArray[] = new PushUmengMsg($umengUserid, [
'title' => $umengTitle,
'body' => $msg->previewMsg(),
'description' => "MID:{$msg->id}",
'seconds' => 3600,
'badge' => 1,
]);
Task::deliver($umengMsg);
}
// 推送目标②:正在打开这个任务会话的会员
@ -168,7 +169,7 @@ class WebSocketDialogMsgTask extends AbstractTask
}
}
if ($array) {
PushTask::push([
$this->endPush[] = [
'userid' => $array,
'ignoreFd' => $this->ignoreFd,
'msg' => [
@ -177,9 +178,17 @@ class WebSocketDialogMsgTask extends AbstractTask
'silence' => $this->silence ? 1 : 0,
'data' => $msg->toArray(),
]
]);
];
}
}
}
}
public function end()
{
foreach ($this->endArray as $task) {
Task::deliver($task);
}
PushTask::push($this->endPush);
}
}