From e020a80020fd4d1433a8041175f514504e97e21e Mon Sep 17 00:00:00 2001 From: kuaifan Date: Sat, 3 Jan 2026 15:19:23 +0000 Subject: [PATCH] feat: Add batch embedding retrieval and vector update methods for Manticore integration - Implemented `getBatchEmbeddings` method in AI module for retrieving embeddings for multiple texts. - Added vector update methods for messages, files, tasks, projects, and users in ManticoreBase. - Enhanced ManticoreFile, ManticoreMsg, ManticoreProject, ManticoreTask, and ManticoreUser to support vector generation during sync operations. - Introduced `generateVectorsBatch` methods for batch processing of vector generation in Manticore modules. - Updated ManticoreSyncTask to handle incremental updates and vector generation asynchronously. --- .../Commands/GenerateManticoreVectors.php | 226 ++++++++++++++++ app/Module/AI.php | 131 +++++++++ app/Module/Manticore/ManticoreBase.php | 256 ++++++++++++++++++ app/Module/Manticore/ManticoreFile.php | 98 ++++++- app/Module/Manticore/ManticoreMsg.php | 90 +++++- app/Module/Manticore/ManticoreProject.php | 90 +++++- app/Module/Manticore/ManticoreTask.php | 94 ++++++- app/Module/Manticore/ManticoreUser.php | 91 ++++++- app/Tasks/ManticoreSyncTask.php | 42 ++- 9 files changed, 1087 insertions(+), 31 deletions(-) create mode 100644 app/Console/Commands/GenerateManticoreVectors.php diff --git a/app/Console/Commands/GenerateManticoreVectors.php b/app/Console/Commands/GenerateManticoreVectors.php new file mode 100644 index 000000000..63ea50f84 --- /dev/null +++ b/app/Console/Commands/GenerateManticoreVectors.php @@ -0,0 +1,226 @@ + [ + 'syncKey' => 'sync:manticoreMsgLastId', + 'vectorKey' => 'vector:manticoreMsgLastId', + 'class' => ManticoreMsg::class, + 'model' => WebSocketDialogMsg::class, + 'idField' => 'id', + ], + 'file' => [ + 'syncKey' => 'sync:manticoreFileLastId', + 'vectorKey' => 'vector:manticoreFileLastId', + 'class' => ManticoreFile::class, + 'model' => File::class, + 'idField' => 'id', + ], + 'task' => [ + 'syncKey' => 'sync:manticoreTaskLastId', + 'vectorKey' => 'vector:manticoreTaskLastId', + 'class' => ManticoreTask::class, + 'model' => ProjectTask::class, + 'idField' => 'id', + ], + 'project' => [ + 'syncKey' => 'sync:manticoreProjectLastId', + 'vectorKey' => 'vector:manticoreProjectLastId', + 'class' => ManticoreProject::class, + 'model' => Project::class, + 'idField' => 'id', + ], + 'user' => [ + 'syncKey' => 'sync:manticoreUserLastId', + 'vectorKey' => 'vector:manticoreUserLastId', + 'class' => ManticoreUser::class, + 'model' => User::class, + 'idField' => 'userid', + ], + ]; + + public function handle(): int + { + if (!Apps::isInstalled("manticore")) { + $this->error("应用「Manticore Search」未安装"); + return 1; + } + + if (!Apps::isInstalled("ai")) { + $this->error("应用「AI」未安装,无法生成向量"); + return 1; + } + + // 注册信号处理器 + if (extension_loaded('pcntl')) { + pcntl_async_signals(true); + pcntl_signal(SIGINT, [$this, 'handleSignal']); + pcntl_signal(SIGTERM, [$this, 'handleSignal']); + } + + // 检查锁 + $lockInfo = $this->getLock(); + if ($lockInfo) { + $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); + return 1; + } + + $this->setLock(); + + $type = $this->option('type'); + $batchSize = intval($this->option('batch')); + $maxCount = intval($this->option('max')); + $reset = $this->option('reset'); + + if ($type === 'all') { + $types = array_keys(self::TYPE_CONFIG); + } else { + if (!isset(self::TYPE_CONFIG[$type])) { + $this->error("未知类型: {$type}。可用类型: msg, file, task, project, user, all"); + $this->releaseLock(); + return 1; + } + $types = [$type]; + } + + foreach ($types as $t) { + $this->processType($t, $batchSize, $maxCount, $reset); + } + + $this->info("\n向量生成完成"); + $this->releaseLock(); + return 0; + } + + /** + * 处理单个类型的向量生成 + */ + private function processType(string $type, int $batchSize, int $maxCount, bool $reset): void + { + $config = self::TYPE_CONFIG[$type]; + + $this->info("\n========== 处理 {$type} =========="); + + // 获取进度指针 + $syncLastId = intval(ManticoreKeyValue::get($config['syncKey'], 0)); + $vectorLastId = $reset ? 0 : intval(ManticoreKeyValue::get($config['vectorKey'], 0)); + + if ($reset) { + ManticoreKeyValue::set($config['vectorKey'], 0); + $this->info("已重置 {$type} 向量进度指针"); + } + + // 计算待处理范围 + $pendingCount = $syncLastId - $vectorLastId; + if ($pendingCount <= 0) { + $this->info("{$type}: 无待处理数据 (sync={$syncLastId}, vector={$vectorLastId})"); + return; + } + + $this->info("{$type}: 待处理 {$pendingCount} 条 (ID {$vectorLastId} -> {$syncLastId})"); + + // 限制本轮处理数量 + $toProcess = min($pendingCount, $maxCount); + $this->info("{$type}: 本轮处理 {$toProcess} 条"); + + // 获取待处理的 ID 列表 + $modelClass = $config['model']; + $idField = $config['idField']; + + $processedCount = 0; + $currentLastId = $vectorLastId; + + while ($processedCount < $toProcess) { + $remainingCount = min($toProcess - $processedCount, $batchSize * 5); + + // 获取一批 ID + $ids = $modelClass::where($idField, '>', $currentLastId) + ->where($idField, '<=', $syncLastId) + ->orderBy($idField) + ->limit($remainingCount) + ->pluck($idField) + ->toArray(); + + if (empty($ids)) { + break; + } + + // 批量生成向量 + $manticoreClass = $config['class']; + $successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize); + + $processedCount += count($ids); + $currentLastId = end($ids); + + // 更新向量进度指针 + ManticoreKeyValue::set($config['vectorKey'], $currentLastId); + + $this->info("{$type}: 已处理 {$processedCount}/{$toProcess},成功 {$successCount},当前ID: {$currentLastId}"); + + // 刷新锁 + $this->setLock(); + } + + $this->info("{$type}: 完成本轮向量生成,共处理 {$processedCount} 条"); + } + + private function getLock(): ?array + { + $lockKey = 'manticore:generate-vectors:lock'; + return Cache::has($lockKey) ? Cache::get($lockKey) : null; + } + + private function setLock(): void + { + $lockKey = 'manticore:generate-vectors:lock'; + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + } + + private function releaseLock(): void + { + $lockKey = 'manticore:generate-vectors:lock'; + Cache::forget($lockKey); + } + + public function handleSignal(int $signal): void + { + $this->info("\n收到信号,正在退出..."); + $this->releaseLock(); + exit(0); + } +} diff --git a/app/Module/AI.php b/app/Module/AI.php index d75185631..94314cf71 100644 --- a/app/Module/AI.php +++ b/app/Module/AI.php @@ -814,6 +814,137 @@ class AI return $result; } + /** + * 批量获取文本的 Embedding 向量 + * OpenAI API 原生支持批量输入,一次请求处理多个文本 + * + * @param array $texts 文本数组(最多 100 条) + * @param bool $noCache 是否禁用缓存 + * @return array 返回结果,成功时 data 为向量数组的数组(与输入顺序对应) + */ + public static function getBatchEmbeddings(array $texts, $noCache = false) + { + if (!Apps::isInstalled('ai')) { + return Base::retError('应用「AI Assistant」未安装'); + } + + if (empty($texts)) { + return Base::retSuccess("success", []); + } + + // 限制批量大小(OpenAI 最多支持 2048 条,这里限制 100 条) + $texts = array_slice($texts, 0, 100); + + // 准备结果数组,并检查缓存 + $results = []; + $uncachedTexts = []; + $uncachedIndices = []; + + foreach ($texts as $index => $text) { + if (empty($text)) { + $results[$index] = []; + continue; + } + + // 截断过长的文本 + $text = mb_substr($text, 0, 30000); + $texts[$index] = $text; // 更新截断后的文本 + + $cacheKey = "openAIEmbedding::" . md5($text); + + if ($noCache) { + Cache::forget($cacheKey); + } + + // 检查缓存 + if (!$noCache && Cache::has($cacheKey)) { + $cached = Cache::get($cacheKey); + if (Base::isSuccess($cached)) { + $results[$index] = $cached['data']; + continue; + } + } + + // 未命中缓存,加入待请求列表 + $uncachedTexts[] = $text; + $uncachedIndices[] = $index; + } + + // 如果所有文本都在缓存中 + if (empty($uncachedTexts)) { + // 按原始顺序返回 + ksort($results); + return Base::retSuccess("success", array_values($results)); + } + + // 获取 provider + $provider = self::resolveEmbeddingProvider(); + if (!$provider) { + return Base::retError("请先在「AI 助手」设置中配置支持 Embedding 的 AI 服务"); + } + + // 构建批量请求 + $payload = [ + "model" => $provider['model'], + "input" => $uncachedTexts, + ]; + + $supportsDimensions = in_array($provider['vendor'], ['openai', 'zhipu']); + if ($supportsDimensions) { + $payload['dimensions'] = 1536; + } + + $post = json_encode($payload); + + $ai = new self($post); + $ai->setProvider($provider); + $ai->setUrlPath('/embeddings'); + $ai->setTimeout(120); // 批量请求需要更长超时 + + $res = $ai->request(true); + if (Base::isError($res)) { + return Base::retError("批量 Embedding 请求失败", $res); + } + + $resData = Base::json2array($res['data']); + if (empty($resData['data'])) { + return Base::retError("Embedding 接口返回数据格式错误", $resData); + } + + // 处理返回的向量并写入缓存 + foreach ($resData['data'] as $item) { + $itemIndex = $item['index'] ?? null; + if ($itemIndex === null || !isset($uncachedIndices[$itemIndex])) { + continue; + } + + $originalIndex = $uncachedIndices[$itemIndex]; + $embedding = $item['embedding'] ?? []; + + if (!empty($embedding) && is_array($embedding)) { + $results[$originalIndex] = $embedding; + + // 写入缓存 + $text = $uncachedTexts[$itemIndex]; + $cacheKey = "openAIEmbedding::" . md5($text); + Cache::put($cacheKey, Base::retSuccess("success", $embedding), Carbon::now()->addDays(7)); + } else { + $results[$originalIndex] = []; + } + } + + // 填充未获取到向量的位置 + foreach ($uncachedIndices as $i => $originalIndex) { + if (!isset($results[$originalIndex])) { + $results[$originalIndex] = []; + } + } + + // 按原始顺序返回 + ksort($results); + return Base::retSuccess("success", array_values($results)); + } + /** * 获取 Embedding 模型配置 * diff --git a/app/Module/Manticore/ManticoreBase.php b/app/Module/Manticore/ManticoreBase.php index ff6312bc2..89a8d283f 100644 --- a/app/Module/Manticore/ManticoreBase.php +++ b/app/Module/Manticore/ManticoreBase.php @@ -1844,5 +1844,261 @@ class ManticoreBase return $result ? (int) ($result['max_id'] ?? 0) : 0; } + // ============================== + // 向量更新方法(用于异步向量生成) + // ============================== + + /** + * 更新消息的向量(仅更新向量字段) + * + * @param int $msgId 消息ID + * @param string $vectorStr 向量字符串,格式如 '[0.1,0.2,...]' + * @return bool 是否成功 + */ + public static function updateMsgVector(int $msgId, string $vectorStr): bool + { + if ($msgId <= 0 || empty($vectorStr)) { + return false; + } + + $instance = new self(); + + // 查询现有记录 + $existing = $instance->queryOne( + "SELECT * FROM msg_vectors WHERE msg_id = ?", + [$msgId] + ); + + if (!$existing) { + return false; + } + + // 删除旧记录 + $instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]); + + // Manticore 的向量需要使用 () 格式 + $vectorStr = str_replace(['[', ']'], ['(', ')'], $vectorStr); + + // 构建 allowed_users MVA 值 + $allowedUsersStr = !empty($existing['allowed_users']) + ? '(' . $existing['allowed_users'] . ')' + : '()'; + + // 重新插入(包含向量) + $sql = "INSERT INTO msg_vectors + (id, msg_id, dialog_id, userid, msg_type, content, allowed_users, created_at, content_vector) + VALUES (?, ?, ?, ?, ?, ?, {$allowedUsersStr}, ?, {$vectorStr})"; + + return $instance->execute($sql, [ + $existing['id'], + $existing['msg_id'], + $existing['dialog_id'], + $existing['userid'], + $existing['msg_type'], + $existing['content'], + $existing['created_at'] ?? time(), + ]); + } + + /** + * 更新文件的向量(仅更新向量字段) + * + * @param int $fileId 文件ID + * @param string $vectorStr 向量字符串,格式如 '[0.1,0.2,...]' + * @return bool 是否成功 + */ + public static function updateFileVector(int $fileId, string $vectorStr): bool + { + if ($fileId <= 0 || empty($vectorStr)) { + return false; + } + + $instance = new self(); + + // 查询现有记录 + $existing = $instance->queryOne( + "SELECT * FROM file_vectors WHERE file_id = ?", + [$fileId] + ); + + if (!$existing) { + return false; + } + + // 删除旧记录 + $instance->execute("DELETE FROM file_vectors WHERE file_id = ?", [$fileId]); + + // Manticore 的向量需要使用 () 格式 + $vectorStr = str_replace(['[', ']'], ['(', ')'], $vectorStr); + + // 构建 allowed_users MVA 值 + $allowedUsersStr = !empty($existing['allowed_users']) + ? '(' . $existing['allowed_users'] . ')' + : '()'; + + // 重新插入(包含向量) + $sql = "INSERT INTO file_vectors + (id, file_id, userid, pshare, file_name, file_type, file_ext, content, allowed_users, content_vector) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, {$allowedUsersStr}, {$vectorStr})"; + + return $instance->execute($sql, [ + $existing['id'], + $existing['file_id'], + $existing['userid'], + $existing['pshare'] ?? 0, + $existing['file_name'], + $existing['file_type'], + $existing['file_ext'], + $existing['content'], + ]); + } + + /** + * 更新任务的向量(仅更新向量字段) + * + * @param int $taskId 任务ID + * @param string $vectorStr 向量字符串,格式如 '[0.1,0.2,...]' + * @return bool 是否成功 + */ + public static function updateTaskVector(int $taskId, string $vectorStr): bool + { + if ($taskId <= 0 || empty($vectorStr)) { + return false; + } + + $instance = new self(); + + // 查询现有记录 + $existing = $instance->queryOne( + "SELECT * FROM task_vectors WHERE task_id = ?", + [$taskId] + ); + + if (!$existing) { + return false; + } + + // 删除旧记录 + $instance->execute("DELETE FROM task_vectors WHERE task_id = ?", [$taskId]); + + // Manticore 的向量需要使用 () 格式 + $vectorStr = str_replace(['[', ']'], ['(', ')'], $vectorStr); + + // 构建 allowed_users MVA 值 + $allowedUsersStr = !empty($existing['allowed_users']) + ? '(' . $existing['allowed_users'] . ')' + : '()'; + + // 重新插入(包含向量) + $sql = "INSERT INTO task_vectors + (id, task_id, project_id, visibility, task_name, task_desc, task_content, allowed_users, content_vector) + VALUES (?, ?, ?, ?, ?, ?, ?, {$allowedUsersStr}, {$vectorStr})"; + + return $instance->execute($sql, [ + $existing['id'], + $existing['task_id'], + $existing['project_id'], + $existing['visibility'] ?? 1, + $existing['task_name'], + $existing['task_desc'], + $existing['task_content'], + ]); + } + + /** + * 更新项目的向量(仅更新向量字段) + * + * @param int $projectId 项目ID + * @param string $vectorStr 向量字符串,格式如 '[0.1,0.2,...]' + * @return bool 是否成功 + */ + public static function updateProjectVector(int $projectId, string $vectorStr): bool + { + if ($projectId <= 0 || empty($vectorStr)) { + return false; + } + + $instance = new self(); + + // 查询现有记录 + $existing = $instance->queryOne( + "SELECT * FROM project_vectors WHERE project_id = ?", + [$projectId] + ); + + if (!$existing) { + return false; + } + + // 删除旧记录 + $instance->execute("DELETE FROM project_vectors WHERE project_id = ?", [$projectId]); + + // Manticore 的向量需要使用 () 格式 + $vectorStr = str_replace(['[', ']'], ['(', ')'], $vectorStr); + + // 构建 allowed_users MVA 值 + $allowedUsersStr = !empty($existing['allowed_users']) + ? '(' . $existing['allowed_users'] . ')' + : '()'; + + // 重新插入(包含向量) + $sql = "INSERT INTO project_vectors + (id, project_id, project_name, project_desc, allowed_users, content_vector) + VALUES (?, ?, ?, ?, {$allowedUsersStr}, {$vectorStr})"; + + return $instance->execute($sql, [ + $existing['id'], + $existing['project_id'], + $existing['project_name'], + $existing['project_desc'], + ]); + } + + /** + * 更新用户的向量(仅更新向量字段) + * + * @param int $userid 用户ID + * @param string $vectorStr 向量字符串,格式如 '[0.1,0.2,...]' + * @return bool 是否成功 + */ + public static function updateUserVector(int $userid, string $vectorStr): bool + { + if ($userid <= 0 || empty($vectorStr)) { + return false; + } + + $instance = new self(); + + // 查询现有记录 + $existing = $instance->queryOne( + "SELECT * FROM user_vectors WHERE userid = ?", + [$userid] + ); + + if (!$existing) { + return false; + } + + // 删除旧记录 + $instance->execute("DELETE FROM user_vectors WHERE userid = ?", [$userid]); + + // Manticore 的向量需要使用 () 格式 + $vectorStr = str_replace(['[', ']'], ['(', ')'], $vectorStr); + + // 重新插入(包含向量) + $sql = "INSERT INTO user_vectors + (id, userid, nickname, email, profession, introduction, content_vector) + VALUES (?, ?, ?, ?, ?, ?, {$vectorStr})"; + + return $instance->execute($sql, [ + $existing['id'], + $existing['userid'], + $existing['nickname'], + $existing['email'], + $existing['profession'], + $existing['introduction'], + ]); + } + } diff --git a/app/Module/Manticore/ManticoreFile.php b/app/Module/Manticore/ManticoreFile.php index 0cbc3a9ca..eb0ab8254 100644 --- a/app/Module/Manticore/ManticoreFile.php +++ b/app/Module/Manticore/ManticoreFile.php @@ -242,9 +242,10 @@ class ManticoreFile * 同步单个文件到 Manticore(含 allowed_users) * * @param File $file 文件模型 + * @param bool $withVector 是否同时生成向量(默认 false,向量由后台任务生成) * @return bool 是否成功 */ - public static function sync(File $file): bool + public static function sync(File $file, bool $withVector = false): bool { if (!Apps::isInstalled("manticore")) { return false; @@ -269,9 +270,9 @@ class ManticoreFile // 限制提取后的内容长度 $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); - // 获取 embedding(如果有内容且 AI 可用) + // 只有明确要求时才生成向量(默认不生成,由后台任务处理) $embedding = null; - if (!empty($content) && Apps::isInstalled('ai')) { + if ($withVector && !empty($content) && Apps::isInstalled('ai')) { $embeddingResult = self::getEmbedding($content); if (!empty($embeddingResult)) { $embedding = '[' . implode(',', $embeddingResult) . ']'; @@ -339,9 +340,10 @@ class ManticoreFile * 批量同步文件 * * @param iterable $files 文件列表 + * @param bool $withVector 是否同时生成向量 * @return int 成功同步的数量 */ - public static function batchSync(iterable $files): int + public static function batchSync(iterable $files, bool $withVector = false): int { if (!Apps::isInstalled("manticore")) { return 0; @@ -349,7 +351,7 @@ class ManticoreFile $count = 0; foreach ($files as $file) { - if (self::sync($file)) { + if (self::sync($file, $withVector)) { $count++; } } @@ -477,4 +479,90 @@ class ManticoreFile return false; } } + + // ============================== + // 批量向量生成方法 + // ============================== + + /** + * 批量生成文件向量 + * 用于后台异步处理,将已索引文件的向量批量生成 + * + * @param array $fileIds 文件ID数组 + * @param int $batchSize 每批 embedding 数量(默认20) + * @return int 成功处理的数量 + */ + public static function generateVectorsBatch(array $fileIds, int $batchSize = 20): int + { + if (!Apps::isInstalled("manticore") || !Apps::isInstalled("ai") || empty($fileIds)) { + return 0; + } + + try { + // 1. 查询文件信息 + $files = File::whereIn('id', $fileIds) + ->where('type', '!=', 'folder') + ->get(); + + if ($files->isEmpty()) { + return 0; + } + + // 2. 提取每个文件的内容 + $fileContents = []; + foreach ($files as $file) { + // 检查文件大小限制 + $maxSize = self::getMaxFileSizeByExt($file->ext); + if ($file->size > $maxSize) { + continue; + } + + $content = self::extractFileContent($file); + if (!empty($content)) { + // 限制内容长度 + $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); + $fileContents[$file->id] = $content; + } + } + + if (empty($fileContents)) { + return 0; + } + + // 3. 分批处理 + $successCount = 0; + $chunks = array_chunk($fileContents, $batchSize, true); + + foreach ($chunks as $chunk) { + $texts = array_values($chunk); + $ids = array_keys($chunk); + + // 4. 批量获取 embedding + $result = AI::getBatchEmbeddings($texts); + if (!Base::isSuccess($result) || empty($result['data'])) { + Log::warning('ManticoreFile: Batch embedding failed', ['file_ids' => $ids]); + continue; + } + + $embeddings = $result['data']; + + // 5. 逐个更新向量到 Manticore + foreach ($ids as $index => $fileId) { + if (!isset($embeddings[$index]) || empty($embeddings[$index])) { + continue; + } + + $vectorStr = '[' . implode(',', $embeddings[$index]) . ']'; + if (ManticoreBase::updateFileVector($fileId, $vectorStr)) { + $successCount++; + } + } + } + + return $successCount; + } catch (\Exception $e) { + Log::error('ManticoreFile generateVectorsBatch error: ' . $e->getMessage()); + return 0; + } + } } diff --git a/app/Module/Manticore/ManticoreMsg.php b/app/Module/Manticore/ManticoreMsg.php index 7f1c375bb..baf41f879 100644 --- a/app/Module/Manticore/ManticoreMsg.php +++ b/app/Module/Manticore/ManticoreMsg.php @@ -355,9 +355,10 @@ class ManticoreMsg * 同步单个消息到 Manticore(含 allowed_users) * * @param WebSocketDialogMsg $msg 消息模型 + * @param bool $withVector 是否同时生成向量(默认 false,向量由后台任务生成) * @return bool 是否成功 */ - public static function sync(WebSocketDialogMsg $msg): bool + public static function sync(WebSocketDialogMsg $msg, bool $withVector = false): bool { if (!Apps::isInstalled("manticore")) { return false; @@ -376,9 +377,9 @@ class ManticoreMsg // 限制内容长度 $content = mb_substr($content, 0, self::MAX_CONTENT_LENGTH); - // 获取 embedding(如果有内容且 AI 可用) + // 只有明确要求时才生成向量(默认不生成,由后台任务处理) $embedding = null; - if (!empty($content) && Apps::isInstalled('ai')) { + if ($withVector && !empty($content) && Apps::isInstalled('ai')) { $embeddingResult = self::getEmbedding($content); if (!empty($embeddingResult)) { $embedding = '[' . implode(',', $embeddingResult) . ']'; @@ -414,9 +415,10 @@ class ManticoreMsg * 批量同步消息 * * @param iterable $msgs 消息列表 + * @param bool $withVector 是否同时生成向量 * @return int 成功同步的数量 */ - public static function batchSync(iterable $msgs): int + public static function batchSync(iterable $msgs, bool $withVector = false): int { if (!Apps::isInstalled("manticore")) { return 0; @@ -424,13 +426,91 @@ class ManticoreMsg $count = 0; foreach ($msgs as $msg) { - if (self::sync($msg)) { + if (self::sync($msg, $withVector)) { $count++; } } return $count; } + /** + * 批量生成向量(供后台任务调用) + * + * @param array $msgIds 消息ID数组 + * @param int $batchSize 每批 embedding 数量 + * @return int 成功生成向量的数量 + */ + public static function generateVectorsBatch(array $msgIds, int $batchSize = 20): int + { + if (!Apps::isInstalled("manticore") || !Apps::isInstalled('ai') || empty($msgIds)) { + return 0; + } + + $count = 0; + + // 分批处理 + foreach (array_chunk($msgIds, $batchSize) as $batchIds) { + // 获取消息 + $msgs = WebSocketDialogMsg::whereIn('id', $batchIds) + ->whereIn('type', self::INDEXABLE_TYPES) + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->get() + ->keyBy('id'); + + if ($msgs->isEmpty()) { + continue; + } + + // 准备文本 + $texts = []; + $idsArray = []; + foreach ($batchIds as $id) { + if (isset($msgs[$id])) { + $content = mb_substr($msgs[$id]->key ?? '', 0, self::MAX_CONTENT_LENGTH); + if (!empty($content)) { + $texts[] = $content; + $idsArray[] = $id; + } + } + } + + if (empty($texts)) { + continue; + } + + // 批量获取 embeddings + $result = AI::getBatchEmbeddings($texts); + + if (Base::isError($result)) { + Log::warning('ManticoreMsg batch embedding failed: ' . ($result['msg'] ?? 'Unknown error')); + continue; + } + + $embeddings = $result['data'] ?? []; + + // 更新向量 + foreach ($embeddings as $index => $embedding) { + if (empty($embedding) || !is_array($embedding)) { + continue; + } + + $msgId = $idsArray[$index] ?? null; + if (!$msgId) { + continue; + } + + $vectorStr = '[' . implode(',', $embedding) . ']'; + if (ManticoreBase::updateMsgVector($msgId, $vectorStr)) { + $count++; + } + } + } + + return $count; + } + /** * 删除消息索引 * diff --git a/app/Module/Manticore/ManticoreProject.php b/app/Module/Manticore/ManticoreProject.php index 8d567d9cb..98f488f40 100644 --- a/app/Module/Manticore/ManticoreProject.php +++ b/app/Module/Manticore/ManticoreProject.php @@ -148,9 +148,10 @@ class ManticoreProject * 同步单个项目到 Manticore(含 allowed_users) * * @param Project $project 项目模型 + * @param bool $withVector 是否同时生成向量(默认 false,向量由后台任务生成) * @return bool 是否成功 */ - public static function sync(Project $project): bool + public static function sync(Project $project, bool $withVector = false): bool { if (!Apps::isInstalled("manticore")) { return false; @@ -165,9 +166,9 @@ class ManticoreProject // 构建用于搜索的文本内容 $searchableContent = self::buildSearchableContent($project); - // 获取 embedding(如果 AI 可用) + // 只有明确要求时才生成向量(默认不生成,由后台任务处理) $embedding = null; - if (!empty($searchableContent) && Apps::isInstalled('ai')) { + if ($withVector && !empty($searchableContent) && Apps::isInstalled('ai')) { $embeddingResult = self::getEmbedding($searchableContent); if (!empty($embeddingResult)) { $embedding = '[' . implode(',', $embeddingResult) . ']'; @@ -222,9 +223,10 @@ class ManticoreProject * 批量同步项目 * * @param iterable $projects 项目列表 + * @param bool $withVector 是否同时生成向量 * @return int 成功同步的数量 */ - public static function batchSync(iterable $projects): int + public static function batchSync(iterable $projects, bool $withVector = false): int { if (!Apps::isInstalled("manticore")) { return 0; @@ -232,7 +234,7 @@ class ManticoreProject $count = 0; foreach ($projects as $project) { - if (self::sync($project)) { + if (self::sync($project, $withVector)) { $count++; } } @@ -307,4 +309,82 @@ class ManticoreProject return false; } } + + // ============================== + // 批量向量生成方法 + // ============================== + + /** + * 批量生成项目向量 + * 用于后台异步处理,将已索引项目的向量批量生成 + * + * @param array $projectIds 项目ID数组 + * @param int $batchSize 每批 embedding 数量(默认20) + * @return int 成功处理的数量 + */ + public static function generateVectorsBatch(array $projectIds, int $batchSize = 20): int + { + if (!Apps::isInstalled("manticore") || !Apps::isInstalled("ai") || empty($projectIds)) { + return 0; + } + + try { + // 1. 查询项目信息 + $projects = Project::whereIn('id', $projectIds) + ->whereNull('archived_at') + ->get(); + + if ($projects->isEmpty()) { + return 0; + } + + // 2. 提取每个项目的内容 + $projectContents = []; + foreach ($projects as $project) { + $searchableContent = self::buildSearchableContent($project); + if (!empty($searchableContent)) { + $projectContents[$project->id] = $searchableContent; + } + } + + if (empty($projectContents)) { + return 0; + } + + // 3. 分批处理 + $successCount = 0; + $chunks = array_chunk($projectContents, $batchSize, true); + + foreach ($chunks as $chunk) { + $texts = array_values($chunk); + $ids = array_keys($chunk); + + // 4. 批量获取 embedding + $result = AI::getBatchEmbeddings($texts); + if (!Base::isSuccess($result) || empty($result['data'])) { + Log::warning('ManticoreProject: Batch embedding failed', ['project_ids' => $ids]); + continue; + } + + $embeddings = $result['data']; + + // 5. 逐个更新向量到 Manticore + foreach ($ids as $index => $projectId) { + if (!isset($embeddings[$index]) || empty($embeddings[$index])) { + continue; + } + + $vectorStr = '[' . implode(',', $embeddings[$index]) . ']'; + if (ManticoreBase::updateProjectVector($projectId, $vectorStr)) { + $successCount++; + } + } + } + + return $successCount; + } catch (\Exception $e) { + Log::error('ManticoreProject generateVectorsBatch error: ' . $e->getMessage()); + return 0; + } + } } diff --git a/app/Module/Manticore/ManticoreTask.php b/app/Module/Manticore/ManticoreTask.php index 33de4d152..e5a5b4532 100644 --- a/app/Module/Manticore/ManticoreTask.php +++ b/app/Module/Manticore/ManticoreTask.php @@ -207,9 +207,10 @@ class ManticoreTask * 同步单个任务到 Manticore(含 allowed_users) * * @param ProjectTask $task 任务模型 + * @param bool $withVector 是否同时生成向量(默认 false,向量由后台任务生成) * @return bool 是否成功 */ - public static function sync(ProjectTask $task): bool + public static function sync(ProjectTask $task, bool $withVector = false): bool { if (!Apps::isInstalled("manticore")) { return false; @@ -227,9 +228,9 @@ class ManticoreTask // 构建用于搜索的文本内容 $searchableContent = self::buildSearchableContent($task, $taskContent); - // 获取 embedding(如果 AI 可用) + // 只有明确要求时才生成向量(默认不生成,由后台任务处理) $embedding = null; - if (!empty($searchableContent) && Apps::isInstalled('ai')) { + if ($withVector && !empty($searchableContent) && Apps::isInstalled('ai')) { $embeddingResult = self::getEmbedding($searchableContent); if (!empty($embeddingResult)) { $embedding = '[' . implode(',', $embeddingResult) . ']'; @@ -353,9 +354,10 @@ class ManticoreTask * 批量同步任务 * * @param iterable $tasks 任务列表 + * @param bool $withVector 是否同时生成向量 * @return int 成功同步的数量 */ - public static function batchSync(iterable $tasks): int + public static function batchSync(iterable $tasks, bool $withVector = false): int { if (!Apps::isInstalled("manticore")) { return 0; @@ -363,7 +365,7 @@ class ManticoreTask $count = 0; foreach ($tasks as $task) { - if (self::sync($task)) { + if (self::sync($task, $withVector)) { $count++; } } @@ -520,4 +522,86 @@ class ManticoreTask Log::error('Manticore cascadeToChildren error: ' . $e->getMessage(), ['task_id' => $taskId]); } } + + // ============================== + // 批量向量生成方法 + // ============================== + + /** + * 批量生成任务向量 + * 用于后台异步处理,将已索引任务的向量批量生成 + * + * @param array $taskIds 任务ID数组 + * @param int $batchSize 每批 embedding 数量(默认20) + * @return int 成功处理的数量 + */ + public static function generateVectorsBatch(array $taskIds, int $batchSize = 20): int + { + if (!Apps::isInstalled("manticore") || !Apps::isInstalled("ai") || empty($taskIds)) { + return 0; + } + + try { + // 1. 查询任务信息 + $tasks = ProjectTask::whereIn('id', $taskIds) + ->whereNull('deleted_at') + ->whereNull('archived_at') + ->get(); + + if ($tasks->isEmpty()) { + return 0; + } + + // 2. 提取每个任务的内容 + $taskContents = []; + foreach ($tasks as $task) { + $taskContent = self::getTaskContent($task); + $searchableContent = self::buildSearchableContent($task, $taskContent); + if (!empty($searchableContent)) { + // 限制内容长度 + $searchableContent = mb_substr($searchableContent, 0, self::MAX_CONTENT_LENGTH); + $taskContents[$task->id] = $searchableContent; + } + } + + if (empty($taskContents)) { + return 0; + } + + // 3. 分批处理 + $successCount = 0; + $chunks = array_chunk($taskContents, $batchSize, true); + + foreach ($chunks as $chunk) { + $texts = array_values($chunk); + $ids = array_keys($chunk); + + // 4. 批量获取 embedding + $result = AI::getBatchEmbeddings($texts); + if (!Base::isSuccess($result) || empty($result['data'])) { + Log::warning('ManticoreTask: Batch embedding failed', ['task_ids' => $ids]); + continue; + } + + $embeddings = $result['data']; + + // 5. 逐个更新向量到 Manticore + foreach ($ids as $index => $taskId) { + if (!isset($embeddings[$index]) || empty($embeddings[$index])) { + continue; + } + + $vectorStr = '[' . implode(',', $embeddings[$index]) . ']'; + if (ManticoreBase::updateTaskVector($taskId, $vectorStr)) { + $successCount++; + } + } + } + + return $successCount; + } catch (\Exception $e) { + Log::error('ManticoreTask generateVectorsBatch error: ' . $e->getMessage()); + return 0; + } + } } diff --git a/app/Module/Manticore/ManticoreUser.php b/app/Module/Manticore/ManticoreUser.php index 1ee0b95df..1e3df5575 100644 --- a/app/Module/Manticore/ManticoreUser.php +++ b/app/Module/Manticore/ManticoreUser.php @@ -130,9 +130,10 @@ class ManticoreUser * 同步单个用户到 Manticore * * @param User $user 用户模型 + * @param bool $withVector 是否同时生成向量(默认 false,向量由后台任务生成) * @return bool 是否成功 */ - public static function sync(User $user): bool + public static function sync(User $user, bool $withVector = false): bool { if (!Apps::isInstalled("manticore")) { return false; @@ -152,9 +153,9 @@ class ManticoreUser // 构建用于搜索的文本内容 $searchableContent = self::buildSearchableContent($user); - // 获取 embedding(如果 AI 可用) + // 只有明确要求时才生成向量(默认不生成,由后台任务处理) $embedding = null; - if (!empty($searchableContent) && Apps::isInstalled('ai')) { + if ($withVector && !empty($searchableContent) && Apps::isInstalled('ai')) { $embeddingResult = self::getEmbedding($searchableContent); if (!empty($embeddingResult)) { $embedding = '[' . implode(',', $embeddingResult) . ']'; @@ -212,9 +213,10 @@ class ManticoreUser * 批量同步用户 * * @param iterable $users 用户列表 + * @param bool $withVector 是否同时生成向量 * @return int 成功同步的数量 */ - public static function batchSync(iterable $users): int + public static function batchSync(iterable $users, bool $withVector = false): int { if (!Apps::isInstalled("manticore")) { return 0; @@ -222,7 +224,7 @@ class ManticoreUser $count = 0; foreach ($users as $user) { - if (self::sync($user)) { + if (self::sync($user, $withVector)) { $count++; } } @@ -271,5 +273,84 @@ class ManticoreUser return ManticoreBase::getIndexedUserCount(); } + + // ============================== + // 批量向量生成方法 + // ============================== + + /** + * 批量生成用户向量 + * 用于后台异步处理,将已索引用户的向量批量生成 + * + * @param array $userIds 用户ID数组 + * @param int $batchSize 每批 embedding 数量(默认20) + * @return int 成功处理的数量 + */ + public static function generateVectorsBatch(array $userIds, int $batchSize = 20): int + { + if (!Apps::isInstalled("manticore") || !Apps::isInstalled("ai") || empty($userIds)) { + return 0; + } + + try { + // 1. 查询用户信息 + $users = User::whereIn('userid', $userIds) + ->where('bot', 0) + ->whereNull('disable_at') + ->get(); + + if ($users->isEmpty()) { + return 0; + } + + // 2. 提取每个用户的内容 + $userContents = []; + foreach ($users as $user) { + $searchableContent = self::buildSearchableContent($user); + if (!empty($searchableContent)) { + $userContents[$user->userid] = $searchableContent; + } + } + + if (empty($userContents)) { + return 0; + } + + // 3. 分批处理 + $successCount = 0; + $chunks = array_chunk($userContents, $batchSize, true); + + foreach ($chunks as $chunk) { + $texts = array_values($chunk); + $ids = array_keys($chunk); + + // 4. 批量获取 embedding + $result = AI::getBatchEmbeddings($texts); + if (!Base::isSuccess($result) || empty($result['data'])) { + Log::warning('ManticoreUser: Batch embedding failed', ['user_ids' => $ids]); + continue; + } + + $embeddings = $result['data']; + + // 5. 逐个更新向量到 Manticore + foreach ($ids as $index => $userid) { + if (!isset($embeddings[$index]) || empty($embeddings[$index])) { + continue; + } + + $vectorStr = '[' . implode(',', $embeddings[$index]) . ']'; + if (ManticoreBase::updateUserVector($userid, $vectorStr)) { + $successCount++; + } + } + } + + return $successCount; + } catch (\Exception $e) { + Log::error('ManticoreUser generateVectorsBatch error: ' . $e->getMessage()); + return 0; + } + } } diff --git a/app/Tasks/ManticoreSyncTask.php b/app/Tasks/ManticoreSyncTask.php index 21dc5d2f2..56c56a47e 100644 --- a/app/Tasks/ManticoreSyncTask.php +++ b/app/Tasks/ManticoreSyncTask.php @@ -194,14 +194,25 @@ class ManticoreSyncTask extends AbstractTask */ private function incrementalUpdate() { - // 60分钟执行一次 - $time = intval(Cache::get("ManticoreSyncTask:Time")); - if (time() - $time < 60 * 60) { + // 执行增量全文索引同步(10分钟执行一次) + $this->runIncrementalSync(); + + // 执行向量生成(10分钟执行一次,与全文索引独立) + $this->runVectorGeneration(); + } + + /** + * 执行增量全文索引同步 + */ + private function runIncrementalSync(): void + { + $time = intval(Cache::get("ManticoreSyncTask:SyncTime")); + if (time() - $time < 10 * 60) { return; } // 执行开始 - Cache::put("ManticoreSyncTask:Time", time(), Carbon::now()->addMinutes(60)); + Cache::put("ManticoreSyncTask:SyncTime", time(), Carbon::now()->addMinutes(15)); // 执行增量同步(MVA 方案不需要单独同步关系表) @shell_exec("php /var/www/artisan manticore:sync-files --i 2>&1 &"); @@ -209,9 +220,28 @@ class ManticoreSyncTask extends AbstractTask @shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-tasks --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-msgs --i 2>&1 &"); + } - // 执行完成 - Cache::put("ManticoreSyncTask:Time", time(), Carbon::now()->addMinutes(5)); + /** + * 执行向量生成(异步批量处理) + */ + private function runVectorGeneration(): void + { + // 检查 AI 是否安装 + if (!Apps::isInstalled("ai")) { + return; + } + + $time = intval(Cache::get("ManticoreSyncTask:VectorTime")); + if (time() - $time < 10 * 60) { + return; + } + + // 执行开始 + Cache::put("ManticoreSyncTask:VectorTime", time(), Carbon::now()->addMinutes(15)); + + // 执行向量生成(批量处理,每轮最多500条) + @shell_exec("php /var/www/artisan manticore:generate-vectors --type=all --batch=20 --max=500 2>&1 &"); } public function end()