diff --git a/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php b/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php deleted file mode 100644 index c91b44ca0..000000000 --- a/app/Console/Commands/SyncDialogUserMsgToElasticsearch.php +++ /dev/null @@ -1,254 +0,0 @@ -es = new ElasticSearchUserMsg(); - } catch (\Exception $e) { - $this->error('Elasticsearch连接失败: ' . $e->getMessage()); - exit(1); - } - } - - /** - * @return int - * @throws \Exception - */ - public function handle() - { - $this->info('开始同步聊天数据...'); - - // 清除索引 - if ($this->option('c')) { - $this->info('清除索引...'); - if (!$this->es->indexExists()) { - $this->saveLastId(true); - $this->info('索引不存在'); - return 0; - } - $result = $this->es->deleteIndex(); - if (isset($result['error'])) { - $this->error('删除索引失败: ' . $result['error']); - return 1; - } - $this->saveLastId(true); - $this->info('索引删除成功'); - return 0; - } - - // 判断创建索引 - if (!$this->es->indexExists()) { - $this->info('创建索引...'); - $result = ElasticSearchUserMsg::generateIndex(); - if (isset($result['error'])) { - $this->error('创建索引失败: ' . $result['error']); - return 1; - } - $this->saveLastId(true); - $this->info('索引创建成功'); - } - - // 同步用户-会话数据 - $this->syncDialogUsers($this->option('batch')); - - // 同步消息数据 - $this->syncDialogMsgs($this->option('batch')); - - // 完成 - $this->info("\n同步完成"); - return 0; - } - - /** - * 保存最后一个ID - * @param string|true $type - * @param integer $lastId - */ - private function saveLastId($type, $lastId = 0) - { - if ($type === true) { - $setting = []; - } else { - $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); - $setting[$type] = $lastId; - } - ElasticSearchKeyValue::save('elasticSearch:sync', $setting); - } - - /** - * 获取最后一个ID - * @param $type - * @return int - */ - private function getLastId($type) - { - if ($this->option('i')) { - $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); - return intval($setting[$type] ?? 0); - } - return 0; - } - - /** - * 同步用户-会话数据(父文档) - * @param $batchSize - * @return void - */ - private function syncDialogUsers($batchSize) - { - $this->info("\n同步用户数据..."); - $lastId = $this->getLastId('dialog_user'); - - $num = 0; - $count = WebSocketDialogUser::where('id', '>', $lastId)->count(); - - do { - // 获取一批用户-会话关系 - $dialogUsers = WebSocketDialogUser::where('id', '>', $lastId) - ->orderBy('id') - ->limit($batchSize) - ->get(); - - if ($dialogUsers->isEmpty()) { - break; - } - - $num += count($dialogUsers); - $progress = round($num / $count * 100, 2); - $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}"); - - // 批量索引数据 - $params = ['body' => []]; - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'index' => [ - '_index' => ElasticSearchUserMsg::indexName(), - '_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser), - ] - ]; - $params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser); - } - - if ($params['body']) { - $result = $this->es->bulk($params); - if (isset($result['errors']) && $result['errors']) { - $this->error('批量索引用户数据部分失败'); - Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); - } - } - - $lastId = $dialogUsers->last()->id; - $this->saveLastId('dialog_user', $lastId); - } while (count($dialogUsers) == $batchSize); - - $this->info("同步用户数据结束 - 最后ID {$lastId}"); - } - - /** - * 同步消息数据(子文档) - */ - private function syncDialogMsgs($batchSize) - { - $this->info("\n同步消息数据..."); - $lastId = $this->getLastId('dialog_msg'); - - $num = 0; - $count = WebSocketDialogMsg::where('id', '>', $lastId)->count(); - - do { - // 获取一批消息 - $dialogMsgs = WebSocketDialogMsg::where('id', '>', $lastId) - ->orderBy('id') - ->limit($batchSize) - ->get(); - - if ($dialogMsgs->isEmpty()) { - break; - } - - $num += count($dialogMsgs); - $progress = round($num / $count * 100, 2); - $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}"); - - // 获取这些消息所属的会话对应的所有用户 - $dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray(); - $userDialogMap = []; - - if (!empty($dialogIds)) { - $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); - - foreach ($dialogUsers as $dialogUser) { - $userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid; - } - } - - // 批量索引消息数据 - $params = ['body' => []]; - foreach ($dialogMsgs as $dialogMsg) { - // 如果该会话没有用户,跳过 - if (empty($userDialogMap[$dialogMsg->dialog_id])) { - continue; - } - - // 为每个用户-会话关系创建子文档 - foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) { - $params['body'][] = [ - 'index' => [ - '_index' => ElasticSearchUserMsg::indexName(), - '_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid), - 'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档 - ] - ]; - - $params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid); - } - } - - if (!empty($params['body'])) { - // 分批处理 - $chunks = array_chunk($params['body'], 1000); - foreach ($chunks as $chunk) { - $chunkParams = ['body' => $chunk]; - $result = $this->es->bulk($chunkParams); - if (isset($result['errors']) && $result['errors']) { - $this->error('批量索引消息数据部分失败'); - Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); - } - } - } - - $lastId = $dialogMsgs->last()->id; - $this->saveLastId('dialog_msg', $lastId); - } while (count($dialogMsgs) == $batchSize); - - $this->info("同步消息结束 - 最后ID {$lastId}"); - } -} diff --git a/app/Http/Controllers/Api/DialogController.php b/app/Http/Controllers/Api/DialogController.php index e9aaca25c..c2d9cd1cf 100755 --- a/app/Http/Controllers/Api/DialogController.php +++ b/app/Http/Controllers/Api/DialogController.php @@ -14,10 +14,8 @@ use App\Module\Base; use App\Module\Timer; use App\Models\Setting; use App\Module\Extranet; -use App\Module\ElasticSearch\ElasticSearchUserMsg; use App\Module\TimeRange; use App\Module\MsgTool; -use App\Module\Table\OnlineData; use App\Models\FileContent; use App\Models\ProjectTask; use App\Models\AbstractModel; @@ -29,6 +27,8 @@ use App\Models\WebSocketDialogMsgRead; use App\Models\WebSocketDialogMsgTodo; use App\Models\WebSocketDialogMsgTranslate; use App\Models\WebSocketDialogSession; +use App\Module\Table\OnlineData; +use App\Module\ZincSearch\ZincSearchUserMsg; use Hhxsv5\LaravelS\Swoole\Task\Task; /** @@ -174,7 +174,7 @@ class DialogController extends AbstractController } // 搜索消息会话 if (count($list) < 20) { - $searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, 20 - count($list)); + $searchResults = ZincSearchUserMsg::searchByKeyword($user->userid, $key, 0, 20 - count($list)); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { @@ -728,7 +728,7 @@ class DialogController extends AbstractController $key = trim(Request::input('key')); $list = []; // - $searchResults = ElasticSearchUserMsg::searchByKeyword($user->userid, $key, Base::getPaginate(50, 20)); + $searchResults = ZincSearchUserMsg::searchByKeyword($user->userid, $key, 0, Base::getPaginate(50, 20)); if ($searchResults) { foreach ($searchResults as $item) { if ($dialog = WebSocketDialog::find($item['id'])) { diff --git a/app/Module/ElasticSearch/ElasticSearchBase.php b/app/Module/ElasticSearch/ElasticSearchBase.php deleted file mode 100644 index 21e67cdb7..000000000 --- a/app/Module/ElasticSearch/ElasticSearchBase.php +++ /dev/null @@ -1,308 +0,0 @@ - ["{$scheme}://{$host}:{$port}"] - ]; - - // 如果设置了用户名和密码 - if (!empty($user)) { - $config['basicAuthentication'] = [$user, $pass]; - } - - $config['SSLVerification'] = $verifi; - if ($verifi) { - $config['SSLCert'] = $cert; - $config['CABundle'] = $ca; - $config['SSLKey'] = $key; - } - // 8.x版本使用ClientBuilder::fromConfig创建客户端 - $this->client = ClientBuilder::fromConfig($config); - - if ($index) { - $this->index = $index; - } - } - - /** - * 设置索引名称 - * - * @param string $index - * @return $this - */ - public function setIndex($index) - { - $this->index = $index; - return $this; - } - - /** - * 检查索引是否存在 - * - * @return bool - * @throws \Exception - */ - public function indexExists() - { - $params = ['index' => $this->index]; - return $this->client->indices()->exists($params)->asBool(); - } - - /** - * 创建索引 - * - * @param array $settings 索引设置 - * @param array $mappings 字段映射 - * @return array - */ - public function createIndex($settings = [], $mappings = []) - { - $params = [ - 'index' => $this->index - ]; - - $body = []; - if (!empty($settings)) { - $body['settings'] = $settings; - } - - if (!empty($mappings)) { - $body['mappings'] = $mappings; - } - - if (!empty($body)) { - $params['body'] = $body; - } - - try { - // 在8.x中,索引操作位于indices()命名空间 - return $this->client->indices()->create($params)->asArray(); - } catch (\Exception $e) { - Log::error('创建Elasticsearch索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 删除索引 - * @return array - */ - public function deleteIndex() - { - try { - $params = ['index' => $this->index]; - return $this->client->indices()->delete($params)->asArray(); - } catch (\Exception $e) { - Log::error('删除Elasticsearch索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 批量操作(批量添加/更新/删除文档) - * - * @param array $operations 批量操作的数据 - * @return array - */ - public function bulk($operations) - { - try { - // 在8.x中,批量操作API签名相同,但内部实现有所变化 - return $this->client->bulk($operations)->asArray(); - } catch (\Exception $e) { - Log::error('批量操作失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 索引单个文档 - * - * @param array $document 文档数据 - * @param string $id 文档ID - * @param string|null $routing 路由值,用于父子文档 - * @return array - */ - public function indexDocument($document, $id, $routing = null) - { - $params = [ - 'index' => $this->index, - 'id' => $id, - 'body' => $document - ]; - - if ($routing) { - $params['routing'] = $routing; - } - - try { - return $this->client->index($params)->asArray(); - } catch (\Exception $e) { - Log::error('索引文档失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 删除文档 - * - * @param string $id 文档ID - * @param string|null $routing 路由值,用于父子文档 - * @return array - */ - public function deleteDocument($id, $routing = null) - { - $params = [ - 'index' => $this->index, - 'id' => $id - ]; - - if ($routing) { - $params['routing'] = $routing; - } - - try { - return $this->client->delete($params)->asArray(); - } catch (MissingParameterException $e) { - // 文档不存在时返回成功 - return ['result' => 'not_found', 'error' => $e->getMessage()]; - } catch (\Exception $e) { - Log::error('删除文档失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 刷新索引 - * @return array - */ - public function refreshIndex() - { - $params = [ - 'index' => $this->index - ]; - - try { - return $this->client->indices()->refresh($params)->asArray(); - } catch (\Exception $e) { - Log::error('刷新索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 检查索引映射 - * @return array - */ - public function checkIndexMapping() - { - try { - return $this->client->indices()->getMapping(['index' => $this->index])->asArray(); - } catch (\Exception $e) { - return ['error' => $e->getMessage()]; - } - } - - /** - * 通用搜索方法 - * - * @param array $query 搜索查询 - * @param int $from 起始位置 - * @param int $size 返回结果数量 - * @param array $sort 排序规则 - * @return array - */ - public function search($query, $from = 0, $size = 10, $sort = []) - { - $params = [ - 'index' => $this->index, - 'body' => [ - 'query' => $query, - 'from' => $from, - 'size' => $size - ] - ]; - - if (!empty($sort)) { - $params['body']['sort'] = $sort; - } - - try { - return $this->client->search($params)->asArray(); - } catch (\Exception $e) { - Log::error('搜索失败: ' . $e->getMessage()); - return ['error' => $e->getMessage(), 'hits' => ['total' => ['value' => 0], 'hits' => []]]; - } - } - - /** - * 索引名称 - */ - const indexName = 'default'; - - /** - * 获取索引名称 - * @param string $index 索引名称 - * @param string|null $prefix 索引前缀 - * @param string|null $subfix 索引后缀 - * @return string - */ - public static function indexName($index = '', $prefix = '', $subfix = '') - { - $index = $index ?: static::indexName; - $prefix = $prefix ?: env('ES_INDEX_PREFIX', ''); - $subfix = $subfix ?: env('ES_INDEX_SUFFIX', ''); - if ($prefix) { - $index = rtrim($prefix, '_') . '_' . $index; - } - if ($subfix) { - $index = $index . '_' . ltrim($subfix, '_'); - } - return $index; - } -} diff --git a/app/Module/ElasticSearch/ElasticSearchKeyValue.php b/app/Module/ElasticSearch/ElasticSearchKeyValue.php deleted file mode 100644 index 63228ca41..000000000 --- a/app/Module/ElasticSearch/ElasticSearchKeyValue.php +++ /dev/null @@ -1,204 +0,0 @@ -indexExists()) { - return ['acknowledged' => true, 'message' => '索引已存在']; - } - - // 定义映射 - $mappings = [ - 'properties' => [ - 'key' => ['type' => 'keyword'], - 'value' => ['type' => 'text', 'fields' => ['keyword' => ['type' => 'keyword']]], - 'created_at' => ['type' => 'integer'], - 'updated_at' => ['type' => 'integer'] - ] - ]; - - // 索引设置 - $settings = [ - 'number_of_shards' => 1, - 'number_of_replicas' => 1, - 'refresh_interval' => '1s' - ]; - - return $es->createIndex($settings, $mappings); - } catch (\Exception $e) { - Log::error('创建键值存储索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 保存键值对 - * @param string $key 键名 - * @param mixed $value 键值 - * @param string $namespace 命名空间,用于区分不同的键值存储场景 - * @return array - */ - public static function save($key, $value, $namespace = 'default') - { - try { - // 确保索引存在 - self::generateIndex(); - - $es = new self(); - - // 生成文档ID - $docId = "{$namespace}:{$key}"; - - // 准备文档数据 - $document = [ - 'key' => $key, - 'value' => is_array($value) ? json_encode($value, JSON_UNESCAPED_UNICODE) : $value, - 'namespace' => $namespace, - 'created_at' => time(), - 'updated_at' => time() - ]; - - // 索引文档 - $result = $es->indexDocument($document, $docId); - - // 刷新索引以确保立即可见 - $es->refreshIndex(); - - return $result; - } catch (\Exception $e) { - Log::error('保存键值对失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 获取键值 - * @param string $key 键名 - * @param mixed $default 默认值,当键不存在时返回 - * @param string $namespace 命名空间,用于区分不同的键值存储场景 - * @return mixed - */ - public static function get($key, $default = null, $namespace = 'default') - { - try { - $es = new self(); - - // 如果索引不存在,直接返回默认值 - if (!$es->indexExists()) { - return $default; - } - - // 生成文档ID - $docId = "{$namespace}:{$key}"; - - // 查询参数 - $params = [ - 'index' => self::indexName(), - 'id' => $docId - ]; - - try { - // 获取文档 - $response = $es->client->get($params)->asArray(); - - // 获取值 - $value = $response['_source']['value'] ?? $default; - - // 如果值是JSON字符串,尝试解码 - if (is_string($value) && $decoded = json_decode($value, true)) { - if (json_last_error() === JSON_ERROR_NONE) { - return $decoded; - } - } - - return $value; - } catch (\Exception $e) { - // 文档不存在或其他错误,返回默认值 - return $default; - } - } catch (\Exception $e) { - Log::error('获取键值对失败: ' . $e->getMessage()); - return $default; - } - } - - /** - * 获取键值,返回数组 - * @param string $key 键名 - * @param array $default 默认值,当键不存在时返回 - * @param string $namespace 命名空间,用于区分不同的键值存储场景 - * @return array - */ - public static function getArray($key, $default = [], $namespace = 'default') - { - return Base::string2array(self::get($key, $default, $namespace)); - } - - /** - * 删除键值对 - * @param string $key 键名 - * @param string $namespace 命名空间 - * @return array - */ - public static function delete($key, $namespace = 'default') - { - try { - $es = new self(); - - // 如果索引不存在,直接返回成功 - if (!$es->indexExists()) { - return ['result' => 'not_found']; - } - - // 生成文档ID - $docId = "{$namespace}:{$key}"; - - // 删除文档 - $result = $es->deleteDocument($docId); - - // 刷新索引以确保立即生效 - $es->refreshIndex(); - - return $result; - } catch (\Exception $e) { - Log::error('删除键值对失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } -} diff --git a/app/Module/ElasticSearch/ElasticSearchUserMsg.php b/app/Module/ElasticSearch/ElasticSearchUserMsg.php deleted file mode 100644 index ed30d8949..000000000 --- a/app/Module/ElasticSearch/ElasticSearchUserMsg.php +++ /dev/null @@ -1,375 +0,0 @@ - [ - // 共用字段 - 'dialog_id' => ['type' => 'keyword'], - 'created_at' => ['type' => 'date'], - 'updated_at' => ['type' => 'date'], - - // dialog_users 字段 - 'userid' => ['type' => 'keyword'], - 'top_at' => ['type' => 'date'], - 'last_at' => ['type' => 'date'], - 'mark_unread' => ['type' => 'integer'], - 'silence' => ['type' => 'integer'], - 'hide' => ['type' => 'integer'], - 'color' => ['type' => 'keyword'], - - // dialog_msgs 字段 - 'msg_id' => ['type' => 'keyword'], - 'sender_userid' => ['type' => 'keyword'], - 'msg_type' => ['type' => 'keyword'], - 'key' => ['type' => 'text'], - 'bot' => ['type' => 'integer'], - - // Join字段定义父子关系 - 'relationship' => [ - 'type' => 'join', - 'relations' => [ - 'dialog_user' => 'dialog_msg' // dialog_user是父文档,dialog_msg是子文档 - ] - ], - ] - ]; - - // 索引设置 - $settings = [ - 'number_of_shards' => 5, - 'number_of_replicas' => 1, - 'refresh_interval' => '5s' - ]; - - try { - $es = new self(); - return $es->createIndex($settings, $mappings); - } catch (\Exception $e) { - Log::error('创建聊天系统索引失败: ' . $e->getMessage()); - return ['error' => $e->getMessage()]; - } - } - - /** - * 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话 - * @param string $userid 用户ID - * @param string $keyword 消息关键词 - * @param int $size 返回结果数量 - * @return array - */ - public static function searchByKeyword($userid, $keyword, $size = 20) - { - // 注意这里的类型名称要与创建索引时的一致 - $query = [ - 'bool' => [ - 'must' => [ - [ - 'term' => [ - 'userid' => $userid - ] - ], - [ - 'has_child' => [ - 'type' => 'dialog_msg', - 'query' => [ - 'bool' => [ - 'must' => [ - [ - 'match_phrase' => [ - 'key' => $keyword - ] - ], - [ - 'term' => [ - 'bot' => 0 - ] - ] - ] - ] - ], - 'inner_hits' => [ - 'size' => 1, - 'sort' => [ - 'msg_id' => 'desc' - ] - ] - ] - ] - ] - ] - ]; - - // 结果集合 - $searchMap = []; - - try { - // 开始搜索 - $es = new self(); - $results = $es->search($query, 0, $size, ['last_at' => 'desc']); - - // 处理搜索结果 - $hits = $results['hits']['hits'] ?? []; - - foreach ($hits as $hit) { - if (isset($hit['inner_hits']['dialog_msg']['hits']['hits'][0])) { - $msgHit = $hit['inner_hits']['dialog_msg']['hits']['hits'][0]; - $source = $hit['_source']; - $msgSource = $msgHit['_source']; - - $searchMap[] = [ - 'id' => $source['dialog_id'], - 'top_at' => $source['top_at'], - 'last_at' => $source['last_at'], - 'mark_unread' => $source['mark_unread'], - 'silence' => $source['silence'], - 'hide' => $source['hide'], - 'color' => $source['color'], - 'user_at' => $source['updated_at'], - 'search_msg_id' => $msgSource['msg_id'], - ]; - } - } - } catch (\Exception $e) { - Log::error('searchByKeyword: ' . $e->getMessage()); - } - - // 返回搜索结果 - return $searchMap; - } - - /** ******************************************************************************************************** */ - /** *********************************************** 用户 ************************************************** */ - /** ******************************************************************************************************** */ - - /** - * 会话用户 - 生成文档ID - * @param WebSocketDialogUser $dialogUser - * @return string - */ - public static function generateUserDicId(WebSocketDialogUser $dialogUser) - { - return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; - } - - /** - * 会话用户 - 生成文档格式 - * @param WebSocketDialogUser $dialogUser - * @return array - */ - public static function generateUserFormat(WebSocketDialogUser $dialogUser) - { - return [ - 'dialog_id' => $dialogUser->dialog_id, - 'created_at' => $dialogUser->created_at, - 'updated_at' => $dialogUser->updated_at, - - 'userid' => $dialogUser->userid, - 'top_at' => $dialogUser->top_at, - 'last_at' => $dialogUser->last_at, - 'mark_unread' => $dialogUser->mark_unread ? 1 : 0, - 'silence' => $dialogUser->silence ? 1 : 0, - 'hide' => $dialogUser->hide ? 1 : 0, - 'color' => $dialogUser->color, - - 'relationship' => [ - 'name' => 'dialog_user' - ] - ]; - } - - /** - * 会话用户 - 同步到Elasticsearch - * @param WebSocketDialogUser $dialogUser - * @return void - */ - public static function syncUser(WebSocketDialogUser $dialogUser) - { - try { - $es = new self(); - $es->indexDocument(self::generateUserFormat($dialogUser), self::generateUserDicId($dialogUser)); - } catch (\Exception $e) { - Log::error('syncUser: ' . $e->getMessage()); - } - } - - /** - * 会话用户 - 从Elasticsearch删除 - */ - public static function deleteUser(WebSocketDialogUser $dialogUser) - { - try { - $es = new self(); - - $docId = "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; - - // 删除用户-会话文档 - $es->deleteDocument($docId); - - // 注意:这里可能还需要删除所有关联的消息文档 - // 但由于父子关系,可以通过查询找到所有子文档并删除 - // 这里为简化,可以选择在后台任务中处理,或者直接依赖ES的级联删除功能 - - } catch (\Exception $e) { - Log::error('deleteUser: ' . $e->getMessage()); - } - } - - /** ******************************************************************************************************** */ - /** *********************************************** 消息 ************************************************** */ - /** ******************************************************************************************************** */ - - /** - * 会话消息 - 生成父文档ID - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return string - */ - public static function generateMsgParentId(WebSocketDialogMsg $dialogMsg, $userid) - { - return "user_{$userid}_dialog_{$dialogMsg->dialog_id}"; - } - - /** - * 会话消息 - 生成文档ID - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return string - */ - public static function generateMsgDicId(WebSocketDialogMsg $dialogMsg, $userid) - { - return "msg_{$dialogMsg->id}_user_{$userid}"; - } - - /** - * 会话消息 - 生成文档格式 - * @param WebSocketDialogMsg $dialogMsg - * @param $userid - * @return array - */ - public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, $userid) - { - return [ - 'dialog_id' => $dialogMsg->dialog_id, - 'created_at' => $dialogMsg->created_at, - 'updated_at' => $dialogMsg->updated_at, - - 'msg_id' => $dialogMsg->id, - 'sender_userid' => $dialogMsg->userid, - 'msg_type' => $dialogMsg->type, - 'key' => $dialogMsg->key, - 'bot' => $dialogMsg->bot ? 1 : 0, - - 'relationship' => [ - 'name' => 'dialog_msg', - 'parent' => self::generateMsgParentId($dialogMsg, $userid) - ] - ]; - } - - /** - * 会话消息 - 同步到Elasticsearch - */ - public static function syncMsg(WebSocketDialogMsg $dialogMsg) - { - try { - $es = new self(); - - // 获取此会话的所有用户 - $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); - - if ($dialogUsers->isEmpty()) { - return; - } - - $params = ['body' => []]; - - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'index' => [ - '_index' => self::indexName(), - '_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid), - 'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid) - ] - ]; - $params['body'][] = self::generateMsgFormat($dialogMsg, $dialogUser->userid); - } - - if (!empty($params['body'])) { - $es->bulk($params); - } - } catch (\Exception $e) { - Log::error('syncMsg: ' . $e->getMessage()); - } - } - - /** - * 会话消息 - 从Elasticsearch删除 - */ - public static function deleteMsg(WebSocketDialogMsg $dialogMsg) - { - try { - $es = new self(); - - // 获取此会话的所有用户 - $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); - - if ($dialogUsers->isEmpty()) { - return; - } - - $params = ['body' => []]; - - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'delete' => [ - '_index' => self::indexName(), - '_id' => self::generateMsgDicId($dialogMsg, $dialogUser->userid), - 'routing' => self::generateMsgParentId($dialogMsg, $dialogUser->userid) - ] - ]; - } - - if (!empty($params['body'])) { - $es->bulk($params); - } - } catch (\Exception $e) { - Log::error('deleteMsg: ' . $e->getMessage()); - } - } -} diff --git a/app/Module/ZincSearch/ZincSearchUserMsg.php b/app/Module/ZincSearch/ZincSearchUserMsg.php index 6036ae89a..f44910241 100644 --- a/app/Module/ZincSearch/ZincSearchUserMsg.php +++ b/app/Module/ZincSearch/ZincSearchUserMsg.php @@ -131,7 +131,21 @@ class ZincSearchUserMsg ]; try { - return ZincSearchBase::elasticSearch(self::$indexName, $searchParams); + $result = ZincSearchBase::elasticSearch(self::$indexName, $searchParams); + return array_map(function ($hit) { + $source = $hit['_source']; + return [ + 'id' => $source['dialog_id'], + 'top_at' => $source['top_at'], + 'last_at' => $source['last_at'], + 'mark_unread' => $source['mark_unread'], + 'silence' => $source['silence'], + 'hide' => $source['hide'], + 'color' => $source['color'], + 'user_at' => $source['updated_at'], + 'search_msg_id' => $source['msg_id'], + ]; + }, $result['data']['hits']['hits'] ?? []); } catch (\Exception $e) { Log::error('搜索对话消息失败: ' . $e->getMessage()); return [ diff --git a/app/Observers/WebSocketDialogMsgObserver.php b/app/Observers/WebSocketDialogMsgObserver.php index 3e30aa116..cef5328b1 100644 --- a/app/Observers/WebSocketDialogMsgObserver.php +++ b/app/Observers/WebSocketDialogMsgObserver.php @@ -3,7 +3,7 @@ namespace App\Observers; use App\Models\WebSocketDialogMsg; -use App\Module\ElasticSearch\ElasticSearchUserMsg; +use App\Module\ZincSearch\ZincSearchUserMsg; class WebSocketDialogMsgObserver { @@ -15,7 +15,7 @@ class WebSocketDialogMsgObserver */ public function created(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearchUserMsg::syncMsg($webSocketDialogMsg); + ZincSearchUserMsg::syncMsg($webSocketDialogMsg); } /** @@ -26,7 +26,7 @@ class WebSocketDialogMsgObserver */ public function updated(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearchUserMsg::syncMsg($webSocketDialogMsg); + ZincSearchUserMsg::syncMsg($webSocketDialogMsg); } /** @@ -37,7 +37,7 @@ class WebSocketDialogMsgObserver */ public function deleted(WebSocketDialogMsg $webSocketDialogMsg) { - ElasticSearchUserMsg::deleteMsg($webSocketDialogMsg); + ZincSearchUserMsg::deleteMsg($webSocketDialogMsg); } /** diff --git a/app/Observers/WebSocketDialogUserObserver.php b/app/Observers/WebSocketDialogUserObserver.php index 785c965c0..61b6d9b71 100644 --- a/app/Observers/WebSocketDialogUserObserver.php +++ b/app/Observers/WebSocketDialogUserObserver.php @@ -4,7 +4,7 @@ namespace App\Observers; use App\Models\Deleted; use App\Models\WebSocketDialogUser; -use App\Module\ElasticSearch\ElasticSearchUserMsg; +use App\Module\ZincSearch\ZincSearchUserMsg; use Carbon\Carbon; class WebSocketDialogUserObserver @@ -30,7 +30,7 @@ class WebSocketDialogUserObserver } } Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ElasticSearchUserMsg::syncUser($webSocketDialogUser); + ZincSearchUserMsg::syncUser($webSocketDialogUser); } /** @@ -41,7 +41,7 @@ class WebSocketDialogUserObserver */ public function updated(WebSocketDialogUser $webSocketDialogUser) { - ElasticSearchUserMsg::syncUser($webSocketDialogUser); + ZincSearchUserMsg::syncUser($webSocketDialogUser); } /** @@ -53,7 +53,7 @@ class WebSocketDialogUserObserver public function deleted(WebSocketDialogUser $webSocketDialogUser) { Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); - ElasticSearchUserMsg::deleteUser($webSocketDialogUser); + ZincSearchUserMsg::deleteUser($webSocketDialogUser); } /** diff --git a/app/Tasks/ElasticSearchSyncTask.php b/app/Tasks/ZincSearchSyncTask.php similarity index 56% rename from app/Tasks/ElasticSearchSyncTask.php rename to app/Tasks/ZincSearchSyncTask.php index 1e5726e28..66802abad 100644 --- a/app/Tasks/ElasticSearchSyncTask.php +++ b/app/Tasks/ZincSearchSyncTask.php @@ -6,9 +6,9 @@ use Carbon\Carbon; use Illuminate\Support\Facades\Cache; /** - * 同步聊天数据到Elasticsearch + * 同步聊天数据到ZincSearch */ -class ElasticSearchSyncTask extends AbstractTask +class ZincSearchSyncTask extends AbstractTask { public function __construct() { @@ -18,19 +18,19 @@ class ElasticSearchSyncTask extends AbstractTask public function start() { // 120分钟执行一次 - $time = intval(Cache::get("ElasticSearchSyncTask:Time")); + $time = intval(Cache::get("ZincSearchSyncTask:Time")); if (time() - $time < 120 * 60) { return; } // 执行开始,120分钟后缓存标记失效 - Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120)); + Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(120)); // 开始执行同步 - @shell_exec("php /var/www/artisan elasticsearch:sync-dialog-user-msg --i"); + @shell_exec("php /var/www/artisan zinc:sync-user-msg --i"); // 执行完成,5分钟后缓存标记失效(5分钟任务可重复执行) - Cache::put("ElasticSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5)); + Cache::put("ZincSearchSyncTask:Time", time(), Carbon::now()->addMinutes(5)); } public function end() diff --git a/composer.json b/composer.json index fdb42d880..c82da0797 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,6 @@ "ext-simplexml": "*", "ext-zip": "*", "directorytree/ldaprecord-laravel": "^2.7", - "elasticsearch/elasticsearch": "^8.17", "fideloper/proxy": "^4.4.1", "firebase/php-jwt": "^6.9", "fruitcake/laravel-cors": "^2.0.4", diff --git a/docker-compose.yml b/docker-compose.yml index 5faaceb89..d63ede0ff 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -217,21 +217,6 @@ services: ipv4_address: "${APP_IPPR}.14" restart: unless-stopped - es: - container_name: "dootask-es-${APP_ID}" - image: "elasticsearch:8.17.2" - volumes: - - ./docker/es/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml - - ./docker/es/data:/usr/share/elasticsearch/data - environment: - discovery.type: single-node - xpack.security.enabled: false - ES_JAVA_OPTS: "-Xms1g -Xmx1g" - networks: - extnetwork: - ipv4_address: "${APP_IPPR}.15" - restart: unless-stopped - search: container_name: "dootask-search-${APP_ID}" image: "public.ecr.aws/zinclabs/zincsearch:0.4.10" @@ -246,7 +231,7 @@ services: ZINC_FIRST_ADMIN_PASSWORD: "${DB_PASSWORD}" networks: extnetwork: - ipv4_address: "${APP_IPPR}.16" + ipv4_address: "${APP_IPPR}.15" restart: unless-stopped networks: