feat: 增加增量同步功能以优化 SeekDB 用户关系同步

- 在 SyncFileToSeekDB、SyncProjectToSeekDB 和 SyncTaskToSeekDB 中实现增量同步逻辑,支持只同步新增的用户关系。
- 新增 syncFileUsersIncremental、syncProjectUsersIncremental 和 syncTaskUsersIncremental 方法,提升数据同步效率。
- 更新相关命令行输出信息,以清晰指示同步状态和进度。
This commit is contained in:
kuaifan 2025-12-31 09:28:10 +00:00
parent 986c4871df
commit 1af29837e2
7 changed files with 212 additions and 7 deletions

View File

@ -79,15 +79,27 @@ class SyncFileToSeekDB extends Command
// 同步文件数据
$this->syncFiles();
// 全量同步时,同步文件用户关系
// 同步文件用户关系
if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) {
$this->info("\n同步文件用户关系...");
// 全量同步:清空后重建
$this->info("\n全量同步文件用户关系...");
$count = SeekDBFile::syncAllFileUsers(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
$this->info("文件用户关系同步完成,共 {$count}");
} elseif ($this->option('i')) {
// 增量同步:只同步新增的
$this->info("\n增量同步文件用户关系...");
$count = SeekDBFile::syncFileUsersIncremental(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
if ($count > 0) {
$this->info("新增文件用户关系 {$count}");
}
}
// 完成

View File

@ -76,15 +76,27 @@ class SyncProjectToSeekDB extends Command
$this->info('开始同步项目数据...');
$this->syncProjects();
// 全量同步时,同步项目成员关系
// 同步项目成员关系
if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) {
$this->info("\n同步项目成员关系...");
// 全量同步:清空后重建
$this->info("\n全量同步项目成员关系...");
$count = SeekDBProject::syncAllProjectUsers(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
$this->info("项目成员关系同步完成,共 {$count}");
} elseif ($this->option('i')) {
// 增量同步:只同步新增的
$this->info("\n增量同步项目成员关系...");
$count = SeekDBProject::syncProjectUsersIncremental(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
if ($count > 0) {
$this->info("新增项目成员关系 {$count}");
}
}
$this->info("\n同步完成");

View File

@ -76,15 +76,27 @@ class SyncTaskToSeekDB extends Command
$this->info('开始同步任务数据...');
$this->syncTasks();
// 全量同步时,同步任务成员关系
// 同步任务成员关系
if ($this->option('f') || (!$this->option('i') && !$this->option('u'))) {
$this->info("\n同步任务成员关系...");
// 全量同步:清空后重建
$this->info("\n全量同步任务成员关系...");
$count = SeekDBTask::syncAllTaskUsers(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
$this->info("任务成员关系同步完成,共 {$count}");
} elseif ($this->option('i')) {
// 增量同步:只同步新增的
$this->info("\n增量同步任务成员关系...");
$count = SeekDBTask::syncTaskUsersIncremental(function ($count) {
if ($count % 1000 === 0) {
$this->info(" 已同步 {$count} 条关系...");
}
});
if ($count > 0) {
$this->info("新增任务成员关系 {$count}");
}
}
$this->info("\n同步完成");

View File

@ -535,5 +535,50 @@ class SeekDBFile
return $count;
}
/**
* 增量同步文件用户关系(只同步新增的)
*
* @param callable|null $progressCallback 进度回调
* @return int 同步数量
*/
public static function syncFileUsersIncremental(?callable $progressCallback = null): int
{
if (!Apps::isInstalled("seekdb")) {
return 0;
}
$count = 0;
$batchSize = 1000;
$lastKey = "sync:seekdbFileUserLastId";
$lastId = intval(SeekDBKeyValue::get($lastKey, 0));
// 分批同步新增的记录
while (true) {
$records = FileUser::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($records->isEmpty()) {
break;
}
foreach ($records as $record) {
SeekDBBase::upsertFileUser($record->file_id, $record->userid, $record->permission);
$count++;
$lastId = $record->id;
}
// 保存进度
SeekDBKeyValue::set($lastKey, $lastId);
if ($progressCallback) {
$progressCallback($count);
}
}
return $count;
}
}

View File

@ -7,6 +7,7 @@ use App\Models\ProjectUser;
use App\Module\Apps;
use App\Module\Base;
use App\Module\AI;
use App\Module\SeekDB\SeekDBKeyValue;
use Illuminate\Support\Facades\Log;
/**
@ -380,5 +381,50 @@ class SeekDBProject
return $count;
}
/**
* 增量同步项目成员关系(只同步新增的)
*
* @param callable|null $progressCallback 进度回调
* @return int 同步数量
*/
public static function syncProjectUsersIncremental(?callable $progressCallback = null): int
{
if (!Apps::isInstalled("seekdb")) {
return 0;
}
$count = 0;
$batchSize = 1000;
$lastKey = "sync:seekdbProjectUserLastId";
$lastId = intval(SeekDBKeyValue::get($lastKey, 0));
// 分批同步新增的记录
while (true) {
$records = ProjectUser::where('id', '>', $lastId)
->orderBy('id')
->limit($batchSize)
->get();
if ($records->isEmpty()) {
break;
}
foreach ($records as $record) {
SeekDBBase::upsertProjectUser($record->project_id, $record->userid);
$count++;
$lastId = $record->id;
}
// 保存进度
SeekDBKeyValue::set($lastKey, $lastId);
if ($progressCallback) {
$progressCallback($count);
}
}
return $count;
}
}

View File

@ -9,6 +9,7 @@ use App\Models\ProjectTaskVisibilityUser;
use App\Module\Apps;
use App\Module\Base;
use App\Module\AI;
use App\Module\SeekDB\SeekDBKeyValue;
use Illuminate\Support\Facades\Log;
/**
@ -567,5 +568,80 @@ class SeekDBTask
return $count;
}
/**
* 增量同步任务成员关系(只同步新增的)
*
* @param callable|null $progressCallback 进度回调
* @return int 同步数量
*/
public static function syncTaskUsersIncremental(?callable $progressCallback = null): int
{
if (!Apps::isInstalled("seekdb")) {
return 0;
}
$count = 0;
$batchSize = 1000;
// 同步 ProjectTaskUser 新增
$lastKey1 = "sync:seekdbTaskUserLastId";
$lastId1 = intval(SeekDBKeyValue::get($lastKey1, 0));
while (true) {
$records = ProjectTaskUser::where('id', '>', $lastId1)
->orderBy('id')
->limit($batchSize)
->get();
if ($records->isEmpty()) {
break;
}
foreach ($records as $record) {
SeekDBBase::upsertTaskUser($record->task_id, $record->userid);
if ($record->task_pid) {
SeekDBBase::upsertTaskUser($record->task_pid, $record->userid);
}
$count++;
$lastId1 = $record->id;
}
SeekDBKeyValue::set($lastKey1, $lastId1);
if ($progressCallback) {
$progressCallback($count);
}
}
// 同步 ProjectTaskVisibilityUser 新增
$lastKey2 = "sync:seekdbTaskVisibilityUserLastId";
$lastId2 = intval(SeekDBKeyValue::get($lastKey2, 0));
while (true) {
$records = ProjectTaskVisibilityUser::where('id', '>', $lastId2)
->orderBy('id')
->limit($batchSize)
->get();
if ($records->isEmpty()) {
break;
}
foreach ($records as $record) {
SeekDBBase::upsertTaskUser($record->task_id, $record->userid);
$count++;
$lastId2 = $record->id;
}
SeekDBKeyValue::set($lastKey2, $lastId2);
if ($progressCallback) {
$progressCallback($count);
}
}
return $count;
}
}

View File

@ -212,6 +212,8 @@ class SeekDBSyncTask extends AbstractTask
/**
* 增量更新(定时执行)
* 使用 --i 参数执行增量同步,会同步新增的向量数据和用户关系数据
*
* @return void
*/
private function incrementalUpdate()
@ -225,7 +227,7 @@ class SeekDBSyncTask extends AbstractTask
// 执行开始
Cache::put("SeekDBSyncTask:Time", time(), Carbon::now()->addMinutes(60));
// 执行同步命令(后台运行
// 执行增量同步(同时同步向量表和用户关系表的新增数据
@shell_exec("php /var/www/artisan seekdb:sync-files --i 2>&1 &");
@shell_exec("php /var/www/artisan seekdb:sync-users --i 2>&1 &");
@shell_exec("php /var/www/artisan seekdb:sync-projects --i 2>&1 &");