feat(manticore): 添加同步失败自动重试机制

- 新增 ManticoreSyncFailure 模型记录同步失败的条目
  - 添加 RetryManticoreSync 命令实现失败重试逻辑
  - ManticoreBase 增加 runWithRetry 包装器,连接断开时自动重连
  - 统一 deleteVector 方法,减少重复代码
  - 修复 quoteValue 传入非字符串的类型问题
This commit is contained in:
kuaifan 2026-01-15 08:28:55 +00:00
parent fb7731ddcd
commit 07360a8d2c
5 changed files with 532 additions and 116 deletions

View File

@ -0,0 +1,188 @@
<?php
namespace App\Console\Commands;
use App\Console\Commands\Traits\ManticoreSyncLock;
use App\Models\File;
use App\Models\ManticoreSyncFailure;
use App\Models\Project;
use App\Models\ProjectTask;
use App\Models\User;
use App\Models\WebSocketDialogMsg;
use App\Module\Apps;
use App\Module\Manticore\ManticoreBase;
use App\Module\Manticore\ManticoreFile;
use App\Module\Manticore\ManticoreMsg;
use App\Module\Manticore\ManticoreProject;
use App\Module\Manticore\ManticoreTask;
use App\Module\Manticore\ManticoreUser;
use Illuminate\Console\Command;
class RetryManticoreSync extends Command
{
use ManticoreSyncLock;
protected $signature = 'manticore:retry-failures {--limit=100 : 每次处理的最大数量} {--stats : 显示统计信息}';
protected $description = '重试 Manticore 同步失败的记录';
public function handle(): int
{
if (!Apps::isInstalled("search")) {
$this->error("应用「Manticore Search」未安装");
return 1;
}
// 显示统计信息
if ($this->option('stats')) {
$this->showStats();
return 0;
}
$this->registerSignalHandlers();
if (!$this->acquireLock()) {
return 1;
}
$this->info('开始重试失败的同步任务...');
$limit = intval($this->option('limit'));
$failures = ManticoreSyncFailure::getPendingRetries($limit);
if ($failures->isEmpty()) {
$this->info('无待重试的记录');
$this->releaseLock();
return 0;
}
$this->info("找到 {$failures->count()} 条待重试记录");
$successCount = 0;
$failCount = 0;
foreach ($failures as $failure) {
if ($this->shouldStop) {
$this->info('收到停止信号,退出处理');
break;
}
$this->setLock();
$result = $this->retryOne($failure);
if ($result) {
$successCount++;
$this->info(" [成功] {$failure->data_type}:{$failure->data_id} ({$failure->action})");
} else {
$failCount++;
$this->warn(" [失败] {$failure->data_type}:{$failure->data_id} ({$failure->action}) - 第 {$failure->retry_count}");
}
}
$this->info("\n重试完成: 成功 {$successCount}, 失败 {$failCount}");
$this->releaseLock();
return 0;
}
/**
* 重试单条失败记录
*/
private function retryOne(ManticoreSyncFailure $failure): bool
{
$type = $failure->data_type;
$id = $failure->data_id;
$action = $failure->action;
try {
if ($action === 'delete') {
// 删除操作直接调用通用删除方法
return ManticoreBase::deleteVector($type, $id);
}
// sync 操作需要根据类型获取模型并同步
return $this->retrySyncByType($type, $id);
} catch (\Throwable $e) {
// 记录失败(会自动更新重试次数和时间)
ManticoreSyncFailure::recordFailure($type, $id, $action, $e->getMessage());
return false;
}
}
/**
* 根据类型重试同步
*/
private function retrySyncByType(string $type, int $id): bool
{
switch ($type) {
case 'msg':
$model = WebSocketDialogMsg::find($id);
if (!$model) {
// 数据已删除,移除失败记录
ManticoreSyncFailure::removeSuccess($type, $id, 'sync');
return true;
}
return ManticoreMsg::sync($model);
case 'file':
$model = File::find($id);
if (!$model) {
ManticoreSyncFailure::removeSuccess($type, $id, 'sync');
return true;
}
return ManticoreFile::sync($model);
case 'task':
$model = ProjectTask::find($id);
if (!$model) {
ManticoreSyncFailure::removeSuccess($type, $id, 'sync');
return true;
}
return ManticoreTask::sync($model);
case 'project':
$model = Project::find($id);
if (!$model) {
ManticoreSyncFailure::removeSuccess($type, $id, 'sync');
return true;
}
return ManticoreProject::sync($model);
case 'user':
$model = User::find($id);
if (!$model) {
ManticoreSyncFailure::removeSuccess($type, $id, 'sync');
return true;
}
return ManticoreUser::sync($model);
default:
return false;
}
}
/**
* 显示统计信息
*/
private function showStats(): void
{
$stats = ManticoreSyncFailure::getStats();
$this->info('Manticore 同步失败统计:');
$this->info(" 总数: {$stats['total']}");
if (!empty($stats['by_type'])) {
$this->info(' 按类型:');
foreach ($stats['by_type'] as $type => $count) {
$this->info(" - {$type}: {$count}");
}
}
if (!empty($stats['by_action'])) {
$this->info(' 按操作:');
foreach ($stats['by_action'] as $action => $count) {
$this->info(" - {$action}: {$count}");
}
}
}
}

View File

@ -0,0 +1,132 @@
<?php
namespace App\Models;
/**
* Manticore 同步失败记录
*
* @property int $id
* @property string $data_type 数据类型: msg/file/task/project/user
* @property int $data_id 数据ID
* @property string $action 操作类型: sync/delete
* @property string|null $error_message 错误信息
* @property int $retry_count 重试次数
* @property \Carbon\Carbon|null $last_retry_at 最后重试时间
* @property \Carbon\Carbon $created_at
* @property \Carbon\Carbon $updated_at
*/
class ManticoreSyncFailure extends AbstractModel
{
protected $table = 'manticore_sync_failures';
protected $fillable = [
'data_type',
'data_id',
'action',
'error_message',
'retry_count',
'last_retry_at',
];
protected $dates = [
'last_retry_at',
'created_at',
'updated_at',
];
/**
* 记录同步失败
*
* @param string $dataType 数据类型
* @param int $dataId 数据ID
* @param string $action 操作类型 sync/delete
* @param string $errorMessage 错误信息
*/
public static function recordFailure(string $dataType, int $dataId, string $action, string $errorMessage = ''): void
{
self::updateOrCreate(
[
'data_type' => $dataType,
'data_id' => $dataId,
'action' => $action,
],
[
'error_message' => mb_substr($errorMessage, 0, 500),
'retry_count' => \DB::raw('retry_count + 1'),
'last_retry_at' => now(),
]
);
}
/**
* 删除成功记录
*
* @param string $dataType 数据类型
* @param int $dataId 数据ID
* @param string $action 操作类型
*/
public static function removeSuccess(string $dataType, int $dataId, string $action): void
{
self::where('data_type', $dataType)
->where('data_id', $dataId)
->where('action', $action)
->delete();
}
/**
* 获取待重试的记录
* 根据重试次数决定间隔1次=1分钟2次=5分钟3次=15分钟4次+=30分钟
*
* @param int $limit 数量限制
* @return \Illuminate\Database\Eloquent\Collection
*/
public static function getPendingRetries(int $limit = 100)
{
return self::where(function ($query) {
$query->whereNull('last_retry_at')
->orWhere(function ($q) {
// 根据重试次数决定间隔
$q->where(function ($sub) {
// 重试1次等待1分钟
$sub->where('retry_count', 1)
->where('last_retry_at', '<', now()->subMinutes(1));
})->orWhere(function ($sub) {
// 重试2次等待5分钟
$sub->where('retry_count', 2)
->where('last_retry_at', '<', now()->subMinutes(5));
})->orWhere(function ($sub) {
// 重试3次等待15分钟
$sub->where('retry_count', 3)
->where('last_retry_at', '<', now()->subMinutes(15));
})->orWhere(function ($sub) {
// 重试4次以上等待30分钟
$sub->where('retry_count', '>=', 4)
->where('last_retry_at', '<', now()->subMinutes(30));
});
});
})
->orderBy('last_retry_at')
->limit($limit)
->get();
}
/**
* 获取统计信息
*
* @return array
*/
public static function getStats(): array
{
return [
'total' => self::count(),
'by_type' => self::selectRaw('data_type, COUNT(*) as count')
->groupBy('data_type')
->pluck('count', 'data_type')
->toArray(),
'by_action' => self::selectRaw('action, COUNT(*) as count')
->groupBy('action')
->pluck('count', 'action')
->toArray(),
];
}
}

View File

@ -2,6 +2,7 @@
namespace App\Module\Manticore;
use App\Models\ManticoreSyncFailure;
use App\Module\Apps;
use App\Module\Base;
use App\Module\AI;
@ -173,6 +174,71 @@ class ManticoreBase
self::$initialized = false;
}
/**
* 判断是否为连接断开错误
* 参考 Laravel Illuminate\Database\DetectsLostConnections
*/
private function isConnectionLostError(PDOException $e): bool
{
$message = $e->getMessage();
return stripos($message, 'server has gone away') !== false
|| stripos($message, 'no connection to the server') !== false
|| stripos($message, 'Lost connection') !== false
|| stripos($message, 'is dead or not enabled') !== false
|| stripos($message, 'Error while sending') !== false
|| stripos($message, 'decryption failed or bad record mac') !== false
|| stripos($message, 'server closed the connection unexpectedly') !== false
|| stripos($message, 'SSL connection has been closed unexpectedly') !== false
|| stripos($message, 'Error writing data to the connection') !== false
|| stripos($message, 'Resource deadlock avoided') !== false
|| stripos($message, 'Transaction() on null') !== false
|| stripos($message, 'child connection forced to terminate') !== false
|| stripos($message, 'query_wait_timeout') !== false
|| stripos($message, 'reset by peer') !== false
|| stripos($message, 'Physical connection is not usable') !== false
|| stripos($message, 'Packets out of order') !== false
|| stripos($message, 'Adaptive Server connection failed') !== false
|| stripos($message, 'Connection was killed') !== false
|| stripos($message, 'Broken pipe') !== false;
}
/**
* 带重试的执行包装器
* 正常情况零开销,仅在连接断开时重试一次
*
* @param callable $callback 执行回调,接收 PDO 参数
* @param mixed $failureReturn 失败时的返回值
* @param array $logContext 日志上下文
* @return mixed
*/
private function runWithRetry(callable $callback, $failureReturn = false, array $logContext = [])
{
$pdo = $this->getConnection();
if (!$pdo) {
return $failureReturn;
}
try {
return $callback($pdo);
} catch (PDOException $e) {
// 如果是连接断开错误,重置连接并重试一次
if ($this->isConnectionLostError($e)) {
self::resetConnection();
$pdo = $this->getConnection();
if ($pdo) {
try {
return $callback($pdo);
} catch (PDOException $retryException) {
Log::error('Manticore retry failed: ' . $retryException->getMessage(), $logContext);
return $failureReturn;
}
}
}
Log::error('Manticore error: ' . $e->getMessage(), $logContext);
return $failureReturn;
}
}
/**
* 检查是否已安装
*/
@ -190,20 +256,14 @@ class ManticoreBase
*/
public function executeRaw(string $sql): bool
{
$pdo = $this->getConnection();
if (!$pdo) {
return false;
}
try {
$pdo->exec($sql);
return true;
} catch (PDOException $e) {
Log::error('Manticore executeRaw error: ' . $e->getMessage(), [
'sql' => $sql,
]);
return false;
}
return $this->runWithRetry(
function (PDO $pdo) use ($sql) {
$pdo->exec($sql);
return true;
},
false,
['sql' => $sql]
);
}
/**
@ -247,22 +307,15 @@ class ManticoreBase
*/
public function execute(string $sql, array $params = []): bool
{
$pdo = $this->getConnection();
if (!$pdo) {
return false;
}
try {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
return $stmt->execute();
} catch (PDOException $e) {
Log::error('Manticore execute error: ' . $e->getMessage(), [
'sql' => $sql,
'params' => $params
]);
return false;
}
return $this->runWithRetry(
function (PDO $pdo) use ($sql, $params) {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
return $stmt->execute();
},
false,
['sql' => $sql, 'params' => $params]
);
}
/**
@ -274,23 +327,16 @@ class ManticoreBase
*/
public function executeWithRowCount(string $sql, array $params = []): int
{
$pdo = $this->getConnection();
if (!$pdo) {
return -1;
}
try {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
return $stmt->rowCount();
} catch (PDOException $e) {
Log::error('Manticore execute error: ' . $e->getMessage(), [
'sql' => $sql,
'params' => $params
]);
return -1;
}
return $this->runWithRetry(
function (PDO $pdo) use ($sql, $params) {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
return $stmt->rowCount();
},
-1,
['sql' => $sql, 'params' => $params]
);
}
/**
@ -302,23 +348,16 @@ class ManticoreBase
*/
public function query(string $sql, array $params = []): array
{
$pdo = $this->getConnection();
if (!$pdo) {
return [];
}
try {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
return $this->convertNumericTypes($stmt->fetchAll());
} catch (PDOException $e) {
Log::error('Manticore query error: ' . $e->getMessage(), [
'sql' => $sql,
'params' => $params
]);
return [];
}
return $this->runWithRetry(
function (PDO $pdo) use ($sql, $params) {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
return $this->convertNumericTypes($stmt->fetchAll());
},
[],
['sql' => $sql, 'params' => $params]
);
}
/**
@ -330,24 +369,17 @@ class ManticoreBase
*/
public function queryOne(string $sql, array $params = []): ?array
{
$pdo = $this->getConnection();
if (!$pdo) {
return null;
}
try {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
$result = $stmt->fetch();
return $result ? $this->convertNumericTypesRow($result) : null;
} catch (PDOException $e) {
Log::error('Manticore queryOne error: ' . $e->getMessage(), [
'sql' => $sql,
'params' => $params
]);
return null;
}
return $this->runWithRetry(
function (PDO $pdo) use ($sql, $params) {
$stmt = $pdo->prepare($sql);
$this->bindParams($stmt, $params);
$stmt->execute();
$result = $stmt->fetch();
return $result ? $this->convertNumericTypesRow($result) : null;
},
null,
['sql' => $sql, 'params' => $params]
);
}
/**
@ -670,12 +702,7 @@ class ManticoreBase
*/
public static function deleteFileVector(int $fileId): bool
{
if ($fileId <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM file_vectors WHERE file_id = ?", [$fileId]);
return self::deleteVector('file', $fileId);
}
/**
@ -909,12 +936,7 @@ class ManticoreBase
*/
public static function deleteUserVector(int $userid): bool
{
if ($userid <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM user_vectors WHERE userid = ?", [$userid]);
return self::deleteVector('user', $userid);
}
/**
@ -1151,12 +1173,7 @@ class ManticoreBase
*/
public static function deleteProjectVector(int $projectId): bool
{
if ($projectId <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM project_vectors WHERE project_id = ?", [$projectId]);
return self::deleteVector('project', $projectId);
}
/**
@ -1421,12 +1438,7 @@ class ManticoreBase
*/
public static function deleteTaskVector(int $taskId): bool
{
if ($taskId <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM task_vectors WHERE task_id = ?", [$taskId]);
return self::deleteVector('task', $taskId);
}
/**
@ -1671,12 +1683,7 @@ class ManticoreBase
*/
public static function deleteMsgVector(int $msgId): bool
{
if ($msgId <= 0) {
return false;
}
$instance = new self();
return $instance->execute("DELETE FROM msg_vectors WHERE msg_id = ?", [$msgId]);
return self::deleteVector('msg', $msgId);
}
/**
@ -1847,7 +1854,7 @@ class ManticoreBase
if (in_array($field, self::NUMERIC_FIELDS)) {
$valueList[] = (int)$value;
} else {
$valueList[] = $instance->quoteValue($value);
$valueList[] = $instance->quoteValue((string)$value);
}
}
@ -1870,7 +1877,50 @@ class ManticoreBase
// 构建并执行 SQL
$sql = "INSERT INTO {$table} (" . implode(', ', $fieldList) . ") VALUES (" . implode(', ', $valueList) . ")";
return $instance->executeRaw($sql);
$result = $instance->executeRaw($sql);
// 记录同步结果
if ($result) {
// 成功则删除失败记录(如果有)
ManticoreSyncFailure::removeSuccess($type, $pkValue, 'sync');
} else {
// 失败则记录
ManticoreSyncFailure::recordFailure($type, $pkValue, 'sync', "INSERT failed for {$table}");
}
return $result;
}
/**
* 通用向量删除方法
*
* @param string $type 类型: msg/file/task/project/user
* @param int $id 数据ID
* @return bool 是否成功
*/
public static function deleteVector(string $type, int $id): bool
{
if (!isset(self::VECTOR_TABLE_CONFIG[$type]) || $id <= 0) {
return false;
}
$config = self::VECTOR_TABLE_CONFIG[$type];
$table = $config['table'];
$pk = $config['pk'];
$instance = new self();
$result = $instance->execute("DELETE FROM {$table} WHERE {$pk} = ?", [$id]);
// 记录删除结果
if ($result) {
// 成功则删除失败记录(如果有)
ManticoreSyncFailure::removeSuccess($type, $id, 'delete');
} else {
// 失败则记录
ManticoreSyncFailure::recordFailure($type, $id, 'delete', "DELETE failed for {$table}");
}
return $result;
}
/**
@ -1955,7 +2005,7 @@ class ManticoreBase
if (in_array($field, self::NUMERIC_FIELDS)) {
$quotedValues[] = (int)$value;
} else {
$quotedValues[] = $instance->quoteValue($value);
$quotedValues[] = $instance->quoteValue((string)$value);
}
}
@ -1996,8 +2046,11 @@ class ManticoreBase
foreach ($insertStatements as $stmt) {
if ($instance->executeRaw($stmt['sql'])) {
$successCount++;
// 成功则删除失败记录(如果有)
ManticoreSyncFailure::removeSuccess($type, $stmt['pk'], 'sync');
} else {
// 插入失败,数据已被删除,需要重新同步
// 失败则记录
ManticoreSyncFailure::recordFailure($type, $stmt['pk'], 'sync', "Batch INSERT failed for {$table}");
}
}

View File

@ -256,6 +256,9 @@ class ManticoreSyncTask extends AbstractTask
@shell_exec("php /var/www/artisan manticore:sync-projects --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-tasks --i 2>&1 &");
@shell_exec("php /var/www/artisan manticore:sync-msgs --i 2>&1 &");
// 启动失败重试命令
@shell_exec("php /var/www/artisan manticore:retry-failures 2>&1 &");
}
/**

View File

@ -0,0 +1,40 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
class CreateManticoreSyncFailuresTable extends Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::create('manticore_sync_failures', function (Blueprint $table) {
$table->id();
$table->string('data_type', 20)->comment('数据类型: msg/file/task/project/user');
$table->bigInteger('data_id')->comment('数据ID');
$table->string('action', 20)->comment('操作类型: sync/delete');
$table->string('error_message', 500)->nullable()->comment('错误信息');
$table->integer('retry_count')->default(0)->comment('重试次数');
$table->timestamp('last_retry_at')->nullable()->comment('最后重试时间');
$table->timestamps();
$table->unique(['data_type', 'data_id', 'action'], 'uk_type_id_action');
$table->index(['last_retry_at', 'retry_count'], 'idx_retry');
});
}
/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('manticore_sync_failures');
}
}