diff --git a/app/Console/Commands/SyncDialogUserMsgToZincSearch.php b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php index 9f2f6248a..3dc6ea31e 100644 --- a/app/Console/Commands/SyncDialogUserMsgToZincSearch.php +++ b/app/Console/Commands/SyncDialogUserMsgToZincSearch.php @@ -4,10 +4,9 @@ namespace App\Console\Commands; use App\Models\WebSocketDialogMsg; use App\Models\WebSocketDialogUser; -use App\Module\ElasticSearch\ElasticSearchKeyValue; -use App\Module\ElasticSearch\ElasticSearchUserMsg; +use App\Module\ZincSearch\ZincSearchKeyValue; +use App\Module\ZincSearch\ZincSearchUserMsg; use Illuminate\Console\Command; -use Illuminate\Support\Facades\Log; class SyncDialogUserMsgToZincSearch extends Command { @@ -22,7 +21,6 @@ class SyncDialogUserMsgToZincSearch extends Command protected $signature = 'zinc:sync-dialog-user-msg {--f} {--i} {--c} {--batch=500}'; protected $description = '同步聊天会话用户和消息到 ZincSearch'; - protected $client = null; /** * SyncDialogUserMsgToElasticsearch constructor. @@ -30,12 +28,6 @@ class SyncDialogUserMsgToZincSearch extends Command public function __construct() { parent::__construct(); - try { - $this->es = new ElasticSearchUserMsg(); - } catch (\Exception $e) { - $this->error('Elasticsearch连接失败: ' . $e->getMessage()); - exit(1); - } } /** @@ -44,37 +36,16 @@ class SyncDialogUserMsgToZincSearch extends Command */ public function handle() { - $this->info('开始同步聊天数据...'); - // 清除索引 if ($this->option('c')) { $this->info('清除索引...'); - if (!$this->es->indexExists()) { - $this->saveLastId(true); - $this->info('索引不存在'); - return 0; - } - $result = $this->es->deleteIndex(); - if (isset($result['error'])) { - $this->error('删除索引失败: ' . $result['error']); - return 1; - } - $this->saveLastId(true); - $this->info('索引删除成功'); + ZincSearchUserMsg::clear(); + ZincSearchKeyValue::clear(); + $this->info("索引删除成功"); return 0; } - // 判断创建索引 - if (!$this->es->indexExists()) { - $this->info('创建索引...'); - $result = ElasticSearchUserMsg::generateIndex(); - if (isset($result['error'])) { - $this->error('创建索引失败: ' . $result['error']); - return 1; - } - $this->saveLastId(true); - $this->info('索引创建成功'); - } + $this->info('开始同步聊天数据...'); // 同步用户-会话数据 $this->syncDialogUsers($this->option('batch')); @@ -89,18 +60,12 @@ class SyncDialogUserMsgToZincSearch extends Command /** * 保存最后一个ID - * @param string|true $type - * @param integer $lastId + * @param string $type + * @param int $lastId */ - private function saveLastId($type, $lastId = 0) + private function saveLastId(string $type, int $lastId = 0): void { - if ($type === true) { - $setting = []; - } else { - $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); - $setting[$type] = $lastId; - } - ElasticSearchKeyValue::save('elasticSearch:sync', $setting); + ZincSearchKeyValue::set("sync", ["{$type}" => $lastId], true); } /** @@ -108,11 +73,11 @@ class SyncDialogUserMsgToZincSearch extends Command * @param $type * @return int */ - private function getLastId($type) + private function getLastId($type): int { - if ($this->option('i')) { - $setting = ElasticSearchKeyValue::getArray('elasticSearch:sync'); - return intval($setting[$type] ?? 0); + if ($this->option("i")) { + $array = ZincSearchKeyValue::getArray("sync"); + return intval($array[$type] ?? 0); } return 0; } @@ -146,24 +111,7 @@ class SyncDialogUserMsgToZincSearch extends Command $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$lastId} ~ {$dialogUsers->last()->id}"); // 批量索引数据 - $params = ['body' => []]; - foreach ($dialogUsers as $dialogUser) { - $params['body'][] = [ - 'index' => [ - '_index' => ElasticSearchUserMsg::indexName(), - '_id' => ElasticSearchUserMsg::generateUserDicId($dialogUser), - ] - ]; - $params['body'][] = ElasticSearchUserMsg::generateUserFormat($dialogUser); - } - - if ($params['body']) { - $result = $this->es->bulk($params); - if (isset($result['errors']) && $result['errors']) { - $this->error('批量索引用户数据部分失败'); - Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); - } - } + ZincSearchUserMsg::batchSyncUsers($dialogUsers); $lastId = $dialogUsers->last()->id; $this->saveLastId('dialog_user', $lastId); @@ -198,52 +146,8 @@ class SyncDialogUserMsgToZincSearch extends Command $progress = round($num / $count * 100, 2); $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$lastId} ~ {$dialogMsgs->last()->id}"); - // 获取这些消息所属的会话对应的所有用户 - $dialogIds = $dialogMsgs->pluck('dialog_id')->unique()->toArray(); - $userDialogMap = []; - - if (!empty($dialogIds)) { - $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); - - foreach ($dialogUsers as $dialogUser) { - $userDialogMap[$dialogUser->dialog_id][] = $dialogUser->userid; - } - } - - // 批量索引消息数据 - $params = ['body' => []]; - foreach ($dialogMsgs as $dialogMsg) { - // 如果该会话没有用户,跳过 - if (empty($userDialogMap[$dialogMsg->dialog_id])) { - continue; - } - - // 为每个用户-会话关系创建子文档 - foreach ($userDialogMap[$dialogMsg->dialog_id] as $userid) { - $params['body'][] = [ - 'index' => [ - '_index' => ElasticSearchUserMsg::indexName(), - '_id' => ElasticSearchUserMsg::generateMsgDicId($dialogMsg, $userid), - 'routing' => ElasticSearchUserMsg::generateMsgParentId($dialogMsg, $userid) // 路由到父文档 - ] - ]; - - $params['body'][] = ElasticSearchUserMsg::generateMsgFormat($dialogMsg, $userid); - } - } - - if (!empty($params['body'])) { - // 分批处理 - $chunks = array_chunk($params['body'], 1000); - foreach ($chunks as $chunk) { - $chunkParams = ['body' => $chunk]; - $result = $this->es->bulk($chunkParams); - if (isset($result['errors']) && $result['errors']) { - $this->error('批量索引消息数据部分失败'); - Log::error('Elasticsearch批量索引失败: ' . json_encode($result['items'])); - } - } - } + // 批量索引数据 + ZincSearchUserMsg::batchSyncMsgs($dialogMsgs); $lastId = $dialogMsgs->last()->id; $this->saveLastId('dialog_msg', $lastId); diff --git a/app/Module/ZincSearch.php b/app/Module/ZincSearch/ZincSearchBase.php similarity index 81% rename from app/Module/ZincSearch.php rename to app/Module/ZincSearch/ZincSearchBase.php index 4662d3c0a..93811033d 100644 --- a/app/Module/ZincSearch.php +++ b/app/Module/ZincSearch/ZincSearchBase.php @@ -1,11 +1,11 @@ request("/api/index/{$index}", null, 'GET'); } + /** + * 判断索引是否存在 + */ + public static function indexExists($index): bool + { + $result = self::getIndex($index); + return $result['success'] && isset($result['data']['name']); + } + /** * 获取所有索引 */ @@ -113,6 +122,41 @@ class ZincSearch return (new self())->request("/api/index/{$index}", null, 'DELETE'); } + /** + * 删除所有索引 + */ + public static function deleteAllIndices(): array + { + $instance = new self(); + $result = $instance->request("/api/index", null, 'GET'); + + if (!$result['success']) { + return $result; + } + + $indices = $result['data'] ?? []; + $deleteResults = []; + $success = true; + + foreach ($indices as $index) { + $indexName = $index['name'] ?? ''; + if (!empty($indexName)) { + $deleteResult = $instance->request("/api/index/{$indexName}", null, 'DELETE'); + $deleteResults[$indexName] = $deleteResult; + + if (!$deleteResult['success']) { + $success = false; + } + } + } + + return [ + 'success' => $success, + 'message' => $success ? '所有索引删除成功' : '部分索引删除失败', + 'details' => $deleteResults + ]; + } + /** * 分析文本 */ diff --git a/app/Module/ZincSearch/ZincSearchKeyValue.php b/app/Module/ZincSearch/ZincSearchKeyValue.php new file mode 100644 index 000000000..c1482e4f2 --- /dev/null +++ b/app/Module/ZincSearch/ZincSearchKeyValue.php @@ -0,0 +1,295 @@ + 'logo.png', 'theme' => 'dark']); + * - 合并现有数据: ZincSearchValue::set('site_config', ['footer' => '版权所有'], true); + * - 获取键值: $siteName = ZincSearchValue::get('site_name'); + * - 获取键值带默认值: $theme = ZincSearchValue::get('theme', 'light'); + * - 删除键值: ZincSearchValue::delete('temporary_data'); + * + * 2. 批量操作 + * - 批量设置: ZincSearchValue::batchSet(['user_count' => 100, 'active_users' => 50]); + * - 批量获取: $stats = ZincSearchValue::batchGet(['user_count', 'active_users']); + * + * 3. 其他操作 + * - 清空所有数据: ZincSearchValue::clear(); + */ +class ZincSearchKeyValue +{ + /** + * 索引名称 + */ + protected static string $indexName = 'keyValue'; + + // ============================== + // 基础方法 + // ============================== + + /** + * 确保索引存在 + */ + private static function ensureIndex(): bool + { + if (!ZincSearchBase::indexExists(self::$indexName)) { + $mappings = [ + 'properties' => [ + 'key' => [ + 'type' => 'keyword', + 'index' => true + ], + 'value' => [ + 'type' => 'text', + 'index' => true + ], + 'created_at' => [ + 'type' => 'date', + 'index' => true + ], + 'updated_at' => [ + 'type' => 'date', + 'index' => true + ] + ] + ]; + $result = ZincSearchBase::createIndex(self::$indexName, $mappings); + return $result['success'] ?? false; + } + return true; + } + + // ============================== + // 基本操作 + // ============================== + + /** + * 设置键值 + * + * @param string $key 键名 + * @param mixed $value 值 + * @param bool $merge 是否合并现有数据(如果值是数组) + * @return bool 是否成功 + */ + public static function set(string $key, mixed $value, bool $merge = false): bool + { + if (!self::ensureIndex()) { + return false; + } + + // 检查键是否已存在 + if ($merge && is_array($value)) { + $existingData = self::get($key); + if (is_array($existingData)) { + $value = array_merge($existingData, $value); + } + } + + // 检查是否存在相同键的文档 + $searchResult = ZincSearchBase::search(self::$indexName, $key, 0, 1); + $docs = $searchResult['data']['hits']['hits'] ?? []; + $now = date('Y-m-d H:i:s'); + + if (!empty($docs)) { + $docId = $docs[0]['_id'] ?? null; + if ($docId) { + // 更新现有文档 + $docData = [ + 'key' => $key, + 'value' => $value, + 'updated_at' => $now + ]; + $updateResult = ZincSearchBase::updateDoc(self::$indexName, $docId, $docData); + return $updateResult['success'] ?? false; + } + } + + // 创建新文档 + $docData = [ + 'key' => $key, + 'value' => $value, + 'created_at' => $now, + 'updated_at' => $now + ]; + $addResult = ZincSearchBase::addDoc(self::$indexName, $docData); + return $addResult['success'] ?? false; + } + + /** + * 获取键值 + * + * @param string $key 键名 + * @param mixed $default 默认值 + * @return mixed 值或默认值 + */ + public static function get(string $key, mixed $default = null): mixed + { + if (!self::ensureIndex() || empty($key)) { + return $default; + } + + // 精确匹配键名 + $searchParams = [ + 'search_type' => 'term', + 'query' => [ + 'field' => 'key', + 'term' => $key + ], + 'from' => 0, + 'max_results' => 1 + ]; + + $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); + if (!($result['success'] ?? false)) { + return $default; + } + + $hits = $result['data']['hits']['hits'] ?? []; + if (empty($hits)) { + return $default; + } + + return $hits[0]['_source']['value'] ?? $default; + } + + /** + * 获取键值,返回数组 + * @param string $key 键名 + * @param array $default 默认值,当键不存在时返回 + * @return array + */ + public static function getArray(string $key, array $default = []): array + { + return Base::string2array(self::get($key, $default)); + } + + /** + * 删除键值 + * + * @param string $key 键名 + * @return bool 是否成功 + */ + public static function delete(string $key): bool + { + if (!self::ensureIndex() || empty($key)) { + return false; + } + + // 查找文档ID + $searchParams = [ + 'search_type' => 'term', + 'query' => [ + 'field' => 'key', + 'term' => $key + ], + 'from' => 0, + 'max_results' => 1 + ]; + + $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); + if (!($result['success'] ?? false)) { + return false; + } + + $hits = $result['data']['hits']['hits'] ?? []; + if (empty($hits)) { + return true; // 不存在视为删除成功 + } + + $docId = $hits[0]['_id'] ?? null; + if (empty($docId)) { + return false; + } + + $deleteResult = ZincSearchBase::deleteDoc(self::$indexName, $docId); + return $deleteResult['success'] ?? false; + } + + // ============================== + // 批量操作 + // ============================== + + /** + * 批量设置键值对 + * + * @param array $keyValues 键值对数组 + * @return bool 是否全部成功 + */ + public static function batchSet(array $keyValues): bool + { + if (!self::ensureIndex() || empty($keyValues)) { + return false; + } + + $docs = []; + $now = date('Y-m-d H:i:s'); + + foreach ($keyValues as $key => $value) { + $docs[] = [ + 'key' => $key, + 'value' => $value, + 'created_at' => $now, + 'updated_at' => $now + ]; + } + + $result = ZincSearchBase::addDocs(self::$indexName, $docs); + return $result['success'] ?? false; + } + + /** + * 批量获取键值 + * + * @param array $keys 键名数组 + * @return array 键值对数组 + */ + public static function batchGet(array $keys): array + { + if (!self::ensureIndex() || empty($keys)) { + return []; + } + + $results = []; + + // 遍历查询每个键 + foreach ($keys as $key) { + $results[$key] = self::get($key); + } + + return $results; + } + + // ============================== + // 其他操作 + // ============================== + + /** + * 清空所有键值 + * + * @return bool 是否成功 + */ + public static function clear(): bool + { + // 检查索引是否存在 + if (!ZincSearchBase::indexExists(self::$indexName)) { + return true; + } + + // 删除再重建索引 + $deleteResult = ZincSearchBase::deleteIndex(self::$indexName); + if (!($deleteResult['success'] ?? false)) { + return false; + } + + return self::ensureIndex(); + } +} diff --git a/app/Module/ZincSearch/ZincSearchUserMsg.php b/app/Module/ZincSearch/ZincSearchUserMsg.php new file mode 100644 index 000000000..f814595fd --- /dev/null +++ b/app/Module/ZincSearch/ZincSearchUserMsg.php @@ -0,0 +1,459 @@ + [ + // 共用字段 + 'dialog_id' => ['type' => 'keyword', 'index' => true], + 'created_at' => ['type' => 'date', 'index' => true], + 'updated_at' => ['type' => 'date', 'index' => true], + + // dialog_users 字段 + 'userid' => ['type' => 'keyword', 'index' => true], + 'top_at' => ['type' => 'date', 'index' => true], + 'last_at' => ['type' => 'date', 'index' => true], + 'mark_unread' => ['type' => 'numeric', 'index' => true], + 'silence' => ['type' => 'numeric', 'index' => true], + 'hide' => ['type' => 'numeric', 'index' => true], + 'color' => ['type' => 'keyword', 'index' => true], + + // dialog_msgs 字段 + 'msg_id' => ['type' => 'keyword', 'index' => true], + 'sender_userid' => ['type' => 'keyword', 'index' => true], + 'msg_type' => ['type' => 'keyword', 'index' => true], + 'key' => ['type' => 'text', 'index' => true], + 'bot' => ['type' => 'numeric', 'index' => true], + + // 关联字段 - ZincSearch不支持父子文档,使用引用字段代替 + 'parent_id' => ['type' => 'keyword', 'index' => true], + 'doc_type' => ['type' => 'keyword', 'index' => true] // dialog_user 或 dialog_msg + ] + ]; + + try { + return ZincSearchBase::createIndex(self::$indexName, $mappings); + } catch (\Exception $e) { + Log::error('创建聊天系统索引失败: ' . $e->getMessage()); + return ['success' => false, 'error' => $e->getMessage()]; + } + } + + /** + * 清空对话系统索引 + * 删除索引并重新创建一个空索引 + * + * @return bool 是否清空成功 + */ + public static function clear(): bool + { + try { + // 检查索引是否存在 + if (!ZincSearchBase::indexExists(self::$indexName)) { + return true; // 索引不存在视为已清空 + } + + // 删除索引 + $deleteResult = ZincSearchBase::deleteIndex(self::$indexName); + if (!($deleteResult['success'] ?? false)) { + Log::error('清空对话系统索引失败: ' . ($deleteResult['error'] ?? '未知错误')); + return false; + } + + // 重新创建索引 + $createResult = self::generateIndex(); + return $createResult['success'] ?? false; + } catch (\Exception $e) { + Log::error('清空对话系统索引异常: ' . $e->getMessage()); + return false; + } + } + + // ============================== + // 搜索相关方法 + // ============================== + + /** + * 构建对话系统特定的搜索 - 根据用户ID和消息关键词搜索会话 + * + * @param string $userid 用户ID + * @param string $keyword 消息关键词 + * @param int $from 起始位置 + * @param int $size 返回结果数量 + * @return array + */ + public static function searchByKeyword(string $userid, string $keyword, int $from = 0, int $size = 20): array + { + // 构建复杂的搜索查询 + $searchParams = [ + 'search_type' => 'querystring', + 'query' => [ + 'term' => "+userid:{$userid} +key:*{$keyword}*" + ], + 'from' => $from, + 'max_results' => $size, + 'sort_fields' => ["updated_at:desc"] + ]; + + try { + return ZincSearchBase::advancedSearch(self::$indexName, $searchParams); + } catch (\Exception $e) { + Log::error('搜索对话消息失败: ' . $e->getMessage()); + return [ + 'success' => false, + 'error' => $e->getMessage(), + 'hits' => ['total' => ['value' => 0], 'hits' => []] + ]; + } + } + + // ============================== + // 会话用户相关方法 + // ============================== + + /** + * 会话用户 - 生成文档ID + * + * @param WebSocketDialogUser $dialogUser + * @return string + */ + public static function generateUserDocId(WebSocketDialogUser $dialogUser): string + { + return "user_{$dialogUser->userid}_dialog_{$dialogUser->dialog_id}"; + } + + /** + * 会话用户 - 生成文档格式 + * + * @param WebSocketDialogUser $dialogUser + * @return array + */ + public static function generateUserFormat(WebSocketDialogUser $dialogUser): array + { + return [ + 'dialog_id' => $dialogUser->dialog_id, + 'created_at' => $dialogUser->created_at, + 'updated_at' => $dialogUser->updated_at, + + 'userid' => $dialogUser->userid, + 'top_at' => $dialogUser->top_at, + 'last_at' => $dialogUser->last_at, + 'mark_unread' => $dialogUser->mark_unread ?: 0, + 'silence' => $dialogUser->silence ?: 0, + 'hide' => $dialogUser->hide ?: 0, + 'color' => $dialogUser->color, + + 'doc_type' => 'dialog_user', + 'parent_id' => '' // 用户文档没有父文档 + ]; + } + + /** + * 会话用户 - 同步到ZincSearch + * + * @param WebSocketDialogUser $dialogUser + * @return void + */ + public static function syncUser(WebSocketDialogUser $dialogUser): void + { + try { + if (!ZincSearchBase::indexExists(self::$indexName)) { + self::generateIndex(); + } + $docFormat = self::generateUserFormat($dialogUser); + ZincSearchBase::addDoc(self::$indexName, $docFormat); + } catch (\Exception $e) { + Log::error('syncUser: ' . $e->getMessage()); + } + } + + /** + * 批量同步会话用户 + * + * @param array|iterable $dialogUsers WebSocketDialogUser对象集合 + * @return int 成功同步的用户数 + */ + public static function batchSyncUsers($dialogUsers): int + { + $count = 0; + try { + if (!ZincSearchBase::indexExists(self::$indexName)) { + self::generateIndex(); + } + + $docs = []; + foreach ($dialogUsers as $dialogUser) { + $docs[] = self::generateUserFormat($dialogUser); + $count++; + } + + if (!empty($docs)) { + ZincSearchBase::addDocs(self::$indexName, $docs); + } + } catch (\Exception $e) { + Log::error('batchSyncUsers: ' . $e->getMessage()); + } + + return $count; + } + + /** + * 会话用户 - 从ZincSearch删除 + * + * @param WebSocketDialogUser $dialogUser + * @return void + */ + public static function deleteUser(WebSocketDialogUser $dialogUser): void + { + try { + $docId = self::generateUserDocId($dialogUser); + + // 首先查询相关消息 + $searchParams = [ + 'search_type' => 'term', + 'query' => [ + 'field' => 'parent_id', + 'term' => $docId + ], + 'from' => 0, + 'max_results' => 1000 // 限制一次查询返回的文档数 + ]; + + $result = ZincSearchBase::advancedSearch(self::$indexName, $searchParams); + $hits = $result['data']['hits']['hits'] ?? []; + + // 批量删除子文档 + $batch = []; + foreach ($hits as $hit) { + if (isset($hit['_id'])) { + ZincSearchBase::deleteDoc(self::$indexName, $hit['_id']); + } + } + + // 删除用户文档 + ZincSearchBase::deleteDoc(self::$indexName, $docId); + } catch (\Exception $e) { + Log::error('deleteUser: ' . $e->getMessage()); + } + } + + // ============================== + // 会话消息相关方法 + // ============================== + + /** + * 会话消息 - 生成父文档ID + * + * @param WebSocketDialogMsg $dialogMsg + * @param string $userid + * @return string + */ + public static function generateMsgParentId(WebSocketDialogMsg $dialogMsg, string $userid): string + { + return "user_{$userid}_dialog_{$dialogMsg->dialog_id}"; + } + + /** + * 会话消息 - 生成文档ID + * + * @param WebSocketDialogMsg $dialogMsg + * @param string $userid + * @return string + */ + public static function generateMsgDocId(WebSocketDialogMsg $dialogMsg, string $userid): string + { + return "msg_{$dialogMsg->id}_user_{$userid}"; + } + + /** + * 会话消息 - 生成文档格式 + * + * @param WebSocketDialogMsg $dialogMsg + * @param string $userid + * @return array + */ + public static function generateMsgFormat(WebSocketDialogMsg $dialogMsg, string $userid): array + { + return [ + 'dialog_id' => $dialogMsg->dialog_id, + 'created_at' => $dialogMsg->created_at, + 'updated_at' => $dialogMsg->updated_at, + + 'msg_id' => $dialogMsg->id, + 'sender_userid' => $dialogMsg->userid, + 'msg_type' => $dialogMsg->type, + 'key' => $dialogMsg->key, + 'bot' => $dialogMsg->bot ? 1 : 0, + + 'doc_type' => 'dialog_msg', + 'parent_id' => self::generateMsgParentId($dialogMsg, $userid) + ]; + } + + /** + * 会话消息 - 同步到ZincSearch + * + * @param WebSocketDialogMsg $dialogMsg + * @return void + */ + public static function syncMsg(WebSocketDialogMsg $dialogMsg): void + { + try { + // 获取此会话的所有用户 + $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); + + if ($dialogUsers->isEmpty()) { + return; + } + + $docs = []; + foreach ($dialogUsers as $dialogUser) { + $docId = self::generateMsgDocId($dialogMsg, $dialogUser->userid); + $docFormat = self::generateMsgFormat($dialogMsg, $dialogUser->userid); + $docs[] = $docFormat; + } + + if (!empty($docs)) { + if (ZincSearchBase::indexExists(self::$indexName)) { + ZincSearchBase::addDocs(self::$indexName, $docs); + } else { + self::generateIndex(); + ZincSearchBase::addDocs(self::$indexName, $docs); + } + } + } catch (\Exception $e) { + Log::error('syncMsg: ' . $e->getMessage()); + } + } + + /** + * 批量同步会话消息 + * + * @param array|iterable $dialogMsgs WebSocketDialogMsg对象集合 + * @return int 成功同步的消息数 + */ + public static function batchSyncMsgs($dialogMsgs): int + { + $count = 0; + try { + $docs = []; + $userDialogs = []; + + // 预处理:收集所有涉及的对话ID + $dialogIds = []; + foreach ($dialogMsgs as $message) { + $dialogIds[] = $message->dialog_id; + } + $dialogIds = array_unique($dialogIds); + + // 获取所有相关的用户-对话关系 + if (!empty($dialogIds)) { + $dialogUsers = WebSocketDialogUser::whereIn('dialog_id', $dialogIds)->get(); + + // 按对话ID组织用户 + foreach ($dialogUsers as $dialogUser) { + $userDialogs[$dialogUser->dialog_id][] = $dialogUser; + } + } + + // 为每条消息准备所有相关用户的文档 + foreach ($dialogMsgs as $message) { + if (isset($userDialogs[$message->dialog_id])) { + foreach ($userDialogs[$message->dialog_id] as $dialogUser) { + $docFormat = self::generateMsgFormat($message, $dialogUser->userid); + $docs[] = $docFormat; + $count++; + } + } + } + + // 批量写入 + if (!empty($docs)) { + if (ZincSearchBase::indexExists(self::$indexName)) { + ZincSearchBase::addDocs(self::$indexName, $docs); + } else { + self::generateIndex(); + ZincSearchBase::addDocs(self::$indexName, $docs); + } + } + } catch (\Exception $e) { + Log::error('batchSyncMsgs: ' . $e->getMessage()); + } + + return $count; + } + + /** + * 会话消息 - 从ZincSearch删除 + * + * @param WebSocketDialogMsg $dialogMsg + * @return void + */ + public static function deleteMsg(WebSocketDialogMsg $dialogMsg): void + { + try { + // 获取此会话的所有用户 + $dialogUsers = WebSocketDialogUser::whereDialogId($dialogMsg->dialog_id)->get(); + + if ($dialogUsers->isEmpty()) { + return; + } + + foreach ($dialogUsers as $dialogUser) { + $docId = self::generateMsgDocId($dialogMsg, $dialogUser->userid); + ZincSearchBase::deleteDoc(self::$indexName, $docId); + } + } catch (\Exception $e) { + Log::error('deleteMsg: ' . $e->getMessage()); + } + } +}