feat: Enhance Manticore sync commands with incremental processing and sleep options

- Updated sync commands (SyncFileToManticore, SyncMsgToManticore, SyncProjectToManticore, SyncTaskToManticore, SyncUserToManticore) to support continuous incremental updates until completion.
- Added --sleep option to allow a pause between batches in incremental mode.
- Improved signal handling to allow graceful shutdown during processing.
- Adjusted lock duration to 30 minutes for long-running processes.
- Enhanced logging for better visibility of sync progress and completion.
- Updated ManticoreSyncTask to ensure commands run continuously and check for new data every 2 minutes.
This commit is contained in:
kuaifan 2026-01-03 22:41:49 +00:00
parent 9a8304d595
commit 610979f30b
7 changed files with 506 additions and 344 deletions

View File

@ -22,13 +22,18 @@ use Illuminate\Console\Command;
* *
* 用于后台批量生成已索引数据的向量,与全文索引解耦 * 用于后台批量生成已索引数据的向量,与全文索引解耦
* 使用双指针追踪sync:xxxLastId全文已同步 vector:xxxLastId向量已生成 * 使用双指针追踪sync:xxxLastId全文已同步 vector:xxxLastId向量已生成
*
* 运行模式:
* - 持续处理直到所有待处理数据完成
* - 每批处理完成后休眠几秒,避免 API 过载
* - 定时器只作为兜底触发机制
*/ */
class GenerateManticoreVectors extends Command class GenerateManticoreVectors extends Command
{ {
protected $signature = 'manticore:generate-vectors protected $signature = 'manticore:generate-vectors
{--type=all : 类型 (msg/file/task/project/user/all)} {--type=all : 类型 (msg/file/task/project/user/all)}
{--batch=20 : 每批 embedding 数量} {--batch=50 : 每批 embedding 数量}
{--max=500 : 每轮最大处理数量} {--sleep=3 : 每批处理后休眠秒数}
{--reset : 重置向量进度指针}'; {--reset : 重置向量进度指针}';
protected $description = '批量生成 Manticore 已索引数据的向量(异步处理)'; protected $description = '批量生成 Manticore 已索引数据的向量(异步处理)';
@ -74,6 +79,8 @@ class GenerateManticoreVectors extends Command
], ],
]; ];
private bool $shouldStop = false;
public function handle(): int public function handle(): int
{ {
if (!Apps::isInstalled("manticore")) { if (!Apps::isInstalled("manticore")) {
@ -104,7 +111,7 @@ class GenerateManticoreVectors extends Command
$type = $this->option('type'); $type = $this->option('type');
$batchSize = intval($this->option('batch')); $batchSize = intval($this->option('batch'));
$maxCount = intval($this->option('max')); $sleepSeconds = intval($this->option('sleep'));
$reset = $this->option('reset'); $reset = $this->option('reset');
if ($type === 'all') { if ($type === 'all') {
@ -118,85 +125,92 @@ class GenerateManticoreVectors extends Command
$types = [$type]; $types = [$type];
} }
// 持续处理直到所有类型都没有待处理数据
$round = 0;
do {
$round++;
$totalPending = 0;
foreach ($types as $t) { foreach ($types as $t) {
$this->processType($t, $batchSize, $maxCount, $reset); if ($this->shouldStop) {
break;
}
$pending = $this->processType($t, $batchSize, $reset && $round === 1);
$totalPending += $pending;
} }
$this->info("\n向量生成完成"); // 如果还有待处理数据,休眠后继续
if ($totalPending > 0 && !$this->shouldStop) {
$this->info("\n--- 第 {$round} 轮完成,剩余 {$totalPending} 条待处理,{$sleepSeconds} 秒后继续 ---\n");
sleep($sleepSeconds);
$this->setLock(); // 刷新锁
}
} while ($totalPending > 0 && !$this->shouldStop);
$this->info("\n向量生成完成(共 {$round} 轮)");
$this->releaseLock(); $this->releaseLock();
return 0; return 0;
} }
/** /**
* 处理单个类型的向量生成 * 处理单个类型的向量生成(每次处理一批)
*
* @param string $type 类型
* @param int $batchSize 每批数量
* @param bool $reset 是否重置进度
* @return int 剩余待处理数量
*/ */
private function processType(string $type, int $batchSize, int $maxCount, bool $reset): void private function processType(string $type, int $batchSize, bool $reset): int
{ {
$config = self::TYPE_CONFIG[$type]; $config = self::TYPE_CONFIG[$type];
$this->info("\n========== 处理 {$type} ==========");
// 获取进度指针 // 获取进度指针
$syncLastId = intval(ManticoreKeyValue::get($config['syncKey'], 0)); $syncLastId = intval(ManticoreKeyValue::get($config['syncKey'], 0));
$vectorLastId = $reset ? 0 : intval(ManticoreKeyValue::get($config['vectorKey'], 0)); $vectorLastId = $reset ? 0 : intval(ManticoreKeyValue::get($config['vectorKey'], 0));
if ($reset) { if ($reset) {
ManticoreKeyValue::set($config['vectorKey'], 0); ManticoreKeyValue::set($config['vectorKey'], 0);
$this->info("已重置 {$type} 向量进度指针"); $this->info("[{$type}] 已重置向量进度指针");
} }
// 计算待处理范围 // 计算待处理范围
$pendingCount = $syncLastId - $vectorLastId; $pendingCount = $syncLastId - $vectorLastId;
if ($pendingCount <= 0) { if ($pendingCount <= 0) {
$this->info("{$type}: 无待处理数据 (sync={$syncLastId}, vector={$vectorLastId})"); return 0;
return;
} }
$this->info("{$type}: 待处理 {$pendingCount} 条 (ID {$vectorLastId} -> {$syncLastId})"); // 获取待处理的 ID 列表(每次处理 batchSize * 5 条,让 generateVectorsBatch 内部再分批调用 API
// 限制本轮处理数量
$toProcess = min($pendingCount, $maxCount);
$this->info("{$type}: 本轮处理 {$toProcess}");
// 获取待处理的 ID 列表
$modelClass = $config['model']; $modelClass = $config['model'];
$idField = $config['idField']; $idField = $config['idField'];
$fetchCount = $batchSize * 5;
$processedCount = 0; $ids = $modelClass::where($idField, '>', $vectorLastId)
$currentLastId = $vectorLastId;
while ($processedCount < $toProcess) {
$remainingCount = min($toProcess - $processedCount, $batchSize * 5);
// 获取一批 ID
$ids = $modelClass::where($idField, '>', $currentLastId)
->where($idField, '<=', $syncLastId) ->where($idField, '<=', $syncLastId)
->orderBy($idField) ->orderBy($idField)
->limit($remainingCount) ->limit($fetchCount)
->pluck($idField) ->pluck($idField)
->toArray(); ->toArray();
if (empty($ids)) { if (empty($ids)) {
break; return 0;
} }
// 批量生成向量 // 批量生成向量
$manticoreClass = $config['class']; $manticoreClass = $config['class'];
$successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize); $successCount = $manticoreClass::generateVectorsBatch($ids, $batchSize);
$processedCount += count($ids);
$currentLastId = end($ids); $currentLastId = end($ids);
// 更新向量进度指针 // 更新向量进度指针
ManticoreKeyValue::set($config['vectorKey'], $currentLastId); ManticoreKeyValue::set($config['vectorKey'], $currentLastId);
$this->info("{$type}: 已处理 {$processedCount}/{$toProcess},成功 {$successCount}当前ID: {$currentLastId}"); $remaining = $pendingCount - count($ids);
$this->info("[{$type}] 处理 " . count($ids) . " 条,成功 {$successCount}ID: {$vectorLastId} -> {$currentLastId},剩余 {$remaining}");
// 刷新锁 // 刷新锁
$this->setLock(); $this->setLock();
}
$this->info("{$type}: 完成本轮向量生成,共处理 {$processedCount}"); return max(0, $remaining);
} }
private function getLock(): ?array private function getLock(): ?array
@ -208,7 +222,8 @@ class GenerateManticoreVectors extends Command
private function setLock(): void private function setLock(): void
{ {
$lockKey = 'manticore:generate-vectors:lock'; $lockKey = 'manticore:generate-vectors:lock';
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); // 锁有效期 30 分钟,持续处理时会不断刷新
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
} }
private function releaseLock(): void private function releaseLock(): void
@ -219,8 +234,7 @@ class GenerateManticoreVectors extends Command
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
$this->info("\n收到信号,正在退出..."); $this->info("\n收到信号,将在当前批次完成后退出...");
$this->releaseLock(); $this->shouldStop = true;
exit(0);
} }
} }

View File

@ -14,15 +14,20 @@ class SyncFileToManticore extends Command
/** /**
* 更新数据MVA 方案allowed_users 在同步时自动写入) * 更新数据MVA 方案allowed_users 在同步时自动写入)
* --f: 全量更新 (默认) * --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上 * --i: 增量更新从上次更新的最后一个ID接上,持续处理直到完成
* *
* 清理数据 * 清理数据
* --c: 清除索引 * --c: 清除索引
*
* 其他选项
* --sleep: 每批处理完成后休眠秒数(增量模式)
*/ */
protected $signature = 'manticore:sync-files {--f} {--i} {--c} {--batch=100}'; protected $signature = 'manticore:sync-files {--f} {--i} {--c} {--batch=100} {--sleep=3}';
protected $description = '同步文件内容到 Manticore SearchMVA 权限方案)'; protected $description = '同步文件内容到 Manticore SearchMVA 权限方案)';
private bool $shouldStop = false;
/** /**
* @return int * @return int
*/ */
@ -35,9 +40,9 @@ class SyncFileToManticore extends Command
// 注册信号处理器仅在支持pcntl扩展的环境下 // 注册信号处理器仅在支持pcntl扩展的环境下
if (extension_loaded('pcntl')) { if (extension_loaded('pcntl')) {
pcntl_async_signals(true); // 启用异步信号处理 pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'handleSignal']); // Ctrl+C pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']); // kill pcntl_signal(SIGTERM, [$this, 'handleSignal']);
} }
// 检查锁,如果已被占用则退出 // 检查锁,如果已被占用则退出
@ -47,7 +52,6 @@ class SyncFileToManticore extends Command
return 1; return 1;
} }
// 设置锁
$this->setLock(); $this->setLock();
// 清除索引 // 清除索引
@ -61,94 +65,84 @@ class SyncFileToManticore extends Command
} }
$this->info('开始同步文件数据MVA 方案allowed_users 自动内联)...'); $this->info('开始同步文件数据MVA 方案allowed_users 自动内联)...');
// 同步文件数据
$this->syncFiles(); $this->syncFiles();
// 完成
$this->info("\n同步完成"); $this->info("\n同步完成");
$this->releaseLock(); $this->releaseLock();
return 0; return 0;
} }
/**
* 获取锁信息
*
* @return array|null 如果锁存在返回锁信息否则返回null
*/
private function getLock(): ?array private function getLock(): ?array
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
return Cache::has($lockKey) ? Cache::get($lockKey) : null; return Cache::has($lockKey) ? Cache::get($lockKey) : null;
} }
/**
* 设置锁
*/
private function setLock(): void private function setLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
$lockInfo = [ Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
'started_at' => date('Y-m-d H:i:s')
];
Cache::put($lockKey, $lockInfo, 600); // 10分钟文件同步可能较慢
} }
/**
* 释放锁
*/
private function releaseLock(): void private function releaseLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
Cache::forget($lockKey); Cache::forget($lockKey);
} }
/**
* 处理终端信号
*
* @param int $signal
* @return void
*/
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
// 释放锁 $this->info("\n收到信号,将在当前批次完成后退出...");
$this->releaseLock(); $this->shouldStop = true;
exit(0);
} }
/** /**
* 同步文件数据 * 同步文件数据
*
* @return void
*/ */
private function syncFiles(): void private function syncFiles(): void
{ {
// 获取上次同步的最后ID
$lastKey = "sync:manticoreFileLastId"; $lastKey = "sync:manticoreFileLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; $isIncremental = $this->option('i');
$sleepSeconds = intval($this->option('sleep'));
if ($lastId > 0) {
$this->info("\n同步文件数据({$lastId}...");
} else {
$this->info("\n同步文件数据...");
}
// 查询条件:排除文件夹,使用最大文件限制
// 具体的文件类型大小检查在 ManticoreFile::sync 中进行
$maxFileSize = ManticoreFile::getMaxFileSize();
$query = File::where('id', '>', $lastId)
->where('type', '!=', 'folder')
->where('size', '<=', $maxFileSize);
$num = 0;
$count = $query->count();
$batchSize = $this->option('batch'); $batchSize = $this->option('batch');
$maxFileSize = ManticoreFile::getMaxFileSize();
$total = 0; $round = 0;
$lastNum = 0;
do { do {
// 获取一批 $round++;
$lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($round === 1) {
if ($lastId > 0) {
$this->info("\n增量同步文件数据从ID {$lastId} 开始)...");
} else {
$this->info("\n全量同步文件数据...");
}
}
$count = File::where('id', '>', $lastId)
->where('type', '!=', 'folder')
->where('size', '<=', $maxFileSize)
->count();
if ($count === 0) {
if ($round === 1) {
$this->info("无待同步数据");
}
break;
}
$this->info("[第 {$round} 轮] 待同步 {$count} 个文件");
$num = 0;
$total = 0;
do {
if ($this->shouldStop) {
break;
}
$files = File::where('id', '>', $lastId) $files = File::where('id', '>', $lastId)
->where('type', '!=', 'folder') ->where('type', '!=', 'folder')
->where('size', '<=', $maxFileSize) ->where('size', '<=', $maxFileSize)
@ -162,24 +156,37 @@ class SyncFileToManticore extends Command
$num += count($files); $num += count($files);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100; $progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) { $this->info("{$num}/{$count} ({$progress}%) 文件ID {$files->first()->id} ~ {$files->last()->id}");
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步文件ID {$files->first()->id} ~ {$files->last()->id} ({$total}|{$lastNum})");
// 刷新锁
$this->setLock(); $this->setLock();
// 同步数据 $syncCount = ManticoreFile::batchSync($files);
$lastNum = ManticoreFile::batchSync($files); $total += $syncCount;
$total += $lastNum;
// 更新最后ID
$lastId = $files->last()->id; $lastId = $files->last()->id;
ManticoreKeyValue::set($lastKey, $lastId); ManticoreKeyValue::set($lastKey, $lastId);
} while (count($files) == $batchSize); } while (count($files) == $batchSize && !$this->shouldStop);
$this->info("同步文件结束 - 最后ID {$lastId}"); $this->info("[第 {$round} 轮] 完成,同步 {$total}最后ID {$lastId}");
if ($isIncremental && !$this->shouldStop) {
$newCount = File::where('id', '>', $lastId)
->where('type', '!=', 'folder')
->where('size', '<=', $maxFileSize)
->count();
if ($newCount > 0) {
$this->info("发现 {$newCount} 个新文件,{$sleepSeconds} 秒后继续...");
sleep($sleepSeconds);
continue;
}
}
break;
} while (!$this->shouldStop);
$this->info("同步文件结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0));
$this->info("已索引文件数量: " . ManticoreFile::getIndexedCount()); $this->info("已索引文件数量: " . ManticoreFile::getIndexedCount());
} }
} }

View File

@ -14,18 +14,21 @@ class SyncMsgToManticore extends Command
/** /**
* 更新数据MVA 方案allowed_users 在同步时自动写入) * 更新数据MVA 方案allowed_users 在同步时自动写入)
* --f: 全量更新 (默认) * --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上 * --i: 增量更新从上次更新的最后一个ID接上,持续处理直到完成
* *
* 清理数据 * 清理数据
* --c: 清除索引 * --c: 清除索引
* *
* 其他选项 * 其他选项
* --dialog: 指定对话ID仅同步该对话的消息 * --dialog: 指定对话ID仅同步该对话的消息
* --sleep: 每批处理完成后休眠秒数(增量模式)
*/ */
protected $signature = 'manticore:sync-msgs {--f} {--i} {--c} {--batch=100} {--dialog=}'; protected $signature = 'manticore:sync-msgs {--f} {--i} {--c} {--batch=100} {--dialog=} {--sleep=3}';
protected $description = '同步消息数据到 Manticore SearchMVA 权限方案)'; protected $description = '同步消息数据到 Manticore SearchMVA 权限方案)';
private bool $shouldStop = false;
/** /**
* @return int * @return int
*/ */
@ -85,7 +88,8 @@ class SyncMsgToManticore extends Command
private function setLock(): void private function setLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); // 锁有效期 30 分钟,持续处理时会不断刷新
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
} }
private function releaseLock(): void private function releaseLock(): void
@ -96,8 +100,8 @@ class SyncMsgToManticore extends Command
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
$this->releaseLock(); $this->info("\n收到信号,将在当前批次完成后退出...");
exit(0); $this->shouldStop = true;
} }
/** /**
@ -106,32 +110,51 @@ class SyncMsgToManticore extends Command
private function syncMsgs(): void private function syncMsgs(): void
{ {
$lastKey = "sync:manticoreMsgLastId"; $lastKey = "sync:manticoreMsgLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; $isIncremental = $this->option('i');
$sleepSeconds = intval($this->option('sleep'));
$batchSize = $this->option('batch');
$round = 0;
// 持续处理循环(增量模式下)
do {
$round++;
$lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($round === 1) {
if ($lastId > 0) { if ($lastId > 0) {
$this->info("\n增量同步消息数据从ID {$lastId} 开始)..."); $this->info("\n增量同步消息数据从ID {$lastId} 开始)...");
} else { } else {
$this->info("\n全量同步消息数据..."); $this->info("\n全量同步消息数据...");
} }
}
// 构建基础查询条件 // 构建基础查询条件
// 排除:软删除、机器人消息、空 key 消息 $count = WebSocketDialogMsg::where('id', '>', $lastId)
// 只包含:可索引的消息类型
$baseQuery = WebSocketDialogMsg::where('id', '>', $lastId)
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('bot', '!=', 1) ->where('bot', '!=', 1)
->whereNotNull('key') ->whereNotNull('key')
->where('key', '!=', '') ->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES); ->whereIn('type', ManticoreMsg::INDEXABLE_TYPES)
->count();
if ($count === 0) {
if ($round === 1) {
$this->info("无待同步数据");
}
break;
}
$this->info("[第 {$round} 轮] 待同步 {$count} 条消息");
$num = 0; $num = 0;
$count = $baseQuery->count();
$batchSize = $this->option('batch');
$total = 0; $total = 0;
$lastNum = 0;
do { do {
if ($this->shouldStop) {
break;
}
$msgs = WebSocketDialogMsg::where('id', '>', $lastId) $msgs = WebSocketDialogMsg::where('id', '>', $lastId)
->whereNull('deleted_at') ->whereNull('deleted_at')
->where('bot', '!=', 1) ->where('bot', '!=', 1)
@ -148,21 +171,41 @@ class SyncMsgToManticore extends Command
$num += count($msgs); $num += count($msgs);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100; $progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) { $this->info("{$num}/{$count} ({$progress}%) 消息ID {$msgs->first()->id} ~ {$msgs->last()->id}");
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步消息ID {$msgs->first()->id} ~ {$msgs->last()->id} ({$total}|{$lastNum})");
$this->setLock(); $this->setLock();
$lastNum = ManticoreMsg::batchSync($msgs); $syncCount = ManticoreMsg::batchSync($msgs);
$total += $lastNum; $total += $syncCount;
$lastId = $msgs->last()->id; $lastId = $msgs->last()->id;
ManticoreKeyValue::set($lastKey, $lastId); ManticoreKeyValue::set($lastKey, $lastId);
} while (count($msgs) == $batchSize); } while (count($msgs) == $batchSize && !$this->shouldStop);
$this->info("同步消息结束 - 最后ID {$lastId}"); $this->info("[第 {$round} 轮] 完成,同步 {$total}最后ID {$lastId}");
// 增量模式下,检查是否有新数据,有则继续
if ($isIncremental && !$this->shouldStop) {
$newCount = WebSocketDialogMsg::where('id', '>', $lastId)
->whereNull('deleted_at')
->where('bot', '!=', 1)
->whereNotNull('key')
->where('key', '!=', '')
->whereIn('type', ManticoreMsg::INDEXABLE_TYPES)
->count();
if ($newCount > 0) {
$this->info("发现 {$newCount} 条新数据,{$sleepSeconds} 秒后继续...");
sleep($sleepSeconds);
continue;
}
}
break; // 非增量模式或无新数据,退出循环
} while (!$this->shouldStop);
$this->info("同步消息结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0));
$this->info("已索引消息数量: " . ManticoreMsg::getIndexedCount()); $this->info("已索引消息数量: " . ManticoreMsg::getIndexedCount());
} }

View File

@ -14,18 +14,20 @@ class SyncProjectToManticore extends Command
/** /**
* 更新数据MVA 方案allowed_users 在同步时自动写入) * 更新数据MVA 方案allowed_users 在同步时自动写入)
* --f: 全量更新 (默认) * --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上 * --i: 增量更新从上次更新的最后一个ID接上,持续处理直到完成
* *
* 清理数据 * 清理数据
* --c: 清除索引 * --c: 清除索引
*
* 其他选项
* --sleep: 每批处理完成后休眠秒数(增量模式)
*/ */
protected $signature = 'manticore:sync-projects {--f} {--i} {--c} {--batch=100}'; protected $signature = 'manticore:sync-projects {--f} {--i} {--c} {--batch=100} {--sleep=3}';
protected $description = '同步项目数据到 Manticore SearchMVA 权限方案)'; protected $description = '同步项目数据到 Manticore SearchMVA 权限方案)';
/** private bool $shouldStop = false;
* @return int
*/
public function handle(): int public function handle(): int
{ {
if (!Apps::isInstalled("manticore")) { if (!Apps::isInstalled("manticore")) {
@ -33,14 +35,12 @@ class SyncProjectToManticore extends Command
return 1; return 1;
} }
// 注册信号处理器
if (extension_loaded('pcntl')) { if (extension_loaded('pcntl')) {
pcntl_async_signals(true); pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']);
} }
// 检查锁
$lockInfo = $this->getLock(); $lockInfo = $this->getLock();
if ($lockInfo) { if ($lockInfo) {
$this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}");
@ -49,7 +49,6 @@ class SyncProjectToManticore extends Command
$this->setLock(); $this->setLock();
// 清除索引
if ($this->option('c')) { if ($this->option('c')) {
$this->info('清除索引...'); $this->info('清除索引...');
ManticoreProject::clear(); ManticoreProject::clear();
@ -75,7 +74,7 @@ class SyncProjectToManticore extends Command
private function setLock(): void private function setLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
} }
private function releaseLock(): void private function releaseLock(): void
@ -86,33 +85,52 @@ class SyncProjectToManticore extends Command
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
$this->releaseLock(); $this->info("\n收到信号,将在当前批次完成后退出...");
exit(0); $this->shouldStop = true;
} }
private function syncProjects(): void private function syncProjects(): void
{ {
$lastKey = "sync:manticoreProjectLastId"; $lastKey = "sync:manticoreProjectLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; $isIncremental = $this->option('i');
$sleepSeconds = intval($this->option('sleep'));
if ($lastId > 0) {
$this->info("\n同步项目数据({$lastId}...");
} else {
$this->info("\n同步项目数据...");
}
// 排除已归档项目
$query = Project::where('id', '>', $lastId)
->whereNull('archived_at');
$num = 0;
$count = $query->count();
$batchSize = $this->option('batch'); $batchSize = $this->option('batch');
$total = 0; $round = 0;
$lastNum = 0;
do { do {
$round++;
$lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($round === 1) {
if ($lastId > 0) {
$this->info("\n增量同步项目数据从ID {$lastId} 开始)...");
} else {
$this->info("\n全量同步项目数据...");
}
}
$count = Project::where('id', '>', $lastId)
->whereNull('archived_at')
->count();
if ($count === 0) {
if ($round === 1) {
$this->info("无待同步数据");
}
break;
}
$this->info("[第 {$round} 轮] 待同步 {$count} 个项目");
$num = 0;
$total = 0;
do {
if ($this->shouldStop) {
break;
}
$projects = Project::where('id', '>', $lastId) $projects = Project::where('id', '>', $lastId)
->whereNull('archived_at') ->whereNull('archived_at')
->orderBy('id') ->orderBy('id')
@ -125,21 +143,36 @@ class SyncProjectToManticore extends Command
$num += count($projects); $num += count($projects);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100; $progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) { $this->info("{$num}/{$count} ({$progress}%) 项目ID {$projects->first()->id} ~ {$projects->last()->id}");
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步项目ID {$projects->first()->id} ~ {$projects->last()->id} ({$total}|{$lastNum})");
$this->setLock(); $this->setLock();
$lastNum = ManticoreProject::batchSync($projects); $syncCount = ManticoreProject::batchSync($projects);
$total += $lastNum; $total += $syncCount;
$lastId = $projects->last()->id; $lastId = $projects->last()->id;
ManticoreKeyValue::set($lastKey, $lastId); ManticoreKeyValue::set($lastKey, $lastId);
} while (count($projects) == $batchSize); } while (count($projects) == $batchSize && !$this->shouldStop);
$this->info("同步项目结束 - 最后ID {$lastId}"); $this->info("[第 {$round} 轮] 完成,同步 {$total}最后ID {$lastId}");
if ($isIncremental && !$this->shouldStop) {
$newCount = Project::where('id', '>', $lastId)
->whereNull('archived_at')
->count();
if ($newCount > 0) {
$this->info("发现 {$newCount} 个新项目,{$sleepSeconds} 秒后继续...");
sleep($sleepSeconds);
continue;
}
}
break;
} while (!$this->shouldStop);
$this->info("同步项目结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0));
$this->info("已索引项目数量: " . ManticoreProject::getIndexedCount()); $this->info("已索引项目数量: " . ManticoreProject::getIndexedCount());
} }
} }

View File

@ -14,18 +14,20 @@ class SyncTaskToManticore extends Command
/** /**
* 更新数据MVA 方案allowed_users 在同步时自动写入) * 更新数据MVA 方案allowed_users 在同步时自动写入)
* --f: 全量更新 (默认) * --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上 * --i: 增量更新从上次更新的最后一个ID接上,持续处理直到完成
* *
* 清理数据 * 清理数据
* --c: 清除索引 * --c: 清除索引
*
* 其他选项
* --sleep: 每批处理完成后休眠秒数(增量模式)
*/ */
protected $signature = 'manticore:sync-tasks {--f} {--i} {--c} {--batch=100}'; protected $signature = 'manticore:sync-tasks {--f} {--i} {--c} {--batch=100} {--sleep=3}';
protected $description = '同步任务数据到 Manticore SearchMVA 权限方案)'; protected $description = '同步任务数据到 Manticore SearchMVA 权限方案)';
/** private bool $shouldStop = false;
* @return int
*/
public function handle(): int public function handle(): int
{ {
if (!Apps::isInstalled("manticore")) { if (!Apps::isInstalled("manticore")) {
@ -33,14 +35,12 @@ class SyncTaskToManticore extends Command
return 1; return 1;
} }
// 注册信号处理器
if (extension_loaded('pcntl')) { if (extension_loaded('pcntl')) {
pcntl_async_signals(true); pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']);
} }
// 检查锁
$lockInfo = $this->getLock(); $lockInfo = $this->getLock();
if ($lockInfo) { if ($lockInfo) {
$this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}");
@ -49,7 +49,6 @@ class SyncTaskToManticore extends Command
$this->setLock(); $this->setLock();
// 清除索引
if ($this->option('c')) { if ($this->option('c')) {
$this->info('清除索引...'); $this->info('清除索引...');
ManticoreTask::clear(); ManticoreTask::clear();
@ -75,7 +74,7 @@ class SyncTaskToManticore extends Command
private function setLock(): void private function setLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
} }
private function releaseLock(): void private function releaseLock(): void
@ -86,34 +85,53 @@ class SyncTaskToManticore extends Command
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
$this->releaseLock(); $this->info("\n收到信号,将在当前批次完成后退出...");
exit(0); $this->shouldStop = true;
} }
private function syncTasks(): void private function syncTasks(): void
{ {
$lastKey = "sync:manticoreTaskLastId"; $lastKey = "sync:manticoreTaskLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; $isIncremental = $this->option('i');
$sleepSeconds = intval($this->option('sleep'));
if ($lastId > 0) {
$this->info("\n同步任务数据({$lastId}...");
} else {
$this->info("\n同步任务数据...");
}
// 排除已归档和已删除的任务
$query = ProjectTask::where('id', '>', $lastId)
->whereNull('archived_at')
->whereNull('deleted_at');
$num = 0;
$count = $query->count();
$batchSize = $this->option('batch'); $batchSize = $this->option('batch');
$total = 0; $round = 0;
$lastNum = 0;
do { do {
$round++;
$lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($round === 1) {
if ($lastId > 0) {
$this->info("\n增量同步任务数据从ID {$lastId} 开始)...");
} else {
$this->info("\n全量同步任务数据...");
}
}
$count = ProjectTask::where('id', '>', $lastId)
->whereNull('archived_at')
->whereNull('deleted_at')
->count();
if ($count === 0) {
if ($round === 1) {
$this->info("无待同步数据");
}
break;
}
$this->info("[第 {$round} 轮] 待同步 {$count} 个任务");
$num = 0;
$total = 0;
do {
if ($this->shouldStop) {
break;
}
$tasks = ProjectTask::where('id', '>', $lastId) $tasks = ProjectTask::where('id', '>', $lastId)
->whereNull('archived_at') ->whereNull('archived_at')
->whereNull('deleted_at') ->whereNull('deleted_at')
@ -127,21 +145,37 @@ class SyncTaskToManticore extends Command
$num += count($tasks); $num += count($tasks);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100; $progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) { $this->info("{$num}/{$count} ({$progress}%) 任务ID {$tasks->first()->id} ~ {$tasks->last()->id}");
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步任务ID {$tasks->first()->id} ~ {$tasks->last()->id} ({$total}|{$lastNum})");
$this->setLock(); $this->setLock();
$lastNum = ManticoreTask::batchSync($tasks); $syncCount = ManticoreTask::batchSync($tasks);
$total += $lastNum; $total += $syncCount;
$lastId = $tasks->last()->id; $lastId = $tasks->last()->id;
ManticoreKeyValue::set($lastKey, $lastId); ManticoreKeyValue::set($lastKey, $lastId);
} while (count($tasks) == $batchSize); } while (count($tasks) == $batchSize && !$this->shouldStop);
$this->info("同步任务结束 - 最后ID {$lastId}"); $this->info("[第 {$round} 轮] 完成,同步 {$total}最后ID {$lastId}");
if ($isIncremental && !$this->shouldStop) {
$newCount = ProjectTask::where('id', '>', $lastId)
->whereNull('archived_at')
->whereNull('deleted_at')
->count();
if ($newCount > 0) {
$this->info("发现 {$newCount} 个新任务,{$sleepSeconds} 秒后继续...");
sleep($sleepSeconds);
continue;
}
}
break;
} while (!$this->shouldStop);
$this->info("同步任务结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0));
$this->info("已索引任务数量: " . ManticoreTask::getIndexedCount()); $this->info("已索引任务数量: " . ManticoreTask::getIndexedCount());
} }
} }

View File

@ -14,18 +14,20 @@ class SyncUserToManticore extends Command
/** /**
* 更新数据 * 更新数据
* --f: 全量更新 (默认) * --f: 全量更新 (默认)
* --i: 增量更新从上次更新的最后一个ID接上 * --i: 增量更新从上次更新的最后一个ID接上,持续处理直到完成
* *
* 清理数据 * 清理数据
* --c: 清除索引 * --c: 清除索引
*
* 其他选项
* --sleep: 每批处理完成后休眠秒数(增量模式)
*/ */
protected $signature = 'manticore:sync-users {--f} {--i} {--c} {--batch=100}'; protected $signature = 'manticore:sync-users {--f} {--i} {--c} {--batch=100} {--sleep=3}';
protected $description = '同步用户数据到 Manticore Search'; protected $description = '同步用户数据到 Manticore Search';
/** private bool $shouldStop = false;
* @return int
*/
public function handle(): int public function handle(): int
{ {
if (!Apps::isInstalled("manticore")) { if (!Apps::isInstalled("manticore")) {
@ -33,14 +35,12 @@ class SyncUserToManticore extends Command
return 1; return 1;
} }
// 注册信号处理器
if (extension_loaded('pcntl')) { if (extension_loaded('pcntl')) {
pcntl_async_signals(true); pcntl_async_signals(true);
pcntl_signal(SIGINT, [$this, 'handleSignal']); pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGTERM, [$this, 'handleSignal']);
} }
// 检查锁
$lockInfo = $this->getLock(); $lockInfo = $this->getLock();
if ($lockInfo) { if ($lockInfo) {
$this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}"); $this->error("命令已在运行中,开始时间: {$lockInfo['started_at']}");
@ -49,7 +49,6 @@ class SyncUserToManticore extends Command
$this->setLock(); $this->setLock();
// 清除索引
if ($this->option('c')) { if ($this->option('c')) {
$this->info('清除索引...'); $this->info('清除索引...');
ManticoreUser::clear(); ManticoreUser::clear();
@ -75,7 +74,7 @@ class SyncUserToManticore extends Command
private function setLock(): void private function setLock(): void
{ {
$lockKey = md5($this->signature); $lockKey = md5($this->signature);
Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 600); Cache::put($lockKey, ['started_at' => date('Y-m-d H:i:s')], 1800);
} }
private function releaseLock(): void private function releaseLock(): void
@ -86,34 +85,53 @@ class SyncUserToManticore extends Command
public function handleSignal(int $signal): void public function handleSignal(int $signal): void
{ {
$this->releaseLock(); $this->info("\n收到信号,将在当前批次完成后退出...");
exit(0); $this->shouldStop = true;
} }
private function syncUsers(): void private function syncUsers(): void
{ {
$lastKey = "sync:manticoreUserLastId"; $lastKey = "sync:manticoreUserLastId";
$lastId = $this->option('i') ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0; $isIncremental = $this->option('i');
$sleepSeconds = intval($this->option('sleep'));
if ($lastId > 0) {
$this->info("\n同步用户数据({$lastId}...");
} else {
$this->info("\n同步用户数据...");
}
// 排除机器人和已禁用账号
$query = User::where('userid', '>', $lastId)
->where('bot', 0)
->whereNull('disable_at');
$num = 0;
$count = $query->count();
$batchSize = $this->option('batch'); $batchSize = $this->option('batch');
$total = 0; $round = 0;
$lastNum = 0;
do { do {
$round++;
$lastId = $isIncremental ? intval(ManticoreKeyValue::get($lastKey, 0)) : 0;
if ($round === 1) {
if ($lastId > 0) {
$this->info("\n增量同步用户数据从ID {$lastId} 开始)...");
} else {
$this->info("\n全量同步用户数据...");
}
}
$count = User::where('userid', '>', $lastId)
->where('bot', 0)
->whereNull('disable_at')
->count();
if ($count === 0) {
if ($round === 1) {
$this->info("无待同步数据");
}
break;
}
$this->info("[第 {$round} 轮] 待同步 {$count} 个用户");
$num = 0;
$total = 0;
do {
if ($this->shouldStop) {
break;
}
$users = User::where('userid', '>', $lastId) $users = User::where('userid', '>', $lastId)
->where('bot', 0) ->where('bot', 0)
->whereNull('disable_at') ->whereNull('disable_at')
@ -127,22 +145,37 @@ class SyncUserToManticore extends Command
$num += count($users); $num += count($users);
$progress = $count > 0 ? round($num / $count * 100, 2) : 100; $progress = $count > 0 ? round($num / $count * 100, 2) : 100;
if ($progress < 100) { $this->info("{$num}/{$count} ({$progress}%) 用户ID {$users->first()->userid} ~ {$users->last()->userid}");
$progress = number_format($progress, 2);
}
$this->info("{$num}/{$count} ({$progress}%) 正在同步用户ID {$users->first()->userid} ~ {$users->last()->userid} ({$total}|{$lastNum})");
$this->setLock(); $this->setLock();
$lastNum = ManticoreUser::batchSync($users); $syncCount = ManticoreUser::batchSync($users);
$total += $lastNum; $total += $syncCount;
$lastId = $users->last()->userid; $lastId = $users->last()->userid;
ManticoreKeyValue::set($lastKey, $lastId); ManticoreKeyValue::set($lastKey, $lastId);
} while (count($users) == $batchSize); } while (count($users) == $batchSize && !$this->shouldStop);
$this->info("同步用户结束 - 最后ID {$lastId}"); $this->info("[第 {$round} 轮] 完成,同步 {$total}最后ID {$lastId}");
if ($isIncremental && !$this->shouldStop) {
$newCount = User::where('userid', '>', $lastId)
->where('bot', 0)
->whereNull('disable_at')
->count();
if ($newCount > 0) {
$this->info("发现 {$newCount} 个新用户,{$sleepSeconds} 秒后继续...");
sleep($sleepSeconds);
continue;
}
}
break;
} while (!$this->shouldStop);
$this->info("同步用户结束(共 {$round} 轮)- 最后ID: " . ManticoreKeyValue::get($lastKey, 0));
$this->info("已索引用户数量: " . ManticoreUser::getIndexedCount()); $this->info("已索引用户数量: " . ManticoreUser::getIndexedCount());
} }
} }

View File

@ -187,34 +187,38 @@ class ManticoreSyncTask extends AbstractTask
} }
/** /**
* 增量更新(定时执行) * 增量更新(定时执行 - 兜底机制)
* 使用 --i 参数执行增量同步,会同步新增的向量数据 *
* 命令本身会持续处理直到完成,定时器只是确保命令在运行
* 如果命令正在运行(有锁),则跳过本次触发
* *
* @return void * @return void
*/ */
private function incrementalUpdate() private function incrementalUpdate()
{ {
// 执行增量全文索引同步10分钟执行一次 // 兜底触发:每 2 分钟检查一次,如果命令没在运行则启动
$time = intval(Cache::get("ManticoreSyncTask:CheckTime"));
if (time() - $time < 2 * 60) {
return;
}
Cache::put("ManticoreSyncTask:CheckTime", time(), Carbon::now()->addMinutes(5));
// 执行增量全文索引同步(命令会持续处理直到完成)
$this->runIncrementalSync(); $this->runIncrementalSync();
// 执行向量生成10分钟执行一次与全文索引独立 // 执行向量生成(命令会持续处理直到完成
$this->runVectorGeneration(); $this->runVectorGeneration();
} }
/** /**
* 执行增量全文索引同步 * 执行增量全文索引同步(兜底触发)
*
* 命令内部有锁机制,如果已在运行会自动跳过
* 命令会持续处理直到无新数据,然后自动退出
*/ */
private function runIncrementalSync(): void private function runIncrementalSync(): void
{ {
$time = intval(Cache::get("ManticoreSyncTask:SyncTime")); // 启动各类型的增量同步命令(命令内部有锁,重复启动会自动跳过)
if (time() - $time < 10 * 60) {
return;
}
// 执行开始
Cache::put("ManticoreSyncTask:SyncTime", time(), Carbon::now()->addMinutes(15));
// 执行增量同步MVA 方案不需要单独同步关系表)
@shell_exec("php /var/www/artisan manticore:sync-files --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-files --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-users --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-users --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &"); @shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &");
@ -223,25 +227,19 @@ class ManticoreSyncTask extends AbstractTask
} }
/** /**
* 执行向量生成(异步批量处理) * 执行向量生成(兜底触发)
*
* 命令内部有锁机制,如果已在运行会自动跳过
* 命令会持续处理直到无待处理数据,然后自动退出
*/ */
private function runVectorGeneration(): void private function runVectorGeneration(): void
{ {
// 检查 AI 是否安装
if (!Apps::isInstalled("ai")) { if (!Apps::isInstalled("ai")) {
return; return;
} }
$time = intval(Cache::get("ManticoreSyncTask:VectorTime")); // 启动向量生成命令(命令内部有锁,重复启动会自动跳过)
if (time() - $time < 10 * 60) { @shell_exec("php /var/www/artisan manticore:generate-vectors --type=all --batch=50 2>&1 &");
return;
}
// 执行开始
Cache::put("ManticoreSyncTask:VectorTime", time(), Carbon::now()->addMinutes(15));
// 执行向量生成批量处理每轮最多500条
@shell_exec("php /var/www/artisan manticore:generate-vectors --type=all --batch=20 --max=500 2>&1 &");
} }
public function end() public function end()