From 679c2070c1aad250bf4d3382410f79993ae79441 Mon Sep 17 00:00:00 2001 From: kuaifan Date: Thu, 17 Apr 2025 11:06:31 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96=E5=85=A8=E6=96=87?= =?UTF-8?q?=E6=90=9C=E7=B4=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SyncDialogUserMsgToZincSearch.php | 254 ++++++++++++++++++ app/Module/ZincSearch.php | 218 +++++++++++++++ 2 files changed, 472 insertions(+) create mode 100644 app/Console/Commands/SyncDialogUserMsgToZincSearch.php create mode 100644 app/Module/ZincSearch.php diff --git a/app/Console/Commands/SyncDialogUserMsgToZincSearch.php b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php new file mode 100644 index 000000000..9f2f6248a --- /dev/null +++ b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php @@ -0,0 +1,254 @@ +es = new ElasticSearchUserMsg(); + } catch (\Exception $e) { + $this->error('Elasticsearch连接失败: ' . $e->getMessage()); + exit(1); + } + } + + /** + * @return int + * @throws \Exception + */ + public function handle() + { + $this->info('开始同步聊天数据...'); + + // 清除索引 + if ($this->option('c')) { + $this->info('清除索引...'); + if (!$this->es->indexExists()) { + $this->saveLastId(true); + $this->info('索引不存在'); + return 0; + } + $result = $this->es->deleteIndex(); + if (isset($result['error'])) { + $this->error('删除索引失败: ' . $result['error']); + return 1; + } + $this->saveLastId(true); + $this->info('索引删除成功'); + return 0; + } + + // 判断创建索引 + if (!$this->es->indexExists()) { + $this->info('创建索引...'); + $result = ElasticSearchUserMsg::generateIndex(); + if (isset($result['error'])) { + $this->error('创建索引失败: ' . $result['error']); + return 1; + } + $this->saveLastId(true); + $this->info('索引创建成功'); + } + + // 同步用户-会话数据 + $this->syncDialogUsers($this->option('batch')); + + // 同步消息数据 + $this->syncDialogMsgs($this->option('batch')); + + // 完成 + $this->info("\n同步完成"); + return 0; + } + + /** + * 保存最后一个ID + * @param string|true $type + * @param integer $lastId + */ + private function saveLastId($type, $lastId = 0) + { + if ($type === true) { + $setting = []; + } else { + $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); + $setting[$type] = $lastId; + } + ElasticSearchKeyValue::save('elasticSearch:sync', $setting); + } + + /** + * 获取最后一个ID + * @param $type + * @return int + */ + private function getLastId($type) + { + if ($this->option('i')) { + $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); + return intval($setting[$type] ?? 0); + } + return 0; + } + + /** + * 同步用户-会话数据(父文档) + * @param $batchSize + * @return void + */ + private function syncDialogUsers($batchSize) + { + $this->info("\n同步用户数据..."); + $lastId = $this->getLastId('dialog_user'); + + $num = 0; + $count = WebSocketDialogUser::where('id', '>', $lastId)->count(); + + do { + // 获取一批用户-会话关系 + $dialogUsers = WebSocketDialogUser::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($dialogUsers->isEmpty()) { + break; + } + + $num += count($dialogUsers); + $progress = round($num / $count * 100, 2); + $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}"); + + // 批量索引数据 + $params = ['body' => []]; + foreach ($dialogUsers as $dialogUser) { + $params['body'][] = [ + 'index' => [ + '_index' => ElasticSearchUserMsg::indexName(), + '_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser), + ] + ]; + $params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser); + } + + if ($params['body']) { + $result = $this->es->bulk($params); + if (isset($result['errors']) && $result['errors']) { + $this->error('批量索引用户数据部分失败'); + Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); + } + } + + $lastId = $dialogUsers->last()->id; + $this->saveLastId('dialog_user', $lastId); + } while (count($dialogUsers) == $batchSize); + + $this->info("同步用户数据结束 - 最后ID {$lastId}"); + } + + /** + * 同步消息数据(子文档) + */ + private function syncDialogMsgs($batchSize) + { + $this->info("\n同步消息数据..."); + $lastId = $this->getLastId('dialog_msg'); + + $num = 0; + $count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); + + do { + // 获取一批消息 + $dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($dialogMsgs->isEmpty()) { + break; + } + + $num += count($dialogMsgs); + $progress = round($num / $count * 100, 2); + $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}"); + + // 获取这些消息所属的会话对应的所有用户 + $dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray(); + $userDialogMap = []; + + if (!empty($dialogIds)) { + $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); + + foreach ($dialogUsers as $dialogUser) { + $userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid; + } + } + + // 批量索引消息数据 + $params = ['body' => []]; + foreach ($dialogMsgs as $dialogMsg) { + // 如果该会话没有用户,跳过 + if (empty($userDialogMap[$dialogMsg->dialog_id])) { + continue; + } + + // 为每个用户-会话关系创建子文档 + foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) { + $params['body'][] = [ + 'index' => [ + '_index' => ElasticSearchUserMsg::indexName(), + '_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid), + 'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档 + ] + ]; + + $params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid); + } + } + + if (!empty($params['body'])) { + // 分批处理 + $chunks = array_chunk($params['body'], 1000); + foreach ($chunks as $chunk) { + $chunkParams = ['body' => $chunk]; + $result = $this->es->bulk($chunkParams); + if (isset($result['errors']) && $result['errors']) { + $this->error('批量索引消息数据部分失败'); + Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); + } + } + } + + $lastId = $dialogMsgs->last()->id; + $this->saveLastId('dialog_msg', $lastId); + } while (count($dialogMsgs) == $batchSize); + + $this->info("同步消息结束 - 最后ID {$lastId}"); + } +} diff --git a/app/Module/ZincSearch.php b/app/Module/ZincSearch.php new file mode 100644 index 000000000..4662d3c0a --- /dev/null +++ b/app/Module/ZincSearch.php @@ -0,0 +1,218 @@ +host = env('ZINCSEARCH_HOST', 'search'); + $this->port = env('ZINCSEARCH_PORT', '4080'); + $this->user = env('DB_USERNAME', ''); + $this->pass = env('DB_PASSWORD', ''); + } + + /** + * 获取配置 + */ + private function config(): array + { + return [ + 'host' => $this->host, + 'port' => $this->port, + 'user' => $this->user, + 'pass' => $this->pass + ]; + } + + /** + * 通用请求方法 + */ + private function request($path, $body = null, $method = 'POST') + { + $ch = curl_init(); + curl_setopt($ch, CURLOPT_URL, "http://{$this->host}:{$this->port}{$path}"); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method); + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + curl_setopt($ch, CURLOPT_USERPWD, $this->user . ':' . $this->pass); + curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + + $headers = ['Content-Type: application/json']; + if ($method === 'BULK') { + $headers = ['Content-Type: text/plain']; + $method = 'POST'; + } + + curl_setopt($ch, CURLOPT_HTTPHEADER, $headers); + if ($body !== null) { + curl_setopt($ch, CURLOPT_POSTFIELDS, $body); + } + $result = curl_exec($ch); + $error = curl_error($ch); + $status = curl_getinfo($ch, CURLINFO_HTTP_CODE); + curl_close($ch); + if ($error) { + return ['success' => false, 'error' => $error]; + } + $data = json_decode($result, true); + return [ + 'success' => $status >= 200 && $status < 300, + 'status' => $status, + 'data' => $data + ]; + } + + // ============================== + // 索引管理相关方法 + // ============================== + + /** + * 创建索引 + */ + public static function createIndex($index, $mappings = []): array + { + $body = json_encode([ + 'name' => $index, + 'mappings' => $mappings + ]); + return (new self())->request("/api/index", $body); + } + + /** + * 获取索引信息 + */ + public static function getIndex($index): array + { + return (new self())->request("/api/index/{$index}", null, 'GET'); + } + + /** + * 获取所有索引 + */ + public static function listIndices(): array + { + return (new self())->request("/api/index", null, 'GET'); + } + + /** + * 删除索引 + */ + public static function deleteIndex($index): array + { + return (new self())->request("/api/index/{$index}", null, 'DELETE'); + } + + /** + * 分析文本 + */ + public static function analyze($analyzer, $text): array + { + $body = json_encode([ + 'analyzer' => $analyzer, + 'text' => $text + ]); + return (new self())->request("/api/_analyze", $body); + } + + // ============================== + // 文档管理相关方法 + // ============================== + + /** + * 写入单条文档 + */ + public static function addDoc($index, $doc): array + { + $body = json_encode($doc); + return (new self())->request("/api/{$index}/_doc", $body); + } + + /** + * 更新文档 + */ + public static function updateDoc($index, $id, $doc): array + { + $body = json_encode($doc); + return (new self())->request("/api/{$index}/_update/{$id}", $body); + } + + /** + * 删除文档 + */ + public static function deleteDoc($index, $id): array + { + return (new self())->request("/api/{$index}/_doc/{$id}", null, 'DELETE'); + } + + /** + * 批量写入文档 + */ + public static function addDocs($index, $docs): array + { + $body = json_encode([ + 'index' => $index, + 'records' => $docs + ]); + return (new self())->request("/api/_bulkv2", $body); + } + + /** + * 使用原始BULK API批量写入文档 + * 请求格式为Elasticsearch兼容格式 + */ + public static function bulkDocs($data): array + { + return (new self())->request("/api/_bulk", $data, 'BULK'); + } + + // ============================== + // 搜索相关方法 + // ============================== + + /** + * 查询文档 + */ + public static function search($index, $query, $from = 0, $size = 10): array + { + $searchParams = [ + 'search_type' => 'match', + 'query' => [ + 'term' => $query + ], + 'from' => $from, + 'max_results' => $size + ]; + + $body = json_encode($searchParams); + return (new self())->request("/api/{$index}/_search", $body); + } + + /** + * 高级查询文档 + */ + public static function advancedSearch($index, $searchParams): array + { + $body = json_encode($searchParams); + return (new self())->request("/api/{$index}/_search", $body); + } + + /** + * 多索引查询 + */ + public static function multiSearch($queries): array + { + $body = json_encode($queries); + return (new self())->request("/api/_msearch", $body); + } +}