[ 'syncKey' => 'sync:manticoreMsgLastId', 'vectorKey' => 'vector:manticoreMsgLastId', 'class' => ManticoreMsg::class, 'model' => WebSocketDialogMsg::class, 'idField' => 'id', ], 'file' => [ 'syncKey' => 'sync:manticoreFileLastId', 'vectorKey' => 'vector:manticoreFileLastId', 'class' => ManticoreFile::class, 'model' => File::class, 'idField' => 'id', ], 'task' => [ 'syncKey' => 'sync:manticoreTaskLastId', 'vectorKey' => 'vector:manticoreTaskLastId', 'class' => ManticoreTask::class, 'model' => ProjectTask::class, 'idField' => 'id', ], 'project' => [ 'syncKey' => 'sync:manticoreProjectLastId', 'vectorKey' => 'vector:manticoreProjectLastId', 'class' => ManticoreProject::class, 'model' => Project::class, 'idField' => 'id', ], 'user' => [ 'syncKey' => 'sync:manticoreUserLastId', 'vectorKey' => 'vector:manticoreUserLastId', 'class' => ManticoreUser::class, 'model' => User::class, 'idField' => 'userid', ], ]; public function handle(): int { if (!Apps::isInstalled("manticore")) { $this->error("应用「Manticore Search」未安装"); return 1; } if (!Apps::isInstalled("ai")) { $this->error("应用「AI」未安装,无法生成向量"); return 1; } $this->registerSignalHandlers(); if (!$this->acquireLock()) { return 1; } $type = $this->option('type'); $batchSize = intval($this->option('batch')); $sleepSeconds = intval($this->option('sleep')); $reset = $this->option('reset'); if ($type === 'all') { $types = array_keys(self::TYPE_CONFIG); } else { if (!isset(self::TYPE_CONFIG[$type])) { $this->error("未知类型: {$type}。可用类型: msg, file, task, project, user, all"); $this->releaseLock(); return 1; } $types = [$type]; } // 持续处理直到所有类型都没有待处理数据 $round = 0; do { $round++; $totalPending = 0; foreach ($types as $t) { if ($this->shouldStop) { break; } $pending = $this->processType($t, $batchSize, $reset && $round === 1); $totalPending += $pending; } // 如果还有待处理数据,休眠后继续 if ($totalPending > 0 && !$this->shouldStop) { $this->info("\n--- 第 {$round} 轮完成,剩余 {$totalPending} 条待处理,{$sleepSeconds} 秒后继续 ---\n"); sleep($sleepSeconds); $this->setLock(); // 刷新锁 } } while ($totalPending > 0 && !$this->shouldStop); $this->info("\n向量生成完成(共 {$round} 轮)"); $this->releaseLock(); return 0; } /** * 处理单个类型的向量生成(每次处理一批) * * @param string $type 类型 * @param int $batchSize 每批数量 * @param bool $reset 是否重置进度 * @return int 剩余待处理数量 */ private function processType(string $type, int $batchSize, bool $reset): int { $config = self::TYPE_CONFIG[$type]; // 获取进度指针 $syncLastId = intval(ManticoreKeyValue::get($config['syncKey'], 0)); $vectorLastId = $reset ? 0 : intval(ManticoreKeyValue::get($config['vectorKey'], 0)); if ($reset) { ManticoreKeyValue::set($config['vectorKey'], 0); $this->info("[{$type}] 已重置向量进度指针"); } // 计算待处理范围 $pendingCount = $syncLastId - $vectorLastId; if ($pendingCount <= 0) { return 0; } // 获取待处理的 ID 列表(每次处理 batchSize * 5 条,让 generateVectorsBatch 内部再分批调用 API) $modelClass = $config['model']; $idField = $config['idField']; $fetchCount = $batchSize * 5; $ids = $modelClass::where($idField, '>', $vectorLastId) ->where($idField, '<=', $syncLastId) ->orderBy($idField) ->limit($fetchCount) ->pluck($idField) ->toArray(); if (empty($ids)) { return 0; } // 批量生成向量 $manticoreClass = $config['class']; $successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize); $currentLastId = end($ids); // 更新向量进度指针 ManticoreKeyValue::set($config['vectorKey'], $currentLastId); $remaining = $pendingCount - count($ids); $this->info("[{$type}] 处理 " . count($ids) . " 条,成功 {$successCount},ID: {$vectorLastId} -> {$currentLastId},剩余 {$remaining}"); // 刷新锁 $this->setLock(); return max(0, $remaining); } }