mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-09 17:12:01 +00:00
* fix(middleware): offload memory injection off event loop to prevent tiktoken blocking (#3402) DynamicContextMiddleware.abefore_agent() called _inject() synchronously on the asyncio event loop. The first time memory is injected (second request), _inject() → format_memory_for_injection() → _count_tokens() → tiktoken.get_encoding("cl100k_base") needs to download the BPE data from openaipublic.blob.core.windows.net. In network-restricted environments this download blocks until the OS TCP timeout (~26 min), starving ALL concurrent handlers including /api/v1/auth/me. Fix: - abefore_agent now uses asyncio.to_thread(self._inject, state) so file I/O and tiktoken never block the event loop. - Extract _get_tiktoken_encoding() with a module-level cache so tiktoken.get_encoding() is called at most once per encoding name. - Add warm_tiktoken_cache() startup helper; gateway lifespan pre-warms the cache via asyncio.to_thread so the first request never triggers a cold download. - _count_tokens falls back to len(text) // 4 on any encoding failure. Tests: - tests/test_tiktoken_cache_and_count_tokens.py (12 tests): cache hit/miss, fallback paths, warm-up helper. - tests/blocking_io/test_dynamic_context_middleware.py (2 tests): Blockbuster gate verifies abefore_agent does not block the event loop; async/sync parity check. Fixes #3402 * Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * fix the lint error * fix(memory): use future annotations to avoid NameError when tiktoken is absent Add `from __future__ import annotations` to prompt.py so that tiktoken.Encoding type hints are never evaluated at runtime. Without this, environments where tiktoken is not installed could raise NameError on the module-level cache and function return annotations. Addresses Copilot review comment on PR #3411. * fix(middleware): bound abefore_agent injection with timeout to prevent hung requests Wrap the asyncio.to_thread(self._inject) offload in asyncio.wait_for() with a 5-second cap. If the startup warm-up failed silently (e.g. network blip during deploy), a cold tiktoken BPE download on the first request can block until the OS TCP timeout (~26 min). The bounded timeout ensures the request degrades gracefully (no memory/date context for that turn) rather than hanging. Adds test_abefore_agent_returns_none_on_timeout to the blocking-IO regression anchors. Addresses review feedback from xg-gh-25 on PR #3411. --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
125 lines
4.4 KiB
Python
125 lines
4.4 KiB
Python
"""Regression anchor: DynamicContextMiddleware must not block the event loop.
|
|
|
|
``_inject`` performs synchronous file I/O (memory JSON loading) and
|
|
potentially blocking network calls (tiktoken encoding download on first
|
|
use — see issue #3402). ``abefore_agent`` offloads the call via
|
|
``asyncio.to_thread`` so the event loop stays responsive.
|
|
|
|
This anchor drives the real ``create_agent`` graph via ``ainvoke`` under
|
|
the strict Blockbuster gate. If the offload regresses and the blocking
|
|
I/O runs on the event loop, Blockbuster raises ``BlockingError`` and
|
|
this test fails.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from types import SimpleNamespace
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from langchain.agents import create_agent
|
|
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
|
|
from langchain_core.messages import AIMessage, HumanMessage
|
|
|
|
from deerflow.agents.middlewares.dynamic_context_middleware import DynamicContextMiddleware
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
class _FakeModel(FakeMessagesListChatModel):
|
|
"""FakeMessagesListChatModel with a no-op ``bind_tools`` for create_agent."""
|
|
|
|
def bind_tools(self, tools, **kwargs): # type: ignore[override]
|
|
return self
|
|
|
|
|
|
async def test_abefore_agent_does_not_block_event_loop() -> None:
|
|
"""``abefore_agent`` must offload _inject() to a thread pool."""
|
|
mw = DynamicContextMiddleware()
|
|
|
|
# Mock _build_full_reminder to simulate a slow synchronous operation
|
|
# (file I/O + tiktoken download). The mock sleeps briefly to make any
|
|
# event-loop blocking visible to the Blockbuster gate.
|
|
original_build = mw._build_full_reminder
|
|
|
|
def slow_build_reminder():
|
|
import time
|
|
|
|
time.sleep(0.05) # 50ms sync sleep — blocks the thread it runs on
|
|
return original_build()
|
|
|
|
with (
|
|
mock.patch.object(mw, "_build_full_reminder", slow_build_reminder),
|
|
mock.patch("deerflow.agents.lead_agent.prompt._get_memory_context", return_value=""),
|
|
):
|
|
agent = await asyncio.to_thread(
|
|
lambda: create_agent(
|
|
model=_FakeModel(responses=[AIMessage(content="ok")]),
|
|
tools=[],
|
|
middleware=[mw],
|
|
)
|
|
)
|
|
|
|
result = await agent.ainvoke(
|
|
{"messages": [HumanMessage(content="hi")]},
|
|
{"configurable": {"thread_id": "test-thread"}},
|
|
)
|
|
|
|
assert result["messages"]
|
|
|
|
|
|
async def test_abefore_agent_returns_same_result_as_before_agent() -> None:
|
|
"""``abefore_agent`` (async, offloaded) must produce the same result as
|
|
``before_agent`` (sync, for backward compatibility)."""
|
|
mw = DynamicContextMiddleware()
|
|
|
|
state = {"messages": [HumanMessage(content="Hello", id="msg-1")]}
|
|
runtime = SimpleNamespace(context={})
|
|
|
|
with (
|
|
mock.patch("deerflow.agents.lead_agent.prompt._get_memory_context", return_value=""),
|
|
mock.patch("deerflow.agents.middlewares.dynamic_context_middleware.datetime") as mock_dt,
|
|
):
|
|
mock_dt.now.return_value.strftime.return_value = "2026-06-05, Friday"
|
|
|
|
# Sync path
|
|
sync_result = mw.before_agent(state, runtime)
|
|
|
|
# Async path (offloaded to thread)
|
|
async_result = await mw.abefore_agent(state, runtime)
|
|
|
|
assert sync_result is not None
|
|
assert async_result is not None
|
|
assert sync_result.keys() == async_result.keys()
|
|
# Both return 2 messages: reminder + user content
|
|
assert len(sync_result["messages"]) == 2
|
|
assert len(async_result["messages"]) == 2
|
|
# IDs match
|
|
assert sync_result["messages"][0].id == async_result["messages"][0].id
|
|
assert sync_result["messages"][1].id == async_result["messages"][1].id
|
|
|
|
|
|
async def test_abefore_agent_returns_none_on_timeout() -> None:
|
|
"""If _inject() exceeds the timeout, abefore_agent returns None gracefully."""
|
|
import time
|
|
|
|
mw = DynamicContextMiddleware()
|
|
|
|
def blocking_inject(state):
|
|
time.sleep(10) # Simulate a blocking call that far exceeds the timeout
|
|
return {"messages": [HumanMessage(content="should not reach")]}
|
|
|
|
with (
|
|
mock.patch.object(mw, "_inject", blocking_inject),
|
|
mock.patch(
|
|
"deerflow.agents.middlewares.dynamic_context_middleware._INJECT_TIMEOUT_SECONDS",
|
|
0.1,
|
|
),
|
|
):
|
|
state = {"messages": [HumanMessage(content="Hello", id="msg-1")]}
|
|
runtime = SimpleNamespace(context={})
|
|
result = await mw.abefore_agent(state, runtime)
|
|
|
|
assert result is None
|