diff --git a/app/Console/Commands/SyncMsgToManticore.php b/app/Console/Commands/SyncMsgToManticore.php new file mode 100644 index 000000000..f3060e73f --- /dev/null +++ b/app/Console/Commands/SyncMsgToManticore.php @@ -0,0 +1,227 @@ +error("应用「Manticore Search」未安装"); + return 1; + } + + // 注册信号处理器 + if (extension_loaded('pcntl')) { + pcntl_async_signals(true); + pcntl_signal(SIGINT, [$this, 'handleSignal']); + pcntl_signal(SIGTERM, [$this, 'handleSignal']); + } + + // 检查锁 + $lockInfo = $this->getLock(); + if ($lockInfo) { + $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); + return 1; + } + + $this->setLock(); + + // 清除索引 + if ($this->option('c')) { + $this->info('清除索引...'); + ManticoreMsg::clear(); + $this->info("索引删除成功"); + $this->releaseLock(); + return 0; + } + + $dialogId = $this->option('dialog') ? intval($this->option('dialog')) : 0; + + if ($dialogId > 0) { + $this->info("开始同步对话 {$dialogId} 的消息数据(MVA 方案:allowed_users 自动内联)..."); + $this->syncDialogMsgs($dialogId); + } else { + $this->info('开始同步消息数据(MVA 方案:allowed_users 自动内联)...'); + $this->syncMsgs(); + } + + $this->info("\n同步完成"); + $this->releaseLock(); + return 0; + } + + private function getLock(): ?array + { + $lockKey = md5($this->signature); + return Cache::has($lockKey) ? Cache::get($lockKey) : null; + } + + private function setLock(): void + { + $lockKey = md5($this->signature); + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + } + + private function releaseLock(): void + { + $lockKey = md5($this->signature); + Cache::forget($lockKey); + } + + public function handleSignal(int $signal): void + { + $this->releaseLock(); + exit(0); + } + + /** + * 同步所有消息 + */ + private function syncMsgs(): void + { + $lastKey = "sync:manticoreMsgLastId"; + $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; + + if ($lastId > 0) { + $this->info("\n增量同步消息数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步消息数据..."); + } + + // 构建基础查询条件 + // 排除:软删除、机器人消息、空 key 消息 + // 只包含:可索引的消息类型 + $baseQuery = WebSocketDialogMsg::where('id', '>', $lastId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES); + + $num = 0; + $count = $baseQuery->count(); + $batchSize = $this->option('batch'); + + $total = 0; + $lastNum = 0; + + do { + $msgs = WebSocketDialogMsg::where('id', '>', $lastId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($msgs->isEmpty()) { + break; + } + + $num += count($msgs); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + if ($progress < 100) { + $progress = number_format($progress, 2); + } + $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})"); + + $this->setLock(); + + $lastNum = ManticoreMsg::batchSync($msgs); + $total += $lastNum; + + $lastId = $msgs->last()->id; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($msgs) == $batchSize); + + $this->info("同步消息结束 - 最后ID {$lastId}"); + $this->info("已索引消息数量: " . ManticoreMsg::getIndexedCount()); + } + + /** + * 同步指定对话的消息 + * + * @param int $dialogId 对话ID + */ + private function syncDialogMsgs(int $dialogId): void + { + $this->info("\n同步对话 {$dialogId} 的消息数据..."); + + $baseQuery = WebSocketDialogMsg::where('dialog_id', $dialogId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES); + + $num = 0; + $count = $baseQuery->count(); + $batchSize = $this->option('batch'); + $lastId = 0; + + $total = 0; + $lastNum = 0; + + do { + $msgs = WebSocketDialogMsg::where('dialog_id', $dialogId) + ->where('id', '>', $lastId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($msgs->isEmpty()) { + break; + } + + $num += count($msgs); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + if ($progress < 100) { + $progress = number_format($progress, 2); + } + $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})"); + + $this->setLock(); + + $lastNum = ManticoreMsg::batchSync($msgs); + $total += $lastNum; + + $lastId = $msgs->last()->id; + } while (count($msgs) == $batchSize); + + $this->info("同步对话 {$dialogId} 消息结束"); + $this->info("该对话已索引消息数量: " . \App\Module\Manticore\ManticoreBase::getDialogIndexedMsgCount($dialogId)); + } +} diff --git a/app/Http/Controllers/Api/SearchController.php b/app/Http/Controllers/Api/SearchController.php index 689171f4f..bf693d370 100644 --- a/app/Http/Controllers/Api/SearchController.php +++ b/app/Http/Controllers/Api/SearchController.php @@ -5,12 +5,14 @@ namespace App\Http\Controllers\Api; use Request; use App\Models\File; use App\Models\User; +use App\Models\WebSocketDialogMsg; use App\Module\Base; use App\Module\Apps; use App\Module\Manticore\ManticoreFile; use App\Module\Manticore\ManticoreUser; use App\Module\Manticore\ManticoreProject; use App\Module\Manticore\ManticoreTask; +use App\Module\Manticore\ManticoreMsg; /** * @apiDefine search @@ -242,5 +244,73 @@ class SearchController extends AbstractController return Base::retSuccess('success', []); } + + /** + * @api {get} api/search/message AI 搜索消息 + * + * @apiDescription 需要token身份,需要安装 Manticore Search 应用 + * @apiVersion 1.0.0 + * @apiGroup search + * @apiName message + * + * @apiParam {String} key 搜索关键词 + * @apiParam {String} [search_type] 搜索类型(text/vector/hybrid,默认:hybrid) + * @apiParam {Number} [take] 获取数量(默认:20,最大:50) + * + * @apiSuccess {Number} ret 返回状态码(1正确、0错误) + * @apiSuccess {String} msg 返回信息(错误描述) + * @apiSuccess {Object} data 返回数据 + */ + public function message() + { + $user = User::auth(); + + if (!Apps::isInstalled('manticore')) { + return Base::retError('Manticore Search 应用未安装'); + } + + $key = trim(Request::input('key')); + $searchType = Request::input('search_type', 'hybrid'); + $take = min(50, max(1, intval(Request::input('take', 20)))); + + if (empty($key)) { + return Base::retSuccess('success', []); + } + + $results = ManticoreMsg::search($user->userid, $key, $searchType, 0, $take); + + // 补充消息完整信息 + $msgIds = array_column($results, 'msg_id'); + if (!empty($msgIds)) { + $msgs = WebSocketDialogMsg::whereIn('id', $msgIds) + ->with(['user' => function ($query) { + $query->select(User::$basicField); + }]) + ->get() + ->keyBy('id'); + + $formattedResults = []; + foreach ($results as $item) { + $msgData = $msgs->get($item['msg_id']); + if ($msgData) { + $formattedResults[] = [ + 'id' => $msgData->id, + 'msg_id' => $msgData->id, + 'dialog_id' => $msgData->dialog_id, + 'userid' => $msgData->userid, + 'type' => $msgData->type, + 'msg' => $msgData->msg, + 'created_at' => $msgData->created_at, + 'user' => $msgData->user, + 'relevance' => $item['relevance'] ?? 0, + 'content_preview' => $item['content_preview'] ?? null, + ]; + } + } + return Base::retSuccess('success', $formattedResults); + } + + return Base::retSuccess('success', []); + } } diff --git a/app/Module/Manticore/ManticoreBase.php b/app/Module/Manticore/ManticoreBase.php index 3359c0693..bbb9a39fa 100644 --- a/app/Module/Manticore/ManticoreBase.php +++ b/app/Module/Manticore/ManticoreBase.php @@ -143,6 +143,21 @@ class ManticoreBase ) charset_table='chinese' morphology='icu_chinese' "); + // 创建消息向量表(含 allowed_users MVA 权限字段) + $pdo->exec(" + CREATE TABLE IF NOT EXISTS msg_vectors ( + id BIGINT, + msg_id BIGINT, + dialog_id BIGINT, + userid BIGINT, + msg_type STRING, + content TEXT, + allowed_users MULTI, + created_at BIGINT, + content_vector float_vector knn_type='hnsw' knn_dims='1536' hnsw_similarity='cosine' + ) charset_table='chinese' morphology='icu_chinese' + "); + Log::info('Manticore tables initialized successfully'); } catch (PDOException $e) { Log::warning('Manticore initialization warning: ' . $e->getMessage()); @@ -1493,5 +1508,338 @@ class ManticoreBase return $result ? (int) $result['cnt'] : 0; } + // ============================== + // 消息向量方法 + // ============================== + + /** + * 消息全文搜索(使用 MVA allowed_users 权限过滤) + * + * @param string $keyword 关键词 + * @param int $userid 用户ID(权限过滤) + * @param int $limit 返回数量 + * @param int $offset 偏移量 + * @return array 搜索结果 + */ + public static function msgFullTextSearch(string $keyword, int $userid = 0, int $limit = 20, int $offset = 0): array + { + if (empty($keyword)) { + return []; + } + + $instance = new self(); + $escapedKeyword = self::escapeMatch($keyword); + + if ($userid > 0) { + // 使用 MVA 权限过滤 + $sql = " + SELECT + id, + msg_id, + dialog_id, + userid, + msg_type, + content, + created_at, + WEIGHT() as relevance + FROM msg_vectors + WHERE MATCH('@content {$escapedKeyword}') + AND allowed_users = " . (int)$userid . " + ORDER BY relevance DESC + LIMIT " . (int)$limit . " OFFSET " . (int)$offset; + } else { + $sql = " + SELECT + id, + msg_id, + dialog_id, + userid, + msg_type, + content, + created_at, + WEIGHT() as relevance + FROM msg_vectors + WHERE MATCH('@content {$escapedKeyword}') + ORDER BY relevance DESC + LIMIT " . (int)$limit . " OFFSET " . (int)$offset; + } + + return $instance->query($sql); + } + + /** + * 消息向量搜索(使用 MVA allowed_users 权限过滤) + * + * @param array $queryVector 查询向量 + * @param int $userid 用户ID(权限过滤) + * @param int $limit 返回数量 + * @return array 搜索结果 + */ + public static function msgVectorSearch(array $queryVector, int $userid = 0, int $limit = 20): array + { + if (empty($queryVector)) { + return []; + } + + $instance = new self(); + $vectorStr = '(' . implode(',', $queryVector) . ')'; + + // KNN 搜索需要先获取更多结果,再在应用层过滤权限 + $fetchLimit = $userid > 0 ? $limit * 5 : $limit; + + $sql = " + SELECT + id, + msg_id, + dialog_id, + userid, + msg_type, + content, + created_at, + KNN_DIST() as distance + FROM msg_vectors + WHERE KNN(content_vector, " . (int)$fetchLimit . ", {$vectorStr}) + ORDER BY distance ASC + "; + + $results = $instance->query($sql); + + foreach ($results as &$item) { + $item['similarity'] = 1 - ($item['distance'] ?? 0); + } + + // MVA 权限过滤 + if ($userid > 0 && !empty($results)) { + $allowedMsgIds = $instance->query( + "SELECT msg_id FROM msg_vectors WHERE allowed_users = ? LIMIT 100000", + [$userid] + ); + $allowedIds = array_column($allowedMsgIds, 'msg_id'); + + $results = array_filter($results, function ($item) use ($allowedIds) { + return in_array($item['msg_id'], $allowedIds); + }); + $results = array_values($results); + } + + return array_slice($results, 0, $limit); + } + + /** + * 消息混合搜索 + * + * @param string $keyword 关键词 + * @param array $queryVector 查询向量 + * @param int $userid 用户ID(权限过滤) + * @param int $limit 返回数量 + * @return array 搜索结果 + */ + public static function msgHybridSearch(string $keyword, array $queryVector, int $userid = 0, int $limit = 20): array + { + $textResults = self::msgFullTextSearch($keyword, $userid, 50, 0); + $vectorResults = !empty($queryVector) ? self::msgVectorSearch($queryVector, $userid, 50) : []; + + $scores = []; + $items = []; + $k = 60; + + foreach ($textResults as $rank => $item) { + $id = $item['msg_id']; + $scores[$id] = ($scores[$id] ?? 0) + 0.5 / ($k + $rank + 1); + $items[$id] = $item; + } + + foreach ($vectorResults as $rank => $item) { + $id = $item['msg_id']; + $scores[$id] = ($scores[$id] ?? 0) + 0.5 / ($k + $rank + 1); + if (!isset($items[$id])) { + $items[$id] = $item; + } + } + + arsort($scores); + + $results = []; + $count = 0; + foreach ($scores as $id => $score) { + if ($count >= $limit) break; + $item = $items[$id]; + $item['rrf_score'] = $score; + $results[] = $item; + $count++; + } + + return $results; + } + + /** + * 插入或更新消息向量(含 allowed_users MVA 权限字段) + * + * @param array $data 消息数据,包含: + * - msg_id: 消息ID + * - dialog_id: 对话ID + * - userid: 发送者ID + * - msg_type: 消息类型 + * - content: 消息内容 + * - content_vector: 向量值 + * - allowed_users: 有权限的用户ID数组 + * - created_at: 创建时间戳 + * @return bool 是否成功 + */ + public static function upsertMsgVector(array $data): bool + { + $instance = new self(); + + $msgId = $data['msg_id'] ?? 0; + if ($msgId <= 0) { + return false; + } + + // 先删除已存在的记录 + $instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]); + + // 构建 allowed_users MVA 值 + $allowedUsers = $data['allowed_users'] ?? []; + $allowedUsersStr = !empty($allowedUsers) ? '(' . implode(',', array_map('intval', $allowedUsers)) . ')' : '()'; + + // 插入新记录 + $vectorValue = $data['content_vector'] ?? null; + if ($vectorValue) { + $vectorValue = str_replace(['[', ']'], ['(', ')'], $vectorValue); + $sql = "INSERT INTO msg_vectors + (id, msg_id, dialog_id, userid, msg_type, content, allowed_users, created_at, content_vector) + VALUES (?, ?, ?, ?, ?, ?, {$allowedUsersStr}, ?, {$vectorValue})"; + } else { + $sql = "INSERT INTO msg_vectors + (id, msg_id, dialog_id, userid, msg_type, content, allowed_users, created_at) + VALUES (?, ?, ?, ?, ?, ?, {$allowedUsersStr}, ?)"; + } + + $params = [ + $msgId, + $msgId, + $data['dialog_id'] ?? 0, + $data['userid'] ?? 0, + $data['msg_type'] ?? 'text', + $data['content'] ?? '', + $data['created_at'] ?? time() + ]; + + return $instance->execute($sql, $params); + } + + /** + * 更新对话的 allowed_users 权限列表(批量更新该对话下所有消息) + * + * @param int $dialogId 对话ID + * @param array $userids 有权限的用户ID数组 + * @return int 更新的消息数量 + */ + public static function updateDialogAllowedUsers(int $dialogId, array $userids): int + { + if ($dialogId <= 0) { + return 0; + } + + $instance = new self(); + $allowedUsersStr = !empty($userids) ? '(' . implode(',', array_map('intval', $userids)) . ')' : '()'; + + // Manticore 支持按条件批量更新 + return $instance->executeWithRowCount( + "UPDATE msg_vectors SET allowed_users = {$allowedUsersStr} WHERE dialog_id = ?", + [$dialogId] + ); + } + + /** + * 删除消息向量 + * + * @param int $msgId 消息ID + * @return bool 是否成功 + */ + public static function deleteMsgVector(int $msgId): bool + { + if ($msgId <= 0) { + return false; + } + + $instance = new self(); + return $instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]); + } + + /** + * 批量删除对话下的所有消息向量 + * + * @param int $dialogId 对话ID + * @return int 删除数量 + */ + public static function deleteDialogMsgVectors(int $dialogId): int + { + if ($dialogId <= 0) { + return 0; + } + + $instance = new self(); + return $instance->executeWithRowCount( + "DELETE FROM msg_vectors WHERE dialog_id = ?", + [$dialogId] + ); + } + + /** + * 清空所有消息向量 + * + * @return bool 是否成功 + */ + public static function clearAllMsgVectors(): bool + { + $instance = new self(); + return $instance->execute("TRUNCATE TABLE msg_vectors"); + } + + /** + * 获取已索引的消息数量 + * + * @return int 消息数量 + */ + public static function getIndexedMsgCount(): int + { + $instance = new self(); + $result = $instance->queryOne("SELECT COUNT(*) as cnt FROM msg_vectors"); + return $result ? (int) $result['cnt'] : 0; + } + + /** + * 获取对话的已索引消息数量 + * + * @param int $dialogId 对话ID + * @return int 消息数量 + */ + public static function getDialogIndexedMsgCount(int $dialogId): int + { + if ($dialogId <= 0) { + return 0; + } + + $instance = new self(); + $result = $instance->queryOne( + "SELECT COUNT(*) as cnt FROM msg_vectors WHERE dialog_id = ?", + [$dialogId] + ); + return $result ? (int) $result['cnt'] : 0; + } + + /** + * 获取最后索引的消息ID + * + * @return int 消息ID + */ + public static function getLastIndexedMsgId(): int + { + $instance = new self(); + $result = $instance->queryOne("SELECT MAX(msg_id) as max_id FROM msg_vectors"); + return $result ? (int) ($result['max_id'] ?? 0) : 0; + } + } diff --git a/app/Module/Manticore/ManticoreMsg.php b/app/Module/Manticore/ManticoreMsg.php new file mode 100644 index 000000000..e7c9ecbe5 --- /dev/null +++ b/app/Module/Manticore/ManticoreMsg.php @@ -0,0 +1,360 @@ +bot === 1) { + return false; + } + + // 2. 检查消息类型 + if (!in_array($msg->type, self::INDEXABLE_TYPES)) { + return false; + } + + // 3. 排除 key 为空的消息 + if (empty($msg->key)) { + return false; + } + + return true; + } + + /** + * 搜索消息(支持全文、向量、混合搜索) + * + * @param int $userid 用户ID + * @param string $keyword 搜索关键词 + * @param string $searchType 搜索类型: text/vector/hybrid + * @param int $from 起始位置 + * @param int $size 返回数量 + * @return array 搜索结果 + */ + public static function search(int $userid, string $keyword, string $searchType = 'hybrid', int $from = 0, int $size = 20): array + { + if (empty($keyword)) { + return []; + } + + if (!Apps::isInstalled("manticore")) { + return []; + } + + try { + switch ($searchType) { + case 'text': + // 纯全文搜索 + return self::formatSearchResults( + ManticoreBase::msgFullTextSearch($keyword, $userid, $size, $from) + ); + + case 'vector': + // 纯向量搜索(需要先获取 embedding) + $embedding = self::getEmbedding($keyword); + if (empty($embedding)) { + // embedding 获取失败,降级到全文搜索 + return self::formatSearchResults( + ManticoreBase::msgFullTextSearch($keyword, $userid, $size, $from) + ); + } + return self::formatSearchResults( + ManticoreBase::msgVectorSearch($embedding, $userid, $size) + ); + + case 'hybrid': + default: + // 混合搜索 + $embedding = self::getEmbedding($keyword); + return self::formatSearchResults( + ManticoreBase::msgHybridSearch($keyword, $embedding, $userid, $size) + ); + } + } catch (\Exception $e) { + Log::error('Manticore msg search error: ' . $e->getMessage()); + return []; + } + } + + /** + * 获取文本的 Embedding 向量 + * + * @param string $text 文本 + * @return array 向量数组(空数组表示失败) + */ + private static function getEmbedding(string $text): array + { + if (empty($text)) { + return []; + } + + try { + // 调用 AI 模块获取 embedding + $result = AI::getEmbedding($text); + if (Base::isSuccess($result)) { + return $result['data'] ?? []; + } + } catch (\Exception $e) { + Log::warning('Get embedding error: ' . $e->getMessage()); + } + + return []; + } + + /** + * 格式化搜索结果 + * + * @param array $results Manticore 返回的结果 + * @return array 格式化后的结果 + */ + private static function formatSearchResults(array $results): array + { + $formatted = []; + foreach ($results as $item) { + $formatted[] = [ + 'id' => $item['msg_id'], + 'msg_id' => $item['msg_id'], + 'dialog_id' => $item['dialog_id'], + 'userid' => $item['userid'], + 'msg_type' => $item['msg_type'], + 'content_preview' => isset($item['content']) ? mb_substr($item['content'], 0, 200) : null, + 'created_at' => $item['created_at'] ?? null, + 'relevance' => $item['relevance'] ?? $item['similarity'] ?? $item['rrf_score'] ?? 0, + ]; + } + return $formatted; + } + + // ============================== + // 权限计算方法(MVA 方案核心) + // ============================== + + /** + * 获取消息的 allowed_users 列表 + * + * 对话的所有成员都有权限查看该对话的消息 + * + * @param WebSocketDialogMsg $msg 消息模型 + * @return array 有权限的用户ID数组 + */ + public static function getAllowedUsers(WebSocketDialogMsg $msg): array + { + return self::getDialogUserIds($msg->dialog_id); + } + + /** + * 获取对话的所有成员ID + * + * @param int $dialogId 对话ID + * @return array 成员用户ID数组 + */ + public static function getDialogUserIds(int $dialogId): array + { + if ($dialogId <= 0) { + return []; + } + + return WebSocketDialogUser::where('dialog_id', $dialogId) + ->pluck('userid') + ->toArray(); + } + + // ============================== + // 同步方法 + // ============================== + + /** + * 同步单个消息到 Manticore(含 allowed_users) + * + * @param WebSocketDialogMsg $msg 消息模型 + * @return bool 是否成功 + */ + public static function sync(WebSocketDialogMsg $msg): bool + { + if (!Apps::isInstalled("manticore")) { + return false; + } + + // 检查是否应该索引 + if (!self::shouldIndex($msg)) { + // 不符合索引条件,尝试删除已存在的索引 + return ManticoreBase::deleteMsgVector($msg->id); + } + + try { + // 提取消息内容(使用 key 字段) + $content = $msg->key ?? ''; + + // 限制内容长度 + $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); + + // 获取 embedding(如果有内容且 AI 可用) + $embedding = null; + if (!empty($content) && Apps::isInstalled('ai')) { + $embeddingResult = self::getEmbedding($content); + if (!empty($embeddingResult)) { + $embedding = '[' . implode(',', $embeddingResult) . ']'; + } + } + + // 获取消息的 allowed_users + $allowedUsers = self::getAllowedUsers($msg); + + // 写入 Manticore(含 allowed_users) + $result = ManticoreBase::upsertMsgVector([ + 'msg_id' => $msg->id, + 'dialog_id' => $msg->dialog_id, + 'userid' => $msg->userid, + 'msg_type' => $msg->type, + 'content' => $content, + 'content_vector' => $embedding, + 'allowed_users' => $allowedUsers, + 'created_at' => $msg->created_at ? $msg->created_at->timestamp : time(), + ]); + + return $result; + } catch (\Exception $e) { + Log::error('Manticore msg sync error: ' . $e->getMessage(), [ + 'msg_id' => $msg->id, + 'dialog_id' => $msg->dialog_id, + ]); + return false; + } + } + + /** + * 批量同步消息 + * + * @param iterable $msgs 消息列表 + * @return int 成功同步的数量 + */ + public static function batchSync(iterable $msgs): int + { + if (!Apps::isInstalled("manticore")) { + return 0; + } + + $count = 0; + foreach ($msgs as $msg) { + if (self::sync($msg)) { + $count++; + } + } + return $count; + } + + /** + * 删除消息索引 + * + * @param int $msgId 消息ID + * @return bool 是否成功 + */ + public static function delete(int $msgId): bool + { + if (!Apps::isInstalled("manticore")) { + return false; + } + + return ManticoreBase::deleteMsgVector($msgId); + } + + /** + * 清空所有索引 + * + * @return bool 是否成功 + */ + public static function clear(): bool + { + if (!Apps::isInstalled("manticore")) { + return false; + } + + return ManticoreBase::clearAllMsgVectors(); + } + + /** + * 获取已索引消息数量 + * + * @return int 数量 + */ + public static function getIndexedCount(): int + { + if (!Apps::isInstalled("manticore")) { + return 0; + } + + return ManticoreBase::getIndexedMsgCount(); + } + + // ============================== + // 权限更新方法(MVA 方案) + // ============================== + + /** + * 更新对话下所有消息的 allowed_users 权限列表 + * 从 MySQL 获取最新的对话成员并更新到 Manticore + * + * @param int $dialogId 对话ID + * @return int 更新的消息数量 + */ + public static function updateDialogAllowedUsers(int $dialogId): int + { + if (!Apps::isInstalled("manticore") || $dialogId <= 0) { + return 0; + } + + try { + $userids = self::getDialogUserIds($dialogId); + return ManticoreBase::updateDialogAllowedUsers($dialogId, $userids); + } catch (\Exception $e) { + Log::error('Manticore updateDialogAllowedUsers error: ' . $e->getMessage(), ['dialog_id' => $dialogId]); + return 0; + } + } +} diff --git a/app/Observers/WebSocketDialogMsgObserver.php b/app/Observers/WebSocketDialogMsgObserver.php index 7d220ff24..244fd27e6 100644 --- a/app/Observers/WebSocketDialogMsgObserver.php +++ b/app/Observers/WebSocketDialogMsgObserver.php @@ -3,6 +3,9 @@ namespace App\Observers; use App\Models\WebSocketDialogMsg; +use App\Module\Apps; +use App\Module\Manticore\ManticoreMsg; +use App\Tasks\ManticoreSyncTask; use App\Tasks\ZincSearchSyncTask; class WebSocketDialogMsgObserver extends AbstractObserver @@ -15,7 +18,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function created(WebSocketDialogMsg $webSocketDialogMsg) { + // ZincSearch 同步 self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray())); + + // Manticore 同步(仅在安装 Manticore 且符合索引条件时) + if (Apps::isInstalled('manticore') && ManticoreMsg::shouldIndex($webSocketDialogMsg)) { + self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id])); + } } /** @@ -26,7 +35,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function updated(WebSocketDialogMsg $webSocketDialogMsg) { + // ZincSearch 同步 self::taskDeliver(new ZincSearchSyncTask('sync', $webSocketDialogMsg->toArray())); + + // Manticore 同步(更新可能使消息符合或不再符合索引条件,由 sync 方法处理) + if (Apps::isInstalled('manticore')) { + self::taskDeliver(new ManticoreSyncTask('msg_sync', ['msg_id' => $webSocketDialogMsg->id])); + } } /** @@ -37,7 +52,13 @@ class WebSocketDialogMsgObserver extends AbstractObserver */ public function deleted(WebSocketDialogMsg $webSocketDialogMsg) { + // ZincSearch 删除 self::taskDeliver(new ZincSearchSyncTask('delete', $webSocketDialogMsg->toArray())); + + // Manticore 删除 + if (Apps::isInstalled('manticore')) { + self::taskDeliver(new ManticoreSyncTask('msg_delete', ['msg_id' => $webSocketDialogMsg->id])); + } } /** diff --git a/app/Observers/WebSocketDialogUserObserver.php b/app/Observers/WebSocketDialogUserObserver.php index 743b2b70c..7ed29705d 100644 --- a/app/Observers/WebSocketDialogUserObserver.php +++ b/app/Observers/WebSocketDialogUserObserver.php @@ -5,6 +5,8 @@ namespace App\Observers; use App\Models\Deleted; use App\Models\UserBot; use App\Models\WebSocketDialogUser; +use App\Module\Apps; +use App\Tasks\ManticoreSyncTask; use App\Tasks\ZincSearchSyncTask; use Carbon\Carbon; @@ -32,6 +34,14 @@ class WebSocketDialogUserObserver extends AbstractObserver } Deleted::forget('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); self::taskDeliver(new ZincSearchSyncTask('userSync', $webSocketDialogUser->toArray())); + + // Manticore: 更新对话下所有消息的 allowed_users + if (Apps::isInstalled('manticore')) { + self::taskDeliver(new ManticoreSyncTask('update_dialog_allowed_users', [ + 'dialog_id' => $webSocketDialogUser->dialog_id + ])); + } + // $dialog = $webSocketDialogUser->webSocketDialog; if ($dialog) { @@ -60,6 +70,14 @@ class WebSocketDialogUserObserver extends AbstractObserver { Deleted::record('dialog', $webSocketDialogUser->dialog_id, $webSocketDialogUser->userid); self::taskDeliver(new ZincSearchSyncTask('deleteUser', $webSocketDialogUser->toArray())); + + // Manticore: 更新对话下所有消息的 allowed_users + if (Apps::isInstalled('manticore')) { + self::taskDeliver(new ManticoreSyncTask('update_dialog_allowed_users', [ + 'dialog_id' => $webSocketDialogUser->dialog_id + ])); + } + // $dialog = $webSocketDialogUser->webSocketDialog; if ($dialog) { diff --git a/app/Tasks/ManticoreSyncTask.php b/app/Tasks/ManticoreSyncTask.php index 8c1bd3eab..21dc5d2f2 100644 --- a/app/Tasks/ManticoreSyncTask.php +++ b/app/Tasks/ManticoreSyncTask.php @@ -6,12 +6,14 @@ use App\Models\File; use App\Models\User; use App\Models\Project; use App\Models\ProjectTask; +use App\Models\WebSocketDialogMsg; use App\Module\Apps; use App\Module\Manticore\ManticoreBase; use App\Module\Manticore\ManticoreFile; use App\Module\Manticore\ManticoreUser; use App\Module\Manticore\ManticoreProject; use App\Module\Manticore\ManticoreTask; +use App\Module\Manticore\ManticoreMsg; use Carbon\Carbon; use Illuminate\Support\Facades\Cache; @@ -152,6 +154,31 @@ class ManticoreSyncTask extends AbstractTask } break; + // ============================== + // 消息同步动作 + // ============================== + case 'msg_sync': + $msg = WebSocketDialogMsg::find($this->data['msg_id'] ?? 0); + if ($msg) { + ManticoreMsg::sync($msg); + } + break; + + case 'msg_delete': + $msgId = $this->data['msg_id'] ?? 0; + if ($msgId > 0) { + ManticoreMsg::delete($msgId); + } + break; + + case 'update_dialog_allowed_users': + // 更新对话下所有消息的 allowed_users(成员变更时调用) + $dialogId = $this->data['dialog_id'] ?? 0; + if ($dialogId > 0) { + ManticoreMsg::updateDialogAllowedUsers($dialogId); + } + break; + default: // 增量更新(定时任务调用) $this->incrementalUpdate(); @@ -181,6 +208,7 @@ class ManticoreSyncTask extends AbstractTask @shell_exec("php /var/www/artisan manticore:sync-users --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-tasks --i 2>&1 &"); + @shell_exec("php /var/www/artisan manticore:sync-msgs --i 2>&1 &"); // 执行完成 Cache::put("ManticoreSyncTask:Time", time(), Carbon::now()->addMinutes(5));