diff --git a/app/Module/ZincSearch/ZincSearchDialogMsg.php b/app/Module/ZincSearch/ZincSearchDialogMsg.php index 4eb0cca9b..376cb1a0a 100644 --- a/app/Module/ZincSearch/ZincSearchDialogMsg.php +++ b/app/Module/ZincSearch/ZincSearchDialogMsg.php @@ -228,7 +228,7 @@ class ZincSearchDialogMsg } // ============================== - // 基本方法 + // 生成内容 // ============================== /** @@ -287,8 +287,12 @@ class ZincSearchDialogMsg ]; } + // ============================== + // 基本方法 + // ============================== + /** - * 同步消息 + * 同步消息(建议在异步进程中使用) * * @param WebSocketDialogMsg $dialogMsg * @return bool @@ -345,7 +349,7 @@ class ZincSearchDialogMsg } /** - * 批量同步消息 + * 批量同步消息(建议在异步进程中使用) * * @param WebSocketDialogMsg[] $dialogMsgs * @return int 成功同步的消息数 @@ -423,11 +427,12 @@ class ZincSearchDialogMsg } /** - * 同步用户 + * 同步用户(建议在异步进程中使用) * @param WebSocketDialogUser $dialogUser + * @param bool $full 跳过判断是否已经存在(全量更新) * @return bool */ - public static function userSync(WebSocketDialogUser $dialogUser): bool + public static function userSync(WebSocketDialogUser $dialogUser, bool $full = false): bool { if (!self::ensureIndex()) { return false; @@ -448,8 +453,13 @@ class ZincSearchDialogMsg try { // 查询用户是否存在 - $result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams); - $hits = $result['data']['hits']['hits'] ?? []; + if ($full) { + $hits = null; + } else { + $result = ZincSearchBase::elasticSearch(self::$indexNameUser, $searchParams); + $hits = $result['data']['hits']['hits'] ?? []; + } + // 同步用户(存在更新、不存在添加) $result = ZincSearchBase::addDoc(self::$indexNameUser, $data); @@ -457,7 +467,7 @@ class ZincSearchDialogMsg return false; } - // 用户不存在,同步消息 todo 应该使用异步进程 + // 用户不存在,同步消息 if (empty($hits)) { $lastId = 0; // 上次同步的最后ID $batchSize = 500; // 每批处理的消息数量 @@ -491,7 +501,7 @@ class ZincSearchDialogMsg } /** - * 删除 + * 删除(建议在异步进程中使用) * * @param WebSocketDialogMsg|WebSocketDialogUser|int $data * @return int diff --git a/app/Observers/WebSocketDialogMsgObserver.php b/app/Observers/WebSocketDialogMsgObserver.php index 2d813d028..b42cc0a7a 100644 --- a/app/Observers/WebSocketDialogMsgObserver.php +++ b/app/Observers/WebSocketDialogMsgObserver.php @@ -3,7 +3,8 @@ namespace App\Observers; use App\Models\WebSocketDialogMsg; -use App\Module\ZincSearch\ZincSearchDialogMsg; +use App\Tasks\ZincSearchSyncTask; +use Hhxsv5\LaravelS\Swoole\Task\Task; class WebSocketDialogMsgObserver { @@ -15,7 +16,7 @@ class WebSocketDialogMsgObserver */ public function created(WebSocketDialogMsg $webSocketDialogMsg) { - ZincSearchDialogMsg::sync($webSocketDialogMsg); + Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg)); } /** @@ -26,7 +27,7 @@ class WebSocketDialogMsgObserver */ public function updated(WebSocketDialogMsg $webSocketDialogMsg) { - ZincSearchDialogMsg::sync($webSocketDialogMsg); + Task::deliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg)); } /** @@ -37,7 +38,7 @@ class WebSocketDialogMsgObserver */ public function deleted(WebSocketDialogMsg $webSocketDialogMsg) { - ZincSearchDialogMsg::delete($webSocketDialogMsg); + Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg)); } /** diff --git a/app/Observers/WebSocketDialogUserObserver.php b/app/Observers/WebSocketDialogUserObserver.php index 46d72a0ff..c60a59af2 100644 --- a/app/Observers/WebSocketDialogUserObserver.php +++ b/app/Observers/WebSocketDialogUserObserver.php @@ -4,8 +4,9 @@ namespace App\Observers; use App\Models\Deleted; use App\Models\WebSocketDialogUser; -use App\Module\ZincSearch\ZincSearchDialogMsg; +use App\Tasks\ZincSearchSyncTask; use Carbon\Carbon; +use Hhxsv5\LaravelS\Swoole\Task\Task; class WebSocketDialogUserObserver { @@ -30,7 +31,7 @@ class WebSocketDialogUserObserver } } Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ZincSearchDialogMsg::userSync($webSocketDialogUser); + Task::deliver(new ZincSearchSyncTask('userCreated', $webSocketDialogUser)); } /** @@ -41,7 +42,7 @@ class WebSocketDialogUserObserver */ public function updated(WebSocketDialogUser $webSocketDialogUser) { - ZincSearchDialogMsg::userSync($webSocketDialogUser); + Task::deliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser)); } /** @@ -53,7 +54,7 @@ class WebSocketDialogUserObserver public function deleted(WebSocketDialogUser $webSocketDialogUser) { Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ZincSearchDialogMsg::delete($webSocketDialogUser); + Task::deliver(new ZincSearchSyncTask('delete', $webSocketDialogUser)); } /** diff --git a/app/Tasks/ZincSearchSyncTask.php b/app/Tasks/ZincSearchSyncTask.php index 66802abad..1e9d52a6d 100644 --- a/app/Tasks/ZincSearchSyncTask.php +++ b/app/Tasks/ZincSearchSyncTask.php @@ -2,6 +2,7 @@ namespace App\Tasks; +use App\Module\ZincSearch\ZincSearchDialogMsg; use Carbon\Carbon; use Illuminate\Support\Facades\Cache; @@ -10,12 +11,48 @@ use Illuminate\Support\Facades\Cache; */ class ZincSearchSyncTask extends AbstractTask { - public function __construct() + private $action; + + private $data; + + public function __construct($action = null, $data = null) { - parent::__construct(); + parent::__construct(...func_get_args()); + $this->action = $action; + $this->data = $data; } public function start() + { + switch ($this->action) { + case 'sync': + // 同步聊天数据 + ZincSearchDialogMsg::sync($this->data); + break; + + case 'userCreated': + case 'userSync': + // 同步用户数据 + ZincSearchDialogMsg::userSync($this->data, $this->action === 'userCreated'); + break; + + case 'delete': + // 删除用户数据 + ZincSearchDialogMsg::delete($this->data); + break; + + default: + // 增量更新 + $this->incrementalUpdate(); + break; + } + } + + /** + * 增量更新 + * @return void + */ + private function incrementalUpdate() { // 120分钟执行一次 $time = intval(Cache::get("ZincSearchSyncTask:Time"));