import type { AIMessage } from "@langchain/langgraph-sdk"; import type { ThreadsClient } from "@langchain/langgraph-sdk/client"; import { useStream } from "@langchain/langgraph-sdk/react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useState } from "react"; import { toast } from "sonner"; import type { PromptInputMessage } from "@/components/ai-elements/prompt-input"; import { getAPIClient } from "../api"; import type { LocalSettings } from "../settings"; import { useUpdateSubtask } from "../tasks/context"; import { uploadFiles } from "../uploads"; import type { AgentThread, AgentThreadState } from "./types"; export type ToolEndEvent = { name: string; data: unknown; }; export type ThreadStreamOptions = { threadId?: string | null | undefined; context: LocalSettings["context"]; isMock?: boolean; onStart?: (threadId: string) => void; onFinish?: (state: AgentThreadState) => void; onToolEnd?: (event: ToolEndEvent) => void; }; export function useThreadStream({ threadId, context, isMock, onStart, onFinish, onToolEnd, }: ThreadStreamOptions) { const [_threadId, setThreadId] = useState(threadId ?? null); useEffect(() => { if (_threadId && _threadId !== threadId) { setThreadId(threadId ?? null); } }, [threadId, _threadId]); const queryClient = useQueryClient(); const updateSubtask = useUpdateSubtask(); const thread = useStream({ client: getAPIClient(isMock), assistantId: "lead_agent", threadId: _threadId, reconnectOnMount: true, fetchStateHistory: { limit: 1 }, onCreated(meta) { setThreadId(meta.thread_id); onStart?.(meta.thread_id); }, onLangChainEvent(event) { if (event.event === "on_tool_end") { onToolEnd?.({ name: event.name, data: event.data, }); } }, onCustomEvent(event: unknown) { if ( typeof event === "object" && event !== null && "type" in event && event.type === "task_running" ) { const e = event as { type: "task_running"; task_id: string; message: AIMessage; }; updateSubtask({ id: e.task_id, latestMessage: e.message }); } }, onFinish(state) { onFinish?.(state.values); void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); }, }); const sendMessage = useCallback( async ( threadId: string, message: PromptInputMessage, extraContext?: Record, ) => { const text = message.text.trim(); // Upload files first if any if (message.files && message.files.length > 0) { try { // Convert FileUIPart to File objects by fetching blob URLs const filePromises = message.files.map(async (fileUIPart) => { if (fileUIPart.url && fileUIPart.filename) { try { // Fetch the blob URL to get the file data const response = await fetch(fileUIPart.url); const blob = await response.blob(); // Create a File object from the blob return new File([blob], fileUIPart.filename, { type: fileUIPart.mediaType || blob.type, }); } catch (error) { console.error( `Failed to fetch file ${fileUIPart.filename}:`, error, ); return null; } } return null; }); const conversionResults = await Promise.all(filePromises); const files = conversionResults.filter( (file): file is File => file !== null, ); const failedConversions = conversionResults.length - files.length; if (failedConversions > 0) { throw new Error( `Failed to prepare ${failedConversions} attachment(s) for upload. Please retry.`, ); } if (!threadId) { throw new Error("Thread is not ready for file upload."); } if (files.length > 0) { await uploadFiles(threadId, files); } } catch (error) { console.error("Failed to upload files:", error); const errorMessage = error instanceof Error ? error.message : "Failed to upload files."; toast.error(errorMessage); throw error; } } await thread.submit( { messages: [ { type: "human", content: [ { type: "text", text, }, ], }, ], }, { threadId: threadId, streamSubgraphs: true, streamResumable: true, streamMode: ["values", "messages-tuple", "custom"], config: { recursion_limit: 1000, }, context: { ...extraContext, ...context, thinking_enabled: context.mode !== "flash", is_plan_mode: context.mode === "pro" || context.mode === "ultra", subagent_enabled: context.mode === "ultra", thread_id: threadId, }, }, ); void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); // afterSubmit?.(); }, [thread, context, queryClient], ); return [thread, sendMessage] as const; } export function useThreads( params: Parameters[0] = { limit: 50, sortBy: "updated_at", sortOrder: "desc", select: ["thread_id", "updated_at", "values"], }, ) { const apiClient = getAPIClient(); return useQuery({ queryKey: ["threads", "search", params], queryFn: async () => { const response = await apiClient.threads.search(params); return response as AgentThread[]; }, refetchOnWindowFocus: false, }); } export function useDeleteThread() { const queryClient = useQueryClient(); const apiClient = getAPIClient(); return useMutation({ mutationFn: async ({ threadId }: { threadId: string }) => { await apiClient.threads.delete(threadId); }, onSuccess(_, { threadId }) { queryClient.setQueriesData( { queryKey: ["threads", "search"], exact: false, }, (oldData: Array) => { return oldData.filter((t) => t.thread_id !== threadId); }, ); }, }); } export function useRenameThread() { const queryClient = useQueryClient(); const apiClient = getAPIClient(); return useMutation({ mutationFn: async ({ threadId, title, }: { threadId: string; title: string; }) => { await apiClient.threads.updateState(threadId, { values: { title }, }); }, onSuccess(_, { threadId, title }) { queryClient.setQueriesData( { queryKey: ["threads", "search"], exact: false, }, (oldData: Array) => { return oldData.map((t) => { if (t.thread_id === threadId) { return { ...t, values: { ...t.values, title, }, }; } return t; }); }, ); }, }); }