mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
* feat(sandbox): add grep and glob tools * refactor(aio-sandbox): use native file search APIs * fix(sandbox): address review issues in grep/glob tools - aio_sandbox: use should_ignore_path() instead of should_ignore_name() for include_dirs=True branch to filter nested ignored paths correctly - aio_sandbox: add early exit when max_results reached in glob loop - aio_sandbox: guard entry.path.startswith(path) before stripping prefix - aio_sandbox: validate regex locally before sending to remote API - search: skip lines exceeding max_line_chars to prevent ReDoS - search: remove resolve() syscall in os.walk loop - tools: avoid double get_thread_data() call in glob_tool/grep_tool - tests: add 6 new cases covering the above code paths - tests: patch get_app_config in truncation test to isolate config * Fix sandbox grep/glob review feedback * Remove unrelated Langfuse RFC from PR
211 lines
5.9 KiB
Python
211 lines
5.9 KiB
Python
import fnmatch
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path, PurePosixPath
|
|
|
|
IGNORE_PATTERNS = [
|
|
".git",
|
|
".svn",
|
|
".hg",
|
|
".bzr",
|
|
"node_modules",
|
|
"__pycache__",
|
|
".venv",
|
|
"venv",
|
|
".env",
|
|
"env",
|
|
".tox",
|
|
".nox",
|
|
".eggs",
|
|
"*.egg-info",
|
|
"site-packages",
|
|
"dist",
|
|
"build",
|
|
".next",
|
|
".nuxt",
|
|
".output",
|
|
".turbo",
|
|
"target",
|
|
"out",
|
|
".idea",
|
|
".vscode",
|
|
"*.swp",
|
|
"*.swo",
|
|
"*~",
|
|
".project",
|
|
".classpath",
|
|
".settings",
|
|
".DS_Store",
|
|
"Thumbs.db",
|
|
"desktop.ini",
|
|
"*.lnk",
|
|
"*.log",
|
|
"*.tmp",
|
|
"*.temp",
|
|
"*.bak",
|
|
"*.cache",
|
|
".cache",
|
|
"logs",
|
|
".coverage",
|
|
"coverage",
|
|
".nyc_output",
|
|
"htmlcov",
|
|
".pytest_cache",
|
|
".mypy_cache",
|
|
".ruff_cache",
|
|
]
|
|
|
|
DEFAULT_MAX_FILE_SIZE_BYTES = 1_000_000
|
|
DEFAULT_LINE_SUMMARY_LENGTH = 200
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class GrepMatch:
|
|
path: str
|
|
line_number: int
|
|
line: str
|
|
|
|
|
|
def should_ignore_name(name: str) -> bool:
|
|
for pattern in IGNORE_PATTERNS:
|
|
if fnmatch.fnmatch(name, pattern):
|
|
return True
|
|
return False
|
|
|
|
|
|
def should_ignore_path(path: str) -> bool:
|
|
return any(should_ignore_name(segment) for segment in path.replace("\\", "/").split("/") if segment)
|
|
|
|
|
|
def path_matches(pattern: str, rel_path: str) -> bool:
|
|
path = PurePosixPath(rel_path)
|
|
if path.match(pattern):
|
|
return True
|
|
if pattern.startswith("**/"):
|
|
return path.match(pattern[3:])
|
|
return False
|
|
|
|
|
|
def truncate_line(line: str, max_chars: int = DEFAULT_LINE_SUMMARY_LENGTH) -> str:
|
|
line = line.rstrip("\n\r")
|
|
if len(line) <= max_chars:
|
|
return line
|
|
return line[: max_chars - 3] + "..."
|
|
|
|
|
|
def is_binary_file(path: Path, sample_size: int = 8192) -> bool:
|
|
try:
|
|
with path.open("rb") as handle:
|
|
return b"\0" in handle.read(sample_size)
|
|
except OSError:
|
|
return True
|
|
|
|
|
|
def find_glob_matches(root: Path, pattern: str, *, include_dirs: bool = False, max_results: int = 200) -> tuple[list[str], bool]:
|
|
matches: list[str] = []
|
|
truncated = False
|
|
root = root.resolve()
|
|
|
|
if not root.exists():
|
|
raise FileNotFoundError(root)
|
|
if not root.is_dir():
|
|
raise NotADirectoryError(root)
|
|
|
|
for current_root, dirs, files in os.walk(root):
|
|
dirs[:] = [name for name in dirs if not should_ignore_name(name)]
|
|
# root is already resolved; os.walk builds current_root by joining under root,
|
|
# so relative_to() works without an extra stat()/resolve() per directory.
|
|
rel_dir = Path(current_root).relative_to(root)
|
|
|
|
if include_dirs:
|
|
for name in dirs:
|
|
rel_path = (rel_dir / name).as_posix()
|
|
if path_matches(pattern, rel_path):
|
|
matches.append(str(Path(current_root) / name))
|
|
if len(matches) >= max_results:
|
|
truncated = True
|
|
return matches, truncated
|
|
|
|
for name in files:
|
|
if should_ignore_name(name):
|
|
continue
|
|
rel_path = (rel_dir / name).as_posix()
|
|
if path_matches(pattern, rel_path):
|
|
matches.append(str(Path(current_root) / name))
|
|
if len(matches) >= max_results:
|
|
truncated = True
|
|
return matches, truncated
|
|
|
|
return matches, truncated
|
|
|
|
|
|
def find_grep_matches(
|
|
root: Path,
|
|
pattern: str,
|
|
*,
|
|
glob_pattern: str | None = None,
|
|
literal: bool = False,
|
|
case_sensitive: bool = False,
|
|
max_results: int = 100,
|
|
max_file_size: int = DEFAULT_MAX_FILE_SIZE_BYTES,
|
|
line_summary_length: int = DEFAULT_LINE_SUMMARY_LENGTH,
|
|
) -> tuple[list[GrepMatch], bool]:
|
|
matches: list[GrepMatch] = []
|
|
truncated = False
|
|
root = root.resolve()
|
|
|
|
if not root.exists():
|
|
raise FileNotFoundError(root)
|
|
if not root.is_dir():
|
|
raise NotADirectoryError(root)
|
|
|
|
regex_source = re.escape(pattern) if literal else pattern
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
regex = re.compile(regex_source, flags)
|
|
|
|
# Skip lines longer than this to prevent ReDoS on minified / no-newline files.
|
|
_max_line_chars = line_summary_length * 10
|
|
|
|
for current_root, dirs, files in os.walk(root):
|
|
dirs[:] = [name for name in dirs if not should_ignore_name(name)]
|
|
rel_dir = Path(current_root).relative_to(root)
|
|
|
|
for name in files:
|
|
if should_ignore_name(name):
|
|
continue
|
|
|
|
candidate_path = Path(current_root) / name
|
|
rel_path = (rel_dir / name).as_posix()
|
|
|
|
if glob_pattern is not None and not path_matches(glob_pattern, rel_path):
|
|
continue
|
|
|
|
try:
|
|
if candidate_path.is_symlink():
|
|
continue
|
|
file_path = candidate_path.resolve()
|
|
if not file_path.is_relative_to(root):
|
|
continue
|
|
if file_path.stat().st_size > max_file_size or is_binary_file(file_path):
|
|
continue
|
|
with file_path.open(encoding="utf-8", errors="replace") as handle:
|
|
for line_number, line in enumerate(handle, start=1):
|
|
if len(line) > _max_line_chars:
|
|
continue
|
|
if regex.search(line):
|
|
matches.append(
|
|
GrepMatch(
|
|
path=str(file_path),
|
|
line_number=line_number,
|
|
line=truncate_line(line, line_summary_length),
|
|
)
|
|
)
|
|
if len(matches) >= max_results:
|
|
truncated = True
|
|
return matches, truncated
|
|
except OSError:
|
|
continue
|
|
|
|
return matches, truncated
|