dootask/app/Tasks/PushTask.php
kuaifan 645cb02757 chore(upgrade): Laravel 8 直升 13(旧结构跑通)+ PHP 8.4 + 依赖升级与兼容修复
- composer: framework ^13.0、php ^8.3、laravel-s ~3.8.0、predis ^2.3、
  phpunit ^11.5、tinker ^3、excel ^3.1.69、captcha ^3.5、avatar ^6.5、
  ldaprecord-laravel ^4、pinyin ^5.3、notify 锁 ~1.28.0;
  移除 fideloper/proxy、fruitcake/laravel-cors、facade/ignition、
  laravel/sail、madnest/madzipper、手动钉的 symfony/mailer;
  symfony/console 锁 ^7.4(LaravelS Portal 与 console 8 的
  configure(): void 类型断言不兼容)
- $dates 移除:AbstractModel 改 getCasts() 合并默认 datetime 列,
  3 个子模型改 $casts
- Carbon 3:4 处 diffInSeconds 补 absolute 参数并取整
- LdapRecord v4:config use_ssl/use_tls→use_tls/use_starttls(env 变量名不变),
  LdapUser::$objectClasses 补类型声明
- Madzipper→原生 ZipArchive(Base::zipAddFiles,4 处调用)
- pinyin v5 静态 API(Base::getFirstCharter/cn2pinyin)
- laravolt/avatar 6.5:PatchedAvatar 修上游纵向对齐 bug
 (intervention 4.1.3 枚举无 middle),avatar 响应改 response()->file()
- TrustProxies 改框架内置基类,CORS 改 Illuminate\Http\Middleware\HandleCors
- Symfony Console 8 兼容:ManticoreSyncLock::handleSignal 新签名,
  pcntl 回调解耦
- 非 Swoole 运行时守卫:AbstractTask::task / PushTask::push /
  AbstractData(swoole table),artisan/测试上下文不再炸
  Target class [swoole] does not exist
- Laravel 11+ change() 丢修饰符:2023_12_07 与 2025_08_10 迁移重申
  nullable/default/comment(修复 fresh 安装)
- Setting/Ihttp 缺键访问加 ?? 守卫(PHP 8 警告在测试中转异常)
- phpunit.xml 迁移 11 schema;UserImportParseTest 改为自建部门数据

验证:8.4 容器内 migrate:fresh --seed 213 全过;php artisan test
145 passed/1 skipped;LaravelS(Swoole 6.2.1) /health 200、登录、
token 认证、WebSocket 握手、Task 投递、头像、图片裁剪冒烟全过

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 19:42:12 +00:00

221 lines
7.0 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace App\Tasks;
use App\Models\WebSocket;
use App\Models\WebSocketTmpMsg;
use App\Module\Base;
use App\Module\Doo;
use Cache;
use Carbon\Carbon;
use Hhxsv5\LaravelS\Swoole\Task\Task;
/**
* 发送消息任务
* Class PushTask
* @package App\Tasks
*/
class PushTask extends AbstractTask
{
protected $params;
protected $retryOffline = true;
protected $endPush = [];
/**
* PushTask constructor.
* @param array|string $params
*/
public function __construct($params = [], $retryOffline = true)
{
parent::__construct(...func_get_args());
$this->params = $params;
$this->retryOffline = $retryOffline;
}
public function start()
{
if (is_string($this->params)) {
// 推送缓存
if (Base::leftExists($this->params, "PUSH::")) {
$params = Cache::pull($this->params);
if (is_array($params) && $params['fd']) {
$this->params = [$params];
}
}
// 根据会员ID推送离线时收到的消息
elseif (Base::leftExists($this->params, "RETRY::")) {
$this->sendTmpMsgForUserid(intval(Base::leftDelete($this->params, "RETRY::")));
}
}
is_array($this->params) && self::push($this->params, $this->retryOffline);
}
public function end()
{
self::push($this->endPush);
}
/**
* 根据会员ID推送离线时收到的消息
* @param $userid
*/
private function sendTmpMsgForUserid($userid)
{
if (empty($userid)) {
return;
}
WebSocketTmpMsg::whereCreateId($userid)
->whereSend(0)
->where('created_at', '>', Carbon::now()->subMinute()) // 1分钟内添加的数据
->orderBy('id')
->chunk(100, function($list) use ($userid) {
foreach ($list as $item) {
$this->endPush[] = [
'tmpMsgId' => $item->id,
'userid' => $userid,
'msg' => Base::json2array($item->msg),
];
}
});
}
/**
* 记录离线消息,等上线后重新发送
* @param array $userFail
* @param array $msg
*/
private static function addTmpMsg(array $userFail, array $msg)
{
foreach ($userFail as $uid) {
$msgString = Base::array2json($msg);
$inArray = [
'md5' => md5($uid . '-' . $msgString),
'msg' => $msgString,
'send' => 0,
'create_id' => $uid,
'created_at' => Carbon::now(),
'updated_at' => Carbon::now(),
];
if (!WebSocketTmpMsg::whereMd5($inArray['md5'])->exists()) {
WebSocketTmpMsg::insertOrIgnore($inArray);
}
}
}
/**
* 推送消息
* @param array $lists 消息列表
* @param bool $retryOffline 如果会员不在线,等上线后继续发送
* @param string $key 延迟推送key依据留空立即推送延迟推送时发给同一人同一种消息类型只发送最新的一条
* @param int $delay 延迟推送时间默认1秒$key填写时有效
*/
public static function push(array $lists, $retryOffline = true, $key = null, $delay = 1)
{
if (empty($lists)) {
return;
}
if (!Base::isTwoArray($lists)) {
$lists = [$lists];
}
// 非 Swoole 运行时artisan/测试)无 swoole 绑定,无法推送,直接跳过(与 AbstractObserver 守卫一致)
if (!app()->bound('swoole')) {
return;
}
$swoole = app('swoole');
foreach ($lists AS $item) {
if (!is_array($item) || empty($item)) {
continue;
}
$userid = $item['userid'];
$fd = $item['fd'];
$ignoreFd = $item['ignoreFd'];
$msg = $item['msg'];
$tmpMsgId = intval($item['tmpMsgId']);
if (!is_array($msg)) {
continue;
}
$type = $msg['type'];
if (empty($type)) {
continue;
}
// 发送对象
$offlineUser = [];
$array = [];
if ($fd) {
if (is_array($fd)) {
$array = array_merge($array, $fd);
} else {
$array[] = $fd;
}
}
if ($userid) {
if (!is_array($userid)) {
$userid = [$userid];
}
foreach ($userid as $uid) {
$row = WebSocket::select(['fd'])->whereUserid($uid)->pluck('fd');
if ($row->isNotEmpty()) {
$array = array_merge($array, $row->toArray());
} else {
$offlineUser[] = $uid;
}
}
}
if ($ignoreFd) {
$ignoreFd = is_array($ignoreFd) ? $ignoreFd : [$ignoreFd];
}
// 开始发送
foreach ($array as $fid) {
if ($ignoreFd) {
if (in_array($fid, $ignoreFd)) continue;
}
if ($key) {
$key = "PUSH::" . $fid . ":" . $type . ":" . $key;
Cache::put($key, [
'fd' => $fid,
'ignoreFd' => $ignoreFd,
'msg' => $msg,
]);
$task = new PushTask($key, $retryOffline);
$task->delay($delay);
Task::deliver($task);
} else {
try {
$swoole->push($fid, self::pushMsgFormat($fid, $msg));
if ($tmpMsgId > 0) {
WebSocketTmpMsg::whereId($tmpMsgId)->update(['send' => 1]);
}
} catch (\Throwable) {
// 发送失败
}
}
}
// 记录不在线的
if ($retryOffline && $tmpMsgId == 0) {
$offlineUser = array_values(array_unique($offlineUser));
self::addTmpMsg($offlineUser, $msg);
}
}
}
/**
* 格式化推送消息
* @param $fid
* @param $msg
* @return string
*/
private static function pushMsgFormat($fid, $msg)
{
$encrypt = Base::json2array(Cache::get("User::encrypt:" . $fid));
if ($encrypt['type'] == 'pgp') {
if (is_array($msg) && $msg['type'] == 'dialog') {
// 仅加密对话消息
$msg = [
'type' => 'encrypt',
'encrypted' => Doo::pgpEncryptApi($msg, $encrypt['key']),
];
}
}
return Base::array2json($msg);
}
}