From 1af29837e28dc7d9f4d2dbc41495fc54f15cfe1d Mon Sep 17 00:00:00 2001 From: kuaifan Date: Wed, 31 Dec 2025 09:28:10 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=A2=9E=E9=87=8F?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=8A=9F=E8=83=BD=E4=BB=A5=E4=BC=98=E5=8C=96?= =?UTF-8?q?=20SeekDB=20=E7=94=A8=E6=88=B7=E5=85=B3=E7=B3=BB=E5=90=8C?= =?UTF-8?q?=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 SyncFileToSeekDB、SyncProjectToSeekDB 和 SyncTaskToSeekDB 中实现增量同步逻辑,支持只同步新增的用户关系。 - 新增 syncFileUsersIncremental、syncProjectUsersIncremental 和 syncTaskUsersIncremental 方法,提升数据同步效率。 - 更新相关命令行输出信息,以清晰指示同步状态和进度。 --- app/Console/Commands/SyncFileToSeekDB.php | 16 ++++- app/Console/Commands/SyncProjectToSeekDB.php | 16 ++++- app/Console/Commands/SyncTaskToSeekDB.php | 16 ++++- app/Module/SeekDB/SeekDBFile.php | 45 ++++++++++++ app/Module/SeekDB/SeekDBProject.php | 46 ++++++++++++ app/Module/SeekDB/SeekDBTask.php | 76 ++++++++++++++++++++ app/Tasks/SeekDBSyncTask.php | 4 +- 7 files changed, 212 insertions(+), 7 deletions(-) diff --git a/app/Console/Commands/SyncFileToSeekDB.php b/app/Console/Commands/SyncFileToSeekDB.php index 1ae62fbb4..588c57c52 100644 --- a/app/Console/Commands/SyncFileToSeekDB.php +++ b/app/Console/Commands/SyncFileToSeekDB.php @@ -79,15 +79,27 @@ class SyncFileToSeekDB extends Command // 同步文件数据 $this->syncFiles(); - // 全量同步时,同步文件用户关系 + // 同步文件用户关系 if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) { - $this->info("\n同步文件用户关系..."); + // 全量同步:清空后重建 + $this->info("\n全量同步文件用户关系..."); $count = SeekDBFile::syncAllFileUsers(function ($count) { if ($count % 1000 === 0) { $this->info(" 已同步 {$count} 条关系..."); } }); $this->info("文件用户关系同步完成,共 {$count} 条"); + } elseif ($this->option('i')) { + // 增量同步:只同步新增的 + $this->info("\n增量同步文件用户关系..."); + $count = SeekDBFile::syncFileUsersIncremental(function ($count) { + if ($count % 1000 === 0) { + $this->info(" 已同步 {$count} 条关系..."); + } + }); + if ($count > 0) { + $this->info("新增文件用户关系 {$count} 条"); + } } // 完成 diff --git a/app/Console/Commands/SyncProjectToSeekDB.php b/app/Console/Commands/SyncProjectToSeekDB.php index 2c904184a..3f4921675 100644 --- a/app/Console/Commands/SyncProjectToSeekDB.php +++ b/app/Console/Commands/SyncProjectToSeekDB.php @@ -76,15 +76,27 @@ class SyncProjectToSeekDB extends Command $this->info('开始同步项目数据...'); $this->syncProjects(); - // 全量同步时,同步项目成员关系 + // 同步项目成员关系 if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) { - $this->info("\n同步项目成员关系..."); + // 全量同步:清空后重建 + $this->info("\n全量同步项目成员关系..."); $count = SeekDBProject::syncAllProjectUsers(function ($count) { if ($count % 1000 === 0) { $this->info(" 已同步 {$count} 条关系..."); } }); $this->info("项目成员关系同步完成,共 {$count} 条"); + } elseif ($this->option('i')) { + // 增量同步:只同步新增的 + $this->info("\n增量同步项目成员关系..."); + $count = SeekDBProject::syncProjectUsersIncremental(function ($count) { + if ($count % 1000 === 0) { + $this->info(" 已同步 {$count} 条关系..."); + } + }); + if ($count > 0) { + $this->info("新增项目成员关系 {$count} 条"); + } } $this->info("\n同步完成"); diff --git a/app/Console/Commands/SyncTaskToSeekDB.php b/app/Console/Commands/SyncTaskToSeekDB.php index 6f8d8281f..216346927 100644 --- a/app/Console/Commands/SyncTaskToSeekDB.php +++ b/app/Console/Commands/SyncTaskToSeekDB.php @@ -76,15 +76,27 @@ class SyncTaskToSeekDB extends Command $this->info('开始同步任务数据...'); $this->syncTasks(); - // 全量同步时,同步任务成员关系 + // 同步任务成员关系 if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) { - $this->info("\n同步任务成员关系..."); + // 全量同步:清空后重建 + $this->info("\n全量同步任务成员关系..."); $count = SeekDBTask::syncAllTaskUsers(function ($count) { if ($count % 1000 === 0) { $this->info(" 已同步 {$count} 条关系..."); } }); $this->info("任务成员关系同步完成,共 {$count} 条"); + } elseif ($this->option('i')) { + // 增量同步:只同步新增的 + $this->info("\n增量同步任务成员关系..."); + $count = SeekDBTask::syncTaskUsersIncremental(function ($count) { + if ($count % 1000 === 0) { + $this->info(" 已同步 {$count} 条关系..."); + } + }); + if ($count > 0) { + $this->info("新增任务成员关系 {$count} 条"); + } } $this->info("\n同步完成"); diff --git a/app/Module/SeekDB/SeekDBFile.php b/app/Module/SeekDB/SeekDBFile.php index bed5b4afc..1aded75c6 100644 --- a/app/Module/SeekDB/SeekDBFile.php +++ b/app/Module/SeekDB/SeekDBFile.php @@ -535,5 +535,50 @@ class SeekDBFile return $count; } + + /** + * 增量同步文件用户关系(只同步新增的) + * + * @param callable|null $progressCallback 进度回调 + * @return int 同步数量 + */ + public static function syncFileUsersIncremental(?callable $progressCallback = null): int + { + if (!Apps::isInstalled("seekdb")) { + return 0; + } + + $count = 0; + $batchSize = 1000; + $lastKey = "sync:seekdbFileUserLastId"; + $lastId = intval(SeekDBKeyValue::get($lastKey, 0)); + + // 分批同步新增的记录 + while (true) { + $records = FileUser::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($records->isEmpty()) { + break; + } + + foreach ($records as $record) { + SeekDBBase::upsertFileUser($record->file_id, $record->userid, $record->permission); + $count++; + $lastId = $record->id; + } + + // 保存进度 + SeekDBKeyValue::set($lastKey, $lastId); + + if ($progressCallback) { + $progressCallback($count); + } + } + + return $count; + } } diff --git a/app/Module/SeekDB/SeekDBProject.php b/app/Module/SeekDB/SeekDBProject.php index 39230f515..8f6176d5b 100644 --- a/app/Module/SeekDB/SeekDBProject.php +++ b/app/Module/SeekDB/SeekDBProject.php @@ -7,6 +7,7 @@ use App\Models\ProjectUser; use App\Module\Apps; use App\Module\Base; use App\Module\AI; +use App\Module\SeekDB\SeekDBKeyValue; use Illuminate\Support\Facades\Log; /** @@ -380,5 +381,50 @@ class SeekDBProject return $count; } + + /** + * 增量同步项目成员关系(只同步新增的) + * + * @param callable|null $progressCallback 进度回调 + * @return int 同步数量 + */ + public static function syncProjectUsersIncremental(?callable $progressCallback = null): int + { + if (!Apps::isInstalled("seekdb")) { + return 0; + } + + $count = 0; + $batchSize = 1000; + $lastKey = "sync:seekdbProjectUserLastId"; + $lastId = intval(SeekDBKeyValue::get($lastKey, 0)); + + // 分批同步新增的记录 + while (true) { + $records = ProjectUser::where('id', '>', $lastId) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($records->isEmpty()) { + break; + } + + foreach ($records as $record) { + SeekDBBase::upsertProjectUser($record->project_id, $record->userid); + $count++; + $lastId = $record->id; + } + + // 保存进度 + SeekDBKeyValue::set($lastKey, $lastId); + + if ($progressCallback) { + $progressCallback($count); + } + } + + return $count; + } } diff --git a/app/Module/SeekDB/SeekDBTask.php b/app/Module/SeekDB/SeekDBTask.php index c06113eb7..21ea7319b 100644 --- a/app/Module/SeekDB/SeekDBTask.php +++ b/app/Module/SeekDB/SeekDBTask.php @@ -9,6 +9,7 @@ use App\Models\ProjectTaskVisibilityUser; use App\Module\Apps; use App\Module\Base; use App\Module\AI; +use App\Module\SeekDB\SeekDBKeyValue; use Illuminate\Support\Facades\Log; /** @@ -567,5 +568,80 @@ class SeekDBTask return $count; } + + /** + * 增量同步任务成员关系(只同步新增的) + * + * @param callable|null $progressCallback 进度回调 + * @return int 同步数量 + */ + public static function syncTaskUsersIncremental(?callable $progressCallback = null): int + { + if (!Apps::isInstalled("seekdb")) { + return 0; + } + + $count = 0; + $batchSize = 1000; + + // 同步 ProjectTaskUser 新增 + $lastKey1 = "sync:seekdbTaskUserLastId"; + $lastId1 = intval(SeekDBKeyValue::get($lastKey1, 0)); + + while (true) { + $records = ProjectTaskUser::where('id', '>', $lastId1) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($records->isEmpty()) { + break; + } + + foreach ($records as $record) { + SeekDBBase::upsertTaskUser($record->task_id, $record->userid); + if ($record->task_pid) { + SeekDBBase::upsertTaskUser($record->task_pid, $record->userid); + } + $count++; + $lastId1 = $record->id; + } + + SeekDBKeyValue::set($lastKey1, $lastId1); + + if ($progressCallback) { + $progressCallback($count); + } + } + + // 同步 ProjectTaskVisibilityUser 新增 + $lastKey2 = "sync:seekdbTaskVisibilityUserLastId"; + $lastId2 = intval(SeekDBKeyValue::get($lastKey2, 0)); + + while (true) { + $records = ProjectTaskVisibilityUser::where('id', '>', $lastId2) + ->orderBy('id') + ->limit($batchSize) + ->get(); + + if ($records->isEmpty()) { + break; + } + + foreach ($records as $record) { + SeekDBBase::upsertTaskUser($record->task_id, $record->userid); + $count++; + $lastId2 = $record->id; + } + + SeekDBKeyValue::set($lastKey2, $lastId2); + + if ($progressCallback) { + $progressCallback($count); + } + } + + return $count; + } } diff --git a/app/Tasks/SeekDBSyncTask.php b/app/Tasks/SeekDBSyncTask.php index b3e43b9f5..44e020dc0 100644 --- a/app/Tasks/SeekDBSyncTask.php +++ b/app/Tasks/SeekDBSyncTask.php @@ -212,6 +212,8 @@ class SeekDBSyncTask extends AbstractTask /** * 增量更新(定时执行) + * 使用 --i 参数执行增量同步,会同步新增的向量数据和用户关系数据 + * * @return void */ private function incrementalUpdate() @@ -225,7 +227,7 @@ class SeekDBSyncTask extends AbstractTask // 执行开始 Cache::put("SeekDBSyncTask:Time", time(), Carbon::now()->addMinutes(60)); - // 执行同步命令(后台运行) + // 执行增量同步(同时同步向量表和用户关系表的新增数据) @shell_exec("php /var/www/artisan seekdb:sync-files --i 2>&1 &"); @shell_exec("php /var/www/artisan seekdb:sync-users --i 2>&1 &"); @shell_exec("php /var/www/artisan seekdb:sync-projects --i 2>&1 &");