From ed064a825a79ff272876fa517aa4285b55910a5f Mon Sep 17 00:00:00 2001 From: kuaifan Date: Sat, 1 Mar 2025 23:52:39 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=90=9C=E7=B4=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SyncDialogUserMsgToElasticsearch.php | 247 ++++++++++++++++++ app/Http/Controllers/IndexController.php | 3 + app/Module/ElasticSearch.php | 149 +++++++---- app/Tasks/ElasticSearchSyncTask.php | 30 +++ 4 files changed, 378 insertions(+), 51 deletions(-) create mode 100644 app/Console/Commands/SyncDialogUserMsgToElasticsearch.php create mode 100644 app/Tasks/ElasticSearchSyncTask.php diff --git a/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php b/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php new file mode 100644 index 000000000..1556fa136 --- /dev/null +++ b/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php @@ -0,0 +1,247 @@ +elasticsearch = new ElasticSearch(ElasticSearch::DUM); + } + + /** + * @return int + * @throws \Exception + */ + public function handle() + { + $this->info('开始同步聊天数据...'); + + // 清除索引 + if ($this->option('c')) { + $this->info('清除索引...'); + if (!$this->elasticsearch->indexExists()) { + $this->info('索引不存在'); + return 0; + } + $result = $this->elasticsearch->deleteIndex(); + if (isset($result['error'])) { + $this->error('删除索引失败: ' . $result['error']); + return 1; + } + $this->saveLastId(true); + $this->info('索引删除成功'); + return 0; + } + + // 判断创建索引 + if (!$this->elasticsearch->indexExists()) { + $this->info('创建索引...'); + $result = $this->elasticsearch->createDialogUserMsgIndex(); + if (isset($result['error'])) { + $this->error('创建索引失败: ' . $result['error']); + return 1; + } + $this->info('索引创建成功'); + } + + // 同步用户-会话数据 + $this->syncDialogUsers($this->option('batch')); + + // 同步消息数据 + $this->syncDialogMsgs($this->option('batch')); + + // 完成 + $this->info("\n同步完成"); + return 0; + } + + /** + * 保存最后一个ID + * @param string|true $type + * @param integer $lastId + */ + private function saveLastId($type, $lastId = 0) + { + if ($type === true) { + $setting = []; + } else { + $setting = Base::setting('elasticSearch:sync'); + $setting[$type] = $lastId; + } + Base::setting('elasticSearch:sync', $setting); + } + + /** + * 获取最后一个ID + * @param $type + * @return int + */ + private function getLastId($type) + { + if ($this->option('i')) { + $setting = Base::setting('elasticSearch:sync'); + return intval($setting[$type] ?? 0); + } + return 0; + } + + /** + * 同步用户-会话数据(父文档) + * @param $batchSize + * @return void + */ + private function syncDialogUsers($batchSize) + { + $this->info("\n同步用户数据..."); + $lastId = $this->getLastId('dialog_user'); + + $num = 0; + $count = WebSocketDialogUser::where('id', '>', $lastId)->count(); + + do { + // 获取一批用户-会话关系 + $dialogUsers = WebSocketDialogUser::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($dialogUsers->isEmpty()) { + break; + } + + $num += count($dialogUsers); + $progress = round($num / $count * 100, 2); + $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}"); + + // 批量索引数据 + $params = ['body' => []]; + foreach ($dialogUsers as $dialogUser) { + $params['body'][] = [ + 'index' => [ + '_index' => ElasticSearch::DUM, + '_id' => ElasticSearch::generateDialogUserDicId($dialogUser), + ] + ]; + $params['body'][] = ElasticSearch::generateDialogUserFormat($dialogUser); + } + + if ($params['body']) { + $result = $this->elasticsearch->bulk($params); + if (isset($result['errors']) && $result['errors']) { + $this->error('批量索引用户数据部分失败'); + Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); + } + } + + $lastId = $dialogUsers->last()->id; + $this->saveLastId('dialog_user', $lastId); + } while (count($dialogUsers) == $batchSize); + + $this->info("同步用户数据结束 - 最后ID {$lastId}"); + } + + /** + * 同步消息数据(子文档) + */ + private function syncDialogMsgs($batchSize) + { + $this->info("\n同步消息数据..."); + $lastId = $this->getLastId('dialog_msg'); + + $num = 0; + $count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); + + do { + // 获取一批消息 + $dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($dialogMsgs->isEmpty()) { + break; + } + + $num += count($dialogMsgs); + $progress = round($num / $count * 100, 2); + $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}"); + + // 获取这些消息所属的会话对应的所有用户 + $dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray(); + $userDialogMap = []; + + if (!empty($dialogIds)) { + $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); + + foreach ($dialogUsers as $dialogUser) { + $userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid; + } + } + + // 批量索引消息数据 + $params = ['body' => []]; + foreach ($dialogMsgs as $dialogMsg) { + // 如果该会话没有用户,跳过 + if (empty($userDialogMap[$dialogMsg->dialog_id])) { + continue; + } + + // 为每个用户-会话关系创建子文档 + foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) { + $params['body'][] = [ + 'index' => [ + '_index' => ElasticSearch::DUM, + '_id' => ElasticSearch::generateDialogMsgDicId($dialogMsg, $userid), + 'routing' => ElasticSearch::generateDialogMsgParentId($dialogMsg, $userid) // 路由到父文档 + ] + ]; + + $params['body'][] = ElasticSearch::generateDialogMsgFormat($dialogMsg, $userid); + } + } + + if (!empty($params['body'])) { + // 分批处理 + $chunks = array_chunk($params['body'], 1000); + foreach ($chunks as $chunk) { + $chunkParams = ['body' => $chunk]; + $result = $this->elasticsearch->bulk($chunkParams); + if (isset($result['errors']) && $result['errors']) { + $this->error('批量索引消息数据部分失败'); + Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); + } + } + } + + $lastId = $dialogMsgs->last()->id; + $this->saveLastId('dialog_msg', $lastId); + } while (count($dialogMsgs) == $batchSize); + + $this->info("同步消息结束 - 最后ID {$lastId}"); + } +} diff --git a/app/Http/Controllers/IndexController.php b/app/Http/Controllers/IndexController.php index 69f08156b..5ec4e0e24 100755 --- a/app/Http/Controllers/IndexController.php +++ b/app/Http/Controllers/IndexController.php @@ -23,6 +23,7 @@ use App\Tasks\AutoArchivedTask; use App\Tasks\DeleteBotMsgTask; use App\Tasks\CheckinRemindTask; use App\Tasks\CloseMeetingRoomTask; +use App\Tasks\ElasticSearchSyncTask; use App\Tasks\UnclaimedTaskRemindTask; use Hhxsv5\LaravelS\Swoole\Task\Task; use Laravolt\Avatar\Avatar; @@ -258,6 +259,8 @@ class IndexController extends InvokeController Task::deliver(new UnclaimedTaskRemindTask()); // 关闭会议室 Task::deliver(new CloseMeetingRoomTask()); + // ElasticSearch 同步 + Task::deliver(new ElasticSearchSyncTask()); return "success"; } diff --git a/app/Module/ElasticSearch.php b/app/Module/ElasticSearch.php index 3069e8b0e..ab1885691 100644 --- a/app/Module/ElasticSearch.php +++ b/app/Module/ElasticSearch.php @@ -77,7 +77,7 @@ class ElasticSearch public function indexExists() { $params = ['index' => $this->index]; - return (bool)$this->client->indices()->exists($params); + return $this->client->indices()->exists($params)->asBool(); } /** @@ -404,36 +404,53 @@ class ElasticSearch const DUM = "dialog_user_msg"; + /** + * 会话用户 - 生成文档ID + * @param WebSocketDialogUser $dialogUser + * @return string + */ + public static function generateDialogUserDicId(WebSocketDialogUser $dialogUser) + { + return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; + } + + + /** + * 会话用户 - 生成文档格式 + * @param WebSocketDialogUser $dialogUser + * @return array + */ + public static function generateDialogUserFormat(WebSocketDialogUser $dialogUser) + { + return [ + 'dialog_id' => $dialogUser->dialog_id, + 'created_at' => $dialogUser->created_at, + 'updated_at' => $dialogUser->updated_at, + + 'userid' => $dialogUser->userid, + 'top_at' => $dialogUser->top_at, + 'last_at' => $dialogUser->last_at, + 'mark_unread' => $dialogUser->mark_unread ? 1 : 0, + 'silence' => $dialogUser->silence ? 1 : 0, + 'hide' => $dialogUser->hide ? 1 : 0, + 'color' => $dialogUser->color, + + 'relationship' => [ + 'name' => 'dialog_user' + ] + ]; + } + /** * 会话用户 - 同步到Elasticsearch + * @param WebSocketDialogUser $dialogUser + * @return void */ public static function syncDialogUserToElasticSearch(WebSocketDialogUser $dialogUser) { try { $es = new self(self::DUM); - - $docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; - - $document = [ - 'dialog_id' => $dialogUser->dialog_id, - 'created_at' => $dialogUser->created_at, - 'updated_at' => $dialogUser->updated_at, - - 'userid' => $dialogUser->userid, - 'top_at' => $dialogUser->top_at, - 'last_at' => $dialogUser->last_at, - 'mark_unread' => $dialogUser->mark_unread ? 1 : 0, - 'silence' => $dialogUser->silence ? 1 : 0, - 'hide' => $dialogUser->hide ? 1 : 0, - 'color' => $dialogUser->color, - - 'relationship' => [ - 'name' => 'dialog_user' - ] - ]; - - $es->indexDocument($document, $docId); - + $es->indexDocument(self::generateDialogUserFormat($dialogUser), self::generateDialogUserDicId($dialogUser)); } catch (\Exception $e) { Log::error('syncDialogUserToElasticSearch: ' . $e->getMessage()); } @@ -461,6 +478,58 @@ class ElasticSearch } } + /** ******************************************************************************************************** */ + /** ******************************************************************************************************** */ + /** ******************************************************************************************************** */ + + /** + * 会话消息 - 生成父文档ID + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return string + */ + public static function generateDialogMsgParentId(WebSocketDialogMsg $dialogMsg, $userid) + { + return "user_{$userid}_dialog_{$dialogMsg->dialog_id}"; + } + + /** + * 会话消息 - 生成文档ID + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return string + */ + public static function generateDialogMsgDicId(WebSocketDialogMsg $dialogMsg, $userid) + { + return "msg_{$dialogMsg->id}_user_{$userid}"; + } + + /** + * 会话消息 - 生成文档格式 + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return array + */ + public static function generateDialogMsgFormat(WebSocketDialogMsg $dialogMsg, $userid) + { + return [ + 'dialog_id' => $dialogMsg->dialog_id, + 'created_at' => $dialogMsg->created_at, + 'updated_at' => $dialogMsg->updated_at, + + 'msg_id' => $dialogMsg->id, + 'sender_userid' => $dialogMsg->userid, + 'msg_type' => $dialogMsg->type, + 'key' => $dialogMsg->key, + 'bot' => $dialogMsg->bot ? 1 : 0, + + 'relationship' => [ + 'name' => 'dialog_msg', + 'parent' => self::generateDialogMsgParentId($dialogMsg, $userid) + ] + ]; + } + /** * 会话消息 - 同步到Elasticsearch */ @@ -479,33 +548,14 @@ class ElasticSearch $params = ['body' => []]; foreach ($dialogUsers as $dialogUser) { - $parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}"; - $docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}"; - $params['body'][] = [ 'index' => [ '_index' => self::DUM, - '_id' => $docId, - 'routing' => $parentId - ] - ]; - - $params['body'][] = [ - 'dialog_id' => $dialogMsg->dialog_id, - 'created_at' => $dialogMsg->created_at, - 'updated_at' => $dialogMsg->updated_at, - - 'msg_id' => $dialogMsg->id, - 'sender_userid' => $dialogMsg->userid, - 'msg_type' => $dialogMsg->type, - 'key' => $dialogMsg->key, - 'bot' => $dialogMsg->bot ? 1 : 0, - - 'relationship' => [ - 'name' => 'dialog_msg', - 'parent' => $parentId + '_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid), + 'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid) ] ]; + $params['body'][] = self::generateDialogMsgFormat($dialogMsg, $dialogUser->userid); } if (!empty($params['body'])) { @@ -534,14 +584,11 @@ class ElasticSearch $params = ['body' => []]; foreach ($dialogUsers as $dialogUser) { - $docId = "msg_{$dialogMsg->id}_user_{$dialogUser->userid}"; - $parentId = "user_{$dialogUser->userid}_dialog_{$dialogMsg->dialog_id}"; - $params['body'][] = [ 'delete' => [ '_index' => self::DUM, - '_id' => $docId, - 'routing' => $parentId + '_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid), + 'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid) ] ]; } diff --git a/app/Tasks/ElasticSearchSyncTask.php b/app/Tasks/ElasticSearchSyncTask.php new file mode 100644 index 000000000..eef1d393e --- /dev/null +++ b/app/Tasks/ElasticSearchSyncTask.php @@ -0,0 +1,30 @@ +addMinutes(10)); + // 判断参数 + @shell_exec("php /var/www/artisan elasticsearch:sync-dialog-user-msg --i"); + } + + public function end() + { + } +}