fix: load paginated run history messages (#3305)

This commit is contained in:
Eilen Shin 2026-06-01 15:50:39 +08:00 committed by GitHub
parent 031d6fbcbe
commit 019bd16a06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 267 additions and 14 deletions

View File

@ -0,0 +1,15 @@
"""Shared pagination helpers for gateway routers."""
from __future__ import annotations
def trim_run_message_page(rows: list[dict], *, limit: int, after_seq: int | None) -> tuple[list[dict], bool]:
"""Trim a ``limit + 1`` run-message page while preserving page boundaries."""
has_more = len(rows) > limit
if not has_more:
return rows, False
if after_seq is not None:
return rows[:limit], True
return rows[-limit:], True

View File

@ -15,6 +15,7 @@ from fastapi.responses import StreamingResponse
from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.pagination import trim_run_message_page
from app.gateway.routers.thread_runs import RunCreateRequest
from app.gateway.services import sse_consumer, start_run, wait_for_run_completion
from deerflow.runtime import serialize_channel_values
@ -129,8 +130,7 @@ async def run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}

View File

@ -21,6 +21,7 @@ from pydantic import BaseModel, Field
from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store, get_run_manager, get_run_store, get_stream_bridge
from app.gateway.pagination import trim_run_message_page
from app.gateway.services import sse_consumer, start_run, wait_for_run_completion
from deerflow.runtime import RunRecord, RunStatus, serialize_channel_values
@ -402,8 +403,7 @@ async def list_run_messages(
before_seq=before_seq,
after_seq=after_seq,
)
has_more = len(rows) > limit
data = rows[:limit] if has_more else rows
data, has_more = trim_run_message_page(rows, limit=limit, after_seq=after_seq)
return {"data": data, "has_more": has_more}

View File

@ -0,0 +1,16 @@
from fastapi.testclient import TestClient
def assert_run_message_page(
client: TestClient,
url: str,
*,
expected_seq: list[int],
has_more: bool = True,
) -> None:
response = client.get(url)
assert response.status_code == 200
body = response.json()
assert body["has_more"] is has_more
assert [m["seq"] for m in body["data"]] == expected_seq

View File

@ -5,6 +5,7 @@ from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
from _router_auth_helpers import make_authed_test_app
from _run_message_pagination_helpers import assert_run_message_page
from fastapi.testclient import TestClient
from app.gateway.routers import runs
@ -97,6 +98,51 @@ def test_run_messages_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))
def test_run_messages_default_page_keeps_newest_messages_when_extra_row_returned():
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(client, "/api/runs/run-2/messages", expected_seq=list(range(17, 67)))
def test_run_messages_before_seq_page_keeps_newest_side_when_extra_row_returned():
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/runs/run-2/messages?before_seq=18&limit=16",
expected_seq=list(range(2, 18)),
)
def test_run_messages_after_seq_page_keeps_oldest_side_when_extra_row_returned():
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
run_record = {"run_id": "run-2", "thread_id": "thread-2"}
app = _make_app(
run_store=_make_run_store(run_record),
event_store=_make_event_store(rows),
)
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/runs/run-2/messages?after_seq=10",
expected_seq=list(range(11, 61)),
)
def test_run_messages_passes_after_seq_to_event_store():

View File

@ -6,6 +6,7 @@ import asyncio
from unittest.mock import AsyncMock, MagicMock
from _router_auth_helpers import make_authed_test_app
from _run_message_pagination_helpers import assert_run_message_page
from fastapi.testclient import TestClient
from app.gateway.routers import thread_runs
@ -88,6 +89,43 @@ def test_has_more_true_when_extra_row_returned():
body = response.json()
assert body["has_more"] is True
assert len(body["data"]) == 50 # trimmed to limit
assert [m["seq"] for m in body["data"]] == list(range(2, 52))
def test_default_page_keeps_newest_messages_when_extra_row_returned():
"""Default latest-page trimming drops the older sentinel row, not the newest message."""
rows = [_make_message(i) for i in range(16, 67)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages",
expected_seq=list(range(17, 67)),
)
def test_before_seq_page_keeps_newest_side_when_extra_row_returned():
"""Backward pagination trims the older sentinel so adjacent pages do not miss the boundary message."""
rows = [_make_message(i) for i in range(1, 18)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages?before_seq=18&limit=16",
expected_seq=list(range(2, 18)),
)
def test_after_seq_page_keeps_oldest_side_when_extra_row_returned():
"""Forward pagination still trims the newer sentinel row."""
rows = [_make_message(i) for i in range(11, 62)]
app = _make_app(event_store=_make_event_store(rows))
with TestClient(app) as client:
assert_run_message_page(
client,
"/api/threads/thread-2/runs/run-2/messages?after_seq=10",
expected_seq=list(range(11, 61)),
)
def test_after_seq_forwarded_to_event_store():

View File

@ -119,6 +119,55 @@ function findLatestUnloadedRunIndex(
return -1;
}
type RunMessagesPageResponse = {
data: RunMessage[];
has_more?: boolean;
hasMore?: boolean;
};
export function runMessagesPageHasMore(result: RunMessagesPageResponse) {
return result.has_more ?? result.hasMore ?? false;
}
export function getOldestRunMessageSeq(messages: RunMessage[]) {
let oldestSeq: number | null = null;
for (const message of messages) {
if (typeof message.seq !== "number") {
continue;
}
oldestSeq =
oldestSeq === null ? message.seq : Math.min(oldestSeq, message.seq);
}
return oldestSeq;
}
export function getNextRunMessagesBeforeSeq(
result: RunMessagesPageResponse,
): number | null | undefined {
if (!runMessagesPageHasMore(result)) {
return null;
}
return getOldestRunMessageSeq(result.data) ?? undefined;
}
export function buildRunMessagesUrl(
baseUrl: string,
threadId: string,
runId: string,
beforeSeq?: number,
) {
const normalizedBaseUrl = baseUrl.replace(/\/$/, "");
const path = `/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/messages`;
const url = new URL(
`${normalizedBaseUrl}${path}`,
typeof window !== "undefined" ? window.location.origin : "http://localhost",
);
if (beforeSeq !== undefined) {
url.searchParams.set("before_seq", String(beforeSeq));
}
return normalizedBaseUrl ? url.toString() : `${url.pathname}${url.search}`;
}
export function mergeMessages(
historyMessages: Message[],
threadMessages: Message[],
@ -801,6 +850,7 @@ export function useThreadHistory(threadId: string) {
const pendingLoadRef = useRef(false);
const loadingRunIdRef = useRef<string | null>(null);
const loadedRunIdsRef = useRef<Set<string>>(new Set());
const runBeforeSeqRef = useRef<Map<string, number>>(new Map());
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<Message[]>([]);
@ -841,16 +891,20 @@ export function useThreadHistory(threadId: string) {
const requestThreadId = threadIdRef.current;
loadingRunIdRef.current = run.run_id;
const result: { data: RunMessage[]; hasMore: boolean } = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(requestThreadId)}/runs/${encodeURIComponent(run.run_id)}/messages`,
{
method: "GET",
headers: {
"Content-Type": "application/json",
},
credentials: "include",
const beforeSeq = runBeforeSeqRef.current.get(run.run_id);
const url = buildRunMessagesUrl(
getBackendBaseURL(),
requestThreadId,
run.run_id,
beforeSeq,
);
const result: RunMessagesPageResponse = await fetch(url, {
method: "GET",
headers: {
"Content-Type": "application/json",
},
).then((res) => {
credentials: "include",
}).then((res) => {
return res.json();
});
const _messages = result.data
@ -862,7 +916,18 @@ export function useThreadHistory(threadId: string) {
setMessages((prev) =>
dedupeMessagesByIdentity([..._messages, ...prev]),
);
loadedRunIdsRef.current.add(run.run_id);
const nextBeforeSeq = getNextRunMessagesBeforeSeq(result);
if (typeof nextBeforeSeq === "number") {
runBeforeSeqRef.current.set(run.run_id, nextBeforeSeq);
pendingLoadRef.current = true;
} else if (nextBeforeSeq === undefined) {
console.warn(
`Run ${run.run_id} returned has_more without message seq values; leaving it pending for retry.`,
);
} else {
runBeforeSeqRef.current.delete(run.run_id);
loadedRunIdsRef.current.add(run.run_id);
}
indexRef.current = findLatestUnloadedRunIndex(
runsRef.current,
loadedRunIdsRef.current,
@ -886,6 +951,7 @@ export function useThreadHistory(threadId: string) {
pendingLoadRef.current = false;
loadingRunIdRef.current = null;
loadedRunIdsRef.current = new Set();
runBeforeSeqRef.current = new Map();
loadingRef.current = false;
setLoading(false);
setMessages([]);

View File

@ -25,6 +25,7 @@ export interface AgentThread extends Thread<AgentThreadState> {
export interface RunMessage {
run_id: string;
seq?: number;
content: Message;
metadata: {
caller: string;

View File

@ -2,10 +2,25 @@ import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";
import {
buildRunMessagesUrl,
getNextRunMessagesBeforeSeq,
getOldestRunMessageSeq,
getSummarizationMiddlewareMessages,
getVisibleOptimisticMessages,
mergeMessages,
runMessagesPageHasMore,
} from "@/core/threads/hooks";
import type { RunMessage } from "@/core/threads/types";
function runMessage(seq?: number): RunMessage {
return {
run_id: "run-1",
...(seq === undefined ? {} : { seq }),
content: {} as Message,
metadata: { caller: "" },
created_at: "2026-05-22T00:00:00Z",
};
}
test("mergeMessages removes duplicate messages already present in history", () => {
const human = {
@ -254,3 +269,59 @@ test("getVisibleOptimisticMessages hides optimistic user input after later serve
optimisticHuman,
]);
});
test("runMessagesPageHasMore reads backend snake_case pagination field", () => {
expect(runMessagesPageHasMore({ data: [], has_more: true })).toBe(true);
expect(runMessagesPageHasMore({ data: [], has_more: false })).toBe(false);
});
test("runMessagesPageHasMore keeps compatibility with camelCase pagination field", () => {
expect(runMessagesPageHasMore({ data: [], hasMore: true })).toBe(true);
});
test("getOldestRunMessageSeq returns the cursor for the next older run page", () => {
expect(
getOldestRunMessageSeq([runMessage(8), runMessage(9), runMessage(10)]),
).toBe(8);
});
test("getOldestRunMessageSeq ignores rows without seq", () => {
expect(getOldestRunMessageSeq([runMessage()])).toBeNull();
});
test("getNextRunMessagesBeforeSeq keeps runs pending when has_more lacks seq", () => {
expect(
getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: true }),
).toBeUndefined();
});
test("getNextRunMessagesBeforeSeq marks runs loaded when no more pages exist", () => {
expect(
getNextRunMessagesBeforeSeq({ data: [runMessage()], has_more: false }),
).toBeNull();
});
test("buildRunMessagesUrl encodes path segments and optional before_seq", () => {
expect(
buildRunMessagesUrl(
"https://api.example.test/",
"thread/with space",
"run?one",
18,
),
).toBe(
"https://api.example.test/api/threads/thread%2Fwith%20space/runs/run%3Fone/messages?before_seq=18",
);
});
test("buildRunMessagesUrl omits before_seq when loading the latest page", () => {
expect(
buildRunMessagesUrl("https://api.example.test", "thread-1", "run-1"),
).toBe("https://api.example.test/api/threads/thread-1/runs/run-1/messages");
});
test("buildRunMessagesUrl returns a relative URL when using the nginx proxy", () => {
expect(buildRunMessagesUrl("", "thread-1", "run-1", 42)).toBe(
"/api/threads/thread-1/runs/run-1/messages?before_seq=42",
);
});