From 610979f30b09a9842b2befa47286f9700bf37e60 Mon Sep 17 00:00:00 2001 From: kuaifan Date: Sat, 3 Jan 2026 22:41:49 +0000 Subject: [PATCH] feat: Enhance Manticore sync commands with incremental processing and sleep options - Updated sync commands (SyncFileToManticore, SyncMsgToManticore, SyncProjectToManticore, SyncTaskToManticore, SyncUserToManticore) to support continuous incremental updates until completion. - Added --sleep option to allow a pause between batches in incremental mode. - Improved signal handling to allow graceful shutdown during processing. - Adjusted lock duration to 30 minutes for long-running processes. - Enhanced logging for better visibility of sync progress and completion. - Updated ManticoreSyncTask to ensure commands run continuously and check for new data every 2 minutes. --- .../Commands/GenerateManticoreVectors.php | 128 +++++++------- app/Console/Commands/SyncFileToManticore.php | 161 +++++++++--------- app/Console/Commands/SyncMsgToManticore.php | 133 ++++++++++----- .../Commands/SyncProjectToManticore.php | 125 +++++++++----- app/Console/Commands/SyncTaskToManticore.php | 126 +++++++++----- app/Console/Commands/SyncUserToManticore.php | 127 +++++++++----- app/Tasks/ManticoreSyncTask.php | 50 +++--- 7 files changed, 506 insertions(+), 344 deletions(-) diff --git a/app/Console/Commands/GenerateManticoreVectors.php b/app/Console/Commands/GenerateManticoreVectors.php index 63ea50f84..cc5062194 100644 --- a/app/Console/Commands/GenerateManticoreVectors.php +++ b/app/Console/Commands/GenerateManticoreVectors.php @@ -22,13 +22,18 @@ use Illuminate\Console\Command; * * 用于后台批量生成已索引数据的向量,与全文索引解耦 * 使用双指针追踪:sync:xxxLastId(全文已同步)和 vector:xxxLastId(向量已生成) + * + * 运行模式: + * - 持续处理直到所有待处理数据完成 + * - 每批处理完成后休眠几秒,避免 API 过载 + * - 定时器只作为兜底触发机制 */ class GenerateManticoreVectors extends Command { protected $signature = 'manticore:generate-vectors {--type=all : 类型 (msg/file/task/project/user/all)} - {--batch=20 : 每批 embedding 数量} - {--max=500 : 每轮最大处理数量} + {--batch=50 : 每批 embedding 数量} + {--sleep=3 : 每批处理后休眠秒数} {--reset : 重置向量进度指针}'; protected $description = '批量生成 Manticore 已索引数据的向量(异步处理)'; @@ -74,6 +79,8 @@ class GenerateManticoreVectors extends Command ], ]; + private bool $shouldStop = false; + public function handle(): int { if (!Apps::isInstalled("manticore")) { @@ -104,7 +111,7 @@ class GenerateManticoreVectors extends Command $type = $this->option('type'); $batchSize = intval($this->option('batch')); - $maxCount = intval($this->option('max')); + $sleepSeconds = intval($this->option('sleep')); $reset = $this->option('reset'); if ($type === 'all') { @@ -118,85 +125,92 @@ class GenerateManticoreVectors extends Command $types = [$type]; } - foreach ($types as $t) { - $this->processType($t, $batchSize, $maxCount, $reset); - } + // 持续处理直到所有类型都没有待处理数据 + $round = 0; + do { + $round++; + $totalPending = 0; - $this->info("\n向量生成完成"); + foreach ($types as $t) { + if ($this->shouldStop) { + break; + } + $pending = $this->processType($t, $batchSize, $reset && $round === 1); + $totalPending += $pending; + } + + // 如果还有待处理数据,休眠后继续 + if ($totalPending > 0 && !$this->shouldStop) { + $this->info("\n--- 第 {$round} 轮完成,剩余 {$totalPending} 条待处理,{$sleepSeconds} 秒后继续 ---\n"); + sleep($sleepSeconds); + $this->setLock(); // 刷新锁 + } + } while ($totalPending > 0 && !$this->shouldStop); + + $this->info("\n向量生成完成(共 {$round} 轮)"); $this->releaseLock(); return 0; } /** - * 处理单个类型的向量生成 + * 处理单个类型的向量生成(每次处理一批) + * + * @param string $type 类型 + * @param int $batchSize 每批数量 + * @param bool $reset 是否重置进度 + * @return int 剩余待处理数量 */ - private function processType(string $type, int $batchSize, int $maxCount, bool $reset): void + private function processType(string $type, int $batchSize, bool $reset): int { $config = self::TYPE_CONFIG[$type]; - $this->info("\n========== 处理 {$type} =========="); - // 获取进度指针 $syncLastId = intval(ManticoreKeyValue::get($config['syncKey'], 0)); $vectorLastId = $reset ? 0 : intval(ManticoreKeyValue::get($config['vectorKey'], 0)); if ($reset) { ManticoreKeyValue::set($config['vectorKey'], 0); - $this->info("已重置 {$type} 向量进度指针"); + $this->info("[{$type}] 已重置向量进度指针"); } // 计算待处理范围 $pendingCount = $syncLastId - $vectorLastId; if ($pendingCount <= 0) { - $this->info("{$type}: 无待处理数据 (sync={$syncLastId}, vector={$vectorLastId})"); - return; + return 0; } - $this->info("{$type}: 待处理 {$pendingCount} 条 (ID {$vectorLastId} -> {$syncLastId})"); - - // 限制本轮处理数量 - $toProcess = min($pendingCount, $maxCount); - $this->info("{$type}: 本轮处理 {$toProcess} 条"); - - // 获取待处理的 ID 列表 + // 获取待处理的 ID 列表(每次处理 batchSize * 5 条,让 generateVectorsBatch 内部再分批调用 API) $modelClass = $config['model']; $idField = $config['idField']; + $fetchCount = $batchSize * 5; - $processedCount = 0; - $currentLastId = $vectorLastId; + $ids = $modelClass::where($idField, '>', $vectorLastId) + ->where($idField, '<=', $syncLastId) + ->orderBy($idField) + ->limit($fetchCount) + ->pluck($idField) + ->toArray(); - while ($processedCount < $toProcess) { - $remainingCount = min($toProcess - $processedCount, $batchSize * 5); - - // 获取一批 ID - $ids = $modelClass::where($idField, '>', $currentLastId) - ->where($idField, '<=', $syncLastId) - ->orderBy($idField) - ->limit($remainingCount) - ->pluck($idField) - ->toArray(); - - if (empty($ids)) { - break; - } - - // 批量生成向量 - $manticoreClass = $config['class']; - $successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize); - - $processedCount += count($ids); - $currentLastId = end($ids); - - // 更新向量进度指针 - ManticoreKeyValue::set($config['vectorKey'], $currentLastId); - - $this->info("{$type}: 已处理 {$processedCount}/{$toProcess},成功 {$successCount},当前ID: {$currentLastId}"); - - // 刷新锁 - $this->setLock(); + if (empty($ids)) { + return 0; } - $this->info("{$type}: 完成本轮向量生成,共处理 {$processedCount} 条"); + // 批量生成向量 + $manticoreClass = $config['class']; + $successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize); + + $currentLastId = end($ids); + + // 更新向量进度指针 + ManticoreKeyValue::set($config['vectorKey'], $currentLastId); + + $remaining = $pendingCount - count($ids); + $this->info("[{$type}] 处理 " . count($ids) . " 条,成功 {$successCount},ID: {$vectorLastId} -> {$currentLastId},剩余 {$remaining}"); + + // 刷新锁 + $this->setLock(); + + return max(0, $remaining); } private function getLock(): ?array @@ -208,7 +222,8 @@ class GenerateManticoreVectors extends Command private function setLock(): void { $lockKey = 'manticore:generate-vectors:lock'; - Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + // 锁有效期 30 分钟,持续处理时会不断刷新 + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } private function releaseLock(): void @@ -219,8 +234,7 @@ class GenerateManticoreVectors extends Command public function handleSignal(int $signal): void { - $this->info("\n收到信号,正在退出..."); - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } } diff --git a/app/Console/Commands/SyncFileToManticore.php b/app/Console/Commands/SyncFileToManticore.php index 393a41089..2b364ccbf 100644 --- a/app/Console/Commands/SyncFileToManticore.php +++ b/app/Console/Commands/SyncFileToManticore.php @@ -14,15 +14,20 @@ class SyncFileToManticore extends Command /** * 更新数据(MVA 方案:allowed_users 在同步时自动写入) * --f: 全量更新 (默认) - * --i: 增量更新(从上次更新的最后一个ID接上) + * --i: 增量更新(从上次更新的最后一个ID接上,持续处理直到完成) * * 清理数据 * --c: 清除索引 + * + * 其他选项 + * --sleep: 每批处理完成后休眠秒数(增量模式) */ - protected $signature = 'manticore:sync-files {--f} {--i} {--c} {--batch=100}'; + protected $signature = 'manticore:sync-files {--f} {--i} {--c} {--batch=100} {--sleep=3}'; protected $description = '同步文件内容到 Manticore Search(MVA 权限方案)'; + private bool $shouldStop = false; + /** * @return int */ @@ -35,9 +40,9 @@ class SyncFileToManticore extends Command // 注册信号处理器(仅在支持pcntl扩展的环境下) if (extension_loaded('pcntl')) { - pcntl_async_signals(true); // 启用异步信号处理 - pcntl_signal(SIGINT, [$this, 'handleSignal']); // Ctrl+C - pcntl_signal(SIGTERM, [$this, 'handleSignal']); // kill + pcntl_async_signals(true); + pcntl_signal(SIGINT, [$this, 'handleSignal']); + pcntl_signal(SIGTERM, [$this, 'handleSignal']); } // 检查锁,如果已被占用则退出 @@ -47,7 +52,6 @@ class SyncFileToManticore extends Command return 1; } - // 设置锁 $this->setLock(); // 清除索引 @@ -61,125 +65,128 @@ class SyncFileToManticore extends Command } $this->info('开始同步文件数据(MVA 方案:allowed_users 自动内联)...'); - - // 同步文件数据 $this->syncFiles(); - // 完成 $this->info("\n同步完成"); $this->releaseLock(); return 0; } - /** - * 获取锁信息 - * - * @return array|null 如果锁存在返回锁信息,否则返回null - */ private function getLock(): ?array { $lockKey = md5($this->signature); return Cache::has($lockKey) ? Cache::get($lockKey) : null; } - /** - * 设置锁 - */ private function setLock(): void { $lockKey = md5($this->signature); - $lockInfo = [ - 'started_at' => date('Y-m-d H:i:s') - ]; - Cache::put($lockKey, $lockInfo, 600); // 10分钟(文件同步可能较慢) + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } - /** - * 释放锁 - */ private function releaseLock(): void { $lockKey = md5($this->signature); Cache::forget($lockKey); } - /** - * 处理终端信号 - * - * @param int $signal - * @return void - */ public function handleSignal(int $signal): void { - // 释放锁 - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } /** * 同步文件数据 - * - * @return void */ private function syncFiles(): void { - // 获取上次同步的最后ID $lastKey = "sync:manticoreFileLastId"; - $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n同步文件数据({$lastId})..."); - } else { - $this->info("\n同步文件数据..."); - } - - // 查询条件:排除文件夹,使用最大文件限制 - // 具体的文件类型大小检查在 ManticoreFile::sync 中进行 - $maxFileSize = ManticoreFile::getMaxFileSize(); - $query = File::where('id', '>', $lastId) - ->where('type', '!=', 'folder') - ->where('size', '<=', $maxFileSize); - - $num = 0; - $count = $query->count(); + $isIncremental = $this->option('i'); + $sleepSeconds = intval($this->option('sleep')); $batchSize = $this->option('batch'); + $maxFileSize = ManticoreFile::getMaxFileSize(); - $total = 0; - $lastNum = 0; + $round = 0; do { - // 获取一批 - $files = File::where('id', '>', $lastId) + $round++; + $lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; + + if ($round === 1) { + if ($lastId > 0) { + $this->info("\n增量同步文件数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步文件数据..."); + } + } + + $count = File::where('id', '>', $lastId) ->where('type', '!=', 'folder') ->where('size', '<=', $maxFileSize) - ->orderBy('id') - ->limit($batchSize) - ->get(); + ->count(); - if ($files->isEmpty()) { + if ($count === 0) { + if ($round === 1) { + $this->info("无待同步数据"); + } break; } - $num += count($files); - $progress = $count > 0 ? round($num / $count * 100, 2) : 100; - if ($progress < 100) { - $progress = number_format($progress, 2); + $this->info("[第 {$round} 轮] 待同步 {$count} 个文件"); + + $num = 0; + $total = 0; + + do { + if ($this->shouldStop) { + break; + } + + $files = File::where('id', '>', $lastId) + ->where('type', '!=', 'folder') + ->where('size', '<=', $maxFileSize) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($files->isEmpty()) { + break; + } + + $num += count($files); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + $this->info("{$num}/{$count} ({$progress}%) 文件ID {$files->first()->id} ~ {$files->last()->id}"); + + $this->setLock(); + + $syncCount = ManticoreFile::batchSync($files); + $total += $syncCount; + + $lastId = $files->last()->id; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($files) == $batchSize && !$this->shouldStop); + + $this->info("[第 {$round} 轮] 完成,同步 {$total} 个,最后ID {$lastId}"); + + if ($isIncremental && !$this->shouldStop) { + $newCount = File::where('id', '>', $lastId) + ->where('type', '!=', 'folder') + ->where('size', '<=', $maxFileSize) + ->count(); + + if ($newCount > 0) { + $this->info("发现 {$newCount} 个新文件,{$sleepSeconds} 秒后继续..."); + sleep($sleepSeconds); + continue; + } } - $this->info("{$num}/{$count} ({$progress}%) 正在同步文件ID {$files->first()->id} ~ {$files->last()->id} ({$total}|{$lastNum})"); - // 刷新锁 - $this->setLock(); + break; - // 同步数据 - $lastNum = ManticoreFile::batchSync($files); - $total += $lastNum; + } while (!$this->shouldStop); - // 更新最后ID - $lastId = $files->last()->id; - ManticoreKeyValue::set($lastKey, $lastId); - } while (count($files) == $batchSize); - - $this->info("同步文件结束 - 最后ID {$lastId}"); + $this->info("同步文件结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0)); $this->info("已索引文件数量: " . ManticoreFile::getIndexedCount()); } } diff --git a/app/Console/Commands/SyncMsgToManticore.php b/app/Console/Commands/SyncMsgToManticore.php index f3060e73f..ad6063e8d 100644 --- a/app/Console/Commands/SyncMsgToManticore.php +++ b/app/Console/Commands/SyncMsgToManticore.php @@ -14,18 +14,21 @@ class SyncMsgToManticore extends Command /** * 更新数据(MVA 方案:allowed_users 在同步时自动写入) * --f: 全量更新 (默认) - * --i: 增量更新(从上次更新的最后一个ID接上) + * --i: 增量更新(从上次更新的最后一个ID接上,持续处理直到完成) * * 清理数据 * --c: 清除索引 * * 其他选项 * --dialog: 指定对话ID(仅同步该对话的消息) + * --sleep: 每批处理完成后休眠秒数(增量模式) */ - protected $signature = 'manticore:sync-msgs {--f} {--i} {--c} {--batch=100} {--dialog=}'; + protected $signature = 'manticore:sync-msgs {--f} {--i} {--c} {--batch=100} {--dialog=} {--sleep=3}'; protected $description = '同步消息数据到 Manticore Search(MVA 权限方案)'; + private bool $shouldStop = false; + /** * @return int */ @@ -85,7 +88,8 @@ class SyncMsgToManticore extends Command private function setLock(): void { $lockKey = md5($this->signature); - Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + // 锁有效期 30 分钟,持续处理时会不断刷新 + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } private function releaseLock(): void @@ -96,8 +100,8 @@ class SyncMsgToManticore extends Command public function handleSignal(int $signal): void { - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } /** @@ -106,63 +110,102 @@ class SyncMsgToManticore extends Command private function syncMsgs(): void { $lastKey = "sync:manticoreMsgLastId"; - $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n增量同步消息数据(从ID {$lastId} 开始)..."); - } else { - $this->info("\n全量同步消息数据..."); - } - - // 构建基础查询条件 - // 排除:软删除、机器人消息、空 key 消息 - // 只包含:可索引的消息类型 - $baseQuery = WebSocketDialogMsg::where('id', '>', $lastId) - ->whereNull('deleted_at') - ->where('bot', '!=', 1) - ->whereNotNull('key') - ->where('key', '!=', '') - ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES); - - $num = 0; - $count = $baseQuery->count(); + $isIncremental = $this->option('i'); + $sleepSeconds = intval($this->option('sleep')); $batchSize = $this->option('batch'); - $total = 0; - $lastNum = 0; + $round = 0; + // 持续处理循环(增量模式下) do { - $msgs = WebSocketDialogMsg::where('id', '>', $lastId) + $round++; + $lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; + + if ($round === 1) { + if ($lastId > 0) { + $this->info("\n增量同步消息数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步消息数据..."); + } + } + + // 构建基础查询条件 + $count = WebSocketDialogMsg::where('id', '>', $lastId) ->whereNull('deleted_at') ->where('bot', '!=', 1) ->whereNotNull('key') ->where('key', '!=', '') ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES) - ->orderBy('id') - ->limit($batchSize) - ->get(); + ->count(); - if ($msgs->isEmpty()) { + if ($count === 0) { + if ($round === 1) { + $this->info("无待同步数据"); + } break; } - $num += count($msgs); - $progress = $count > 0 ? round($num / $count * 100, 2) : 100; - if ($progress < 100) { - $progress = number_format($progress, 2); + $this->info("[第 {$round} 轮] 待同步 {$count} 条消息"); + + $num = 0; + $total = 0; + + do { + if ($this->shouldStop) { + break; + } + + $msgs = WebSocketDialogMsg::where('id', '>', $lastId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($msgs->isEmpty()) { + break; + } + + $num += count($msgs); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + $this->info("{$num}/{$count} ({$progress}%) 消息ID {$msgs->first()->id} ~ {$msgs->last()->id}"); + + $this->setLock(); + + $syncCount = ManticoreMsg::batchSync($msgs); + $total += $syncCount; + + $lastId = $msgs->last()->id; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($msgs) == $batchSize && !$this->shouldStop); + + $this->info("[第 {$round} 轮] 完成,同步 {$total} 条,最后ID {$lastId}"); + + // 增量模式下,检查是否有新数据,有则继续 + if ($isIncremental && !$this->shouldStop) { + $newCount = WebSocketDialogMsg::where('id', '>', $lastId) + ->whereNull('deleted_at') + ->where('bot', '!=', 1) + ->whereNotNull('key') + ->where('key', '!=', '') + ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES) + ->count(); + + if ($newCount > 0) { + $this->info("发现 {$newCount} 条新数据,{$sleepSeconds} 秒后继续..."); + sleep($sleepSeconds); + continue; + } } - $this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})"); - $this->setLock(); + break; // 非增量模式或无新数据,退出循环 - $lastNum = ManticoreMsg::batchSync($msgs); - $total += $lastNum; + } while (!$this->shouldStop); - $lastId = $msgs->last()->id; - ManticoreKeyValue::set($lastKey, $lastId); - } while (count($msgs) == $batchSize); - - $this->info("同步消息结束 - 最后ID {$lastId}"); + $this->info("同步消息结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0)); $this->info("已索引消息数量: " . ManticoreMsg::getIndexedCount()); } diff --git a/app/Console/Commands/SyncProjectToManticore.php b/app/Console/Commands/SyncProjectToManticore.php index 1d0ffb3af..11371c108 100644 --- a/app/Console/Commands/SyncProjectToManticore.php +++ b/app/Console/Commands/SyncProjectToManticore.php @@ -14,18 +14,20 @@ class SyncProjectToManticore extends Command /** * 更新数据(MVA 方案:allowed_users 在同步时自动写入) * --f: 全量更新 (默认) - * --i: 增量更新(从上次更新的最后一个ID接上) + * --i: 增量更新(从上次更新的最后一个ID接上,持续处理直到完成) * * 清理数据 * --c: 清除索引 + * + * 其他选项 + * --sleep: 每批处理完成后休眠秒数(增量模式) */ - protected $signature = 'manticore:sync-projects {--f} {--i} {--c} {--batch=100}'; + protected $signature = 'manticore:sync-projects {--f} {--i} {--c} {--batch=100} {--sleep=3}'; protected $description = '同步项目数据到 Manticore Search(MVA 权限方案)'; - /** - * @return int - */ + private bool $shouldStop = false; + public function handle(): int { if (!Apps::isInstalled("manticore")) { @@ -33,14 +35,12 @@ class SyncProjectToManticore extends Command return 1; } - // 注册信号处理器 if (extension_loaded('pcntl')) { pcntl_async_signals(true); pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']); } - // 检查锁 $lockInfo = $this->getLock(); if ($lockInfo) { $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); @@ -49,7 +49,6 @@ class SyncProjectToManticore extends Command $this->setLock(); - // 清除索引 if ($this->option('c')) { $this->info('清除索引...'); ManticoreProject::clear(); @@ -75,7 +74,7 @@ class SyncProjectToManticore extends Command private function setLock(): void { $lockKey = md5($this->signature); - Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } private function releaseLock(): void @@ -86,60 +85,94 @@ class SyncProjectToManticore extends Command public function handleSignal(int $signal): void { - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } private function syncProjects(): void { $lastKey = "sync:manticoreProjectLastId"; - $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n同步项目数据({$lastId})..."); - } else { - $this->info("\n同步项目数据..."); - } - - // 排除已归档项目 - $query = Project::where('id', '>', $lastId) - ->whereNull('archived_at'); - - $num = 0; - $count = $query->count(); + $isIncremental = $this->option('i'); + $sleepSeconds = intval($this->option('sleep')); $batchSize = $this->option('batch'); - $total = 0; - $lastNum = 0; + $round = 0; do { - $projects = Project::where('id', '>', $lastId) - ->whereNull('archived_at') - ->orderBy('id') - ->limit($batchSize) - ->get(); + $round++; + $lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - if ($projects->isEmpty()) { + if ($round === 1) { + if ($lastId > 0) { + $this->info("\n增量同步项目数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步项目数据..."); + } + } + + $count = Project::where('id', '>', $lastId) + ->whereNull('archived_at') + ->count(); + + if ($count === 0) { + if ($round === 1) { + $this->info("无待同步数据"); + } break; } - $num += count($projects); - $progress = $count > 0 ? round($num / $count * 100, 2) : 100; - if ($progress < 100) { - $progress = number_format($progress, 2); + $this->info("[第 {$round} 轮] 待同步 {$count} 个项目"); + + $num = 0; + $total = 0; + + do { + if ($this->shouldStop) { + break; + } + + $projects = Project::where('id', '>', $lastId) + ->whereNull('archived_at') + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($projects->isEmpty()) { + break; + } + + $num += count($projects); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + $this->info("{$num}/{$count} ({$progress}%) 项目ID {$projects->first()->id} ~ {$projects->last()->id}"); + + $this->setLock(); + + $syncCount = ManticoreProject::batchSync($projects); + $total += $syncCount; + + $lastId = $projects->last()->id; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($projects) == $batchSize && !$this->shouldStop); + + $this->info("[第 {$round} 轮] 完成,同步 {$total} 个,最后ID {$lastId}"); + + if ($isIncremental && !$this->shouldStop) { + $newCount = Project::where('id', '>', $lastId) + ->whereNull('archived_at') + ->count(); + + if ($newCount > 0) { + $this->info("发现 {$newCount} 个新项目,{$sleepSeconds} 秒后继续..."); + sleep($sleepSeconds); + continue; + } } - $this->info("{$num}/{$count} ({$progress}%) 正在同步项目ID {$projects->first()->id} ~ {$projects->last()->id} ({$total}|{$lastNum})"); - $this->setLock(); + break; - $lastNum = ManticoreProject::batchSync($projects); - $total += $lastNum; + } while (!$this->shouldStop); - $lastId = $projects->last()->id; - ManticoreKeyValue::set($lastKey, $lastId); - } while (count($projects) == $batchSize); - - $this->info("同步项目结束 - 最后ID {$lastId}"); + $this->info("同步项目结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0)); $this->info("已索引项目数量: " . ManticoreProject::getIndexedCount()); } } diff --git a/app/Console/Commands/SyncTaskToManticore.php b/app/Console/Commands/SyncTaskToManticore.php index a14d47e82..7a230dbb9 100644 --- a/app/Console/Commands/SyncTaskToManticore.php +++ b/app/Console/Commands/SyncTaskToManticore.php @@ -14,18 +14,20 @@ class SyncTaskToManticore extends Command /** * 更新数据(MVA 方案:allowed_users 在同步时自动写入) * --f: 全量更新 (默认) - * --i: 增量更新(从上次更新的最后一个ID接上) + * --i: 增量更新(从上次更新的最后一个ID接上,持续处理直到完成) * * 清理数据 * --c: 清除索引 + * + * 其他选项 + * --sleep: 每批处理完成后休眠秒数(增量模式) */ - protected $signature = 'manticore:sync-tasks {--f} {--i} {--c} {--batch=100}'; + protected $signature = 'manticore:sync-tasks {--f} {--i} {--c} {--batch=100} {--sleep=3}'; protected $description = '同步任务数据到 Manticore Search(MVA 权限方案)'; - /** - * @return int - */ + private bool $shouldStop = false; + public function handle(): int { if (!Apps::isInstalled("manticore")) { @@ -33,14 +35,12 @@ class SyncTaskToManticore extends Command return 1; } - // 注册信号处理器 if (extension_loaded('pcntl')) { pcntl_async_signals(true); pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']); } - // 检查锁 $lockInfo = $this->getLock(); if ($lockInfo) { $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); @@ -49,7 +49,6 @@ class SyncTaskToManticore extends Command $this->setLock(); - // 清除索引 if ($this->option('c')) { $this->info('清除索引...'); ManticoreTask::clear(); @@ -75,7 +74,7 @@ class SyncTaskToManticore extends Command private function setLock(): void { $lockKey = md5($this->signature); - Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } private function releaseLock(): void @@ -86,62 +85,97 @@ class SyncTaskToManticore extends Command public function handleSignal(int $signal): void { - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } private function syncTasks(): void { $lastKey = "sync:manticoreTaskLastId"; - $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n同步任务数据({$lastId})..."); - } else { - $this->info("\n同步任务数据..."); - } - - // 排除已归档和已删除的任务 - $query = ProjectTask::where('id', '>', $lastId) - ->whereNull('archived_at') - ->whereNull('deleted_at'); - - $num = 0; - $count = $query->count(); + $isIncremental = $this->option('i'); + $sleepSeconds = intval($this->option('sleep')); $batchSize = $this->option('batch'); - $total = 0; - $lastNum = 0; + $round = 0; do { - $tasks = ProjectTask::where('id', '>', $lastId) + $round++; + $lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; + + if ($round === 1) { + if ($lastId > 0) { + $this->info("\n增量同步任务数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步任务数据..."); + } + } + + $count = ProjectTask::where('id', '>', $lastId) ->whereNull('archived_at') ->whereNull('deleted_at') - ->orderBy('id') - ->limit($batchSize) - ->get(); + ->count(); - if ($tasks->isEmpty()) { + if ($count === 0) { + if ($round === 1) { + $this->info("无待同步数据"); + } break; } - $num += count($tasks); - $progress = $count > 0 ? round($num / $count * 100, 2) : 100; - if ($progress < 100) { - $progress = number_format($progress, 2); + $this->info("[第 {$round} 轮] 待同步 {$count} 个任务"); + + $num = 0; + $total = 0; + + do { + if ($this->shouldStop) { + break; + } + + $tasks = ProjectTask::where('id', '>', $lastId) + ->whereNull('archived_at') + ->whereNull('deleted_at') + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($tasks->isEmpty()) { + break; + } + + $num += count($tasks); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + $this->info("{$num}/{$count} ({$progress}%) 任务ID {$tasks->first()->id} ~ {$tasks->last()->id}"); + + $this->setLock(); + + $syncCount = ManticoreTask::batchSync($tasks); + $total += $syncCount; + + $lastId = $tasks->last()->id; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($tasks) == $batchSize && !$this->shouldStop); + + $this->info("[第 {$round} 轮] 完成,同步 {$total} 个,最后ID {$lastId}"); + + if ($isIncremental && !$this->shouldStop) { + $newCount = ProjectTask::where('id', '>', $lastId) + ->whereNull('archived_at') + ->whereNull('deleted_at') + ->count(); + + if ($newCount > 0) { + $this->info("发现 {$newCount} 个新任务,{$sleepSeconds} 秒后继续..."); + sleep($sleepSeconds); + continue; + } } - $this->info("{$num}/{$count} ({$progress}%) 正在同步任务ID {$tasks->first()->id} ~ {$tasks->last()->id} ({$total}|{$lastNum})"); - $this->setLock(); + break; - $lastNum = ManticoreTask::batchSync($tasks); - $total += $lastNum; + } while (!$this->shouldStop); - $lastId = $tasks->last()->id; - ManticoreKeyValue::set($lastKey, $lastId); - } while (count($tasks) == $batchSize); - - $this->info("同步任务结束 - 最后ID {$lastId}"); + $this->info("同步任务结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0)); $this->info("已索引任务数量: " . ManticoreTask::getIndexedCount()); } } diff --git a/app/Console/Commands/SyncUserToManticore.php b/app/Console/Commands/SyncUserToManticore.php index 28a5e7093..b0cfad037 100644 --- a/app/Console/Commands/SyncUserToManticore.php +++ b/app/Console/Commands/SyncUserToManticore.php @@ -14,18 +14,20 @@ class SyncUserToManticore extends Command /** * 更新数据 * --f: 全量更新 (默认) - * --i: 增量更新(从上次更新的最后一个ID接上) + * --i: 增量更新(从上次更新的最后一个ID接上,持续处理直到完成) * * 清理数据 * --c: 清除索引 + * + * 其他选项 + * --sleep: 每批处理完成后休眠秒数(增量模式) */ - protected $signature = 'manticore:sync-users {--f} {--i} {--c} {--batch=100}'; + protected $signature = 'manticore:sync-users {--f} {--i} {--c} {--batch=100} {--sleep=3}'; protected $description = '同步用户数据到 Manticore Search'; - /** - * @return int - */ + private bool $shouldStop = false; + public function handle(): int { if (!Apps::isInstalled("manticore")) { @@ -33,14 +35,12 @@ class SyncUserToManticore extends Command return 1; } - // 注册信号处理器 if (extension_loaded('pcntl')) { pcntl_async_signals(true); pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']); } - // 检查锁 $lockInfo = $this->getLock(); if ($lockInfo) { $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); @@ -49,7 +49,6 @@ class SyncUserToManticore extends Command $this->setLock(); - // 清除索引 if ($this->option('c')) { $this->info('清除索引...'); ManticoreUser::clear(); @@ -75,7 +74,7 @@ class SyncUserToManticore extends Command private function setLock(): void { $lockKey = md5($this->signature); - Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); + Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800); } private function releaseLock(): void @@ -86,63 +85,97 @@ class SyncUserToManticore extends Command public function handleSignal(int $signal): void { - $this->releaseLock(); - exit(0); + $this->info("\n收到信号,将在当前批次完成后退出..."); + $this->shouldStop = true; } private function syncUsers(): void { $lastKey = "sync:manticoreUserLastId"; - $lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; - - if ($lastId > 0) { - $this->info("\n同步用户数据({$lastId})..."); - } else { - $this->info("\n同步用户数据..."); - } - - // 排除机器人和已禁用账号 - $query = User::where('userid', '>', $lastId) - ->where('bot', 0) - ->whereNull('disable_at'); - - $num = 0; - $count = $query->count(); + $isIncremental = $this->option('i'); + $sleepSeconds = intval($this->option('sleep')); $batchSize = $this->option('batch'); - $total = 0; - $lastNum = 0; + $round = 0; do { - $users = User::where('userid', '>', $lastId) + $round++; + $lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; + + if ($round === 1) { + if ($lastId > 0) { + $this->info("\n增量同步用户数据(从ID {$lastId} 开始)..."); + } else { + $this->info("\n全量同步用户数据..."); + } + } + + $count = User::where('userid', '>', $lastId) ->where('bot', 0) ->whereNull('disable_at') - ->orderBy('userid') - ->limit($batchSize) - ->get(); + ->count(); - if ($users->isEmpty()) { + if ($count === 0) { + if ($round === 1) { + $this->info("无待同步数据"); + } break; } - $num += count($users); - $progress = $count > 0 ? round($num / $count * 100, 2) : 100; - if ($progress < 100) { - $progress = number_format($progress, 2); + $this->info("[第 {$round} 轮] 待同步 {$count} 个用户"); + + $num = 0; + $total = 0; + + do { + if ($this->shouldStop) { + break; + } + + $users = User::where('userid', '>', $lastId) + ->where('bot', 0) + ->whereNull('disable_at') + ->orderBy('userid') + ->limit($batchSize) + ->get(); + + if ($users->isEmpty()) { + break; + } + + $num += count($users); + $progress = $count > 0 ? round($num / $count * 100, 2) : 100; + $this->info("{$num}/{$count} ({$progress}%) 用户ID {$users->first()->userid} ~ {$users->last()->userid}"); + + $this->setLock(); + + $syncCount = ManticoreUser::batchSync($users); + $total += $syncCount; + + $lastId = $users->last()->userid; + ManticoreKeyValue::set($lastKey, $lastId); + } while (count($users) == $batchSize && !$this->shouldStop); + + $this->info("[第 {$round} 轮] 完成,同步 {$total} 个,最后ID {$lastId}"); + + if ($isIncremental && !$this->shouldStop) { + $newCount = User::where('userid', '>', $lastId) + ->where('bot', 0) + ->whereNull('disable_at') + ->count(); + + if ($newCount > 0) { + $this->info("发现 {$newCount} 个新用户,{$sleepSeconds} 秒后继续..."); + sleep($sleepSeconds); + continue; + } } - $this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$users->first()->userid} ~ {$users->last()->userid} ({$total}|{$lastNum})"); - $this->setLock(); + break; - $lastNum = ManticoreUser::batchSync($users); - $total += $lastNum; + } while (!$this->shouldStop); - $lastId = $users->last()->userid; - ManticoreKeyValue::set($lastKey, $lastId); - } while (count($users) == $batchSize); - - $this->info("同步用户结束 - 最后ID {$lastId}"); + $this->info("同步用户结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0)); $this->info("已索引用户数量: " . ManticoreUser::getIndexedCount()); } } - diff --git a/app/Tasks/ManticoreSyncTask.php b/app/Tasks/ManticoreSyncTask.php index 56c56a47e..d001a0c46 100644 --- a/app/Tasks/ManticoreSyncTask.php +++ b/app/Tasks/ManticoreSyncTask.php @@ -187,34 +187,38 @@ class ManticoreSyncTask extends AbstractTask } /** - * 增量更新(定时执行) - * 使用 --i 参数执行增量同步,会同步新增的向量数据 + * 增量更新(定时执行 - 兜底机制) + * + * 命令本身会持续处理直到完成,定时器只是确保命令在运行 + * 如果命令正在运行(有锁),则跳过本次触发 * * @return void */ private function incrementalUpdate() { - // 执行增量全文索引同步(10分钟执行一次) + // 兜底触发:每 2 分钟检查一次,如果命令没在运行则启动 + $time = intval(Cache::get("ManticoreSyncTask:CheckTime")); + if (time() - $time < 2 * 60) { + return; + } + Cache::put("ManticoreSyncTask:CheckTime", time(), Carbon::now()->addMinutes(5)); + + // 执行增量全文索引同步(命令会持续处理直到完成) $this->runIncrementalSync(); - // 执行向量生成(10分钟执行一次,与全文索引独立) + // 执行向量生成(命令会持续处理直到完成) $this->runVectorGeneration(); } /** - * 执行增量全文索引同步 + * 执行增量全文索引同步(兜底触发) + * + * 命令内部有锁机制,如果已在运行会自动跳过 + * 命令会持续处理直到无新数据,然后自动退出 */ private function runIncrementalSync(): void { - $time = intval(Cache::get("ManticoreSyncTask:SyncTime")); - if (time() - $time < 10 * 60) { - return; - } - - // 执行开始 - Cache::put("ManticoreSyncTask:SyncTime", time(), Carbon::now()->addMinutes(15)); - - // 执行增量同步(MVA 方案不需要单独同步关系表) + // 启动各类型的增量同步命令(命令内部有锁,重复启动会自动跳过) @shell_exec("php /var/www/artisan manticore:sync-files --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-users --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &"); @@ -223,25 +227,19 @@ class ManticoreSyncTask extends AbstractTask } /** - * 执行向量生成(异步批量处理) + * 执行向量生成(兜底触发) + * + * 命令内部有锁机制,如果已在运行会自动跳过 + * 命令会持续处理直到无待处理数据,然后自动退出 */ private function runVectorGeneration(): void { - // 检查 AI 是否安装 if (!Apps::isInstalled("ai")) { return; } - $time = intval(Cache::get("ManticoreSyncTask:VectorTime")); - if (time() - $time < 10 * 60) { - return; - } - - // 执行开始 - Cache::put("ManticoreSyncTask:VectorTime", time(), Carbon::now()->addMinutes(15)); - - // 执行向量生成(批量处理,每轮最多500条) - @shell_exec("php /var/www/artisan manticore:generate-vectors --type=all --batch=20 --max=500 2>&1 &"); + // 启动向量生成命令(命令内部有锁,重复启动会自动跳过) + @shell_exec("php /var/www/artisan manticore:generate-vectors --type=all --batch=50 2>&1 &"); } public function end()