no message

This commit is contained in:
kuaifan 2025-04-24 07:00:09 +08:00
parent f8b65a5546
commit f5a343f358
4 changed files with 68 additions and 19 deletions

View File

@ -228,7 +228,7 @@ class ZincSearchDialogMsg
} }
// ============================== // ==============================
// 基本方法 // 生成内容
// ============================== // ==============================
/** /**
@ -287,8 +287,12 @@ class ZincSearchDialogMsg
]; ];
} }
// ==============================
// 基本方法
// ==============================
/** /**
* 同步消息 * 同步消息(建议在异步进程中使用)
* *
* @param WebSocketDialogMsg $dialogMsg * @param WebSocketDialogMsg $dialogMsg
* @return bool * @return bool
@ -345,7 +349,7 @@ class ZincSearchDialogMsg
} }
/** /**
* 批量同步消息 * 批量同步消息(建议在异步进程中使用)
* *
* @param WebSocketDialogMsg[] $dialogMsgs * @param WebSocketDialogMsg[] $dialogMsgs
* @return int 成功同步的消息数 * @return int 成功同步的消息数
@ -423,11 +427,12 @@ class ZincSearchDialogMsg
} }
/** /**
* 同步用户 * 同步用户(建议在异步进程中使用)
* @param WebSocketDialogUser $dialogUser * @param WebSocketDialogUser $dialogUser
* @param bool $full 跳过判断是否已经存在(全量更新)
* @return bool * @return bool
*/ */
public static function userSync(WebSocketDialogUser $dialogUser): bool public static function userSync(WebSocketDialogUser $dialogUser, bool $full = false): bool
{ {
if (!self::ensureIndex()) { if (!self::ensureIndex()) {
return false; return false;
@ -448,8 +453,13 @@ class ZincSearchDialogMsg
try { try {
// 查询用户是否存在 // 查询用户是否存在
$result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams); if ($full) {
$hits = $result['data']['hits']['hits'] ?? []; $hits = null;
} else {
$result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams);
$hits = $result['data']['hits']['hits'] ?? [];
}
// 同步用户(存在更新、不存在添加) // 同步用户(存在更新、不存在添加)
$result = ZincSearchBase::addDoc(self::$indexNameUser, $data); $result = ZincSearchBase::addDoc(self::$indexNameUser, $data);
@ -457,7 +467,7 @@ class ZincSearchDialogMsg
return false; return false;
} }
// 用户不存在,同步消息 todo 应该使用异步进程 // 用户不存在,同步消息
if (empty($hits)) { if (empty($hits)) {
$lastId = 0; // 上次同步的最后ID $lastId = 0; // 上次同步的最后ID
$batchSize = 500; // 每批处理的消息数量 $batchSize = 500; // 每批处理的消息数量
@ -491,7 +501,7 @@ class ZincSearchDialogMsg
} }
/** /**
* 删除 * 删除(建议在异步进程中使用)
* *
* @param WebSocketDialogMsg|WebSocketDialogUser|int $data * @param WebSocketDialogMsg|WebSocketDialogUser|int $data
* @return int * @return int

View File

@ -3,7 +3,8 @@
namespace App\Observers; namespace App\Observers;
use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogMsg;
use App\Module\ZincSearch\ZincSearchDialogMsg; use App\Tasks\ZincSearchSyncTask;
use Hhxsv5\LaravelS\Swoole\Task\Task;
class WebSocketDialogMsgObserver class WebSocketDialogMsgObserver
{ {
@ -15,7 +16,7 @@ class WebSocketDialogMsgObserver
*/ */
public function created(WebSocketDialogMsg $webSocketDialogMsg) public function created(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogMsg::sync($webSocketDialogMsg); Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg));
} }
/** /**
@ -26,7 +27,7 @@ class WebSocketDialogMsgObserver
*/ */
public function updated(WebSocketDialogMsg $webSocketDialogMsg) public function updated(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogMsg::sync($webSocketDialogMsg); Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg));
} }
/** /**
@ -37,7 +38,7 @@ class WebSocketDialogMsgObserver
*/ */
public function deleted(WebSocketDialogMsg $webSocketDialogMsg) public function deleted(WebSocketDialogMsg $webSocketDialogMsg)
{ {
ZincSearchDialogMsg::delete($webSocketDialogMsg); Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg));
} }
/** /**

View File

@ -4,8 +4,9 @@ namespace App\Observers;
use App\Models\Deleted; use App\Models\Deleted;
use App\Models\WebSocketDialogUser; use App\Models\WebSocketDialogUser;
use App\Module\ZincSearch\ZincSearchDialogMsg; use App\Tasks\ZincSearchSyncTask;
use Carbon\Carbon; use Carbon\Carbon;
use Hhxsv5\LaravelS\Swoole\Task\Task;
class WebSocketDialogUserObserver class WebSocketDialogUserObserver
{ {
@ -30,7 +31,7 @@ class WebSocketDialogUserObserver
} }
} }
Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ZincSearchDialogMsg::userSync($webSocketDialogUser); Task::deliver(new ZincSearchSyncTask('userCreated', $webSocketDialogUser));
} }
/** /**
@ -41,7 +42,7 @@ class WebSocketDialogUserObserver
*/ */
public function updated(WebSocketDialogUser $webSocketDialogUser) public function updated(WebSocketDialogUser $webSocketDialogUser)
{ {
ZincSearchDialogMsg::userSync($webSocketDialogUser); Task::deliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser));
} }
/** /**
@ -53,7 +54,7 @@ class WebSocketDialogUserObserver
public function deleted(WebSocketDialogUser $webSocketDialogUser) public function deleted(WebSocketDialogUser $webSocketDialogUser)
{ {
Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid);
ZincSearchDialogMsg::delete($webSocketDialogUser); Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogUser));
} }
/** /**

View File

@ -2,6 +2,7 @@
namespace App\Tasks; namespace App\Tasks;
use App\Module\ZincSearch\ZincSearchDialogMsg;
use Carbon\Carbon; use Carbon\Carbon;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Cache;
@ -10,12 +11,48 @@ use Illuminate\Support\Facades\Cache;
*/ */
class ZincSearchSyncTask extends AbstractTask class ZincSearchSyncTask extends AbstractTask
{ {
public function __construct() private $action;
private $data;
public function __construct($action = null, $data = null)
{ {
parent::__construct(); parent::__construct(...func_get_args());
$this->action = $action;
$this->data = $data;
} }
public function start() public function start()
{
switch ($this->action) {
case 'sync':
// 同步聊天数据
ZincSearchDialogMsg::sync($this->data);
break;
case 'userCreated':
case 'userSync':
// 同步用户数据
ZincSearchDialogMsg::userSync($this->data, $this->action === 'userCreated');
break;
case 'delete':
// 删除用户数据
ZincSearchDialogMsg::delete($this->data);
break;
default:
// 增量更新
$this->incrementalUpdate();
break;
}
}
/**
* 增量更新
* @return void
*/
private function incrementalUpdate()
{ {
// 120分钟执行一次 // 120分钟执行一次
$time = intval(Cache::get("ZincSearchSyncTask:Time")); $time = intval(Cache::get("ZincSearchSyncTask:Time"));