mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-09 17:12:01 +00:00
fix(agents): offload UploadsMiddleware uploads scan off the event loop (#3311)
UploadsMiddleware defines only the sync `before_agent` hook. LangChain wires a sync-only hook as `RunnableCallable(before_agent, None)`, and LangGraph's `ainvoke` runs it directly on the event loop when `afunc is None` — so the per-message uploads-directory scan (`exists`/`iterdir`/`stat` plus reading sibling `.md` outlines) blocks the asyncio event loop on every message that has an uploads directory. Add `abefore_agent` that offloads the scan to a worker thread via `run_in_executor`; it copies the current context, preserving the `user_id` contextvar read by `get_effective_user_id()`. Add a runtime anchor under `tests/blocking_io/` that drives the real `create_agent` graph via `ainvoke` under the strict Blockbuster gate, so a regression back onto the event loop fails CI. Update blocking-IO docs.
This commit is contained in:
parent
e8e9edcb6e
commit
9f3be2a9fa
@ -126,8 +126,10 @@ Blocking-IO runtime gate (`tests/blocking_io/`):
|
||||
`asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix
|
||||
for #1917); `test_sqlite_lifespan.py` (locks the offload around
|
||||
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912);
|
||||
and `test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async
|
||||
API offloading its file IO via `asyncio.to_thread`, fix #3084).
|
||||
`test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async
|
||||
API offloading its file IO via `asyncio.to_thread`, fix #3084); and
|
||||
`test_uploads_middleware.py` (locks `UploadsMiddleware.abefore_agent`
|
||||
offloading the uploads-directory scan off the event loop).
|
||||
- `test_gate_smoke.py` is a meta-test asserting the gate actually catches
|
||||
unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io`
|
||||
opt-out works.
|
||||
|
||||
@ -144,6 +144,9 @@ The runtime anchors protect confirmed blocking-IO bug shapes:
|
||||
(fix #3084); this anchor drives the real async API under the gate so any
|
||||
blocking IO reintroduced on the loop fails, not only removal of one
|
||||
`to_thread` call.
|
||||
- `UploadsMiddleware.before_agent` uploads-directory scan: a sync-only middleware
|
||||
hook runs on the event loop under async graph execution, so the scan is
|
||||
offloaded via `abefore_agent` + `run_in_executor`.
|
||||
- Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and
|
||||
patches are restored after exceptions.
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ from typing import NotRequired, override
|
||||
from langchain.agents import AgentState
|
||||
from langchain.agents.middleware import AgentMiddleware
|
||||
from langchain_core.messages import HumanMessage
|
||||
from langchain_core.runnables import run_in_executor
|
||||
from langgraph.runtime import Runtime
|
||||
|
||||
from deerflow.config.paths import Paths, get_paths
|
||||
@ -293,3 +294,16 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
|
||||
"uploaded_files": new_files,
|
||||
"messages": messages,
|
||||
}
|
||||
|
||||
@override
|
||||
async def abefore_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None:
|
||||
"""Async hook that offloads the synchronous uploads scan off the event loop.
|
||||
|
||||
``before_agent`` performs blocking filesystem IO (directory enumeration,
|
||||
``stat``, reading sibling ``.md`` outlines). When the graph runs async,
|
||||
langgraph would otherwise execute the sync hook directly on the event
|
||||
loop, so it is dispatched to a worker thread via ``run_in_executor``.
|
||||
``run_in_executor`` copies the current context, so the ``user_id``
|
||||
contextvar read by ``get_effective_user_id()`` is preserved.
|
||||
"""
|
||||
return await run_in_executor(None, self.before_agent, state, runtime)
|
||||
|
||||
56
backend/tests/blocking_io/test_uploads_middleware.py
Normal file
56
backend/tests/blocking_io/test_uploads_middleware.py
Normal file
@ -0,0 +1,56 @@
|
||||
"""Regression anchor: UploadsMiddleware must not block the event loop.
|
||||
|
||||
``before_agent`` scans the thread uploads directory (``exists`` / ``iterdir`` /
|
||||
``stat`` plus reading sibling ``.md`` outlines). LangChain wires a sync-only
|
||||
``before_agent`` as ``RunnableCallable(before_agent, None)``; langgraph's
|
||||
``ainvoke`` runs it directly on the event loop when ``afunc is None``. So the
|
||||
filesystem scan must be offloaded (the middleware provides ``abefore_agent``).
|
||||
|
||||
This anchor drives the real ``create_agent`` graph via ``ainvoke`` under the
|
||||
strict Blockbuster gate. If the scan regresses back onto the event loop,
|
||||
Blockbuster raises ``BlockingError`` and this test fails.
|
||||
|
||||
The graph/middleware construction is offloaded with ``asyncio.to_thread`` only
|
||||
because ``Paths.__init__`` resolves paths synchronously; the surface under test
|
||||
(``before_agent``'s directory scan) is exercised on the event loop, not
|
||||
bypassed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
|
||||
from langchain_core.messages import AIMessage, HumanMessage
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class _FakeModel(FakeMessagesListChatModel):
|
||||
"""FakeMessagesListChatModel with a no-op ``bind_tools`` for create_agent."""
|
||||
|
||||
def bind_tools(self, tools, **kwargs): # type: ignore[override]
|
||||
return self
|
||||
|
||||
|
||||
async def test_before_agent_uploads_scan_does_not_block_event_loop(tmp_path: Path) -> None:
|
||||
from langchain.agents import create_agent
|
||||
|
||||
from deerflow.agents.middlewares.uploads_middleware import UploadsMiddleware
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
mw = await asyncio.to_thread(UploadsMiddleware, str(tmp_path))
|
||||
uploads_dir = await asyncio.to_thread(mw._paths.sandbox_uploads_dir, "t1", user_id=get_effective_user_id())
|
||||
uploads_dir.mkdir(parents=True, exist_ok=True) # test-side seeding (not in scanned_modules)
|
||||
(uploads_dir / "existing.txt").write_text("hello", encoding="utf-8")
|
||||
|
||||
agent = await asyncio.to_thread(lambda: create_agent(model=_FakeModel(responses=[AIMessage(content="ok")]), tools=[], middleware=[mw]))
|
||||
|
||||
result = await agent.ainvoke(
|
||||
{"messages": [HumanMessage(content="hi")]},
|
||||
{"configurable": {"thread_id": "t1"}},
|
||||
)
|
||||
|
||||
assert result["messages"]
|
||||
Loading…
x
Reference in New Issue
Block a user