From 88e36d968682846d1c04fa38f59dac3d0734cf6e Mon Sep 17 00:00:00 2001 From: Huixin615 Date: Sun, 7 Jun 2026 17:47:11 +0800 Subject: [PATCH] fix(#3189): prevent write_file streaming timeout on long reports (#3195) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(#3189): prevent write_file streaming timeout on long reports Adds a layered defense against StreamChunkTimeoutError caused by oversized single-shot write_file tool calls: - factory: default stream_chunk_timeout to 240s for OpenAI-compatible clients (overridable via ModelConfig.stream_chunk_timeout in config.yaml) - sandbox/tools: server-side 80 KB length guard on non-append write_file calls (configurable via DEERFLOW_WRITE_FILE_MAX_BYTES env var, 0 disables); rejects oversized payloads with a structured error pointing the model at str_replace or append=True - middleware: classify StreamChunkTimeoutError as transient but cap retries at 1 via per-exception _RETRY_BUDGET_OVERRIDES (same-payload retry on a chunk-gap timeout buffers the same way upstream; full 3-attempt loop would stack 6-12 min of dead air) - middleware: surface an actionable user-facing message for stream-drop exceptions instead of leaking the raw langchain stack - prompts: add a routing-style File Editing Workflow hint to both lead_agent and general_purpose subagent prompts, pointing the model at str_replace for incremental edits (mirrors Claude Code's Edit / Codex's apply_patch) - tests: behavioural coverage for size guard, retry budget override, stream-drop user message, factory default injection Refs #3189 * fix(#3189): drop stream_chunk_timeout for non-OpenAI providers Address CR feedback on PR #3195: - factory: pop `stream_chunk_timeout` from kwargs for any model_use_path other than `langchain_openai:ChatOpenAI` instead of returning early. `ModelConfig.stream_chunk_timeout` is part of the shared schema, so a user-supplied value on a non-OpenAI provider would otherwise be forwarded to its constructor and raise `TypeError: unexpected keyword argument`. - factory: rewrite docstring to describe the actual `exclude_none=True` behaviour (explicit null is excluded and falls back to the default) instead of the misleading "None falling out via exclude_none=True keeps its value". - tests: add regression coverage asserting the kwarg is stripped before reaching a non-OpenAI provider's constructor. Refs: bytedance#3189 * fix(#3189): restrict stream-drop user copy to StreamChunkTimeoutError only Per CR on #3195: narrow _STREAM_DROP_EXCEPTIONS to StreamChunkTimeoutError. Generic httpx RemoteProtocolError / ReadError fall back to the standard 'temporarily unavailable' copy, since they routinely fire on transient network blips where the 'split the output' guidance is misleading. Retry/backoff classification is unchanged — both remain transient/retriable. Tests updated to reflect new copy, plus a symmetric regression test for ReadError. --------- Co-authored-by: Willem Jiang --- .../deerflow/agents/lead_agent/prompt.py | 8 + .../llm_error_handling_middleware.py | 68 +++++- .../harness/deerflow/config/model_config.py | 10 + .../harness/deerflow/models/factory.py | 33 +++ .../harness/deerflow/sandbox/tools.py | 65 +++++- .../subagents/builtins/general_purpose.py | 11 + backend/tests/test_lead_agent_prompt.py | 41 ++++ .../test_llm_error_handling_middleware.py | 216 ++++++++++++++++++ backend/tests/test_model_factory.py | 113 +++++++++ .../tests/test_write_file_tool_size_guard.py | 116 ++++++++++ 10 files changed, 677 insertions(+), 4 deletions(-) create mode 100644 backend/tests/test_write_file_tool_size_guard.py diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index ef7917104..2e4d32cec 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -542,6 +542,14 @@ combined with a FastAPI gateway for REST API access [citation:FastAPI](https://f {subagent_reminder}- Skill First: Always load the relevant skill before starting **complex** tasks. - Progressive Loading: Load resources incrementally as referenced in skills - Output Files: Final deliverables must be in `/mnt/user-data/outputs` +- File Editing Workflow: When revising an existing file, prefer + `str_replace` over `write_file` — it sends only the diff and avoids + re-emitting the whole file (mirrors Claude Code's Edit and Codex's + apply_patch). When writing long new content from scratch, split it + into sections: the first `write_file` call creates the file, then use + `write_file` with append=True to extend it section by section. This + keeps each tool call small and avoids mid-stream chunk-gap timeouts + on oversized single-shot writes. (See issue #3189.) - Clarity: Be direct and helpful, avoid unnecessary meta-commentary - Including Images and Mermaid: Images and Mermaid diagrams are always welcomed in the Markdown format, and you're encouraged to use `![Image Description](image_path)\n\n` or "```mermaid" to display images in response or Markdown files - Multi-task: Better utilize parallel tool calling to call multiple tools at one time for better performance diff --git a/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py index a489d90d0..4ec3b7fe6 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py @@ -62,6 +62,41 @@ _AUTH_PATTERNS = ( "未授权", ) +# Per-exception retry budget overrides. +# +# Some transient errors are retriable in principle but expensive to retry at +# the default budget. StreamChunkTimeoutError in particular fires after the +# upstream provider has already stalled for `stream_chunk_timeout` seconds +# (typically 120-240s); a full 3-attempt loop can therefore stack 6-12 minutes +# of dead air before surfacing the failure to the user. We keep exactly one +# retry (cheap reconnect that catches genuine transient TCP blips) and then +# fail fast — the same buffered payload is overwhelmingly likely to fail +# again at the upstream provider for the same reason. +# +# Keys are exception class *names* (not classes) so we don't introduce +# import-time coupling on optional dependencies like langchain-openai. The +# value is the absolute max attempt count, NOT additional retries — so a +# value of 2 means "1 first attempt + 1 retry" (the CR-requested +# "keep one retry" behavior). +_RETRY_BUDGET_OVERRIDES: dict[str, int] = { + "StreamChunkTimeoutError": 2, +} + +# Exception class names that indicate the upstream stream-chunk watchdog +# fired because the model stalled mid-flight. These deserve a more specific +# user-facing message than the generic "temporarily unavailable" copy, +# because the typical root cause is a long tool-call serialization stalling +# the upstream stream — and the most actionable advice we can give the user +# is "ask for a shorter / split output" rather than "wait and retry". +# Generic connection drops (httpx RemoteProtocolError / ReadError) are +# intentionally excluded: they routinely fire on transient network blips +# with normal payloads, where the "split the work" guidance is misleading. +_STREAM_DROP_EXCEPTIONS: frozenset[str] = frozenset( + { + "StreamChunkTimeoutError", + } +) + class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): """Retry transient LLM errors and surface graceful assistant messages.""" @@ -83,6 +118,18 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): self._circuit_state = "closed" self._circuit_probe_in_flight = False + def _max_attempts_for(self, exc: BaseException) -> int: + """Return the effective max attempt count for this exception. + + Falls back to `self.retry_max_attempts` unless the exception class name + appears in the per-exception override table. + """ + override = _RETRY_BUDGET_OVERRIDES.get(type(exc).__name__) + if override is None: + return self.retry_max_attempts + + return min(override, self.retry_max_attempts) + def _check_circuit(self) -> bool: """Returns True if circuit is OPEN (fast fail), False otherwise.""" with self._circuit_lock: @@ -153,6 +200,7 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): "InternalServerError", "ReadError", # httpx.ReadError: connection dropped mid-stream "RemoteProtocolError", # httpx: server closed connection unexpectedly + "StreamChunkTimeoutError", # langchain-openai: chunk gap exceeded stream_chunk_timeout }: return True, "transient" if status_code in _RETRIABLE_STATUS_CODES: @@ -202,6 +250,20 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): if reason == "auth": return "The configured LLM provider rejected the request because authentication or access is invalid. Please check the provider credentials and try again." if reason in {"busy", "transient"}: + # Stream-drop failures (chunk-gap timeout, peer-closed connection, + # raw read error) almost always point at a single oversized + # tool-call payload — the model spent so long serializing JSON + # arguments that the upstream provider buffered and the stream + # gap exceeded `stream_chunk_timeout`. Surfacing this distinct + # cause lets the user split or shorten their next request + # instead of helplessly retrying the same prompt. + if type(exc).__name__ in _STREAM_DROP_EXCEPTIONS: + return ( + "The model's streaming response was interrupted before it could " + "finish. This usually happens when a single response or tool call " + "is very large — please ask the assistant to split the work into " + "smaller steps, or shorten the requested output, and try again." + ) return "The configured LLM provider is temporarily unavailable after multiple retries. Please wait a moment and continue the conversation." return f"LLM request failed: {detail}" @@ -259,7 +321,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): raise except Exception as exc: retriable, reason = self._classify_error(exc) - if retriable and attempt < self.retry_max_attempts: + max_attempts = self._max_attempts_for(exc) + if retriable and attempt < max_attempts: wait_ms = self._build_retry_delay_ms(attempt, exc) logger.warning( "Transient LLM error on attempt %d/%d; retrying in %dms: %s", @@ -310,7 +373,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]): raise except Exception as exc: retriable, reason = self._classify_error(exc) - if retriable and attempt < self.retry_max_attempts: + max_attempts = self._max_attempts_for(exc) + if retriable and attempt < max_attempts: wait_ms = self._build_retry_delay_ms(attempt, exc) logger.warning( "Transient LLM error on attempt %d/%d; retrying in %dms: %s", diff --git a/backend/packages/harness/deerflow/config/model_config.py b/backend/packages/harness/deerflow/config/model_config.py index e9a3e1c16..b747eec99 100644 --- a/backend/packages/harness/deerflow/config/model_config.py +++ b/backend/packages/harness/deerflow/config/model_config.py @@ -32,6 +32,16 @@ class ModelConfig(BaseModel): description="Extra settings to be passed to the model when thinking is disabled", ) supports_vision: bool = Field(default_factory=lambda: False, description="Whether the model supports vision/image inputs") + stream_chunk_timeout: float | None = Field( + default=None, + description=( + "Maximum seconds to wait between successive streaming chunks before " + "langchain-openai raises StreamChunkTimeoutError. None means use the " + "factory default (240s for OpenAI-compatible clients). Tune higher for " + "reasoning models with long thinking pauses; lower for latency-sensitive " + "interactive endpoints. Has no effect on non-OpenAI-compatible providers." + ), + ) thinking: dict | None = Field( default_factory=lambda: None, description=( diff --git a/backend/packages/harness/deerflow/models/factory.py b/backend/packages/harness/deerflow/models/factory.py index c6a3573f8..57eb15001 100644 --- a/backend/packages/harness/deerflow/models/factory.py +++ b/backend/packages/harness/deerflow/models/factory.py @@ -47,6 +47,38 @@ def _enable_stream_usage_by_default(model_use_path: str, model_settings_from_con model_settings_from_config["stream_usage"] = True +# Default chunk-gap budget for OpenAI-compatible streaming responses. +# +# langchain-openai raises ``StreamChunkTimeoutError`` after this many seconds +# without receiving a chunk. Its own default is 60s, which is too aggressive for +# reasoning models (DeepSeek-R1, Doubao-thinking, GPT-5) whose first chunk can +# legitimately take 90~150s. We default to 240s so the streaming layer rarely +# trips on long thinking pauses; the LLMErrorHandlingMiddleware still retries +# (budget=2) if a real stall happens. Users can override per-model in config.yaml. +_DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS: float = 240.0 + + +def _apply_stream_chunk_timeout_default(model_use_path: str, model_settings_from_config: dict) -> None: + """Inject a generous ``stream_chunk_timeout`` for OpenAI-compatible clients. + + The ``stream_chunk_timeout`` kwarg is specific to ``langchain_openai:ChatOpenAI`` + and is rejected by other providers' constructors as an unexpected keyword + argument. Behaviour: + + * OpenAI-compatible path: an explicit value in ``config.yaml`` is preserved. + An explicit ``null`` is dropped upstream by ``model_dump(exclude_none=True)`` + and therefore treated as "unset", so the default is injected. + * Non-OpenAI path: drop the key so it is never forwarded to an incompatible + constructor (which would raise ``TypeError: unexpected keyword argument``). + """ + if model_use_path != "langchain_openai:ChatOpenAI": + model_settings_from_config.pop("stream_chunk_timeout", None) + return + if "stream_chunk_timeout" in model_settings_from_config: + return + model_settings_from_config["stream_chunk_timeout"] = _DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS + + def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel: """Create a chat model instance from the config. @@ -128,6 +160,7 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, * model_settings_from_config.pop("reasoning_effort", None) _enable_stream_usage_by_default(model_config.use, model_settings_from_config) + _apply_stream_chunk_timeout_default(model_config.use, model_settings_from_config) # For Codex Responses API models: map thinking mode to reasoning_effort from deerflow.models.openai_codex_provider import CodexChatModel diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py index 6edc88882..4c04e3ac7 100644 --- a/backend/packages/harness/deerflow/sandbox/tools.py +++ b/backend/packages/harness/deerflow/sandbox/tools.py @@ -1,4 +1,5 @@ import asyncio +import os import posixpath import re import shlex @@ -43,6 +44,16 @@ _MAX_GLOB_MAX_RESULTS = 1000 _DEFAULT_GREP_MAX_RESULTS = 100 _MAX_GREP_MAX_RESULTS = 500 _DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000 + +# Maximum bytes accepted in a single non-append write_file call (issue #3189). +# Oversized single-shot writes correlate with LLM streaming chunk-gap timeouts +# because the tool-call JSON payload (which the model must emit as one +# continuous stream) grows past the safe window. 80 KB ≈ 20K tokens, a +# comfortable headroom under the factory-default 240s stream_chunk_timeout. +# Deployments can override via env var DEERFLOW_WRITE_FILE_MAX_BYTES; set to +# 0 (or negative) to disable the guard entirely. +_WRITE_FILE_CONTENT_MAX_BYTES = 80 * 1024 +_WRITE_FILE_MAX_BYTES_ENV = "DEERFLOW_WRITE_FILE_MAX_BYTES" _LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"} _LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"} _LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"} @@ -1671,6 +1682,23 @@ async def _read_file_tool_async( read_file_tool.coroutine = _read_file_tool_async +def _effective_write_file_max_bytes() -> int: + """Return the active size cap for non-append write_file calls. + + Reads ``DEERFLOW_WRITE_FILE_MAX_BYTES`` at call time (not import time) + so tests and runtime tweaks take effect without restart. Falls back to + the default on missing/malformed values. A non-positive value disables + the guard. + """ + raw = os.environ.get(_WRITE_FILE_MAX_BYTES_ENV) + if raw is None: + return _WRITE_FILE_CONTENT_MAX_BYTES + try: + return int(raw) + except ValueError: + return _WRITE_FILE_CONTENT_MAX_BYTES + + @tool("write_file", parse_docstring=True) def write_file_tool( runtime: Runtime, @@ -1679,14 +1707,47 @@ def write_file_tool( content: str, append: bool = False, ) -> str: - """Write text content to a file. By default this overwrites the target file; set append to true to add content to the end without replacing existing content. + """Write text content to a file. By default this overwrites the target file; set append=True to add content to the end without replacing existing content. + + SIZE POLICY (issue #3189): + A single non-append write_file call must not exceed 80 KB of UTF-8 content. + Oversized single-shot writes correlate with LLM streaming chunk-gap + timeouts because the tool-call JSON payload — which the model must emit as + one continuous stream — grows past the safe window. For larger documents, + use ONE of these strategies (write_file rejects oversized payloads with an + actionable error): + + 1. INCREMENTAL EDIT (preferred for revisions): after the initial write, + use `str_replace` to surgically update sections. This is the same + pattern Claude Code's Write+Edit and OpenAI Codex's apply_patch use, + and keeps each tool call's payload small. + 2. APPEND-IN-CHUNKS (for new long-form content): split the document into + sections, each well under 80 KB. First call uses append=False to + create the file; subsequent calls use append=True. The 80 KB cap does + NOT apply to append=True calls. + + Operators can override the cap via env var `DEERFLOW_WRITE_FILE_MAX_BYTES` + (0 disables the guard entirely). Raising it risks streaming timeouts. Args: description: Explain why you are writing to this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST. path: The **absolute** path to the file to write to. ALWAYS PROVIDE THIS PARAMETER SECOND. content: The content to write to the file. ALWAYS PROVIDE THIS PARAMETER THIRD. - append: Whether to append content to the end of the file instead of overwriting it. Defaults to false. + append: Whether to append content to the end of the file instead of overwriting it. Defaults to False. """ + if not append: + max_bytes = _effective_write_file_max_bytes() + if max_bytes > 0: + content_bytes = len(content.encode("utf-8")) + if content_bytes > max_bytes: + return ( + f"Error: write_file content ({content_bytes} bytes) exceeds the " + f"{max_bytes}-byte single-call limit. Split the content into smaller " + "pieces: either (a) write the first section now, then use `str_replace` " + "for further edits, or (b) call write_file again with append=True " + "carrying the next section. See SIZE POLICY in the tool docstring " + "or issue #3189 for the rationale." + ) try: requested_path = path sandbox = ensure_sandbox_initialized(runtime) diff --git a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py index 08f0c7593..176194729 100644 --- a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py +++ b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py @@ -24,6 +24,17 @@ Do NOT use for simple, single-step operations.""", - Do NOT ask for clarification - work with the information provided + +When revising an existing file, prefer `str_replace` over `write_file` — +it sends only the diff and avoids re-emitting the whole file (mirrors +Claude Code's Edit and Codex's apply_patch). When writing long new +content from scratch, split it into sections: the first `write_file` +call creates the file, then use `write_file` with append=True to extend +it section by section. This keeps each tool call small and avoids +mid-stream chunk-gap timeouts on oversized single-shot writes. +(See issue #3189.) + + When you complete the task, provide: 1. A brief summary of what was accomplished diff --git a/backend/tests/test_lead_agent_prompt.py b/backend/tests/test_lead_agent_prompt.py index a03fa02b5..78a5739f3 100644 --- a/backend/tests/test_lead_agent_prompt.py +++ b/backend/tests/test_lead_agent_prompt.py @@ -373,3 +373,44 @@ def test_warm_enabled_skills_cache_logs_on_timeout(monkeypatch, caplog): assert warmed is False assert "Timed out waiting" in caplog.text + + +def test_system_prompt_template_contains_file_editing_workflow_rule(): + """The File Editing Workflow rule must remain in the system prompt + template so the planner picks the right tool (str_replace for edits, + write_file + append=True for long new content) and avoids mid-stream + chunk-gap timeouts on oversized single-shot writes. See issue #3189 + / PR #3195. + + We deliberately do NOT assert on any specific byte / word threshold + here — that would re-introduce the docstring-lock-in pattern the + reviewers flagged. The numeric cap lives in the server-side guard + (see test_write_file_tool_size_guard.py), which is where it belongs. + """ + template = prompt_module.SYSTEM_PROMPT_TEMPLATE + # Section anchor — keeps the rule discoverable in the assembled prompt. + assert "File Editing Workflow" in template + # Behavioural anchors — if either of these disappears, the model will + # silently regress to single-shot write_file calls for long content. + assert "str_replace" in template + assert "append=True" in template + + +def test_system_prompt_template_preserves_placeholders(): + """Ensure the chunking-rule edit didn't drop any f-string placeholder + consumed by apply_prompt_template(). A missing placeholder would + crash prompt rendering at runtime. + """ + template = prompt_module.SYSTEM_PROMPT_TEMPLATE + for ph in ( + "{agent_name}", + "{soul}", + "{self_update_section}", + "{subagent_thinking}", + "{skills_section}", + "{deferred_tools_section}", + "{subagent_section}", + "{acp_section}", + "{subagent_reminder}", + ): + assert ph in template, f"placeholder {ph} accidentally removed" diff --git a/backend/tests/test_llm_error_handling_middleware.py b/backend/tests/test_llm_error_handling_middleware.py index 9d2c0fa77..7cce9a4cf 100644 --- a/backend/tests/test_llm_error_handling_middleware.py +++ b/backend/tests/test_llm_error_handling_middleware.py @@ -373,7 +373,11 @@ def test_sync_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPatch) -> result = middleware.wrap_model_call(SimpleNamespace(), handler) assert isinstance(result, AIMessage) + # ReadError is a generic connection drop, not a chunk-gap timeout, so + # it must fall back to the legacy transient copy rather than the + # specialized "split the work into smaller steps" guidance (#3195 CR). assert "temporarily unavailable" in result.content + assert "streaming response was interrupted" not in result.content assert attempts == 3 # exhausted all retries assert len(waits) == 2 # slept between attempts 1→2 and 2→3 @@ -397,7 +401,11 @@ async def test_async_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPa result = await middleware.awrap_model_call(SimpleNamespace(), handler) assert isinstance(result, AIMessage) + # ReadError is a generic connection drop, not a chunk-gap timeout, so + # it must fall back to the legacy transient copy rather than the + # specialized "split the work into smaller steps" guidance (#3195 CR). assert "temporarily unavailable" in result.content + assert "streaming response was interrupted" not in result.content assert attempts == 3 # exhausted all retries assert len(waits) == 2 # slept between attempts 1→2 and 2→3 @@ -462,3 +470,211 @@ async def test_async_circuit_breaker_trips_and_recovers(monkeypatch: pytest.Monk assert result.content == "Success" assert middleware._circuit_failure_count == 0 # RESET assert middleware._check_circuit() is False + + +class _StreamChunkTimeoutError(Exception): + """Local stand-in for langchain_openai's StreamChunkTimeoutError — + matched by class name, no langchain-openai import needed (mirrors + how this file already stubs httpx.ReadError / RemoteProtocolError). + """ + + +_StreamChunkTimeoutError.__name__ = "StreamChunkTimeoutError" + + +def test_classify_error_stream_chunk_timeout_is_retriable() -> None: + """StreamChunkTimeoutError must be classified as transient/retriable.""" + middleware = _build_middleware() + exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s (model=mimo-v2.5, chunks_received=58).") + exc.__class__.__name__ = "StreamChunkTimeoutError" + retriable, reason = middleware._classify_error(exc) + assert retriable is True + assert reason == "transient" + + +def test_sync_stream_chunk_timeout_retries_once( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Sync handler raising StreamChunkTimeoutError is retried exactly once — + the per-exception override caps it at 2 total attempts (1 first call + 1 + retry) even when retry_max_attempts=3. + Same-payload retry on a chunk-gap timeout buffers the same way upstream; + a full 3-attempt loop would stack 6-12 minutes of dead air before + surfacing failure. We keep one cheap reconnect for genuine transient TCP + blips, then surface the failure so the model can re-plan on its next turn. + """ + middleware = _build_middleware( + retry_max_attempts=3, + retry_base_delay_ms=10, + retry_cap_delay_ms=10, + ) + attempts = 0 + waits: list[float] = [] + monkeypatch.setattr("time.sleep", lambda d: waits.append(d)) + + def handler(_request) -> AIMessage: + nonlocal attempts + attempts += 1 + raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s") + + result = middleware.wrap_model_call(SimpleNamespace(), handler) + + assert isinstance(result, AIMessage) + assert "streaming response was interrupted" in result.content + # Override caps StreamChunkTimeoutError at 2 attempts (1 first call + 1 retry). + assert attempts == 2 + # Exactly one sleep between the first attempt and the single retry. + assert len(waits) == 1 + + +@pytest.mark.anyio +async def test_async_stream_chunk_timeout_retries_once( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Async mirror of the sync test: StreamChunkTimeoutError is capped at + 2 attempts (1 first call + 1 retry) so we don't stack 6-12 minutes of + dead air on a same-payload buffering failure. + """ + middleware = _build_middleware( + retry_max_attempts=3, + retry_base_delay_ms=10, + retry_cap_delay_ms=10, + ) + attempts = 0 + waits: list[float] = [] + + async def fake_sleep(d: float) -> None: + waits.append(d) + + monkeypatch.setattr(asyncio, "sleep", fake_sleep) + + async def handler(_request) -> AIMessage: + nonlocal attempts + attempts += 1 + raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s") + + result = await middleware.awrap_model_call(SimpleNamespace(), handler) + + assert isinstance(result, AIMessage) + assert "streaming response was interrupted" in result.content + assert attempts == 2 + # Exactly one sleep between the first attempt and the single retry. + assert len(waits) == 1 + + +def test_max_attempts_for_returns_override_for_stream_chunk_timeout() -> None: + """StreamChunkTimeoutError must use the tightened budget (2 = "keep one retry"), + not the default of 3.""" + middleware = _build_middleware(retry_max_attempts=3) + exc = _StreamChunkTimeoutError("upstream stalled") + exc.__class__.__name__ = "StreamChunkTimeoutError" + + assert middleware._max_attempts_for(exc) == 2 + + +def test_max_attempts_for_falls_back_to_default_for_unlisted_exception() -> None: + """ReadError / RemoteProtocolError keep the full retry budget — only + StreamChunkTimeoutError pays for stalling upstream for `stream_chunk_timeout` + seconds per attempt, so only it gets the tighter cap. + """ + middleware = _build_middleware(retry_max_attempts=3) + + read_err = _ReadError("conn reset") + read_err.__class__.__name__ = "ReadError" + proto_err = _RemoteProtocolError("peer closed") + proto_err.__class__.__name__ = "RemoteProtocolError" + + assert middleware._max_attempts_for(read_err) == 3 + assert middleware._max_attempts_for(proto_err) == 3 + assert middleware._max_attempts_for(FakeError("boom")) == 3 + + +def test_max_attempts_for_override_never_exceeds_user_cap() -> None: + """If the operator lowered retry_max_attempts below the override default, + the user-configured cap wins — overrides only ever *tighten*, never loosen. + """ + middleware = _build_middleware(retry_max_attempts=1) + exc = _StreamChunkTimeoutError("upstream stalled") + exc.__class__.__name__ = "StreamChunkTimeoutError" + + assert middleware._max_attempts_for(exc) == 1 + + +def test_user_message_for_stream_chunk_timeout_mentions_split_or_shorten() -> None: + """When the retry budget for StreamChunkTimeoutError is exhausted, the user + message must guide the user toward splitting / shortening the request + instead of suggesting a generic retry. This is the actionable advice + Reviewer B asked for in the follow-up CR (issue #3189). + """ + middleware = _build_middleware() + exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s") + exc.__class__.__name__ = "StreamChunkTimeoutError" + + message = middleware._build_user_message(exc, reason="transient") + + assert "streaming response was interrupted" in message + assert "split" in message or "shorten" in message + # The old generic "streaming response was interrupted" wording must NOT appear here, + # otherwise the actionable guidance is buried. + assert "temporarily unavailable" not in message + + +def test_user_message_for_remote_protocol_error_uses_generic_transient_copy() -> None: + """RemoteProtocolError is a generic connection drop that can fire on + transient network blips with perfectly normal payloads. The + "split the work into smaller steps" guidance only applies when the + upstream chunk-gap watchdog fires (StreamChunkTimeoutError), so + RemoteProtocolError must fall back to the legacy transient copy. + Regression guard for the #3195 CR feedback. + """ + middleware = _build_middleware() + exc = _RemoteProtocolError("Server closed connection unexpectedly") + exc.__class__.__name__ = "RemoteProtocolError" + + message = middleware._build_user_message(exc, reason="transient") + + assert "temporarily unavailable" in message + assert "streaming response was interrupted" not in message + + +def test_user_message_for_read_error_uses_generic_transient_copy() -> None: + """httpx.ReadError is symmetric to RemoteProtocolError: a generic + connection drop that must NOT receive the "split the work" guidance. + Regression guard for the #3195 CR feedback. + """ + middleware = _build_middleware() + exc = FakeError("connection dropped mid-stream") + exc.__class__.__name__ = "ReadError" + + message = middleware._build_user_message(exc, reason="transient") + + assert "temporarily unavailable" in message + assert "streaming response was interrupted" not in message + + +def test_user_message_for_generic_transient_keeps_legacy_copy() -> None: + """Generic transient errors (HTTP 503, 'cluster busy', etc.) must keep + the original 'streaming response was interrupted' message — only stream-drop + exceptions get the new specialized copy. This prevents regression on + callers who already rely on the legacy wording. + """ + middleware = _build_middleware() + exc = FakeError("server busy", status_code=503) + + message = middleware._build_user_message(exc, reason="transient") + + assert "temporarily unavailable" in message + assert "streaming response was interrupted" not in message + + +def test_user_message_for_quota_unchanged() -> None: + """Sanity check: the quota / auth branches must remain untouched by the + stream-drop refactor. + """ + middleware = _build_middleware() + exc = FakeError("insufficient_quota", status_code=429, code="insufficient_quota") + + message = middleware._build_user_message(exc, reason="quota") + + assert "out of quota" in message + assert "streaming response was interrupted" not in message diff --git a/backend/tests/test_model_factory.py b/backend/tests/test_model_factory.py index f8b84331c..562c8874c 100644 --- a/backend/tests/test_model_factory.py +++ b/backend/tests/test_model_factory.py @@ -1069,3 +1069,116 @@ def test_no_duplicate_kwarg_when_reasoning_effort_in_config_and_thinking_disable # kwargs (runtime) takes precedence: thinking-disabled path sets reasoning_effort=minimal assert captured.get("reasoning_effort") == "minimal" + + +# --------------------------------------------------------------------------- +# stream_chunk_timeout default injection (issue #3189) +# --------------------------------------------------------------------------- + + +def test_stream_chunk_timeout_defaults_to_240_for_openai_compatible_model(monkeypatch): + """OpenAI-compatible clients must receive a generous 240s chunk-gap budget by + default, so reasoning models with long thinking pauses don't trip + langchain-openai's aggressive 60s built-in default. + """ + model = _make_model(use="langchain_openai:ChatOpenAI") + cfg = _make_app_config([model]) + + captured: dict = {} + + class CapturingModel(FakeChatModel): + def __init__(self, **kwargs): + captured.update(kwargs) + BaseChatModel.__init__(self, **kwargs) + + _patch_factory(monkeypatch, cfg, model_class=CapturingModel) + factory_module.create_chat_model(name="test-model") + + assert captured.get("stream_chunk_timeout") == 240.0 + + +def test_stream_chunk_timeout_user_value_not_overridden(monkeypatch): + """If the user explicitly sets stream_chunk_timeout in config.yaml, the + factory must not overwrite it with the default — even if the value is + smaller (60s) or larger (600s) than the default. + """ + model = ModelConfig( + name="custom-timeout-model", + display_name="Custom Timeout", + description=None, + use="langchain_openai:ChatOpenAI", + model="gpt-4o-mini", + stream_chunk_timeout=60.0, # user-set explicit value + ) + cfg = _make_app_config([model]) + + captured: dict = {} + + class CapturingModel(FakeChatModel): + def __init__(self, **kwargs): + captured.update(kwargs) + BaseChatModel.__init__(self, **kwargs) + + _patch_factory(monkeypatch, cfg, model_class=CapturingModel) + factory_module.create_chat_model(name="custom-timeout-model") + + assert captured.get("stream_chunk_timeout") == 60.0 + + +def test_stream_chunk_timeout_not_injected_for_non_openai_provider(monkeypatch): + """Only langchain_openai:ChatOpenAI receives the default. Anthropic / Vertex / + other clients that don't understand this kwarg must not be polluted with it. + """ + model = _make_model(use="langchain_anthropic:ChatAnthropic") + cfg = _make_app_config([model]) + + captured: dict = {} + + class CapturingModel(FakeChatModel): + def __init__(self, **kwargs): + captured.update(kwargs) + BaseChatModel.__init__(self, **kwargs) + + _patch_factory(monkeypatch, cfg, model_class=CapturingModel) + factory_module.create_chat_model(name="test-model") + + assert "stream_chunk_timeout" not in captured + + +def test_stream_chunk_timeout_default_constant_is_documented(): + """Lock the default value at 240s. If we ever want to change this, the + deliberate update here (and the docstring on _apply_stream_chunk_timeout_default) + forces a paired review of the rationale comment block above the constant. + """ + assert factory_module._DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS == 240.0 + + +def test_stream_chunk_timeout_popped_for_non_openai_provider_when_user_set_it(monkeypatch): + """Regression for CR feedback on issue #3189: if a user accidentally sets + ``stream_chunk_timeout`` on a non-OpenAI provider, the factory must drop + the kwarg before forwarding it to the model constructor. Otherwise the + third-party client raises ``TypeError: unexpected keyword argument + 'stream_chunk_timeout'`` because the parameter is specific to + ``langchain_openai:ChatOpenAI``. + """ + model = ModelConfig( + name="anthropic-with-stray-timeout", + display_name="Anthropic With Stray Timeout", + description=None, + use="langchain_anthropic:ChatAnthropic", + model="claude-sonnet-4", + stream_chunk_timeout=60.0, # user-set on a non-OpenAI provider — must be dropped + ) + cfg = _make_app_config([model]) + + captured: dict = {} + + class CapturingModel(FakeChatModel): + def __init__(self, **kwargs): + captured.update(kwargs) + BaseChatModel.__init__(self, **kwargs) + + _patch_factory(monkeypatch, cfg, model_class=CapturingModel) + factory_module.create_chat_model(name="anthropic-with-stray-timeout") + + assert "stream_chunk_timeout" not in captured diff --git a/backend/tests/test_write_file_tool_size_guard.py b/backend/tests/test_write_file_tool_size_guard.py new file mode 100644 index 000000000..21f20a6bd --- /dev/null +++ b/backend/tests/test_write_file_tool_size_guard.py @@ -0,0 +1,116 @@ +"""Size-guard tests for write_file_tool (issue #3189, PR #3195). + +These tests verify that write_file_tool rejects oversized single-shot payloads +with an actionable message, while leaving append-mode and env-override paths +untouched. They run purely against the tool's internal guard — no real sandbox +or filesystem is exercised, so they're fast and hermetic. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from deerflow.sandbox import tools as tools_module +from deerflow.sandbox.tools import write_file_tool + + +def _call_write_file(*, content: str, append: bool = False) -> str: + """Invoke write_file_tool via its underlying callable. + + We patch the sandbox initialisation chain to a no-op MagicMock so the test + focuses purely on the size guard. The guard runs BEFORE any sandbox call, + so when the guard rejects we never enter the patched path; when the guard + passes, the patched sandbox.write_file returns silently and the tool + returns "OK". + """ + fn = getattr(write_file_tool, "func", write_file_tool) + runtime = MagicMock() + + with ( + patch.object(tools_module, "ensure_sandbox_initialized") as mock_ensure, + patch.object(tools_module, "ensure_thread_directories_exist"), + patch.object(tools_module, "is_local_sandbox", return_value=False), + patch.object(tools_module, "get_file_operation_lock") as mock_lock, + ): + sandbox = MagicMock() + sandbox.write_file = MagicMock() + mock_ensure.return_value = sandbox + mock_lock.return_value.__enter__ = MagicMock(return_value=None) + mock_lock.return_value.__exit__ = MagicMock(return_value=False) + + return fn( + runtime=runtime, + description="test write", + path="/tmp/test.txt", + content=content, + append=append, + ) + + +def test_below_cap_succeeds(): + """A 79 KB payload sits comfortably under the 80 KB default and must pass + straight through to the sandbox layer. + """ + payload = "a" * (79 * 1024) + result = _call_write_file(content=payload) + assert result == "OK" + + +def test_above_cap_returns_actionable_error(): + """An 81 KB payload trips the guard. The error message must name the + cap, the actual size, and steer the LLM toward str_replace / append=True + — these are the exact handles Reviewer A/B asked for in PR #3195. + """ + payload = "a" * (81 * 1024) + result = _call_write_file(content=payload) + + assert result.startswith("Error: write_file content") + assert "81920 bytes" in result or "82944 bytes" in result, "Error must report the actual content size so the LLM/operator can judge how much to trim or chunk." + assert "str_replace" in result, "Error must point to str_replace as the preferred incremental-edit path." + assert "append=True" in result, "Error must also surface the append-in-chunks alternative." + + +def test_above_cap_with_append_true_bypasses_guard(): + """append=True is the *correct* way to write a large document in chunks, + so the guard must not block it. The 80 KB cap intentionally applies only + to single-shot overwrite calls. + """ + payload = "a" * (200 * 1024) # 200 KB + result = _call_write_file(content=payload, append=True) + assert result == "OK", f"append=True must bypass the size guard, got: {result!r}" + + +def test_env_override_raises_cap(monkeypatch: pytest.MonkeyPatch): + """Setting DEERFLOW_WRITE_FILE_MAX_BYTES lets deployments accept larger + payloads when the underlying LLM/network can demonstrably handle them. + """ + monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", str(300 * 1024)) + payload = "a" * (150 * 1024) # 150 KB — would normally trip the 80 KB cap + result = _call_write_file(content=payload) + assert result == "OK" + + +def test_env_override_zero_disables_guard(monkeypatch: pytest.MonkeyPatch): + """Setting the env var to 0 is the documented escape hatch for operators + who want to opt out of the guard entirely (e.g. when running models with + very large stream_chunk_timeout values). + """ + monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "0") + payload = "a" * (500 * 1024) # 500 KB + result = _call_write_file(content=payload) + assert result == "OK" + + +def test_env_override_malformed_falls_back_to_default(monkeypatch: pytest.MonkeyPatch): + """A typo in the env var (e.g. 'lots') must not crash the tool — fall + back silently to the safe 80 KB default. Crashing on every write because + of a misconfigured env var would be far worse than ignoring it. + """ + monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "lots") + # 100 KB should still be rejected because the malformed value falls back + # to the 80 KB default. + payload = "a" * (100 * 1024) + result = _call_write_file(content=payload) + assert result.startswith("Error: write_file content")