findPendingTasks(); foreach ($tasks as $task) { // 为任务创建事件记录 $this->createEventRecords($task); // 投递异步分析任务 Task::deliver(new AiTaskAnalyzeTask($task->id)); } } /** * 查找待处理的任务 */ private function findPendingTasks(): \Illuminate\Support\Collection { $delayTime = Carbon::now()->subSeconds(self::DELAY_SECONDS); // 子查询:已经有 AI 事件记录的任务 $processedTaskIds = ProjectTaskAiEvent::select('task_id') ->distinct() ->pluck('task_id'); // 查询新建任务(未处理过的) $newTasks = ProjectTask::where('parent_id', 0) // 只处理主任务 ->whereNull('deleted_at') ->whereNull('archived_at') ->where('created_at', '<=', $delayTime) // 创建超过延迟时间 ->where('created_at', '>=', Carbon::now()->subDays(1)) // 只处理1天内的 ->whereNotNull('dialog_id') // 有对话ID ->whereNotIn('id', $processedTaskIds) ->orderBy('created_at', 'asc') ->take(self::BATCH_SIZE) ->get(); // 查询需要重试的任务(优先处理较早失败的) $retryTaskIds = ProjectTaskAiEvent::where('status', ProjectTaskAiEvent::STATUS_FAILED) ->where('retry_count', '<', ProjectTaskAiEvent::MAX_RETRY) ->select('task_id') ->distinct() ->orderBy('updated_at', 'asc') ->take(self::BATCH_SIZE - $newTasks->count()) ->pluck('task_id'); $retryTasks = ProjectTask::whereIn('id', $retryTaskIds) ->whereNull('deleted_at') ->get(); return $newTasks->merge($retryTasks)->take(self::BATCH_SIZE); } /** * 为任务创建事件记录 */ private function createEventRecords(ProjectTask $task): void { foreach (ProjectTaskAiEvent::getEventTypes() as $eventType) { ProjectTaskAiEvent::firstOrCreate( [ 'task_id' => $task->id, 'event_type' => $eventType, ], [ 'status' => ProjectTaskAiEvent::STATUS_PENDING, 'retry_count' => 0, ] ); } } public function end() { } }