From b62c5a7b5bb1034d8fcfe16cf8314ee650efb417 Mon Sep 17 00:00:00 2001 From: ly-wang19 <94427531+ly-wang19@users.noreply.github.com> Date: Tue, 9 Jun 2026 22:24:53 +0800 Subject: [PATCH] fix(agents): offload blocking filesystem IO in the custom-agent router off the event loop (#3457) * fix(agents): offload blocking filesystem IO in delete_agent off the event loop delete_agent is an async route handler but resolved the agent directory (Paths.base_dir -> Path.resolve), probed it (Path.exists), and removed it (shutil.rmtree) directly on the event loop, blocking it for the duration of every delete. Surfaced by 'make detect-blocking-io'. Move the resolve/exists/rmtree sequence into a sync helper run via asyncio.to_thread, mapping its outcome back to the existing 404/409/500 responses (behavior unchanged). Adds a tests/blocking_io/ regression anchor under the strict Blockbuster gate, mirroring test_skills_load.py (#1917). Co-Authored-By: Claude Opus 4.8 (1M context) * fix(agents): offload blocking filesystem IO in create_agent_endpoint too Like delete_agent, the async create_agent_endpoint resolved and created the agent directory and wrote config.yaml + SOUL.md (with rmtree cleanup on failure) directly on the event loop. Move the whole create-or-409 sequence into a sync helper run via asyncio.to_thread; behavior is unchanged (201 / 409 / 500). Extends the blocking_io regression anchor to cover create as well as delete and renames it to test_agents_router.py. Co-Authored-By: Claude Opus 4.8 (1M context) * Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: ly-wang19 Co-authored-by: Claude Opus 4.8 (1M context) Co-authored-by: Willem Jiang Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- backend/app/gateway/routers/agents.py | 115 +++++++++++------- .../tests/blocking_io/test_agents_router.py | 64 ++++++++++ 2 files changed, 134 insertions(+), 45 deletions(-) create mode 100644 backend/tests/blocking_io/test_agents_router.py diff --git a/backend/app/gateway/routers/agents.py b/backend/app/gateway/routers/agents.py index 8769e9834..933dd4211 100644 --- a/backend/app/gateway/routers/agents.py +++ b/backend/app/gateway/routers/agents.py @@ -1,5 +1,6 @@ """CRUD API for custom agents.""" +import asyncio import logging import re import shutil @@ -213,48 +214,61 @@ async def create_agent_endpoint(request: AgentCreateRequest) -> AgentResponse: user_id = get_effective_user_id() paths = get_paths() - agent_dir = paths.user_agent_dir(user_id, normalized_name) - legacy_dir = paths.agent_dir(normalized_name) + def _create_agent() -> AgentResponse | None: + # Worker thread: base-dir resolution, existence checks, directory/file + # creation, read-back, and failure cleanup are all blocking filesystem + # IO that must stay off the event loop. + agent_dir = paths.user_agent_dir(user_id, normalized_name) + legacy_dir = paths.agent_dir(normalized_name) - if agent_dir.exists() or legacy_dir.exists(): - raise HTTPException(status_code=409, detail=f"Agent '{normalized_name}' already exists") + if legacy_dir.exists(): + return None # signals 409 to the caller + + try: + try: + agent_dir.mkdir(parents=True, exist_ok=False) + except FileExistsError: + return None # signals 409 to the caller + # Write config.yaml + config_data: dict = {"name": normalized_name} + if request.description: + config_data["description"] = request.description + if request.model is not None: + config_data["model"] = request.model + if request.tool_groups is not None: + config_data["tool_groups"] = request.tool_groups + if request.skills is not None: + config_data["skills"] = request.skills + + config_file = agent_dir / "config.yaml" + with open(config_file, "w", encoding="utf-8") as f: + yaml.dump(config_data, f, default_flow_style=False, allow_unicode=True) + + # Write SOUL.md + soul_file = agent_dir / "SOUL.md" + soul_file.write_text(request.soul, encoding="utf-8") + + logger.info(f"Created agent '{normalized_name}' at {agent_dir}") + + agent_cfg = load_agent_config(normalized_name, user_id=user_id) + return _agent_config_to_response(agent_cfg, include_soul=True, user_id=user_id) + except Exception: + # Clean up partial state on failure before surfacing the error. + if agent_dir.exists(): + shutil.rmtree(agent_dir) + raise try: - agent_dir.mkdir(parents=True, exist_ok=True) - - # Write config.yaml - config_data: dict = {"name": normalized_name} - if request.description: - config_data["description"] = request.description - if request.model is not None: - config_data["model"] = request.model - if request.tool_groups is not None: - config_data["tool_groups"] = request.tool_groups - if request.skills is not None: - config_data["skills"] = request.skills - - config_file = agent_dir / "config.yaml" - with open(config_file, "w", encoding="utf-8") as f: - yaml.dump(config_data, f, default_flow_style=False, allow_unicode=True) - - # Write SOUL.md - soul_file = agent_dir / "SOUL.md" - soul_file.write_text(request.soul, encoding="utf-8") - - logger.info(f"Created agent '{normalized_name}' at {agent_dir}") - - agent_cfg = load_agent_config(normalized_name, user_id=user_id) - return _agent_config_to_response(agent_cfg, include_soul=True, user_id=user_id) - - except HTTPException: - raise + response = await asyncio.to_thread(_create_agent) except Exception as e: - # Clean up on failure - if agent_dir.exists(): - shutil.rmtree(agent_dir) logger.error(f"Failed to create agent '{request.name}': {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}") + if response is None: + raise HTTPException(status_code=409, detail=f"Agent '{normalized_name}' already exists") + + return response + @router.put( "/agents/{name}", @@ -428,19 +442,30 @@ async def delete_agent(name: str) -> None: name = _normalize_agent_name(name) user_id = get_effective_user_id() paths = get_paths() - agent_dir = paths.user_agent_dir(user_id, name) - if not agent_dir.exists(): - if paths.agent_dir(name).exists(): - raise HTTPException( - status_code=409, - detail=(f"Agent '{name}' only exists in the legacy shared layout and is not scoped to a user. Run scripts/migrate_user_isolation.py to move legacy agents into the per-user layout before deleting."), - ) - raise HTTPException(status_code=404, detail=f"Agent '{name}' not found") + def _remove_agent_dir() -> tuple[str, str]: + # Runs in a worker thread: resolving the base dir, probing the directory + # (`exists`), and removing it (`rmtree`) are all blocking filesystem IO + # that must stay off the event loop. + agent_dir = paths.user_agent_dir(user_id, name) + if not agent_dir.exists(): + outcome = "legacy" if paths.agent_dir(name).exists() else "missing" + return outcome, str(agent_dir) + shutil.rmtree(agent_dir) + return "deleted", str(agent_dir) try: - shutil.rmtree(agent_dir) - logger.info(f"Deleted agent '{name}' from {agent_dir}") + outcome, agent_dir = await asyncio.to_thread(_remove_agent_dir) except Exception as e: logger.error(f"Failed to delete agent '{name}': {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}") + + if outcome == "legacy": + raise HTTPException( + status_code=409, + detail=(f"Agent '{name}' only exists in the legacy shared layout and is not scoped to a user. Run scripts/migrate_user_isolation.py to move legacy agents into the per-user layout before deleting."), + ) + if outcome == "missing": + raise HTTPException(status_code=404, detail=f"Agent '{name}' not found") + + logger.info(f"Deleted agent '{name}' from {agent_dir}") diff --git a/backend/tests/blocking_io/test_agents_router.py b/backend/tests/blocking_io/test_agents_router.py new file mode 100644 index 000000000..1f7787397 --- /dev/null +++ b/backend/tests/blocking_io/test_agents_router.py @@ -0,0 +1,64 @@ +"""Regression anchors: the custom-agent router must not block the event loop. + +``app.gateway.routers.agents.create_agent_endpoint`` and ``delete_agent`` are +async route handlers that resolve the agent directory (``Paths.base_dir`` calls +``Path.resolve``), probe it (``Path.exists``), and create/remove it (``mkdir``, +config/SOUL writes, ``shutil.rmtree``) — all blocking IO. Both offload that work +via ``asyncio.to_thread``; if any of it regresses back onto the event loop, the +strict Blockbuster gate raises ``BlockingError`` and these tests fail. + +Imports live at module scope so the one-time FastAPI app construction (which +reads files while building OpenAPI schemas) happens at collection time, not on +the event loop under test. Test-side path resolution is itself offloaded with +``asyncio.to_thread`` (matching ``test_uploads_middleware``) so only the +handlers' own filesystem access is exercised on the loop. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +import pytest + +from app.gateway.routers.agents import AgentCreateRequest, create_agent_endpoint, delete_agent +from deerflow.config.agents_api_config import load_agents_api_config_from_dict +from deerflow.config.paths import get_paths +from deerflow.runtime.user_context import get_effective_user_id + +pytestmark = pytest.mark.asyncio + + +async def test_create_agent_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path)) + monkeypatch.setattr("deerflow.config.paths._paths", None) + load_agents_api_config_from_dict({"enabled": True}) + try: + response = await create_agent_endpoint(AgentCreateRequest(name="loop-make-agent", soul="You are a test agent.")) + assert response is not None + + user_id = get_effective_user_id() + # test-side check (resolution offloaded; not exercised on the loop) + agent_dir = await asyncio.to_thread(get_paths().user_agent_dir, user_id, "loop-make-agent") + assert await asyncio.to_thread((agent_dir / "config.yaml").exists) + finally: + load_agents_api_config_from_dict({}) + + +async def test_delete_agent_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path)) + monkeypatch.setattr("deerflow.config.paths._paths", None) + load_agents_api_config_from_dict({"enabled": True}) + try: + user_id = get_effective_user_id() + user_id = get_effective_user_id() + # test-side seeding (resolution offloaded; not exercised on the loop) + agent_dir = await asyncio.to_thread(get_paths().user_agent_dir, user_id, "loop-test-agent") + await asyncio.to_thread(agent_dir.mkdir, parents=True, exist_ok=True) + await asyncio.to_thread((agent_dir / "config.yaml").write_text, "name: loop-test-agent\n", encoding="utf-8") + + await delete_agent("loop-test-agent") + + assert not await asyncio.to_thread(agent_dir.exists) + finally: + load_agents_api_config_from_dict({})