diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py
index ef7917104..2e4d32cec 100644
--- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py
+++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py
@@ -542,6 +542,14 @@ combined with a FastAPI gateway for REST API access [citation:FastAPI](https://f
{subagent_reminder}- Skill First: Always load the relevant skill before starting **complex** tasks.
- Progressive Loading: Load resources incrementally as referenced in skills
- Output Files: Final deliverables must be in `/mnt/user-data/outputs`
+- File Editing Workflow: When revising an existing file, prefer
+ `str_replace` over `write_file` — it sends only the diff and avoids
+ re-emitting the whole file (mirrors Claude Code's Edit and Codex's
+ apply_patch). When writing long new content from scratch, split it
+ into sections: the first `write_file` call creates the file, then use
+ `write_file` with append=True to extend it section by section. This
+ keeps each tool call small and avoids mid-stream chunk-gap timeouts
+ on oversized single-shot writes. (See issue #3189.)
- Clarity: Be direct and helpful, avoid unnecessary meta-commentary
- Including Images and Mermaid: Images and Mermaid diagrams are always welcomed in the Markdown format, and you're encouraged to use `\n\n` or "```mermaid" to display images in response or Markdown files
- Multi-task: Better utilize parallel tool calling to call multiple tools at one time for better performance
diff --git a/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py
index a489d90d0..4ec3b7fe6 100644
--- a/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py
+++ b/backend/packages/harness/deerflow/agents/middlewares/llm_error_handling_middleware.py
@@ -62,6 +62,41 @@ _AUTH_PATTERNS = (
"未授权",
)
+# Per-exception retry budget overrides.
+#
+# Some transient errors are retriable in principle but expensive to retry at
+# the default budget. StreamChunkTimeoutError in particular fires after the
+# upstream provider has already stalled for `stream_chunk_timeout` seconds
+# (typically 120-240s); a full 3-attempt loop can therefore stack 6-12 minutes
+# of dead air before surfacing the failure to the user. We keep exactly one
+# retry (cheap reconnect that catches genuine transient TCP blips) and then
+# fail fast — the same buffered payload is overwhelmingly likely to fail
+# again at the upstream provider for the same reason.
+#
+# Keys are exception class *names* (not classes) so we don't introduce
+# import-time coupling on optional dependencies like langchain-openai. The
+# value is the absolute max attempt count, NOT additional retries — so a
+# value of 2 means "1 first attempt + 1 retry" (the CR-requested
+# "keep one retry" behavior).
+_RETRY_BUDGET_OVERRIDES: dict[str, int] = {
+ "StreamChunkTimeoutError": 2,
+}
+
+# Exception class names that indicate the upstream stream-chunk watchdog
+# fired because the model stalled mid-flight. These deserve a more specific
+# user-facing message than the generic "temporarily unavailable" copy,
+# because the typical root cause is a long tool-call serialization stalling
+# the upstream stream — and the most actionable advice we can give the user
+# is "ask for a shorter / split output" rather than "wait and retry".
+# Generic connection drops (httpx RemoteProtocolError / ReadError) are
+# intentionally excluded: they routinely fire on transient network blips
+# with normal payloads, where the "split the work" guidance is misleading.
+_STREAM_DROP_EXCEPTIONS: frozenset[str] = frozenset(
+ {
+ "StreamChunkTimeoutError",
+ }
+)
+
class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
"""Retry transient LLM errors and surface graceful assistant messages."""
@@ -83,6 +118,18 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
self._circuit_state = "closed"
self._circuit_probe_in_flight = False
+ def _max_attempts_for(self, exc: BaseException) -> int:
+ """Return the effective max attempt count for this exception.
+
+ Falls back to `self.retry_max_attempts` unless the exception class name
+ appears in the per-exception override table.
+ """
+ override = _RETRY_BUDGET_OVERRIDES.get(type(exc).__name__)
+ if override is None:
+ return self.retry_max_attempts
+
+ return min(override, self.retry_max_attempts)
+
def _check_circuit(self) -> bool:
"""Returns True if circuit is OPEN (fast fail), False otherwise."""
with self._circuit_lock:
@@ -153,6 +200,7 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
"InternalServerError",
"ReadError", # httpx.ReadError: connection dropped mid-stream
"RemoteProtocolError", # httpx: server closed connection unexpectedly
+ "StreamChunkTimeoutError", # langchain-openai: chunk gap exceeded stream_chunk_timeout
}:
return True, "transient"
if status_code in _RETRIABLE_STATUS_CODES:
@@ -202,6 +250,20 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
if reason == "auth":
return "The configured LLM provider rejected the request because authentication or access is invalid. Please check the provider credentials and try again."
if reason in {"busy", "transient"}:
+ # Stream-drop failures (chunk-gap timeout, peer-closed connection,
+ # raw read error) almost always point at a single oversized
+ # tool-call payload — the model spent so long serializing JSON
+ # arguments that the upstream provider buffered and the stream
+ # gap exceeded `stream_chunk_timeout`. Surfacing this distinct
+ # cause lets the user split or shorten their next request
+ # instead of helplessly retrying the same prompt.
+ if type(exc).__name__ in _STREAM_DROP_EXCEPTIONS:
+ return (
+ "The model's streaming response was interrupted before it could "
+ "finish. This usually happens when a single response or tool call "
+ "is very large — please ask the assistant to split the work into "
+ "smaller steps, or shorten the requested output, and try again."
+ )
return "The configured LLM provider is temporarily unavailable after multiple retries. Please wait a moment and continue the conversation."
return f"LLM request failed: {detail}"
@@ -259,7 +321,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
raise
except Exception as exc:
retriable, reason = self._classify_error(exc)
- if retriable and attempt < self.retry_max_attempts:
+ max_attempts = self._max_attempts_for(exc)
+ if retriable and attempt < max_attempts:
wait_ms = self._build_retry_delay_ms(attempt, exc)
logger.warning(
"Transient LLM error on attempt %d/%d; retrying in %dms: %s",
@@ -310,7 +373,8 @@ class LLMErrorHandlingMiddleware(AgentMiddleware[AgentState]):
raise
except Exception as exc:
retriable, reason = self._classify_error(exc)
- if retriable and attempt < self.retry_max_attempts:
+ max_attempts = self._max_attempts_for(exc)
+ if retriable and attempt < max_attempts:
wait_ms = self._build_retry_delay_ms(attempt, exc)
logger.warning(
"Transient LLM error on attempt %d/%d; retrying in %dms: %s",
diff --git a/backend/packages/harness/deerflow/config/model_config.py b/backend/packages/harness/deerflow/config/model_config.py
index e9a3e1c16..b747eec99 100644
--- a/backend/packages/harness/deerflow/config/model_config.py
+++ b/backend/packages/harness/deerflow/config/model_config.py
@@ -32,6 +32,16 @@ class ModelConfig(BaseModel):
description="Extra settings to be passed to the model when thinking is disabled",
)
supports_vision: bool = Field(default_factory=lambda: False, description="Whether the model supports vision/image inputs")
+ stream_chunk_timeout: float | None = Field(
+ default=None,
+ description=(
+ "Maximum seconds to wait between successive streaming chunks before "
+ "langchain-openai raises StreamChunkTimeoutError. None means use the "
+ "factory default (240s for OpenAI-compatible clients). Tune higher for "
+ "reasoning models with long thinking pauses; lower for latency-sensitive "
+ "interactive endpoints. Has no effect on non-OpenAI-compatible providers."
+ ),
+ )
thinking: dict | None = Field(
default_factory=lambda: None,
description=(
diff --git a/backend/packages/harness/deerflow/models/factory.py b/backend/packages/harness/deerflow/models/factory.py
index c6a3573f8..57eb15001 100644
--- a/backend/packages/harness/deerflow/models/factory.py
+++ b/backend/packages/harness/deerflow/models/factory.py
@@ -47,6 +47,38 @@ def _enable_stream_usage_by_default(model_use_path: str, model_settings_from_con
model_settings_from_config["stream_usage"] = True
+# Default chunk-gap budget for OpenAI-compatible streaming responses.
+#
+# langchain-openai raises ``StreamChunkTimeoutError`` after this many seconds
+# without receiving a chunk. Its own default is 60s, which is too aggressive for
+# reasoning models (DeepSeek-R1, Doubao-thinking, GPT-5) whose first chunk can
+# legitimately take 90~150s. We default to 240s so the streaming layer rarely
+# trips on long thinking pauses; the LLMErrorHandlingMiddleware still retries
+# (budget=2) if a real stall happens. Users can override per-model in config.yaml.
+_DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS: float = 240.0
+
+
+def _apply_stream_chunk_timeout_default(model_use_path: str, model_settings_from_config: dict) -> None:
+ """Inject a generous ``stream_chunk_timeout`` for OpenAI-compatible clients.
+
+ The ``stream_chunk_timeout`` kwarg is specific to ``langchain_openai:ChatOpenAI``
+ and is rejected by other providers' constructors as an unexpected keyword
+ argument. Behaviour:
+
+ * OpenAI-compatible path: an explicit value in ``config.yaml`` is preserved.
+ An explicit ``null`` is dropped upstream by ``model_dump(exclude_none=True)``
+ and therefore treated as "unset", so the default is injected.
+ * Non-OpenAI path: drop the key so it is never forwarded to an incompatible
+ constructor (which would raise ``TypeError: unexpected keyword argument``).
+ """
+ if model_use_path != "langchain_openai:ChatOpenAI":
+ model_settings_from_config.pop("stream_chunk_timeout", None)
+ return
+ if "stream_chunk_timeout" in model_settings_from_config:
+ return
+ model_settings_from_config["stream_chunk_timeout"] = _DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS
+
+
def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *, app_config: AppConfig | None = None, attach_tracing: bool = True, **kwargs) -> BaseChatModel:
"""Create a chat model instance from the config.
@@ -128,6 +160,7 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
model_settings_from_config.pop("reasoning_effort", None)
_enable_stream_usage_by_default(model_config.use, model_settings_from_config)
+ _apply_stream_chunk_timeout_default(model_config.use, model_settings_from_config)
# For Codex Responses API models: map thinking mode to reasoning_effort
from deerflow.models.openai_codex_provider import CodexChatModel
diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py
index 6edc88882..4c04e3ac7 100644
--- a/backend/packages/harness/deerflow/sandbox/tools.py
+++ b/backend/packages/harness/deerflow/sandbox/tools.py
@@ -1,4 +1,5 @@
import asyncio
+import os
import posixpath
import re
import shlex
@@ -43,6 +44,16 @@ _MAX_GLOB_MAX_RESULTS = 1000
_DEFAULT_GREP_MAX_RESULTS = 100
_MAX_GREP_MAX_RESULTS = 500
_DEFAULT_WRITE_FILE_ERROR_MAX_CHARS = 2000
+
+# Maximum bytes accepted in a single non-append write_file call (issue #3189).
+# Oversized single-shot writes correlate with LLM streaming chunk-gap timeouts
+# because the tool-call JSON payload (which the model must emit as one
+# continuous stream) grows past the safe window. 80 KB ≈ 20K tokens, a
+# comfortable headroom under the factory-default 240s stream_chunk_timeout.
+# Deployments can override via env var DEERFLOW_WRITE_FILE_MAX_BYTES; set to
+# 0 (or negative) to disable the guard entirely.
+_WRITE_FILE_CONTENT_MAX_BYTES = 80 * 1024
+_WRITE_FILE_MAX_BYTES_ENV = "DEERFLOW_WRITE_FILE_MAX_BYTES"
_LOCAL_BASH_CWD_COMMANDS = {"cd", "pushd"}
_LOCAL_BASH_COMMAND_WRAPPERS = {"command", "builtin"}
_LOCAL_BASH_COMMAND_PREFIX_KEYWORDS = {"!", "{", "case", "do", "elif", "else", "for", "if", "select", "then", "time", "until", "while"}
@@ -1671,6 +1682,23 @@ async def _read_file_tool_async(
read_file_tool.coroutine = _read_file_tool_async
+def _effective_write_file_max_bytes() -> int:
+ """Return the active size cap for non-append write_file calls.
+
+ Reads ``DEERFLOW_WRITE_FILE_MAX_BYTES`` at call time (not import time)
+ so tests and runtime tweaks take effect without restart. Falls back to
+ the default on missing/malformed values. A non-positive value disables
+ the guard.
+ """
+ raw = os.environ.get(_WRITE_FILE_MAX_BYTES_ENV)
+ if raw is None:
+ return _WRITE_FILE_CONTENT_MAX_BYTES
+ try:
+ return int(raw)
+ except ValueError:
+ return _WRITE_FILE_CONTENT_MAX_BYTES
+
+
@tool("write_file", parse_docstring=True)
def write_file_tool(
runtime: Runtime,
@@ -1679,14 +1707,47 @@ def write_file_tool(
content: str,
append: bool = False,
) -> str:
- """Write text content to a file. By default this overwrites the target file; set append to true to add content to the end without replacing existing content.
+ """Write text content to a file. By default this overwrites the target file; set append=True to add content to the end without replacing existing content.
+
+ SIZE POLICY (issue #3189):
+ A single non-append write_file call must not exceed 80 KB of UTF-8 content.
+ Oversized single-shot writes correlate with LLM streaming chunk-gap
+ timeouts because the tool-call JSON payload — which the model must emit as
+ one continuous stream — grows past the safe window. For larger documents,
+ use ONE of these strategies (write_file rejects oversized payloads with an
+ actionable error):
+
+ 1. INCREMENTAL EDIT (preferred for revisions): after the initial write,
+ use `str_replace` to surgically update sections. This is the same
+ pattern Claude Code's Write+Edit and OpenAI Codex's apply_patch use,
+ and keeps each tool call's payload small.
+ 2. APPEND-IN-CHUNKS (for new long-form content): split the document into
+ sections, each well under 80 KB. First call uses append=False to
+ create the file; subsequent calls use append=True. The 80 KB cap does
+ NOT apply to append=True calls.
+
+ Operators can override the cap via env var `DEERFLOW_WRITE_FILE_MAX_BYTES`
+ (0 disables the guard entirely). Raising it risks streaming timeouts.
Args:
description: Explain why you are writing to this file in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
path: The **absolute** path to the file to write to. ALWAYS PROVIDE THIS PARAMETER SECOND.
content: The content to write to the file. ALWAYS PROVIDE THIS PARAMETER THIRD.
- append: Whether to append content to the end of the file instead of overwriting it. Defaults to false.
+ append: Whether to append content to the end of the file instead of overwriting it. Defaults to False.
"""
+ if not append:
+ max_bytes = _effective_write_file_max_bytes()
+ if max_bytes > 0:
+ content_bytes = len(content.encode("utf-8"))
+ if content_bytes > max_bytes:
+ return (
+ f"Error: write_file content ({content_bytes} bytes) exceeds the "
+ f"{max_bytes}-byte single-call limit. Split the content into smaller "
+ "pieces: either (a) write the first section now, then use `str_replace` "
+ "for further edits, or (b) call write_file again with append=True "
+ "carrying the next section. See SIZE POLICY in the tool docstring "
+ "or issue #3189 for the rationale."
+ )
try:
requested_path = path
sandbox = ensure_sandbox_initialized(runtime)
diff --git a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py
index 08f0c7593..176194729 100644
--- a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py
+++ b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py
@@ -24,6 +24,17 @@ Do NOT use for simple, single-step operations.""",
- Do NOT ask for clarification - work with the information provided
+
+When revising an existing file, prefer `str_replace` over `write_file` —
+it sends only the diff and avoids re-emitting the whole file (mirrors
+Claude Code's Edit and Codex's apply_patch). When writing long new
+content from scratch, split it into sections: the first `write_file`
+call creates the file, then use `write_file` with append=True to extend
+it section by section. This keeps each tool call small and avoids
+mid-stream chunk-gap timeouts on oversized single-shot writes.
+(See issue #3189.)
+
+
When you complete the task, provide:
1. A brief summary of what was accomplished
diff --git a/backend/tests/test_lead_agent_prompt.py b/backend/tests/test_lead_agent_prompt.py
index a03fa02b5..78a5739f3 100644
--- a/backend/tests/test_lead_agent_prompt.py
+++ b/backend/tests/test_lead_agent_prompt.py
@@ -373,3 +373,44 @@ def test_warm_enabled_skills_cache_logs_on_timeout(monkeypatch, caplog):
assert warmed is False
assert "Timed out waiting" in caplog.text
+
+
+def test_system_prompt_template_contains_file_editing_workflow_rule():
+ """The File Editing Workflow rule must remain in the system prompt
+ template so the planner picks the right tool (str_replace for edits,
+ write_file + append=True for long new content) and avoids mid-stream
+ chunk-gap timeouts on oversized single-shot writes. See issue #3189
+ / PR #3195.
+
+ We deliberately do NOT assert on any specific byte / word threshold
+ here — that would re-introduce the docstring-lock-in pattern the
+ reviewers flagged. The numeric cap lives in the server-side guard
+ (see test_write_file_tool_size_guard.py), which is where it belongs.
+ """
+ template = prompt_module.SYSTEM_PROMPT_TEMPLATE
+ # Section anchor — keeps the rule discoverable in the assembled prompt.
+ assert "File Editing Workflow" in template
+ # Behavioural anchors — if either of these disappears, the model will
+ # silently regress to single-shot write_file calls for long content.
+ assert "str_replace" in template
+ assert "append=True" in template
+
+
+def test_system_prompt_template_preserves_placeholders():
+ """Ensure the chunking-rule edit didn't drop any f-string placeholder
+ consumed by apply_prompt_template(). A missing placeholder would
+ crash prompt rendering at runtime.
+ """
+ template = prompt_module.SYSTEM_PROMPT_TEMPLATE
+ for ph in (
+ "{agent_name}",
+ "{soul}",
+ "{self_update_section}",
+ "{subagent_thinking}",
+ "{skills_section}",
+ "{deferred_tools_section}",
+ "{subagent_section}",
+ "{acp_section}",
+ "{subagent_reminder}",
+ ):
+ assert ph in template, f"placeholder {ph} accidentally removed"
diff --git a/backend/tests/test_llm_error_handling_middleware.py b/backend/tests/test_llm_error_handling_middleware.py
index 9d2c0fa77..7cce9a4cf 100644
--- a/backend/tests/test_llm_error_handling_middleware.py
+++ b/backend/tests/test_llm_error_handling_middleware.py
@@ -373,7 +373,11 @@ def test_sync_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPatch) ->
result = middleware.wrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage)
+ # ReadError is a generic connection drop, not a chunk-gap timeout, so
+ # it must fall back to the legacy transient copy rather than the
+ # specialized "split the work into smaller steps" guidance (#3195 CR).
assert "temporarily unavailable" in result.content
+ assert "streaming response was interrupted" not in result.content
assert attempts == 3 # exhausted all retries
assert len(waits) == 2 # slept between attempts 1→2 and 2→3
@@ -397,7 +401,11 @@ async def test_async_read_error_triggers_retry_loop(monkeypatch: pytest.MonkeyPa
result = await middleware.awrap_model_call(SimpleNamespace(), handler)
assert isinstance(result, AIMessage)
+ # ReadError is a generic connection drop, not a chunk-gap timeout, so
+ # it must fall back to the legacy transient copy rather than the
+ # specialized "split the work into smaller steps" guidance (#3195 CR).
assert "temporarily unavailable" in result.content
+ assert "streaming response was interrupted" not in result.content
assert attempts == 3 # exhausted all retries
assert len(waits) == 2 # slept between attempts 1→2 and 2→3
@@ -462,3 +470,211 @@ async def test_async_circuit_breaker_trips_and_recovers(monkeypatch: pytest.Monk
assert result.content == "Success"
assert middleware._circuit_failure_count == 0 # RESET
assert middleware._check_circuit() is False
+
+
+class _StreamChunkTimeoutError(Exception):
+ """Local stand-in for langchain_openai's StreamChunkTimeoutError —
+ matched by class name, no langchain-openai import needed (mirrors
+ how this file already stubs httpx.ReadError / RemoteProtocolError).
+ """
+
+
+_StreamChunkTimeoutError.__name__ = "StreamChunkTimeoutError"
+
+
+def test_classify_error_stream_chunk_timeout_is_retriable() -> None:
+ """StreamChunkTimeoutError must be classified as transient/retriable."""
+ middleware = _build_middleware()
+ exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s (model=mimo-v2.5, chunks_received=58).")
+ exc.__class__.__name__ = "StreamChunkTimeoutError"
+ retriable, reason = middleware._classify_error(exc)
+ assert retriable is True
+ assert reason == "transient"
+
+
+def test_sync_stream_chunk_timeout_retries_once(
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ """Sync handler raising StreamChunkTimeoutError is retried exactly once —
+ the per-exception override caps it at 2 total attempts (1 first call + 1
+ retry) even when retry_max_attempts=3.
+ Same-payload retry on a chunk-gap timeout buffers the same way upstream;
+ a full 3-attempt loop would stack 6-12 minutes of dead air before
+ surfacing failure. We keep one cheap reconnect for genuine transient TCP
+ blips, then surface the failure so the model can re-plan on its next turn.
+ """
+ middleware = _build_middleware(
+ retry_max_attempts=3,
+ retry_base_delay_ms=10,
+ retry_cap_delay_ms=10,
+ )
+ attempts = 0
+ waits: list[float] = []
+ monkeypatch.setattr("time.sleep", lambda d: waits.append(d))
+
+ def handler(_request) -> AIMessage:
+ nonlocal attempts
+ attempts += 1
+ raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
+
+ result = middleware.wrap_model_call(SimpleNamespace(), handler)
+
+ assert isinstance(result, AIMessage)
+ assert "streaming response was interrupted" in result.content
+ # Override caps StreamChunkTimeoutError at 2 attempts (1 first call + 1 retry).
+ assert attempts == 2
+ # Exactly one sleep between the first attempt and the single retry.
+ assert len(waits) == 1
+
+
+@pytest.mark.anyio
+async def test_async_stream_chunk_timeout_retries_once(
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ """Async mirror of the sync test: StreamChunkTimeoutError is capped at
+ 2 attempts (1 first call + 1 retry) so we don't stack 6-12 minutes of
+ dead air on a same-payload buffering failure.
+ """
+ middleware = _build_middleware(
+ retry_max_attempts=3,
+ retry_base_delay_ms=10,
+ retry_cap_delay_ms=10,
+ )
+ attempts = 0
+ waits: list[float] = []
+
+ async def fake_sleep(d: float) -> None:
+ waits.append(d)
+
+ monkeypatch.setattr(asyncio, "sleep", fake_sleep)
+
+ async def handler(_request) -> AIMessage:
+ nonlocal attempts
+ attempts += 1
+ raise _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
+
+ result = await middleware.awrap_model_call(SimpleNamespace(), handler)
+
+ assert isinstance(result, AIMessage)
+ assert "streaming response was interrupted" in result.content
+ assert attempts == 2
+ # Exactly one sleep between the first attempt and the single retry.
+ assert len(waits) == 1
+
+
+def test_max_attempts_for_returns_override_for_stream_chunk_timeout() -> None:
+ """StreamChunkTimeoutError must use the tightened budget (2 = "keep one retry"),
+ not the default of 3."""
+ middleware = _build_middleware(retry_max_attempts=3)
+ exc = _StreamChunkTimeoutError("upstream stalled")
+ exc.__class__.__name__ = "StreamChunkTimeoutError"
+
+ assert middleware._max_attempts_for(exc) == 2
+
+
+def test_max_attempts_for_falls_back_to_default_for_unlisted_exception() -> None:
+ """ReadError / RemoteProtocolError keep the full retry budget — only
+ StreamChunkTimeoutError pays for stalling upstream for `stream_chunk_timeout`
+ seconds per attempt, so only it gets the tighter cap.
+ """
+ middleware = _build_middleware(retry_max_attempts=3)
+
+ read_err = _ReadError("conn reset")
+ read_err.__class__.__name__ = "ReadError"
+ proto_err = _RemoteProtocolError("peer closed")
+ proto_err.__class__.__name__ = "RemoteProtocolError"
+
+ assert middleware._max_attempts_for(read_err) == 3
+ assert middleware._max_attempts_for(proto_err) == 3
+ assert middleware._max_attempts_for(FakeError("boom")) == 3
+
+
+def test_max_attempts_for_override_never_exceeds_user_cap() -> None:
+ """If the operator lowered retry_max_attempts below the override default,
+ the user-configured cap wins — overrides only ever *tighten*, never loosen.
+ """
+ middleware = _build_middleware(retry_max_attempts=1)
+ exc = _StreamChunkTimeoutError("upstream stalled")
+ exc.__class__.__name__ = "StreamChunkTimeoutError"
+
+ assert middleware._max_attempts_for(exc) == 1
+
+
+def test_user_message_for_stream_chunk_timeout_mentions_split_or_shorten() -> None:
+ """When the retry budget for StreamChunkTimeoutError is exhausted, the user
+ message must guide the user toward splitting / shortening the request
+ instead of suggesting a generic retry. This is the actionable advice
+ Reviewer B asked for in the follow-up CR (issue #3189).
+ """
+ middleware = _build_middleware()
+ exc = _StreamChunkTimeoutError("No streaming chunk received for 120.0s")
+ exc.__class__.__name__ = "StreamChunkTimeoutError"
+
+ message = middleware._build_user_message(exc, reason="transient")
+
+ assert "streaming response was interrupted" in message
+ assert "split" in message or "shorten" in message
+ # The old generic "streaming response was interrupted" wording must NOT appear here,
+ # otherwise the actionable guidance is buried.
+ assert "temporarily unavailable" not in message
+
+
+def test_user_message_for_remote_protocol_error_uses_generic_transient_copy() -> None:
+ """RemoteProtocolError is a generic connection drop that can fire on
+ transient network blips with perfectly normal payloads. The
+ "split the work into smaller steps" guidance only applies when the
+ upstream chunk-gap watchdog fires (StreamChunkTimeoutError), so
+ RemoteProtocolError must fall back to the legacy transient copy.
+ Regression guard for the #3195 CR feedback.
+ """
+ middleware = _build_middleware()
+ exc = _RemoteProtocolError("Server closed connection unexpectedly")
+ exc.__class__.__name__ = "RemoteProtocolError"
+
+ message = middleware._build_user_message(exc, reason="transient")
+
+ assert "temporarily unavailable" in message
+ assert "streaming response was interrupted" not in message
+
+
+def test_user_message_for_read_error_uses_generic_transient_copy() -> None:
+ """httpx.ReadError is symmetric to RemoteProtocolError: a generic
+ connection drop that must NOT receive the "split the work" guidance.
+ Regression guard for the #3195 CR feedback.
+ """
+ middleware = _build_middleware()
+ exc = FakeError("connection dropped mid-stream")
+ exc.__class__.__name__ = "ReadError"
+
+ message = middleware._build_user_message(exc, reason="transient")
+
+ assert "temporarily unavailable" in message
+ assert "streaming response was interrupted" not in message
+
+
+def test_user_message_for_generic_transient_keeps_legacy_copy() -> None:
+ """Generic transient errors (HTTP 503, 'cluster busy', etc.) must keep
+ the original 'streaming response was interrupted' message — only stream-drop
+ exceptions get the new specialized copy. This prevents regression on
+ callers who already rely on the legacy wording.
+ """
+ middleware = _build_middleware()
+ exc = FakeError("server busy", status_code=503)
+
+ message = middleware._build_user_message(exc, reason="transient")
+
+ assert "temporarily unavailable" in message
+ assert "streaming response was interrupted" not in message
+
+
+def test_user_message_for_quota_unchanged() -> None:
+ """Sanity check: the quota / auth branches must remain untouched by the
+ stream-drop refactor.
+ """
+ middleware = _build_middleware()
+ exc = FakeError("insufficient_quota", status_code=429, code="insufficient_quota")
+
+ message = middleware._build_user_message(exc, reason="quota")
+
+ assert "out of quota" in message
+ assert "streaming response was interrupted" not in message
diff --git a/backend/tests/test_model_factory.py b/backend/tests/test_model_factory.py
index f8b84331c..562c8874c 100644
--- a/backend/tests/test_model_factory.py
+++ b/backend/tests/test_model_factory.py
@@ -1069,3 +1069,116 @@ def test_no_duplicate_kwarg_when_reasoning_effort_in_config_and_thinking_disable
# kwargs (runtime) takes precedence: thinking-disabled path sets reasoning_effort=minimal
assert captured.get("reasoning_effort") == "minimal"
+
+
+# ---------------------------------------------------------------------------
+# stream_chunk_timeout default injection (issue #3189)
+# ---------------------------------------------------------------------------
+
+
+def test_stream_chunk_timeout_defaults_to_240_for_openai_compatible_model(monkeypatch):
+ """OpenAI-compatible clients must receive a generous 240s chunk-gap budget by
+ default, so reasoning models with long thinking pauses don't trip
+ langchain-openai's aggressive 60s built-in default.
+ """
+ model = _make_model(use="langchain_openai:ChatOpenAI")
+ cfg = _make_app_config([model])
+
+ captured: dict = {}
+
+ class CapturingModel(FakeChatModel):
+ def __init__(self, **kwargs):
+ captured.update(kwargs)
+ BaseChatModel.__init__(self, **kwargs)
+
+ _patch_factory(monkeypatch, cfg, model_class=CapturingModel)
+ factory_module.create_chat_model(name="test-model")
+
+ assert captured.get("stream_chunk_timeout") == 240.0
+
+
+def test_stream_chunk_timeout_user_value_not_overridden(monkeypatch):
+ """If the user explicitly sets stream_chunk_timeout in config.yaml, the
+ factory must not overwrite it with the default — even if the value is
+ smaller (60s) or larger (600s) than the default.
+ """
+ model = ModelConfig(
+ name="custom-timeout-model",
+ display_name="Custom Timeout",
+ description=None,
+ use="langchain_openai:ChatOpenAI",
+ model="gpt-4o-mini",
+ stream_chunk_timeout=60.0, # user-set explicit value
+ )
+ cfg = _make_app_config([model])
+
+ captured: dict = {}
+
+ class CapturingModel(FakeChatModel):
+ def __init__(self, **kwargs):
+ captured.update(kwargs)
+ BaseChatModel.__init__(self, **kwargs)
+
+ _patch_factory(monkeypatch, cfg, model_class=CapturingModel)
+ factory_module.create_chat_model(name="custom-timeout-model")
+
+ assert captured.get("stream_chunk_timeout") == 60.0
+
+
+def test_stream_chunk_timeout_not_injected_for_non_openai_provider(monkeypatch):
+ """Only langchain_openai:ChatOpenAI receives the default. Anthropic / Vertex /
+ other clients that don't understand this kwarg must not be polluted with it.
+ """
+ model = _make_model(use="langchain_anthropic:ChatAnthropic")
+ cfg = _make_app_config([model])
+
+ captured: dict = {}
+
+ class CapturingModel(FakeChatModel):
+ def __init__(self, **kwargs):
+ captured.update(kwargs)
+ BaseChatModel.__init__(self, **kwargs)
+
+ _patch_factory(monkeypatch, cfg, model_class=CapturingModel)
+ factory_module.create_chat_model(name="test-model")
+
+ assert "stream_chunk_timeout" not in captured
+
+
+def test_stream_chunk_timeout_default_constant_is_documented():
+ """Lock the default value at 240s. If we ever want to change this, the
+ deliberate update here (and the docstring on _apply_stream_chunk_timeout_default)
+ forces a paired review of the rationale comment block above the constant.
+ """
+ assert factory_module._DEFAULT_STREAM_CHUNK_TIMEOUT_SECONDS == 240.0
+
+
+def test_stream_chunk_timeout_popped_for_non_openai_provider_when_user_set_it(monkeypatch):
+ """Regression for CR feedback on issue #3189: if a user accidentally sets
+ ``stream_chunk_timeout`` on a non-OpenAI provider, the factory must drop
+ the kwarg before forwarding it to the model constructor. Otherwise the
+ third-party client raises ``TypeError: unexpected keyword argument
+ 'stream_chunk_timeout'`` because the parameter is specific to
+ ``langchain_openai:ChatOpenAI``.
+ """
+ model = ModelConfig(
+ name="anthropic-with-stray-timeout",
+ display_name="Anthropic With Stray Timeout",
+ description=None,
+ use="langchain_anthropic:ChatAnthropic",
+ model="claude-sonnet-4",
+ stream_chunk_timeout=60.0, # user-set on a non-OpenAI provider — must be dropped
+ )
+ cfg = _make_app_config([model])
+
+ captured: dict = {}
+
+ class CapturingModel(FakeChatModel):
+ def __init__(self, **kwargs):
+ captured.update(kwargs)
+ BaseChatModel.__init__(self, **kwargs)
+
+ _patch_factory(monkeypatch, cfg, model_class=CapturingModel)
+ factory_module.create_chat_model(name="anthropic-with-stray-timeout")
+
+ assert "stream_chunk_timeout" not in captured
diff --git a/backend/tests/test_write_file_tool_size_guard.py b/backend/tests/test_write_file_tool_size_guard.py
new file mode 100644
index 000000000..21f20a6bd
--- /dev/null
+++ b/backend/tests/test_write_file_tool_size_guard.py
@@ -0,0 +1,116 @@
+"""Size-guard tests for write_file_tool (issue #3189, PR #3195).
+
+These tests verify that write_file_tool rejects oversized single-shot payloads
+with an actionable message, while leaving append-mode and env-override paths
+untouched. They run purely against the tool's internal guard — no real sandbox
+or filesystem is exercised, so they're fast and hermetic.
+"""
+
+from __future__ import annotations
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from deerflow.sandbox import tools as tools_module
+from deerflow.sandbox.tools import write_file_tool
+
+
+def _call_write_file(*, content: str, append: bool = False) -> str:
+ """Invoke write_file_tool via its underlying callable.
+
+ We patch the sandbox initialisation chain to a no-op MagicMock so the test
+ focuses purely on the size guard. The guard runs BEFORE any sandbox call,
+ so when the guard rejects we never enter the patched path; when the guard
+ passes, the patched sandbox.write_file returns silently and the tool
+ returns "OK".
+ """
+ fn = getattr(write_file_tool, "func", write_file_tool)
+ runtime = MagicMock()
+
+ with (
+ patch.object(tools_module, "ensure_sandbox_initialized") as mock_ensure,
+ patch.object(tools_module, "ensure_thread_directories_exist"),
+ patch.object(tools_module, "is_local_sandbox", return_value=False),
+ patch.object(tools_module, "get_file_operation_lock") as mock_lock,
+ ):
+ sandbox = MagicMock()
+ sandbox.write_file = MagicMock()
+ mock_ensure.return_value = sandbox
+ mock_lock.return_value.__enter__ = MagicMock(return_value=None)
+ mock_lock.return_value.__exit__ = MagicMock(return_value=False)
+
+ return fn(
+ runtime=runtime,
+ description="test write",
+ path="/tmp/test.txt",
+ content=content,
+ append=append,
+ )
+
+
+def test_below_cap_succeeds():
+ """A 79 KB payload sits comfortably under the 80 KB default and must pass
+ straight through to the sandbox layer.
+ """
+ payload = "a" * (79 * 1024)
+ result = _call_write_file(content=payload)
+ assert result == "OK"
+
+
+def test_above_cap_returns_actionable_error():
+ """An 81 KB payload trips the guard. The error message must name the
+ cap, the actual size, and steer the LLM toward str_replace / append=True
+ — these are the exact handles Reviewer A/B asked for in PR #3195.
+ """
+ payload = "a" * (81 * 1024)
+ result = _call_write_file(content=payload)
+
+ assert result.startswith("Error: write_file content")
+ assert "81920 bytes" in result or "82944 bytes" in result, "Error must report the actual content size so the LLM/operator can judge how much to trim or chunk."
+ assert "str_replace" in result, "Error must point to str_replace as the preferred incremental-edit path."
+ assert "append=True" in result, "Error must also surface the append-in-chunks alternative."
+
+
+def test_above_cap_with_append_true_bypasses_guard():
+ """append=True is the *correct* way to write a large document in chunks,
+ so the guard must not block it. The 80 KB cap intentionally applies only
+ to single-shot overwrite calls.
+ """
+ payload = "a" * (200 * 1024) # 200 KB
+ result = _call_write_file(content=payload, append=True)
+ assert result == "OK", f"append=True must bypass the size guard, got: {result!r}"
+
+
+def test_env_override_raises_cap(monkeypatch: pytest.MonkeyPatch):
+ """Setting DEERFLOW_WRITE_FILE_MAX_BYTES lets deployments accept larger
+ payloads when the underlying LLM/network can demonstrably handle them.
+ """
+ monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", str(300 * 1024))
+ payload = "a" * (150 * 1024) # 150 KB — would normally trip the 80 KB cap
+ result = _call_write_file(content=payload)
+ assert result == "OK"
+
+
+def test_env_override_zero_disables_guard(monkeypatch: pytest.MonkeyPatch):
+ """Setting the env var to 0 is the documented escape hatch for operators
+ who want to opt out of the guard entirely (e.g. when running models with
+ very large stream_chunk_timeout values).
+ """
+ monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "0")
+ payload = "a" * (500 * 1024) # 500 KB
+ result = _call_write_file(content=payload)
+ assert result == "OK"
+
+
+def test_env_override_malformed_falls_back_to_default(monkeypatch: pytest.MonkeyPatch):
+ """A typo in the env var (e.g. 'lots') must not crash the tool — fall
+ back silently to the safe 80 KB default. Crashing on every write because
+ of a misconfigured env var would be far worse than ignoring it.
+ """
+ monkeypatch.setenv("DEERFLOW_WRITE_FILE_MAX_BYTES", "lots")
+ # 100 KB should still be rejected because the malformed value falls back
+ # to the 80 KB default.
+ payload = "a" * (100 * 1024)
+ result = _call_write_file(content=payload)
+ assert result.startswith("Error: write_file content")