diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 12fd14983..a45b14253 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -232,7 +232,7 @@ Proxied through nginx: `/api/langgraph/*` → LangGraph, all other `/api/*` → - `ls` - Directory listing (tree format, max 2 levels) - `read_file` - Read file contents with optional line range - `write_file` - Write/append to files, creates directories -- `str_replace` - Substring replacement (single or all occurrences) +- `str_replace` - Substring replacement (single or all occurrences); same-path serialization is scoped to `(sandbox.id, path)` so isolated sandboxes do not contend on identical virtual paths inside one process ### Subagent System (`packages/harness/deerflow/subagents/`) diff --git a/backend/README.md b/backend/README.md index 2296fcfe2..158540946 100644 --- a/backend/README.md +++ b/backend/README.md @@ -78,6 +78,7 @@ Per-thread isolated execution with virtual path translation: - **Virtual paths**: `/mnt/user-data/{workspace,uploads,outputs}` → thread-specific physical directories - **Skills path**: `/mnt/skills` → `deer-flow/skills/` directory - **Skills loading**: Recursively discovers nested `SKILL.md` files under `skills/{public,custom}` and preserves nested container paths +- **File-write safety**: `str_replace` serializes read-modify-write per `(sandbox.id, path)` so isolated sandboxes keep concurrency even when virtual paths match - **Tools**: `bash`, `ls`, `read_file`, `write_file`, `str_replace` (`bash` is disabled by default when using `LocalSandboxProvider`; use `AioSandboxProvider` for isolated shell access) ### Subagent System diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox.py b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox.py index 599462da7..7f8784492 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox.py @@ -124,16 +124,16 @@ class AioSandbox(Sandbox): content: The text content to write to the file. append: Whether to append the content to the file. """ - try: - if append: - # Read existing content first and append - existing = self.read_file(path) - if not existing.startswith("Error:"): - content = existing + content - self._client.file.write_file(file=path, content=content) - except Exception as e: - logger.error(f"Failed to write file in sandbox: {e}") - raise + with self._lock: + try: + if append: + existing = self.read_file(path) + if not existing.startswith("Error:"): + content = existing + content + self._client.file.write_file(file=path, content=content) + except Exception as e: + logger.error(f"Failed to write file in sandbox: {e}") + raise def update_file(self, path: str, content: bytes) -> None: """Update a file with binary content in the sandbox. @@ -142,9 +142,10 @@ class AioSandbox(Sandbox): path: The absolute path of the file to update. content: The binary content to write to the file. """ - try: - base64_content = base64.b64encode(content).decode("utf-8") - self._client.file.write_file(file=path, content=base64_content, encoding="base64") - except Exception as e: - logger.error(f"Failed to update file in sandbox: {e}") - raise + with self._lock: + try: + base64_content = base64.b64encode(content).decode("utf-8") + self._client.file.write_file(file=path, content=base64_content, encoding="base64") + except Exception as e: + logger.error(f"Failed to update file in sandbox: {e}") + raise diff --git a/backend/packages/harness/deerflow/sandbox/file_operation_lock.py b/backend/packages/harness/deerflow/sandbox/file_operation_lock.py new file mode 100644 index 000000000..2464015c0 --- /dev/null +++ b/backend/packages/harness/deerflow/sandbox/file_operation_lock.py @@ -0,0 +1,23 @@ +import threading + +from deerflow.sandbox.sandbox import Sandbox + +_FILE_OPERATION_LOCKS: dict[tuple[str, str], threading.Lock] = {} +_FILE_OPERATION_LOCKS_GUARD = threading.Lock() + + +def get_file_operation_lock_key(sandbox: Sandbox, path: str) -> tuple[str, str]: + sandbox_id = getattr(sandbox, "id", None) + if not sandbox_id: + sandbox_id = f"instance:{id(sandbox)}" + return sandbox_id, path + + +def get_file_operation_lock(sandbox: Sandbox, path: str) -> threading.Lock: + lock_key = get_file_operation_lock_key(sandbox, path) + with _FILE_OPERATION_LOCKS_GUARD: + lock = _FILE_OPERATION_LOCKS.get(lock_key) + if lock is None: + lock = threading.Lock() + _FILE_OPERATION_LOCKS[lock_key] = lock + return lock diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py index 7cdaa26da..f7c079322 100644 --- a/backend/packages/harness/deerflow/sandbox/tools.py +++ b/backend/packages/harness/deerflow/sandbox/tools.py @@ -13,6 +13,7 @@ from deerflow.sandbox.exceptions import ( SandboxNotFoundError, SandboxRuntimeError, ) +from deerflow.sandbox.file_operation_lock import get_file_operation_lock from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox_provider import get_sandbox_provider from deerflow.sandbox.security import LOCAL_HOST_BASH_DISABLED_MESSAGE, is_host_bash_allowed @@ -971,7 +972,8 @@ def write_file_tool( thread_data = get_thread_data(runtime) validate_local_tool_path(path, thread_data) path = _resolve_and_validate_user_data_path(path, thread_data) - sandbox.write_file(path, content, append) + with get_file_operation_lock(sandbox, path): + sandbox.write_file(path, content, append) return "OK" except SandboxError as e: return f"Error: {e}" @@ -1012,16 +1014,17 @@ def str_replace_tool( thread_data = get_thread_data(runtime) validate_local_tool_path(path, thread_data) path = _resolve_and_validate_user_data_path(path, thread_data) - content = sandbox.read_file(path) - if not content: - return "OK" - if old_str not in content: - return f"Error: String to replace not found in file: {requested_path}" - if replace_all: - content = content.replace(old_str, new_str) - else: - content = content.replace(old_str, new_str, 1) - sandbox.write_file(path, content) + with get_file_operation_lock(sandbox, path): + content = sandbox.read_file(path) + if not content: + return "OK" + if old_str not in content: + return f"Error: String to replace not found in file: {requested_path}" + if replace_all: + content = content.replace(old_str, new_str) + else: + content = content.replace(old_str, new_str, 1) + sandbox.write_file(path, content) return "OK" except SandboxError as e: return f"Error: {e}" diff --git a/backend/tests/test_aio_sandbox.py b/backend/tests/test_aio_sandbox.py index c86259432..789fbde20 100644 --- a/backend/tests/test_aio_sandbox.py +++ b/backend/tests/test_aio_sandbox.py @@ -131,3 +131,53 @@ class TestListDirSerialization: result = sandbox.list_dir("/test") assert result == ["/a", "/b"] assert lock_was_held == [True], "list_dir must hold the lock during exec_command" + + +class TestConcurrentFileWrites: + """Verify file write paths do not lose concurrent updates.""" + + def test_append_should_preserve_both_parallel_writes(self, sandbox): + storage = {"content": "seed\n"} + active_reads = 0 + state_lock = threading.Lock() + overlap_detected = threading.Event() + + def overlapping_read_file(path): + nonlocal active_reads + with state_lock: + active_reads += 1 + snapshot = storage["content"] + if active_reads == 2: + overlap_detected.set() + + overlap_detected.wait(0.05) + + with state_lock: + active_reads -= 1 + + return snapshot + + def write_back(*, file, content, **kwargs): + storage["content"] = content + return SimpleNamespace(data=SimpleNamespace()) + + sandbox.read_file = overlapping_read_file + sandbox._client.file.write_file = write_back + + barrier = threading.Barrier(2) + + def writer(payload: str): + barrier.wait() + sandbox.write_file("/tmp/shared.log", payload, append=True) + + threads = [ + threading.Thread(target=writer, args=("A\n",)), + threading.Thread(target=writer, args=("B\n",)), + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert storage["content"] in {"seed\nA\nB\n", "seed\nB\nA\n"} diff --git a/backend/tests/test_sandbox_tools_security.py b/backend/tests/test_sandbox_tools_security.py index bfcff5d72..982b5d389 100644 --- a/backend/tests/test_sandbox_tools_security.py +++ b/backend/tests/test_sandbox_tools_security.py @@ -1,3 +1,4 @@ +import threading from pathlib import Path from types import SimpleNamespace from unittest.mock import patch @@ -17,8 +18,10 @@ from deerflow.sandbox.tools import ( mask_local_paths_in_output, replace_virtual_path, replace_virtual_paths_in_command, + str_replace_tool, validate_local_bash_command_paths, validate_local_tool_path, + write_file_tool, ) _THREAD_DATA = { @@ -512,3 +515,221 @@ def test_validate_local_bash_command_paths_allows_mcp_filesystem_paths() -> None with patch("deerflow.config.extensions_config.get_extensions_config", return_value=disabled_config): with pytest.raises(PermissionError, match="Unsafe absolute paths"): validate_local_bash_command_paths("ls /mnt/d/workspace", _THREAD_DATA) + + +def test_str_replace_parallel_updates_should_preserve_both_edits(monkeypatch) -> None: + class SharedSandbox: + def __init__(self) -> None: + self.content = "alpha\nbeta\n" + self._active_reads = 0 + self._state_lock = threading.Lock() + self._overlap_detected = threading.Event() + + def read_file(self, path: str) -> str: + with self._state_lock: + self._active_reads += 1 + snapshot = self.content + if self._active_reads == 2: + self._overlap_detected.set() + + self._overlap_detected.wait(0.05) + + with self._state_lock: + self._active_reads -= 1 + + return snapshot + + def write_file(self, path: str, content: str, append: bool = False) -> None: + self.content = content + + sandbox = SharedSandbox() + runtimes = [ + SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={}), + SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={}), + ] + failures: list[BaseException] = [] + + monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox) + monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None) + monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False) + + def worker(runtime: SimpleNamespace, old_str: str, new_str: str) -> None: + try: + result = str_replace_tool.func( + runtime=runtime, + description="并发替换同一文件", + path="/mnt/user-data/workspace/shared.txt", + old_str=old_str, + new_str=new_str, + ) + assert result == "OK" + except BaseException as exc: # pragma: no cover - failure is asserted below + failures.append(exc) + + threads = [ + threading.Thread(target=worker, args=(runtimes[0], "alpha", "ALPHA")), + threading.Thread(target=worker, args=(runtimes[1], "beta", "BETA")), + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert failures == [] + assert "ALPHA" in sandbox.content + assert "BETA" in sandbox.content + + +def test_str_replace_parallel_updates_in_isolated_sandboxes_should_not_share_path_lock(monkeypatch) -> None: + class IsolatedSandbox: + def __init__(self, sandbox_id: str, shared_state: dict[str, object]) -> None: + self.id = sandbox_id + self.content = "alpha\nbeta\n" + self._shared_state = shared_state + + def read_file(self, path: str) -> str: + state_lock = self._shared_state["state_lock"] + with state_lock: + active_reads = self._shared_state["active_reads"] + self._shared_state["active_reads"] = active_reads + 1 + snapshot = self.content + if self._shared_state["active_reads"] == 2: + overlap_detected = self._shared_state["overlap_detected"] + overlap_detected.set() + + overlap_detected = self._shared_state["overlap_detected"] + overlap_detected.wait(0.05) + + with state_lock: + active_reads = self._shared_state["active_reads"] + self._shared_state["active_reads"] = active_reads - 1 + + return snapshot + + def write_file(self, path: str, content: str, append: bool = False) -> None: + self.content = content + + shared_state: dict[str, object] = { + "active_reads": 0, + "state_lock": threading.Lock(), + "overlap_detected": threading.Event(), + } + sandboxes = { + "sandbox-a": IsolatedSandbox("sandbox-a", shared_state), + "sandbox-b": IsolatedSandbox("sandbox-b", shared_state), + } + runtimes = [ + SimpleNamespace(state={}, context={"thread_id": "thread-1", "sandbox_key": "sandbox-a"}, config={}), + SimpleNamespace(state={}, context={"thread_id": "thread-2", "sandbox_key": "sandbox-b"}, config={}), + ] + failures: list[BaseException] = [] + + monkeypatch.setattr( + "deerflow.sandbox.tools.ensure_sandbox_initialized", + lambda runtime: sandboxes[runtime.context["sandbox_key"]], + ) + monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None) + monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False) + + def worker(runtime: SimpleNamespace, old_str: str, new_str: str) -> None: + try: + result = str_replace_tool.func( + runtime=runtime, + description="隔离 sandbox 并发替换同一路径", + path="/mnt/user-data/workspace/shared.txt", + old_str=old_str, + new_str=new_str, + ) + assert result == "OK" + except BaseException as exc: # pragma: no cover - failure is asserted below + failures.append(exc) + + threads = [ + threading.Thread(target=worker, args=(runtimes[0], "alpha", "ALPHA")), + threading.Thread(target=worker, args=(runtimes[1], "beta", "BETA")), + ] + + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + assert failures == [] + assert sandboxes["sandbox-a"].content == "ALPHA\nbeta\n" + assert sandboxes["sandbox-b"].content == "alpha\nBETA\n" + assert shared_state["overlap_detected"].is_set() + + +def test_str_replace_and_append_on_same_path_should_preserve_both_updates(monkeypatch) -> None: + class SharedSandbox: + def __init__(self) -> None: + self.id = "sandbox-1" + self.content = "alpha\n" + self.state_lock = threading.Lock() + self.str_replace_has_snapshot = threading.Event() + self.append_finished = threading.Event() + + def read_file(self, path: str) -> str: + with self.state_lock: + snapshot = self.content + self.str_replace_has_snapshot.set() + self.append_finished.wait(0.05) + return snapshot + + def write_file(self, path: str, content: str, append: bool = False) -> None: + with self.state_lock: + if append: + self.content += content + self.append_finished.set() + else: + self.content = content + + sandbox = SharedSandbox() + runtimes = [ + SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={}), + SimpleNamespace(state={}, context={"thread_id": "thread-1"}, config={}), + ] + failures: list[BaseException] = [] + + monkeypatch.setattr("deerflow.sandbox.tools.ensure_sandbox_initialized", lambda runtime: sandbox) + monkeypatch.setattr("deerflow.sandbox.tools.ensure_thread_directories_exist", lambda runtime: None) + monkeypatch.setattr("deerflow.sandbox.tools.is_local_sandbox", lambda runtime: False) + + def replace_worker() -> None: + try: + result = str_replace_tool.func( + runtime=runtimes[0], + description="替换旧内容", + path="/mnt/user-data/workspace/shared.txt", + old_str="alpha", + new_str="ALPHA", + ) + assert result == "OK" + except BaseException as exc: # pragma: no cover - failure is asserted below + failures.append(exc) + + def append_worker() -> None: + try: + sandbox.str_replace_has_snapshot.wait(0.05) + result = write_file_tool.func( + runtime=runtimes[1], + description="追加新内容", + path="/mnt/user-data/workspace/shared.txt", + content="tail\n", + append=True, + ) + assert result == "OK" + except BaseException as exc: # pragma: no cover - failure is asserted below + failures.append(exc) + + replace_thread = threading.Thread(target=replace_worker) + append_thread = threading.Thread(target=append_worker) + + replace_thread.start() + append_thread.start() + replace_thread.join() + append_thread.join() + + assert failures == [] + assert sandbox.content == "ALPHA\ntail\n"