From 58cb49b125a3460478d1dcb9e061cf4d5b8712f1 Mon Sep 17 00:00:00 2001 From: kuaifan Date: Sat, 8 Mar 2025 10:00:27 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E4=BC=98=E5=8C=96ES=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../SyncDialogUserMsgToElasticsearch.php | 33 +- app/Http/Controllers/Api/DialogController.php | 8 +- app/Module/ElasticSearch.php | 619 ------------------ .../ElasticSearch/ElasticSearchBase.php | 282 ++++++++ .../ElasticSearch/ElasticSearchKeyValue.php | 211 ++++++ .../ElasticSearch/ElasticSearchUserMsg.php | 382 +++++++++++ app/Observers/WebSocketDialogMsgObserver.php | 8 +- app/Observers/WebSocketDialogUserObserver.php | 8 +- app/Tasks/ElasticSearchSyncTask.php | 17 +- 9 files changed, 918 insertions(+), 650 deletions(-) delete mode 100644 app/Module/ElasticSearch.php create mode 100644 app/Module/ElasticSearch/ElasticSearchBase.php create mode 100644 app/Module/ElasticSearch/ElasticSearchKeyValue.php create mode 100644 app/Module/ElasticSearch/ElasticSearchUserMsg.php diff --git a/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php b/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php index ec2be19a1..c91b44ca0 100644 --- a/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php +++ b/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php @@ -2,10 +2,10 @@ namespace App\Console\Commands; -use App\Module\Base; -use App\Module\ElasticSearch; use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogUser; +use App\Module\ElasticSearch\ElasticSearchKeyValue; +use App\Module\ElasticSearch\ElasticSearchUserMsg; use Illuminate\Console\Command; use Illuminate\Support\Facades\Log; @@ -30,7 +30,12 @@ class SyncDialogUserMsgToElasticsearch extends Command public function __construct() { parent::__construct(); - $this->es = new ElasticSearch(ElasticSearch::DUMIndex()); + try { + $this->es = new ElasticSearchUserMsg(); + } catch (\Exception $e) { + $this->error('Elasticsearch连接失败: ' . $e->getMessage()); + exit(1); + } } /** @@ -62,7 +67,7 @@ class SyncDialogUserMsgToElasticsearch extends Command // 判断创建索引 if (!$this->es->indexExists()) { $this->info('创建索引...'); - $result = $this->es->createDialogUserMsgIndex(); + $result = ElasticSearchUserMsg::generateIndex(); if (isset($result['error'])) { $this->error('创建索引失败: ' . $result['error']); return 1; @@ -92,10 +97,10 @@ class SyncDialogUserMsgToElasticsearch extends Command if ($type === true) { $setting = []; } else { - $setting = Base::setting('elasticSearch:sync'); + $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); $setting[$type] = $lastId; } - Base::setting('elasticSearch:sync', $setting); + ElasticSearchKeyValue::save('elasticSearch:sync', $setting); } /** @@ -106,7 +111,7 @@ class SyncDialogUserMsgToElasticsearch extends Command private function getLastId($type) { if ($this->option('i')) { - $setting = Base::setting('elasticSearch:sync'); + $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); return intval($setting[$type] ?? 0); } return 0; @@ -145,11 +150,11 @@ class SyncDialogUserMsgToElasticsearch extends Command foreach ($dialogUsers as $dialogUser) { $params['body'][] = [ 'index' => [ - '_index' => ElasticSearch::DUMIndex(), - '_id' => ElasticSearch::generateDialogUserDicId($dialogUser), + '_index' => ElasticSearchUserMsg::indexName(), + '_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser), ] ]; - $params['body'][] = ElasticSearch::generateDialogUserFormat($dialogUser); + $params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser); } if ($params['body']) { @@ -217,13 +222,13 @@ class SyncDialogUserMsgToElasticsearch extends Command foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) { $params['body'][] = [ 'index' => [ - '_index' => ElasticSearch::DUMIndex(), - '_id' => ElasticSearch::generateDialogMsgDicId($dialogMsg, $userid), - 'routing' => ElasticSearch::generateDialogMsgParentId($dialogMsg, $userid) // 路由到父文档 + '_index' => ElasticSearchUserMsg::indexName(), + '_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid), + 'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档 ] ]; - $params['body'][] = ElasticSearch::generateDialogMsgFormat($dialogMsg, $userid); + $params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid); } } diff --git a/app/Http/Controllers/Api/DialogController.php b/app/Http/Controllers/Api/DialogController.php index 7ff41291a..dfa2d4371 100755 --- a/app/Http/Controllers/Api/DialogController.php +++ b/app/Http/Controllers/Api/DialogController.php @@ -13,7 +13,7 @@ use App\Models\User; use App\Module\Base; use App\Module\Timer; use App\Module\Extranet; -use App\Module\ElasticSearch; +use App\Module\ElasticSearch\ElasticSearchUserMsg; use App\Module\TimeRange; use App\Module\MsgTool; use App\Module\Table\OnlineData; @@ -172,8 +172,7 @@ class DialogController extends AbstractController } // 搜索消息会话 if (count($list) < 20) { - $es = new ElasticSearch(ElasticSearch::DUMIndex()); - $searchResults = $es->searchDialogsByUserAndKeyword($user->userid, $key, 20 - count($list)); + $searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, 20 - count($list)); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { @@ -734,8 +733,7 @@ class DialogController extends AbstractController $key = trim(Request::input('key')); $list = []; // - $es = new ElasticSearch(ElasticSearch::DUMIndex()); - $searchResults = $es->searchDialogsByUserAndKeyword($user->userid, $key, Base::getPaginate(50, 20)); + $searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, Base::getPaginate(50, 20)); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { diff --git a/app/Module/ElasticSearch.php b/app/Module/ElasticSearch.php deleted file mode 100644 index 7cff30e64..000000000 --- a/app/Module/ElasticSearch.php +++ /dev/null @@ -1,619 +0,0 @@ - ["{$scheme}://{$host}:{$port}"] - ]; - - // 如果设置了用户名和密码 - if (!empty($user)) { - $config['basicAuthentication'] = [$user, $pass]; - } - - $config['SSLVerification'] = $verifi; - if ($verifi) { - $config['SSLCert'] = $cert; - $config['CABundle'] = $ca; - $config['SSLKey'] = $key; - } - // 8.x版本使用ClientBuilder::fromConfig创建客户端 - $this->client = ClientBuilder::fromConfig($config); - - if ($index) { - $this->index = $index; - } - } - - /** - * 设置索引名称 - * - * @param string $index - * @return $this - */ - public function setIndex($index) - { - $this->index = $index; - return $this; - } - - /** - * 检查索引是否存在 - * - * @return bool - * @throws \Exception - */ - public function indexExists() - { - $params = ['index' => $this->index]; - return $this->client->indices()->exists($params)->asBool(); - } - - /** - * 创建索引 - * - * @param array $settings 索引设置 - * @param array $mappings 字段映射 - * @return array - */ - public function createIndex($settings = [], $mappings = []) - { - $params = [ - 'index' => $this->index - ]; - - $body = []; - if (!empty($settings)) { - $body['settings'] = $settings; - } - - if (!empty($mappings)) { - $body['mappings'] = $mappings; - } - - if (!empty($body)) { - $params['body'] = $body; - } - - try { - // 在8.x中,索引操作位于indices()命名空间 - return $this->client->indices()->create($params)->asArray(); - } catch (\Exception $e) { - Log::error('创建Elasticsearch索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 删除索引 - * @return array - */ - public function deleteIndex() - { - try { - $params = ['index' => $this->index]; - return $this->client->indices()->delete($params)->asArray(); - } catch (\Exception $e) { - Log::error('删除Elasticsearch索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 批量操作(批量添加/更新/删除文档) - * - * @param array $operations 批量操作的数据 - * @return array - */ - public function bulk($operations) - { - try { - // 在8.x中,批量操作API签名相同,但内部实现有所变化 - return $this->client->bulk($operations)->asArray(); - } catch (\Exception $e) { - Log::error('批量操作失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 索引单个文档 - * - * @param array $document 文档数据 - * @param string $id 文档ID - * @param string|null $routing 路由值,用于父子文档 - * @return array - */ - public function indexDocument($document, $id, $routing = null) - { - $params = [ - 'index' => $this->index, - 'id' => $id, - 'body' => $document - ]; - - if ($routing) { - $params['routing'] = $routing; - } - - try { - return $this->client->index($params)->asArray(); - } catch (\Exception $e) { - Log::error('索引文档失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 删除文档 - * - * @param string $id 文档ID - * @param string|null $routing 路由值,用于父子文档 - * @return array - */ - public function deleteDocument($id, $routing = null) - { - $params = [ - 'index' => $this->index, - 'id' => $id - ]; - - if ($routing) { - $params['routing'] = $routing; - } - - try { - return $this->client->delete($params)->asArray(); - } catch (MissingParameterException $e) { - // 文档不存在时返回成功 - return ['result' => 'not_found', 'error' => $e->getMessage()]; - } catch (\Exception $e) { - Log::error('删除文档失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 通用搜索方法 - * - * @param array $query 搜索查询 - * @param int $from 起始位置 - * @param int $size 返回结果数量 - * @param array $sort 排序规则 - * @return array - */ - public function search($query, $from = 0, $size = 10, $sort = []) - { - $params = [ - 'index' => $this->index, - 'body' => [ - 'query' => $query, - 'from' => $from, - 'size' => $size - ] - ]; - - if (!empty($sort)) { - $params['body']['sort'] = $sort; - } - - try { - return $this->client->search($params)->asArray(); - } catch (\Exception $e) { - Log::error('搜索失败: ' . $e->getMessage()); - return ['error' => $e->getMessage(), 'hits' => ['total' => ['value' => 0], 'hits' => []]]; - } - } - - /** - * 刷新索引 - * @return array - */ - public function refreshIndex() - { - $params = [ - 'index' => $this->index - ]; - - try { - return $this->client->indices()->refresh($params)->asArray(); - } catch (\Exception $e) { - Log::error('刷新索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 检查索引映射 - * @return array - */ - public function checkIndexMapping() - { - try { - return $this->client->indices()->getMapping(['index' => $this->index])->asArray(); - } catch (\Exception $e) { - return ['error' => $e->getMessage()]; - } - } - - /** - * 创建聊天系统索引 - 使用父子关系 - * @return array - */ - public function createDialogUserMsgIndex() - { - // 定义映射 - $mappings = [ - 'properties' => [ - // 共用字段 - 'dialog_id' => ['type' => 'keyword'], - 'created_at' => ['type' => 'date'], - 'updated_at' => ['type' => 'date'], - - // dialog_users 字段 - 'userid' => ['type' => 'keyword'], - 'top_at' => ['type' => 'date'], - 'last_at' => ['type' => 'date'], - 'mark_unread' => ['type' => 'integer'], - 'silence' => ['type' => 'integer'], - 'hide' => ['type' => 'integer'], - 'color' => ['type' => 'keyword'], - - // dialog_msgs 字段 - 'msg_id' => ['type' => 'keyword'], - 'sender_userid' => ['type' => 'keyword'], - 'msg_type' => ['type' => 'keyword'], - 'key' => ['type' => 'text'], - 'bot' => ['type' => 'integer'], - - // Join字段定义父子关系 - 'relationship' => [ - 'type' => 'join', - 'relations' => [ - 'dialog_user' => 'dialog_msg' // dialog_user是父文档,dialog_msg是子文档 - ] - ], - ] - ]; - - // 索引设置 - $settings = [ - 'number_of_shards' => 5, - 'number_of_replicas' => 1, - 'refresh_interval' => '5s' - ]; - - return $this->createIndex($settings, $mappings); - } - - /** - * 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话 - * - * @param string $userid 用户ID - * @param string $keyword 消息关键词 - * @param int $size 返回结果数量 - * @return array - */ - public function searchDialogsByUserAndKeyword($userid, $keyword, $size = 20) - { - // 注意这里的类型名称要与创建索引时的一致 - $query = [ - 'bool' => [ - 'must' => [ - [ - 'term' => [ - 'userid' => $userid - ] - ], - [ - 'has_child' => [ - 'type' => 'dialog_msg', - 'query' => [ - 'bool' => [ - 'must' => [ - [ - 'match_phrase' => [ - 'key' => $keyword - ] - ], - [ - 'term' => [ - 'bot' => 0 - ] - ] - ] - ] - ], - 'inner_hits' => [ - 'size' => 1, - 'sort' => [ - 'msg_id' => 'desc' - ] - ] - ] - ] - ] - ] - ]; - - // 开始搜索 - $results = $this->search($query, 0, $size, ['last_at' => 'desc']); - - // 处理搜索结果 - $searchMap = []; - $hits = $results['hits']['hits'] ?? []; - - foreach ($hits as $hit) { - if (isset($hit['inner_hits']['dialog_msg']['hits']['hits'][0])) { - $msgHit = $hit['inner_hits']['dialog_msg']['hits']['hits'][0]; - $source = $hit['_source']; - $msgSource = $msgHit['_source']; - - $searchMap[] = [ - 'id' => $source['dialog_id'], - 'top_at' => $source['top_at'], - 'last_at' => $source['last_at'], - 'mark_unread' => $source['mark_unread'], - 'silence' => $source['silence'], - 'hide' => $source['hide'], - 'color' => $source['color'], - 'user_at' => $source['updated_at'], - 'search_msg_id' => $msgSource['msg_id'], - ]; - } - } - - // 返回搜索结果 - return $searchMap; - } - - /** ******************************************************************************************************** */ - /** ******************************************************************************************************** */ - /** ******************************************************************************************************** */ - - /** - * DialogUserMsg 索引名称 - * @return string - */ - public static function DUMIndex() - { - return "dialog_user_msg" . env("ES_INDEX_SUFFIX", ""); - } - - /** - * 会话用户 - 生成文档ID - * @param WebSocketDialogUser $dialogUser - * @return string - */ - public static function generateDialogUserDicId(WebSocketDialogUser $dialogUser) - { - return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; - } - - - /** - * 会话用户 - 生成文档格式 - * @param WebSocketDialogUser $dialogUser - * @return array - */ - public static function generateDialogUserFormat(WebSocketDialogUser $dialogUser) - { - return [ - 'dialog_id' => $dialogUser->dialog_id, - 'created_at' => $dialogUser->created_at, - 'updated_at' => $dialogUser->updated_at, - - 'userid' => $dialogUser->userid, - 'top_at' => $dialogUser->top_at, - 'last_at' => $dialogUser->last_at, - 'mark_unread' => $dialogUser->mark_unread ? 1 : 0, - 'silence' => $dialogUser->silence ? 1 : 0, - 'hide' => $dialogUser->hide ? 1 : 0, - 'color' => $dialogUser->color, - - 'relationship' => [ - 'name' => 'dialog_user' - ] - ]; - } - - /** - * 会话用户 - 同步到Elasticsearch - * @param WebSocketDialogUser $dialogUser - * @return void - */ - public static function syncDialogUserToElasticSearch(WebSocketDialogUser $dialogUser) - { - try { - $es = new self(self::DUMIndex()); - $es->indexDocument(self::generateDialogUserFormat($dialogUser), self::generateDialogUserDicId($dialogUser)); - } catch (\Exception $e) { - Log::error('syncDialogUserToElasticSearch: ' . $e->getMessage()); - } - } - - /** - * 会话用户 - 从Elasticsearch删除 - */ - public static function deleteDialogUserFromElasticSearch(WebSocketDialogUser $dialogUser) - { - try { - $es = new self(self::DUMIndex()); - - $docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; - - // 删除用户-会话文档 - $es->deleteDocument($docId); - - // 注意:这里可能还需要删除所有关联的消息文档 - // 但由于父子关系,可以通过查询找到所有子文档并删除 - // 这里为简化,可以选择在后台任务中处理,或者直接依赖ES的级联删除功能 - - } catch (\Exception $e) { - Log::error('deleteDialogUserFromElasticSearch: ' . $e->getMessage()); - } - } - - /** ******************************************************************************************************** */ - /** ******************************************************************************************************** */ - /** ******************************************************************************************************** */ - - /** - * 会话消息 - 生成父文档ID - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return string - */ - public static function generateDialogMsgParentId(WebSocketDialogMsg $dialogMsg, $userid) - { - return "user_{$userid}_dialog_{$dialogMsg->dialog_id}"; - } - - /** - * 会话消息 - 生成文档ID - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return string - */ - public static function generateDialogMsgDicId(WebSocketDialogMsg $dialogMsg, $userid) - { - return "msg_{$dialogMsg->id}_user_{$userid}"; - } - - /** - * 会话消息 - 生成文档格式 - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return array - */ - public static function generateDialogMsgFormat(WebSocketDialogMsg $dialogMsg, $userid) - { - return [ - 'dialog_id' => $dialogMsg->dialog_id, - 'created_at' => $dialogMsg->created_at, - 'updated_at' => $dialogMsg->updated_at, - - 'msg_id' => $dialogMsg->id, - 'sender_userid' => $dialogMsg->userid, - 'msg_type' => $dialogMsg->type, - 'key' => $dialogMsg->key, - 'bot' => $dialogMsg->bot ? 1 : 0, - - 'relationship' => [ - 'name' => 'dialog_msg', - 'parent' => self::generateDialogMsgParentId($dialogMsg, $userid) - ] - ]; - } - - /** - * 会话消息 - 同步到Elasticsearch - */ - public static function syncDialogToElasticSearch(WebSocketDialogMsg $dialogMsg) - { - try { - $es = new self(self::DUMIndex()); - - // 获取此会话的所有用户 - $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); - - if ($dialogUsers->isEmpty()) { - return; - } - - $params = ['body' => []]; - - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'index' => [ - '_index' => self::DUMIndex(), - '_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid), - 'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid) - ] - ]; - $params['body'][] = self::generateDialogMsgFormat($dialogMsg, $dialogUser->userid); - } - - if (!empty($params['body'])) { - $es->bulk($params); - } - } catch (\Exception $e) { - Log::error('syncDialogToElasticSearch: ' . $e->getMessage()); - } - } - - /** - * 会话消息 - 从Elasticsearch删除 - */ - public static function deleteDialogFromElasticSearch(WebSocketDialogMsg $dialogMsg) - { - try { - $es = new self(self::DUMIndex()); - - // 获取此会话的所有用户 - $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); - - if ($dialogUsers->isEmpty()) { - return; - } - - $params = ['body' => []]; - - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'delete' => [ - '_index' => self::DUMIndex(), - '_id' => self::generateDialogMsgDicId($dialogMsg, $dialogUser->userid), - 'routing' => self::generateDialogMsgParentId($dialogMsg, $dialogUser->userid) - ] - ]; - } - - if (!empty($params['body'])) { - $es->bulk($params); - } - } catch (\Exception $e) { - Log::error('deleteDialogFromElasticSearch: ' . $e->getMessage()); - } - } -} diff --git a/app/Module/ElasticSearch/ElasticSearchBase.php b/app/Module/ElasticSearch/ElasticSearchBase.php new file mode 100644 index 000000000..d807be58b --- /dev/null +++ b/app/Module/ElasticSearch/ElasticSearchBase.php @@ -0,0 +1,282 @@ + ["{$scheme}://{$host}:{$port}"] + ]; + + // 如果设置了用户名和密码 + if (!empty($user)) { + $config['basicAuthentication'] = [$user, $pass]; + } + + $config['SSLVerification'] = $verifi; + if ($verifi) { + $config['SSLCert'] = $cert; + $config['CABundle'] = $ca; + $config['SSLKey'] = $key; + } + // 8.x版本使用ClientBuilder::fromConfig创建客户端 + $this->client = ClientBuilder::fromConfig($config); + + if ($index) { + $this->index = $index; + } + } + + /** + * 设置索引名称 + * + * @param string $index + * @return $this + */ + public function setIndex($index) + { + $this->index = $index; + return $this; + } + + /** + * 检查索引是否存在 + * + * @return bool + * @throws \Exception + */ + public function indexExists() + { + $params = ['index' => $this->index]; + return $this->client->indices()->exists($params)->asBool(); + } + + /** + * 创建索引 + * + * @param array $settings 索引设置 + * @param array $mappings 字段映射 + * @return array + */ + public function createIndex($settings = [], $mappings = []) + { + $params = [ + 'index' => $this->index + ]; + + $body = []; + if (!empty($settings)) { + $body['settings'] = $settings; + } + + if (!empty($mappings)) { + $body['mappings'] = $mappings; + } + + if (!empty($body)) { + $params['body'] = $body; + } + + try { + // 在8.x中,索引操作位于indices()命名空间 + return $this->client->indices()->create($params)->asArray(); + } catch (\Exception $e) { + Log::error('创建Elasticsearch索引失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 删除索引 + * @return array + */ + public function deleteIndex() + { + try { + $params = ['index' => $this->index]; + return $this->client->indices()->delete($params)->asArray(); + } catch (\Exception $e) { + Log::error('删除Elasticsearch索引失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 批量操作(批量添加/更新/删除文档) + * + * @param array $operations 批量操作的数据 + * @return array + */ + public function bulk($operations) + { + try { + // 在8.x中,批量操作API签名相同,但内部实现有所变化 + return $this->client->bulk($operations)->asArray(); + } catch (\Exception $e) { + Log::error('批量操作失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 索引单个文档 + * + * @param array $document 文档数据 + * @param string $id 文档ID + * @param string|null $routing 路由值,用于父子文档 + * @return array + */ + public function indexDocument($document, $id, $routing = null) + { + $params = [ + 'index' => $this->index, + 'id' => $id, + 'body' => $document + ]; + + if ($routing) { + $params['routing'] = $routing; + } + + try { + return $this->client->index($params)->asArray(); + } catch (\Exception $e) { + Log::error('索引文档失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 删除文档 + * + * @param string $id 文档ID + * @param string|null $routing 路由值,用于父子文档 + * @return array + */ + public function deleteDocument($id, $routing = null) + { + $params = [ + 'index' => $this->index, + 'id' => $id + ]; + + if ($routing) { + $params['routing'] = $routing; + } + + try { + return $this->client->delete($params)->asArray(); + } catch (MissingParameterException $e) { + // 文档不存在时返回成功 + return ['result' => 'not_found', 'error' => $e->getMessage()]; + } catch (\Exception $e) { + Log::error('删除文档失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 刷新索引 + * @return array + */ + public function refreshIndex() + { + $params = [ + 'index' => $this->index + ]; + + try { + return $this->client->indices()->refresh($params)->asArray(); + } catch (\Exception $e) { + Log::error('刷新索引失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 检查索引映射 + * @return array + */ + public function checkIndexMapping() + { + try { + return $this->client->indices()->getMapping(['index' => $this->index])->asArray(); + } catch (\Exception $e) { + return ['error' => $e->getMessage()]; + } + } + + /** + * 通用搜索方法 + * + * @param array $query 搜索查询 + * @param int $from 起始位置 + * @param int $size 返回结果数量 + * @param array $sort 排序规则 + * @return array + */ + public function search($query, $from = 0, $size = 10, $sort = []) + { + $params = [ + 'index' => $this->index, + 'body' => [ + 'query' => $query, + 'from' => $from, + 'size' => $size + ] + ]; + + if (!empty($sort)) { + $params['body']['sort'] = $sort; + } + + try { + return $this->client->search($params)->asArray(); + } catch (\Exception $e) { + Log::error('搜索失败: ' . $e->getMessage()); + return ['error' => $e->getMessage(), 'hits' => ['total' => ['value' => 0], 'hits' => []]]; + } + } +} diff --git a/app/Module/ElasticSearch/ElasticSearchKeyValue.php b/app/Module/ElasticSearch/ElasticSearchKeyValue.php new file mode 100644 index 000000000..b87e515a9 --- /dev/null +++ b/app/Module/ElasticSearch/ElasticSearchKeyValue.php @@ -0,0 +1,211 @@ +indexExists()) { + return ['acknowledged' => true, 'message' => '索引已存在']; + } + + // 定义映射 + $mappings = [ + 'properties' => [ + 'key' => ['type' => 'keyword'], + 'value' => ['type' => 'text', 'fields' => ['keyword' => ['type' => 'keyword']]], + 'created_at' => ['type' => 'date'], + 'updated_at' => ['type' => 'date'] + ] + ]; + + // 索引设置 + $settings = [ + 'number_of_shards' => 1, + 'number_of_replicas' => 1, + 'refresh_interval' => '1s' + ]; + + return $es->createIndex($settings, $mappings); + } catch (\Exception $e) { + Log::error('创建键值存储索引失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 保存键值对 + * @param string $key 键名 + * @param mixed $value 键值 + * @param string $namespace 命名空间,用于区分不同的键值存储场景 + * @return array + */ + public static function save($key, $value, $namespace = 'default') + { + try { + // 确保索引存在 + self::generateIndex(); + + $es = new self(); + + // 生成文档ID + $docId = "{$namespace}:{$key}"; + + // 准备文档数据 + $document = [ + 'key' => $key, + 'value' => is_array($value) ? json_encode($value, JSON_UNESCAPED_UNICODE) : $value, + 'namespace' => $namespace, + 'created_at' => date('Y-m-d H:i:s'), + 'updated_at' => date('Y-m-d H:i:s') + ]; + + // 索引文档 + $result = $es->indexDocument($document, $docId); + + // 刷新索引以确保立即可见 + $es->refreshIndex(); + + return $result; + } catch (\Exception $e) { + Log::error('保存键值对失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 获取键值 + * @param string $key 键名 + * @param mixed $default 默认值,当键不存在时返回 + * @param string $namespace 命名空间,用于区分不同的键值存储场景 + * @return mixed + */ + public static function get($key, $default = null, $namespace = 'default') + { + try { + $es = new self(); + + // 如果索引不存在,直接返回默认值 + if (!$es->indexExists()) { + return $default; + } + + // 生成文档ID + $docId = "{$namespace}:{$key}"; + + // 查询参数 + $params = [ + 'index' => self::indexName(), + 'id' => $docId + ]; + + try { + // 获取文档 + $response = $es->client->get($params)->asArray(); + + // 获取值 + $value = $response['_source']['value'] ?? $default; + + // 如果值是JSON字符串,尝试解码 + if (is_string($value) && $decoded = json_decode($value, true)) { + if (json_last_error() === JSON_ERROR_NONE) { + return $decoded; + } + } + + return $value; + } catch (\Exception $e) { + // 文档不存在或其他错误,返回默认值 + return $default; + } + } catch (\Exception $e) { + Log::error('获取键值对失败: ' . $e->getMessage()); + return $default; + } + } + + /** + * 获取键值,返回数组 + * @param string $key 键名 + * @param array $default 默认值,当键不存在时返回 + * @param string $namespace 命名空间,用于区分不同的键值存储场景 + * @return array + */ + public static function getArray($key, $default = [], $namespace = 'default') + { + return Base::string2array(self::get($key, $default, $namespace)); + } + + /** + * 删除键值对 + * @param string $key 键名 + * @param string $namespace 命名空间 + * @return array + */ + public static function delete($key, $namespace = 'default') + { + try { + $es = new self(); + + // 如果索引不存在,直接返回成功 + if (!$es->indexExists()) { + return ['result' => 'not_found']; + } + + // 生成文档ID + $docId = "{$namespace}:{$key}"; + + // 删除文档 + $result = $es->deleteDocument($docId); + + // 刷新索引以确保立即生效 + $es->refreshIndex(); + + return $result; + } catch (\Exception $e) { + Log::error('删除键值对失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } +} diff --git a/app/Module/ElasticSearch/ElasticSearchUserMsg.php b/app/Module/ElasticSearch/ElasticSearchUserMsg.php new file mode 100644 index 000000000..027612978 --- /dev/null +++ b/app/Module/ElasticSearch/ElasticSearchUserMsg.php @@ -0,0 +1,382 @@ + [ + // 共用字段 + 'dialog_id' => ['type' => 'keyword'], + 'created_at' => ['type' => 'date'], + 'updated_at' => ['type' => 'date'], + + // dialog_users 字段 + 'userid' => ['type' => 'keyword'], + 'top_at' => ['type' => 'date'], + 'last_at' => ['type' => 'date'], + 'mark_unread' => ['type' => 'integer'], + 'silence' => ['type' => 'integer'], + 'hide' => ['type' => 'integer'], + 'color' => ['type' => 'keyword'], + + // dialog_msgs 字段 + 'msg_id' => ['type' => 'keyword'], + 'sender_userid' => ['type' => 'keyword'], + 'msg_type' => ['type' => 'keyword'], + 'key' => ['type' => 'text'], + 'bot' => ['type' => 'integer'], + + // Join字段定义父子关系 + 'relationship' => [ + 'type' => 'join', + 'relations' => [ + 'dialog_user' => 'dialog_msg' // dialog_user是父文档,dialog_msg是子文档 + ] + ], + ] + ]; + + // 索引设置 + $settings = [ + 'number_of_shards' => 5, + 'number_of_replicas' => 1, + 'refresh_interval' => '5s' + ]; + + try { + $es = new self(); + return $es->createIndex($settings, $mappings); + } catch (\Exception $e) { + Log::error('创建聊天系统索引失败: ' . $e->getMessage()); + return ['error' => $e->getMessage()]; + } + } + + /** + * 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话 + * @param string $userid 用户ID + * @param string $keyword 消息关键词 + * @param int $size 返回结果数量 + * @return array + */ + public static function searchByKeyword($userid, $keyword, $size = 20) + { + // 注意这里的类型名称要与创建索引时的一致 + $query = [ + 'bool' => [ + 'must' => [ + [ + 'term' => [ + 'userid' => $userid + ] + ], + [ + 'has_child' => [ + 'type' => 'dialog_msg', + 'query' => [ + 'bool' => [ + 'must' => [ + [ + 'match_phrase' => [ + 'key' => $keyword + ] + ], + [ + 'term' => [ + 'bot' => 0 + ] + ] + ] + ] + ], + 'inner_hits' => [ + 'size' => 1, + 'sort' => [ + 'msg_id' => 'desc' + ] + ] + ] + ] + ] + ] + ]; + + // 结果集合 + $searchMap = []; + + try { + // 开始搜索 + $es = new self(); + $results = $es->search($query, 0, $size, ['last_at' => 'desc']); + + // 处理搜索结果 + $hits = $results['hits']['hits'] ?? []; + + foreach ($hits as $hit) { + if (isset($hit['inner_hits']['dialog_msg']['hits']['hits'][0])) { + $msgHit = $hit['inner_hits']['dialog_msg']['hits']['hits'][0]; + $source = $hit['_source']; + $msgSource = $msgHit['_source']; + + $searchMap[] = [ + 'id' => $source['dialog_id'], + 'top_at' => $source['top_at'], + 'last_at' => $source['last_at'], + 'mark_unread' => $source['mark_unread'], + 'silence' => $source['silence'], + 'hide' => $source['hide'], + 'color' => $source['color'], + 'user_at' => $source['updated_at'], + 'search_msg_id' => $msgSource['msg_id'], + ]; + } + } + } catch (\Exception $e) { + Log::error('searchByKeyword: ' . $e->getMessage()); + } + + // 返回搜索结果 + return $searchMap; + } + + /** ******************************************************************************************************** */ + /** *********************************************** 用户 ************************************************** */ + /** ******************************************************************************************************** */ + + /** + * 会话用户 - 生成文档ID + * @param WebSocketDialogUser $dialogUser + * @return string + */ + public static function generateUserDicId(WebSocketDialogUser $dialogUser) + { + return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; + } + + /** + * 会话用户 - 生成文档格式 + * @param WebSocketDialogUser $dialogUser + * @return array + */ + public static function generateUserFormat(WebSocketDialogUser $dialogUser) + { + return [ + 'dialog_id' => $dialogUser->dialog_id, + 'created_at' => $dialogUser->created_at, + 'updated_at' => $dialogUser->updated_at, + + 'userid' => $dialogUser->userid, + 'top_at' => $dialogUser->top_at, + 'last_at' => $dialogUser->last_at, + 'mark_unread' => $dialogUser->mark_unread ? 1 : 0, + 'silence' => $dialogUser->silence ? 1 : 0, + 'hide' => $dialogUser->hide ? 1 : 0, + 'color' => $dialogUser->color, + + 'relationship' => [ + 'name' => 'dialog_user' + ] + ]; + } + + /** + * 会话用户 - 同步到Elasticsearch + * @param WebSocketDialogUser $dialogUser + * @return void + */ + public static function syncUser(WebSocketDialogUser $dialogUser) + { + try { + $es = new self(); + $es->indexDocument(self::generateUserFormat($dialogUser), self::generateUserDicId($dialogUser)); + } catch (\Exception $e) { + Log::error('syncUser: ' . $e->getMessage()); + } + } + + /** + * 会话用户 - 从Elasticsearch删除 + */ + public static function deleteUser(WebSocketDialogUser $dialogUser) + { + try { + $es = new self(); + + $docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; + + // 删除用户-会话文档 + $es->deleteDocument($docId); + + // 注意:这里可能还需要删除所有关联的消息文档 + // 但由于父子关系,可以通过查询找到所有子文档并删除 + // 这里为简化,可以选择在后台任务中处理,或者直接依赖ES的级联删除功能 + + } catch (\Exception $e) { + Log::error('deleteUser: ' . $e->getMessage()); + } + } + + /** ******************************************************************************************************** */ + /** *********************************************** 消息 ************************************************** */ + /** ******************************************************************************************************** */ + + /** + * 会话消息 - 生成父文档ID + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return string + */ + public static function generateMsgParentId(WebSocketDialogMsg $dialogMsg, $userid) + { + return "user_{$userid}_dialog_{$dialogMsg->dialog_id}"; + } + + /** + * 会话消息 - 生成文档ID + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return string + */ + public static function generateMsgDicId(WebSocketDialogMsg $dialogMsg, $userid) + { + return "msg_{$dialogMsg->id}_user_{$userid}"; + } + + /** + * 会话消息 - 生成文档格式 + * @param WebSocketDialogMsg $dialogMsg + * @param $userid + * @return array + */ + public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, $userid) + { + return [ + 'dialog_id' => $dialogMsg->dialog_id, + 'created_at' => $dialogMsg->created_at, + 'updated_at' => $dialogMsg->updated_at, + + 'msg_id' => $dialogMsg->id, + 'sender_userid' => $dialogMsg->userid, + 'msg_type' => $dialogMsg->type, + 'key' => $dialogMsg->key, + 'bot' => $dialogMsg->bot ? 1 : 0, + + 'relationship' => [ + 'name' => 'dialog_msg', + 'parent' => self::generateMsgParentId($dialogMsg, $userid) + ] + ]; + } + + /** + * 会话消息 - 同步到Elasticsearch + */ + public static function syncMsg(WebSocketDialogMsg $dialogMsg) + { + try { + $es = new self(); + + // 获取此会话的所有用户 + $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); + + if ($dialogUsers->isEmpty()) { + return; + } + + $params = ['body' => []]; + + foreach ($dialogUsers as $dialogUser) { + $params['body'][] = [ + 'index' => [ + '_index' => self::indexName(), + '_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid), + 'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid) + ] + ]; + $params['body'][] = self::generateMsgFormat($dialogMsg, $dialogUser->userid); + } + + if (!empty($params['body'])) { + $es->bulk($params); + } + } catch (\Exception $e) { + Log::error('syncMsg: ' . $e->getMessage()); + } + } + + /** + * 会话消息 - 从Elasticsearch删除 + */ + public static function deleteMsg(WebSocketDialogMsg $dialogMsg) + { + try { + $es = new self(); + + // 获取此会话的所有用户 + $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); + + if ($dialogUsers->isEmpty()) { + return; + } + + $params = ['body' => []]; + + foreach ($dialogUsers as $dialogUser) { + $params['body'][] = [ + 'delete' => [ + '_index' => self::indexName(), + '_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid), + 'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid) + ] + ]; + } + + if (!empty($params['body'])) { + $es->bulk($params); + } + } catch (\Exception $e) { + Log::error('deleteMsg: ' . $e->getMessage()); + } + } +} diff --git a/app/Observers/WebSocketDialogMsgObserver.php b/app/Observers/WebSocketDialogMsgObserver.php index 0347ee30a..3e30aa116 100644 --- a/app/Observers/WebSocketDialogMsgObserver.php +++ b/app/Observers/WebSocketDialogMsgObserver.php @@ -3,7 +3,7 @@ namespace App\Observers; use App\Models\WebSocketDialogMsg; -use App\Module\ElasticSearch; +use App\Module\ElasticSearch\ElasticSearchUserMsg; class WebSocketDialogMsgObserver { @@ -15,7 +15,7 @@ class WebSocketDialogMsgObserver */ public function created(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearch::syncDialogToElasticSearch($webSocketDialogMsg); + ElasticSearchUserMsg::syncMsg($webSocketDialogMsg); } /** @@ -26,7 +26,7 @@ class WebSocketDialogMsgObserver */ public function updated(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearch::syncDialogToElasticSearch($webSocketDialogMsg); + ElasticSearchUserMsg::syncMsg($webSocketDialogMsg); } /** @@ -37,7 +37,7 @@ class WebSocketDialogMsgObserver */ public function deleted(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearch::deleteDialogFromElasticSearch($webSocketDialogMsg); + ElasticSearchUserMsg::deleteMsg($webSocketDialogMsg); } /** diff --git a/app/Observers/WebSocketDialogUserObserver.php b/app/Observers/WebSocketDialogUserObserver.php index f1dcbb299..785c965c0 100644 --- a/app/Observers/WebSocketDialogUserObserver.php +++ b/app/Observers/WebSocketDialogUserObserver.php @@ -4,7 +4,7 @@ namespace App\Observers; use App\Models\Deleted; use App\Models\WebSocketDialogUser; -use App\Module\ElasticSearch; +use App\Module\ElasticSearch\ElasticSearchUserMsg; use Carbon\Carbon; class WebSocketDialogUserObserver @@ -30,7 +30,7 @@ class WebSocketDialogUserObserver } } Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ElasticSearch::syncDialogUserToElasticSearch($webSocketDialogUser); + ElasticSearchUserMsg::syncUser($webSocketDialogUser); } /** @@ -41,7 +41,7 @@ class WebSocketDialogUserObserver */ public function updated(WebSocketDialogUser $webSocketDialogUser) { - ElasticSearch::syncDialogUserToElasticSearch($webSocketDialogUser); + ElasticSearchUserMsg::syncUser($webSocketDialogUser); } /** @@ -53,7 +53,7 @@ class WebSocketDialogUserObserver public function deleted(WebSocketDialogUser $webSocketDialogUser) { Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ElasticSearch::deleteDialogUserFromElasticSearch($webSocketDialogUser); + ElasticSearchUserMsg::deleteUser($webSocketDialogUser); } /** diff --git a/app/Tasks/ElasticSearchSyncTask.php b/app/Tasks/ElasticSearchSyncTask.php index eef1d393e..1e5726e28 100644 --- a/app/Tasks/ElasticSearchSyncTask.php +++ b/app/Tasks/ElasticSearchSyncTask.php @@ -5,6 +5,9 @@ namespace App\Tasks; use Carbon\Carbon; use Illuminate\Support\Facades\Cache; +/** + * 同步聊天数据到Elasticsearch + */ class ElasticSearchSyncTask extends AbstractTask { public function __construct() @@ -14,14 +17,20 @@ class ElasticSearchSyncTask extends AbstractTask public function start() { - // 30分钟执行一次 + // 120分钟执行一次 $time = intval(Cache::get("ElasticSearchSyncTask:Time")); - if (time() - $time < 1800) { + if (time() - $time < 120 * 60) { return; } - Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(10)); - // 判断参数 + + // 执行开始,120分钟后缓存标记失效 + Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120)); + + // 开始执行同步 @shell_exec("php /var/www/artisan elasticsearch:sync-dialog-user-msg --i"); + + // 执行完成,5分钟后缓存标记失效(5分钟任务可重复执行) + Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5)); } public function end()