mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
fix: add Windows shell fallback for local sandbox (#1505)
* fix: add Windows shell fallback for local sandbox * fix: handle PowerShell execution on Windows * fix: handle Windows local shell execution --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
parent
92c7a20cb7
commit
68c9e09a7a
@ -1,3 +1,4 @@
|
||||
import ntpath
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@ -8,6 +9,36 @@ from deerflow.sandbox.sandbox import Sandbox
|
||||
|
||||
|
||||
class LocalSandbox(Sandbox):
|
||||
@staticmethod
|
||||
def _shell_name(shell: str) -> str:
|
||||
"""Return the executable name for a shell path or command."""
|
||||
return shell.replace("\\", "/").rsplit("/", 1)[-1].lower()
|
||||
|
||||
@staticmethod
|
||||
def _is_powershell(shell: str) -> bool:
|
||||
"""Return whether the selected shell is a PowerShell executable."""
|
||||
return LocalSandbox._shell_name(shell) in {"powershell", "powershell.exe", "pwsh", "pwsh.exe"}
|
||||
|
||||
@staticmethod
|
||||
def _is_cmd_shell(shell: str) -> bool:
|
||||
"""Return whether the selected shell is cmd.exe."""
|
||||
return LocalSandbox._shell_name(shell) in {"cmd", "cmd.exe"}
|
||||
|
||||
@staticmethod
|
||||
def _find_first_available_shell(candidates: tuple[str, ...]) -> str | None:
|
||||
"""Return the first executable shell path or command found from candidates."""
|
||||
for shell in candidates:
|
||||
if os.path.isabs(shell):
|
||||
if os.path.isfile(shell) and os.access(shell, os.X_OK):
|
||||
return shell
|
||||
continue
|
||||
|
||||
shell_from_path = shutil.which(shell)
|
||||
if shell_from_path is not None:
|
||||
return shell_from_path
|
||||
|
||||
return None
|
||||
|
||||
def __init__(self, id: str, path_mappings: dict[str, str] | None = None):
|
||||
"""
|
||||
Initialize local sandbox with optional path mappings.
|
||||
@ -137,32 +168,59 @@ class LocalSandbox(Sandbox):
|
||||
|
||||
@staticmethod
|
||||
def _get_shell() -> str:
|
||||
"""Detect available shell executable with fallback.
|
||||
"""Detect available shell executable with fallback."""
|
||||
shell = LocalSandbox._find_first_available_shell(("/bin/zsh", "/bin/bash", "/bin/sh", "sh"))
|
||||
if shell is not None:
|
||||
return shell
|
||||
|
||||
Returns the first available shell in order of preference:
|
||||
/bin/zsh → /bin/bash → /bin/sh → first `sh` found on PATH.
|
||||
Raises a RuntimeError if no suitable shell is found.
|
||||
"""
|
||||
for shell in ("/bin/zsh", "/bin/bash", "/bin/sh"):
|
||||
if os.path.isfile(shell) and os.access(shell, os.X_OK):
|
||||
if os.name == "nt":
|
||||
system_root = os.environ.get("SystemRoot", r"C:\Windows")
|
||||
shell = LocalSandbox._find_first_available_shell(
|
||||
(
|
||||
"pwsh",
|
||||
"pwsh.exe",
|
||||
"powershell",
|
||||
"powershell.exe",
|
||||
ntpath.join(system_root, "System32", "WindowsPowerShell", "v1.0", "powershell.exe"),
|
||||
"cmd.exe",
|
||||
)
|
||||
)
|
||||
if shell is not None:
|
||||
return shell
|
||||
shell_from_path = shutil.which("sh")
|
||||
if shell_from_path is not None:
|
||||
return shell_from_path
|
||||
|
||||
raise RuntimeError("No suitable shell executable found. Tried /bin/zsh, /bin/bash, /bin/sh, `sh` on PATH, then PowerShell and cmd.exe fallbacks for Windows.")
|
||||
|
||||
raise RuntimeError("No suitable shell executable found. Tried /bin/zsh, /bin/bash, /bin/sh, and `sh` on PATH.")
|
||||
|
||||
def execute_command(self, command: str) -> str:
|
||||
# Resolve container paths in command before execution
|
||||
resolved_command = self._resolve_paths_in_command(command)
|
||||
shell = self._get_shell()
|
||||
|
||||
result = subprocess.run(
|
||||
resolved_command,
|
||||
executable=self._get_shell(),
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=600,
|
||||
)
|
||||
if os.name == "nt":
|
||||
if self._is_powershell(shell):
|
||||
args = [shell, "-NoProfile", "-Command", resolved_command]
|
||||
elif self._is_cmd_shell(shell):
|
||||
args = [shell, "/c", resolved_command]
|
||||
else:
|
||||
args = [shell, "-c", resolved_command]
|
||||
|
||||
result = subprocess.run(
|
||||
args,
|
||||
shell=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=600,
|
||||
)
|
||||
else:
|
||||
result = subprocess.run(
|
||||
resolved_command,
|
||||
executable=shell,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=600,
|
||||
)
|
||||
output = result.stdout
|
||||
if result.stderr:
|
||||
output += f"\nStd Error:\n{result.stderr}" if output else result.stderr
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import builtins
|
||||
from types import SimpleNamespace
|
||||
|
||||
import deerflow.sandbox.local.local_sandbox as local_sandbox
|
||||
from deerflow.sandbox.local.local_sandbox import LocalSandbox
|
||||
@ -31,3 +32,133 @@ def test_write_file_uses_utf8_on_windows_locale(tmp_path, monkeypatch):
|
||||
LocalSandbox("t").write_file(str(path), text)
|
||||
|
||||
assert path.read_text(encoding="utf-8") == text
|
||||
|
||||
|
||||
def test_get_shell_prefers_posix_shell_from_path_before_windows_fallback(monkeypatch):
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(LocalSandbox, "_find_first_available_shell", lambda candidates: r"C:\Program Files\Git\bin\sh.exe" if candidates == ("/bin/zsh", "/bin/bash", "/bin/sh", "sh") else None)
|
||||
|
||||
assert LocalSandbox._get_shell() == r"C:\Program Files\Git\bin\sh.exe"
|
||||
|
||||
|
||||
def test_get_shell_uses_powershell_fallback_on_windows(monkeypatch):
|
||||
calls: list[tuple[str, ...]] = []
|
||||
|
||||
def fake_find(candidates: tuple[str, ...]) -> str | None:
|
||||
calls.append(candidates)
|
||||
if candidates == ("/bin/zsh", "/bin/bash", "/bin/sh", "sh"):
|
||||
return None
|
||||
return r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe"
|
||||
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(local_sandbox.os, "environ", {"SystemRoot": r"C:\Windows"})
|
||||
monkeypatch.setattr(LocalSandbox, "_find_first_available_shell", fake_find)
|
||||
|
||||
assert LocalSandbox._get_shell() == r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe"
|
||||
assert calls[1] == (
|
||||
"pwsh",
|
||||
"pwsh.exe",
|
||||
"powershell",
|
||||
"powershell.exe",
|
||||
r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
|
||||
"cmd.exe",
|
||||
)
|
||||
|
||||
|
||||
def test_get_shell_uses_cmd_as_last_windows_fallback(monkeypatch):
|
||||
def fake_find(candidates: tuple[str, ...]) -> str | None:
|
||||
if candidates == ("/bin/zsh", "/bin/bash", "/bin/sh", "sh"):
|
||||
return None
|
||||
return r"C:\Windows\System32\cmd.exe"
|
||||
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(local_sandbox.os, "environ", {"SystemRoot": r"C:\Windows"})
|
||||
monkeypatch.setattr(LocalSandbox, "_find_first_available_shell", fake_find)
|
||||
|
||||
assert LocalSandbox._get_shell() == r"C:\Windows\System32\cmd.exe"
|
||||
|
||||
|
||||
def test_execute_command_uses_powershell_command_mode_on_windows(monkeypatch):
|
||||
calls: list[tuple[object, dict]] = []
|
||||
|
||||
def fake_run(*args, **kwargs):
|
||||
calls.append((args[0], kwargs))
|
||||
return SimpleNamespace(stdout="ok", stderr="", returncode=0)
|
||||
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(LocalSandbox, "_get_shell", staticmethod(lambda: r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe"))
|
||||
monkeypatch.setattr(local_sandbox.subprocess, "run", fake_run)
|
||||
|
||||
output = LocalSandbox("t").execute_command("Write-Output hello")
|
||||
|
||||
assert output == "ok"
|
||||
assert calls == [
|
||||
(
|
||||
[
|
||||
r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe",
|
||||
"-NoProfile",
|
||||
"-Command",
|
||||
"Write-Output hello",
|
||||
],
|
||||
{
|
||||
"shell": False,
|
||||
"capture_output": True,
|
||||
"text": True,
|
||||
"timeout": 600,
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_execute_command_uses_posix_shell_command_mode_on_windows(monkeypatch):
|
||||
calls: list[tuple[object, dict]] = []
|
||||
|
||||
def fake_run(*args, **kwargs):
|
||||
calls.append((args[0], kwargs))
|
||||
return SimpleNamespace(stdout="ok", stderr="", returncode=0)
|
||||
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(LocalSandbox, "_get_shell", staticmethod(lambda: r"C:\Program Files\Git\bin\sh.exe"))
|
||||
monkeypatch.setattr(local_sandbox.subprocess, "run", fake_run)
|
||||
|
||||
output = LocalSandbox("t").execute_command("echo hello")
|
||||
|
||||
assert output == "ok"
|
||||
assert calls == [
|
||||
(
|
||||
[r"C:\Program Files\Git\bin\sh.exe", "-c", "echo hello"],
|
||||
{
|
||||
"shell": False,
|
||||
"capture_output": True,
|
||||
"text": True,
|
||||
"timeout": 600,
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_execute_command_uses_cmd_command_mode_on_windows(monkeypatch):
|
||||
calls: list[tuple[object, dict]] = []
|
||||
|
||||
def fake_run(*args, **kwargs):
|
||||
calls.append((args[0], kwargs))
|
||||
return SimpleNamespace(stdout="ok", stderr="", returncode=0)
|
||||
|
||||
monkeypatch.setattr(local_sandbox.os, "name", "nt")
|
||||
monkeypatch.setattr(LocalSandbox, "_get_shell", staticmethod(lambda: r"C:\Windows\System32\cmd.exe"))
|
||||
monkeypatch.setattr(local_sandbox.subprocess, "run", fake_run)
|
||||
|
||||
output = LocalSandbox("t").execute_command("echo hello")
|
||||
|
||||
assert output == "ok"
|
||||
assert calls == [
|
||||
(
|
||||
[r"C:\Windows\System32\cmd.exe", "/c", "echo hello"],
|
||||
{
|
||||
"shell": False,
|
||||
"capture_output": True,
|
||||
"text": True,
|
||||
"timeout": 600,
|
||||
},
|
||||
)
|
||||
]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user