diff --git a/backend/docs/REPLAY_E2E.md b/backend/docs/REPLAY_E2E.md index cd9920b4c..881c768b7 100644 --- a/backend/docs/REPLAY_E2E.md +++ b/backend/docs/REPLAY_E2E.md @@ -50,18 +50,22 @@ gateway's own run/event stores using the request's auth context, so the real ## How replay works `tests/replay_provider.py::ReplayChatModel` returns recorded assistant turns keyed -by a **normalized hash of the conversation** (human / ai / tool messages — role, -text, tool-call name+args; with ``, dates, UUIDs, tmp paths -stripped). A miss raises loudly rather than passing silently. +by a **normalized hash of the model caller + conversation**. The conversation is +human / ai / tool messages — role, text, tool-call name+args; with +``, dates, UUIDs, tmp paths stripped. The caller is the stable +source of the model call (`lead_agent`, `middleware:title`, `suggest_agent`, +`subagent:*`, etc.). A miss raises loudly rather than passing silently. **The system prompt is excluded from the match key.** The lead-agent system prompt is a living, frequently-edited implementation detail — its wording changes across PRs (e.g. #3195 added a "File Editing Workflow" section). Hashing it would make every fixture go stale and red-fail unrelated PRs the moment anyone edits the prompt. The conversation flow (user input → tool calls → results → answer) is the -stable contract that identifies a recorded turn. (This mirrors how open-design's -mock picker keys on the user prompt, not the system internals.) Combined with -pinning skills + extensions empty and disabling memory/summarization +stable contract that identifies a recorded turn. The caller still stays in the +key so two different model users with identical conversation text do not compete +for the same replay bucket. (This mirrors how open-design's mock picker keys on +the user prompt, not the system internals.) Combined with pinning skills + +extensions empty and disabling memory/summarization (`tests/_replay_fixture.py::build_config_yaml`), a fixture replays the same across machines, days, prompt edits, and CI. Replaying needs **no API key**. diff --git a/backend/scripts/build_fixture_from_jsonl.py b/backend/scripts/build_fixture_from_jsonl.py index 9bd7e1f93..6fcdba405 100644 --- a/backend/scripts/build_fixture_from_jsonl.py +++ b/backend/scripts/build_fixture_from_jsonl.py @@ -36,7 +36,8 @@ def main() -> int: for index, turn in enumerate(turns): data = turn["output"].get("data", {}) tool_calls = [tc.get("name") for tc in (data.get("tool_calls") or [])] - print(f" turn {index}: hash={turn['input_hash'][:12]} tool_calls={tool_calls} content={str(data.get('content'))[:50]!r}") + caller = turn.get("caller", "legacy") + print(f" turn {index}: caller={caller} hash={turn['input_hash'][:12]} tool_calls={tool_calls} content={str(data.get('content'))[:50]!r}") return 0 diff --git a/backend/scripts/record_gateway.py b/backend/scripts/record_gateway.py index ecab4b6cd..105c8bab7 100644 --- a/backend/scripts/record_gateway.py +++ b/backend/scripts/record_gateway.py @@ -28,27 +28,45 @@ sys.path.insert(0, str(_BACKEND / "tests")) def _install_capture(out_path: Path) -> None: from langchain_core.callbacks import BaseCallbackHandler from langchain_core.messages import messages_to_dict - from replay_provider import hash_messages + from replay_provider import caller_identity, hash_messages, hash_replay_input import deerflow.models.factory as factory_mod class Capture(BaseCallbackHandler): def __init__(self) -> None: - self.inputs: dict[str, list] = {} + self.inputs: dict[str, tuple[list, str]] = {} - def on_chat_model_start(self, serialized, messages, *, run_id=None, **kwargs): # noqa: ANN001 - self.inputs[str(run_id)] = messages[0] if messages else [] + def on_chat_model_start( # noqa: ANN001 + self, + serialized, + messages, + *, + run_id=None, + tags=None, + name=None, + **kwargs, + ): + self.inputs[str(run_id)] = ( + messages[0] if messages else [], + caller_identity(name=name, tags=tags), + ) def on_llm_end(self, response, *, run_id=None, **kwargs): # noqa: ANN001 - inp = self.inputs.pop(str(run_id), None) - if inp is None: + captured = self.inputs.pop(str(run_id), None) + if captured is None: return + inp, caller = captured for batch in response.generations: for gen in batch: message = getattr(gen, "message", None) if message is None: continue - record = {"input_hash": hash_messages(inp), "output": messages_to_dict([message])[0]} + record = { + "caller": caller, + "conversation_hash": hash_messages(inp), + "input_hash": hash_replay_input(inp, caller=caller), + "output": messages_to_dict([message])[0], + } with open(out_path, "a", encoding="utf-8") as handle: handle.write(json.dumps(record, ensure_ascii=False) + "\n") handle.flush() diff --git a/backend/tests/_replay_fixture.py b/backend/tests/_replay_fixture.py index 56f1a080a..053fc6ae4 100644 --- a/backend/tests/_replay_fixture.py +++ b/backend/tests/_replay_fixture.py @@ -32,7 +32,8 @@ REPLAY_MODEL_BLOCK = """\ - name: scenario-model display_name: Scenario Model use: replay_provider:ReplayChatModel - model: replay""" + model: replay + supports_thinking: true""" def real_model_block(model: str) -> str: diff --git a/backend/tests/fixtures/replay/write_read_file.ultra.json b/backend/tests/fixtures/replay/write_read_file.ultra.json index 95cce6ce8..b8cbe142c 100644 --- a/backend/tests/fixtures/replay/write_read_file.ultra.json +++ b/backend/tests/fixtures/replay/write_read_file.ultra.json @@ -12,7 +12,9 @@ }, "turns": [ { - "input_hash": "9c50eda6ab7e8593dabccbdeadc70a4a7bf778b2c0c3f275f1f96cf2c8ab58db", + "caller": "lead_agent", + "conversation_hash": "9c50eda6ab7e8593dabccbdeadc70a4a7bf778b2c0c3f275f1f96cf2c8ab58db", + "input_hash": "27aeb4c11bff2c3ebc182fe52a06556823c21928620a400c7f26be9733c31f3f", "output": { "type": "ai", "data": { @@ -56,7 +58,9 @@ } }, { - "input_hash": "3598aeb87e221ca8f554e4d61ce6d5e8801754606fa5c95a89c38bd6cb623045", + "caller": "middleware:title", + "conversation_hash": "3598aeb87e221ca8f554e4d61ce6d5e8801754606fa5c95a89c38bd6cb623045", + "input_hash": "75101f9faa453b1a35deff920b1e3c1a9f0b013a7627fbbaa03436752776b953", "output": { "type": "ai", "data": { @@ -89,7 +93,9 @@ } }, { - "input_hash": "6af134379b2a9efa01b4f63032f88211d5f38f459f8bed621eb6c65e8e05c1f9", + "caller": "lead_agent", + "conversation_hash": "6af134379b2a9efa01b4f63032f88211d5f38f459f8bed621eb6c65e8e05c1f9", + "input_hash": "f7468603a43d301fcc0167c2f7cd10e53137bfc584f1b3d776614b7a612ed7a6", "output": { "type": "ai", "data": { @@ -132,7 +138,9 @@ } }, { - "input_hash": "04751c4f7b0107b78b5c97d417063883fd586f5ebcbc4acf79be6cb3c0cdaec1", + "caller": "lead_agent", + "conversation_hash": "04751c4f7b0107b78b5c97d417063883fd586f5ebcbc4acf79be6cb3c0cdaec1", + "input_hash": "218645dabc6926a1dbdf45dd20fba8a41e1e690cef78d7752566db3acf5a36ce", "output": { "type": "ai", "data": { @@ -165,7 +173,9 @@ } }, { - "input_hash": "8b98ebdbb53e88f000556c4753adede8eaa076ff6fd7b8a1285bfd18aee8144d", + "caller": "suggest_agent", + "conversation_hash": "8b98ebdbb53e88f000556c4753adede8eaa076ff6fd7b8a1285bfd18aee8144d", + "input_hash": "dcd855d389d7179a1e4bc7074fa9ba7ce697570af8947225d6bacb538f14a0cb", "output": { "type": "ai", "data": { @@ -230,4 +240,4 @@ } } ] -} \ No newline at end of file +} diff --git a/backend/tests/replay_provider.py b/backend/tests/replay_provider.py index ab2ef3791..035889305 100644 --- a/backend/tests/replay_provider.py +++ b/backend/tests/replay_provider.py @@ -2,14 +2,19 @@ record/replay e2e (mirrors open-design's ``mocks/`` golden traces). A fixture is a JSON file capturing the *real* model calls of one scenario, -keyed by a normalized hash of the **input** each call received:: +keyed by a normalized hash of the **caller + input** each call received:: { "scenario": "write_read_file", "mode": "ultra", "model": "gpt-5.5", "turns": [ - {"input_hash": "", "input_preview": "...", "output": }, + { + "caller": "lead_agent", + "conversation_hash": "", + "input_hash": "", + "output": , + }, ... ] } @@ -21,8 +26,11 @@ A real run makes model calls from several callers — the lead agent's own turns and their count/order is not something we want a replay to depend on. Matching by a normalized hash of the *input messages* means each call gets back exactly the output that was recorded for that input, regardless of order or which middleware -issued it. That keeps the in-graph, deterministic title call part of the -recording; memory/summarization, by contrast, are disabled in the replay config +issued it. The caller name (``lead_agent``, ``middleware:title``, +``suggest_agent``, ``subagent:*``, ...) is included so two different model +callers with the same conversation text do not compete for the same replay +bucket. That keeps the in-graph, deterministic title call part of the recording; +memory/summarization, by contrast, are disabled in the replay config (``_replay_fixture.py``) because their background, debounced timing is not reproducible across runs. @@ -67,7 +75,7 @@ from collections import deque from collections.abc import Iterator from typing import Any -from langchain_core.callbacks import CallbackManagerForLLMRun +from langchain_core.callbacks import BaseCallbackHandler, CallbackManagerForLLMRun from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import AIMessage, AIMessageChunk, BaseMessage, messages_from_dict from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult @@ -75,6 +83,14 @@ from langchain_core.runnables import Runnable from pydantic import PrivateAttr _FIXTURE_ENV = "DEERFLOW_REPLAY_FIXTURE" +_DEFAULT_CALLER = "lead_agent" +_CALLER_TAG_PREFIXES = ("middleware:", "subagent:") +_CALLER_NAME_ALIASES = { + # TitleMiddleware uses this run_name and tags the call as middleware:title. + # Some execution paths do not preserve the tag down to the model callback, + # so keep the run_name and tag in the same replay namespace. + "title_agent": "middleware:title", +} # Process-wide record of replay misses. A miss raises inside the model, but the # gateway's LLMErrorHandlingMiddleware swallows it into a normal assistant error @@ -94,6 +110,30 @@ def reset_replay_misses() -> None: _replay_misses.clear() +def _normalize_caller(caller: str | None) -> str: + value = _normalize_text(str(caller or "").strip()) + if not value: + return _DEFAULT_CALLER + return _CALLER_NAME_ALIASES.get(value, value) + + +def _caller_from_tags(tags: list[str] | None) -> str | None: + for tag in tags or []: + if isinstance(tag, str) and (tag == _DEFAULT_CALLER or tag.startswith(_CALLER_TAG_PREFIXES)): + return tag + return None + + +def caller_identity(*, name: str | None = None, tags: list[str] | None = None) -> str: + """Stable model-caller identity shared by record and replay. + + Tags win because graph middleware and subagents already use them as the + explicit caller marker. ``run_name`` is exposed to callbacks as ``name`` and + covers route-level callers such as ``suggest_agent``. + """ + return _normalize_caller(_caller_from_tags(tags) or name) + + # Volatile substrings that differ between a recording run and a replay run but # carry no semantic weight for matching. Normalized to stable placeholders # before hashing so the same logical input hashes identically across processes. @@ -172,10 +212,30 @@ def _canonical_messages(messages: list[BaseMessage]) -> str: def hash_messages(messages: list[BaseMessage]) -> str: - """Stable hash of a model call's input. Shared by recorder and replayer.""" + """Legacy stable hash of only a model call's conversation input.""" return hashlib.sha256(_canonical_messages(messages).encode("utf-8")).hexdigest() +def hash_replay_input(messages: list[BaseMessage], *, caller: str | None) -> str: + """Stable replay key for a caller-specific model input.""" + return hash_input_key(hash_messages(messages), caller=caller) + + +def hash_input_key(conversation_hash: str, *, caller: str | None) -> str: + """Namespace a conversation hash by caller identity. + + Keeping this as ``hash(caller + legacy_conversation_hash)`` lets existing + fixtures migrate without a live-model re-record: their old ``input_hash`` is + exactly the conversation hash. + """ + payload = json.dumps( + {"caller": _normalize_caller(caller), "conversation_hash": conversation_hash}, + sort_keys=True, + ensure_ascii=False, + ) + return hashlib.sha256(payload.encode("utf-8")).hexdigest() + + def _load_fixture(fixture_path: str) -> dict[str, deque[AIMessage]]: with open(fixture_path, encoding="utf-8") as handle: payload = json.load(handle) @@ -199,24 +259,54 @@ class ReplayChatModel(BaseChatModel): _table: dict[str, deque] = PrivateAttr(default_factory=dict) _fixture_path: str = PrivateAttr(default="") + _run_callers: dict[str, str] = PrivateAttr(default_factory=dict) def __init__(self, **kwargs: Any) -> None: # Ignore provider noise the factory forwards from config (model, api_key, # base_url, ...). Fixture path comes from the ``fixture`` kwarg or env. fixture_path = kwargs.pop("fixture", None) or os.environ.get(_FIXTURE_ENV) - super().__init__() + callbacks = kwargs.pop("callbacks", None) + super().__init__(callbacks=callbacks) if not fixture_path: raise ValueError(f"ReplayChatModel needs a fixture path via the ``fixture`` kwarg or ${_FIXTURE_ENV}") self._fixture_path = fixture_path self._table = _load_fixture(fixture_path) + self.callbacks = [*(self.callbacks or []), _ReplayCallerCapture(self._run_callers)] @property def _llm_type(self) -> str: return "deerflow-replay" - def _match(self, messages: list[BaseMessage]) -> AIMessage: - key = hash_messages(messages) + def _caller_from_run_manager(self, run_manager: CallbackManagerForLLMRun | None) -> str: + if run_manager is None: + if len(self._run_callers) == 1: + # Some async LangGraph paths fire on_chat_model_start with the + # caller metadata but invoke the model implementation without a + # run_manager. When there is only one pending start event, it is + # the current call; use it so record/replay share the same + # caller key. + return self._run_callers.pop(next(iter(self._run_callers))) + return _DEFAULT_CALLER + run_id = str(getattr(run_manager, "run_id", "")) + caller = self._run_callers.pop(run_id, None) + if caller: + return caller + return caller_identity( + name=getattr(run_manager, "run_name", None) or getattr(run_manager, "name", None), + tags=getattr(run_manager, "tags", None), + ) + + def _match(self, messages: list[BaseMessage], run_manager: CallbackManagerForLLMRun | None = None) -> AIMessage: + caller = self._caller_from_run_manager(run_manager) + key = hash_replay_input(messages, caller=caller) bucket = self._table.get(key) + if not bucket: + # Backward compatibility for fixtures recorded before caller-aware + # keys. New recordings write caller-aware ``input_hash`` values. + legacy_key = hash_messages(messages) + bucket = self._table.get(legacy_key) + if bucket: + key = legacy_key if not bucket: _replay_misses.append(key) preview = _canonical_messages(messages) @@ -224,6 +314,7 @@ class ReplayChatModel(BaseChatModel): f"replay miss: no recorded output for input hash {key} in {self._fixture_path!r}. " "The replayed run diverged from the recording (graph changed, a non-deterministic tool result " "altered a downstream input, or a volatile field slipped past normalization). " + f"Caller: {caller!r}. " f"Known hashes: {sorted(self._table)}. " f"Normalized input (first 800 chars): {preview[:800]!r}" ) @@ -236,7 +327,7 @@ class ReplayChatModel(BaseChatModel): run_manager: CallbackManagerForLLMRun | None = None, **kwargs: Any, ) -> ChatResult: - return ChatResult(generations=[ChatGeneration(message=self._match(messages))]) + return ChatResult(generations=[ChatGeneration(message=self._match(messages, run_manager))]) def _stream( self, @@ -245,9 +336,16 @@ class ReplayChatModel(BaseChatModel): run_manager: CallbackManagerForLLMRun | None = None, **kwargs: Any, ) -> Iterator[ChatGenerationChunk]: - turn = self._match(messages) + turn = self._match(messages, run_manager) text = turn.content if isinstance(turn.content, str) else "" - chunk = ChatGenerationChunk(message=AIMessageChunk(content=turn.content, tool_calls=turn.tool_calls, additional_kwargs=turn.additional_kwargs, id=turn.id)) + chunk = ChatGenerationChunk( + message=AIMessageChunk( + content=turn.content, + tool_calls=turn.tool_calls, + additional_kwargs=turn.additional_kwargs, + id=turn.id, + ) + ) if run_manager is not None and text: run_manager.on_llm_new_token(text, chunk=chunk) yield chunk @@ -256,5 +354,31 @@ class ReplayChatModel(BaseChatModel): return self +class _ReplayCallerCapture(BaseCallbackHandler): + def __init__(self, run_callers: dict[str, str]) -> None: + self._run_callers = run_callers + + def on_chat_model_start( + self, + serialized: dict, + messages: list[list[BaseMessage]], + *, + run_id: Any = None, + tags: list[str] | None = None, + name: str | None = None, + **kwargs: Any, + ) -> None: + if run_id is not None: + self._run_callers[str(run_id)] = caller_identity(name=name, tags=tags) + + # Re-export so the recorder shares the exact hashing logic. -__all__ = ["ReplayChatModel", "hash_messages", "replay_misses", "reset_replay_misses"] +__all__ = [ + "ReplayChatModel", + "caller_identity", + "hash_input_key", + "hash_messages", + "hash_replay_input", + "replay_misses", + "reset_replay_misses", +] diff --git a/backend/tests/test_replay_provider.py b/backend/tests/test_replay_provider.py new file mode 100644 index 000000000..e87f93cfb --- /dev/null +++ b/backend/tests/test_replay_provider.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from langchain_core.messages import AIMessage, HumanMessage, messages_to_dict +from replay_provider import ReplayChatModel, caller_identity, hash_messages, hash_replay_input + + +def _write_fixture(path: Path, turns: list[dict]) -> None: + path.write_text( + json.dumps( + { + "scenario": "unit", + "mode": "unit", + "model": "replay", + "prompt": "unit", + "context": {}, + "turns": turns, + } + ), + encoding="utf-8", + ) + + +def test_replay_key_includes_caller_identity(tmp_path: Path): + messages = [HumanMessage(content="same conversation")] + lead_output = AIMessage(content="lead") + suggest_output = AIMessage(content="suggest") + fixture_path = tmp_path / "fixture.json" + + _write_fixture( + fixture_path, + [ + { + "caller": "lead_agent", + "conversation_hash": hash_messages(messages), + "input_hash": hash_replay_input(messages, caller="lead_agent"), + "output": messages_to_dict([lead_output])[0], + }, + { + "caller": "suggest_agent", + "conversation_hash": hash_messages(messages), + "input_hash": hash_replay_input(messages, caller="suggest_agent"), + "output": messages_to_dict([suggest_output])[0], + }, + ], + ) + + model = ReplayChatModel(fixture=str(fixture_path)) + + assert model.invoke(messages, config={"run_name": "suggest_agent"}).content == "suggest" + assert model.invoke(messages, config={"run_name": "lead_agent"}).content == "lead" + + +def test_replay_supports_legacy_conversation_only_fixture(tmp_path: Path): + messages = [HumanMessage(content="legacy conversation")] + fixture_path = tmp_path / "legacy.json" + + _write_fixture( + fixture_path, + [ + { + "input_hash": hash_messages(messages), + "output": messages_to_dict([AIMessage(content="legacy")])[0], + } + ], + ) + + model = ReplayChatModel(fixture=str(fixture_path)) + + assert model.invoke(messages, config={"run_name": "suggest_agent"}).content == "legacy" + + +def test_title_run_name_uses_middleware_caller_namespace(tmp_path: Path): + messages = [HumanMessage(content="title prompt")] + fixture_path = tmp_path / "fixture.json" + + _write_fixture( + fixture_path, + [ + { + "caller": "middleware:title", + "conversation_hash": hash_messages(messages), + "input_hash": hash_replay_input(messages, caller="middleware:title"), + "output": messages_to_dict([AIMessage(content="generated title")])[0], + } + ], + ) + + model = ReplayChatModel(fixture=str(fixture_path)) + + assert caller_identity(name="title_agent") == "middleware:title" + assert model.invoke(messages, config={"run_name": "title_agent"}).content == "generated title" + + +def test_replay_uses_single_pending_capture_when_run_manager_is_missing(tmp_path: Path): + messages = [HumanMessage(content="title prompt")] + fixture_path = tmp_path / "fixture.json" + + _write_fixture( + fixture_path, + [ + { + "caller": "middleware:title", + "conversation_hash": hash_messages(messages), + "input_hash": hash_replay_input(messages, caller="middleware:title"), + "output": messages_to_dict([AIMessage(content="generated title")])[0], + } + ], + ) + + model = ReplayChatModel(fixture=str(fixture_path)) + model._run_callers["captured-run"] = caller_identity(name="title_agent", tags=["middleware:title"]) + + assert model._match(messages, run_manager=None).content == "generated title"