2026-01-07 16:24:01 +08:00

1101 lines
36 KiB
Python
Executable File

"""File-related function tools for model-invoked file access."""
import fnmatch
import locale
import mimetypes
import os
import re
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import (
Annotated,
Any,
Dict,
Iterable,
List,
Literal,
Mapping,
MutableSequence,
Optional,
Sequence,
)
from entity.messages import MessageBlock, MessageBlockType
from utils.attachments import AttachmentStore
from utils.workspace_scanner import iter_workspace_entries
from utils.function_catalog import ParamMeta
class FileToolContext:
"""Helper to read runtime context injected via `_context` kwarg."""
def __init__(self, ctx: Dict[str, Any] | None):
if ctx is None:
raise ValueError("_context is required for file tools")
self._ctx = ctx
self.attachment_store = self._require_store(ctx.get("attachment_store"))
self.workspace_root = self._require_workspace(ctx.get("python_workspace_root"))
self.session_root = self._require_session_root(ctx.get("graph_directory"), self.workspace_root)
@staticmethod
def _require_store(store: Any) -> AttachmentStore:
if not isinstance(store, AttachmentStore):
raise ValueError("attachment_store missing from _context")
return store
@staticmethod
def _require_workspace(root: Any) -> Path:
if root is None:
raise ValueError("python_workspace_root missing from _context")
path = Path(root).resolve()
path.mkdir(parents=True, exist_ok=True)
return path
@staticmethod
def _require_session_root(root: Any, workspace_root: Path) -> Path:
base = root or workspace_root.parent
path = Path(base).resolve()
path.mkdir(parents=True, exist_ok=True)
return path
def resolve_under_workspace(self, relative_path: str | Path) -> Path:
rel = Path(relative_path)
target = rel.resolve() if rel.is_absolute() else (self.workspace_root / rel).resolve()
if self.workspace_root not in target.parents and target != self.workspace_root:
raise ValueError("Path is outside workspace")
return target
def resolve_under_session(self, relative_path: str | Path) -> Path:
raw = Path(relative_path)
candidates = []
if raw.is_absolute():
candidates.append(raw.resolve())
else:
candidates.append((self.session_root / raw).resolve())
candidates.append(raw.resolve())
for target in candidates:
if self.session_root in target.parents or target == self.session_root:
return target
raise ValueError("Path is outside session directory")
def to_session_relative(self, absolute_path: str | Path | None) -> Optional[str]:
if not absolute_path:
return None
target = Path(absolute_path).resolve()
if self.session_root in target.parents or target == self.session_root:
return target.relative_to(self.session_root).as_posix()
return str(target)
def to_workspace_relative(self, absolute_path: str | Path | None) -> Optional[str]:
if not absolute_path:
return None
target = Path(absolute_path).resolve()
if self.workspace_root in target.parents or target == self.workspace_root:
rel = target.relative_to(self.workspace_root)
return rel.as_posix() or "."
return None
def _check_attachments_not_modified(path: str) -> None:
if path.startswith("attachments"):
raise ValueError("Modifications to the attachments directory are not allowed")
def describe_available_files(
*,
recursive: bool = True,
limit: int = 200,
include_hidden: bool = False,
# max_depth: int = 5,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""
List accessible files from the attachment store and the current code_workspace.
"""
max_depth = 8
ctx = FileToolContext(_context)
entries: List[Dict[str, Any]] = []
total_limit = max(1, limit)
# # Attachment store (user uploads or files registered via load_file)
# for attachment_id, record in ctx.attachment_store.list_records().items():
# ref = record.ref
# workspace_path = ctx.to_workspace_relative(ref.local_path)
# session_path = ctx.to_session_relative(ref.local_path)
# display_path = workspace_path or session_path or ref.local_path
# entries.append(
# {
# "id": attachment_id,
# "name": ref.name,
# "source": record.extra.get("source") if record.extra else "attachment",
# "mime": ref.mime_type,
# "size": ref.size,
# "type": "file",
# "path": display_path,
# }
# )
# if len(entries) >= total_limit:
# return {"files": entries}
# Workspace files (includes attachments directory because it sits inside workspace)
for entry in iter_workspace_entries(
ctx.workspace_root,
recursive=recursive,
max_depth=max_depth,
include_hidden=include_hidden,
):
if len(entries) >= total_limit:
break
abs_path = (ctx.workspace_root / entry.path).resolve()
workspace_path = Path(entry.path)
# session_path = ctx.to_session_relative(abs_path)
entries.append(
{
"id": entry.path,
"name": Path(entry.path).name,
"source": "workspace",
"path": workspace_path,
"absolute_path": abs_path,
"type": entry.type,
"size": entry.size,
"depth": entry.depth,
}
)
return {"files": entries[:total_limit]}
def list_directory(
path: Annotated[str, ParamMeta(description="Workspace-relative directory path")]=".",
*,
recursive: Annotated[bool, ParamMeta(description="Traverse subdirectories")] = False,
max_depth: Annotated[int, ParamMeta(description="Maximum depth when recursive=True")] = 3,
include_hidden: Annotated[bool, ParamMeta(description="Include entries starting with '.'")] = False,
limit: Annotated[int, ParamMeta(description="Maximum entries to return")] = 500,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""List contents of a workspace-relative directory."""
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if not target.exists():
raise FileNotFoundError(f"Directory not found: {path}")
if not target.is_dir():
raise NotADirectoryError(f"Path is not a directory: {path}")
if limit <= 0:
raise ValueError("limit must be positive")
if recursive and max_depth < 1:
raise ValueError("max_depth must be >= 1 when recursive")
entries: List[Dict[str, Any]] = []
stack: List[tuple[Path, int]] = [(target, 0)]
base_relative = ctx.to_workspace_relative(target) or "."
while stack and len(entries) < limit:
current, depth = stack.pop()
try:
children = sorted(current.iterdir(), key=lambda p: p.name.lower())
except (FileNotFoundError, PermissionError):
continue
for child in children:
rel = child.relative_to(target)
if not include_hidden and _path_is_hidden(rel):
continue
stat_size = None
modified = None
try:
stat = child.stat()
modified = stat.st_mtime
if child.is_file():
stat_size = stat.st_size
except (FileNotFoundError, PermissionError, OSError):
pass
entry = {
"name": child.name,
"relative_path": rel.as_posix(),
"absolute_path": str(child),
"type": "directory" if child.is_dir() else "file",
"size": stat_size,
"modified_ts": modified,
"depth": depth,
}
entries.append(entry)
if len(entries) >= limit:
break
if recursive and child.is_dir() and depth + 1 < max_depth:
stack.append((child, depth + 1))
return {
"directory": base_relative,
"entries": entries[:limit],
"truncated": len(entries) >= limit,
"recursive": recursive,
}
def create_folder(
path: Annotated[str, ParamMeta(description="Workspace-relative folder path")],
*,
parents: Annotated[bool, ParamMeta(description="Create missing parent directories")]
= True,
exist_ok: Annotated[bool, ParamMeta(description="Do not raise if folder already exists")]
= True,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Create a directory tree under the workspace."""
if not path:
raise ValueError("path must be provided")
_check_attachments_not_modified(path)
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if target.exists() and not target.is_dir():
raise ValueError("Target exists and is not a directory")
previously_exists = target.exists()
target.mkdir(parents=parents, exist_ok=exist_ok)
return {
"path": ctx.to_workspace_relative(target),
"absolute_path": str(target),
"created": not previously_exists,
}
def delete_path(
path: Annotated[str, ParamMeta(description="Workspace-relative file or folder path")],
*,
recursive: Annotated[
bool,
ParamMeta(description="Allow deleting non-empty directories recursively"),
] = False,
missing_ok: Annotated[bool, ParamMeta(description="Suppress error if path is missing")]
= False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Delete a workspace file or directory."""
if not path:
raise ValueError("path must be provided")
_check_attachments_not_modified(path)
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if not target.exists():
if missing_ok:
return {
"path": ctx.to_workspace_relative(target),
"absolute_path": str(target),
"deleted": False,
"reason": "missing",
}
raise FileNotFoundError(f"Path not found: {path}")
if target.is_dir():
if not recursive:
raise IsADirectoryError("Set recursive=True to delete directories")
shutil.rmtree(target)
deleted_type = "directory"
else:
target.unlink()
deleted_type = "file"
return {
"path": ctx.to_workspace_relative(target),
"absolute_path": str(target),
"deleted": True,
"type": deleted_type,
}
def load_file(
path_or_id: str,
*,
# mime_override: Optional[str] = None,
_context: Dict[str, Any] | None = None,
) -> List[MessageBlock]:
"""
Load an attachment by ID or register a workspace file as a new attachment.
"""
ctx = FileToolContext(_context)
# First, try existing attachment id
record = ctx.attachment_store.get(path_or_id)
if record:
return [record.as_message_block()]
# Otherwise treat as workspace path
target = ctx.resolve_under_workspace(path_or_id)
if not target.exists() or not target.is_file():
raise ValueError(f"Workspace file not found: {path_or_id}")
# mime_type = mime_override or (mimetypes.guess_type(target.name)[0] or "application/octet-stream")
mime_type = mimetypes.guess_type(target.name)[0] or "application/octet-stream"
record = ctx.attachment_store.register_file(
target,
kind=MessageBlockType.from_mime_type(mime_type),
display_name=target.name,
mime_type=mime_type,
copy_file=False,
persist=False,
deduplicate=True,
extra={
"source": "workspace",
"workspace_path": path_or_id,
"storage": "reference",
},
)
return [record.as_message_block()]
def save_file(
path: str,
content: str,
*,
encoding: str = "utf-8",
mode: Literal["overwrite", "append"] = "overwrite",
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""
Persist data to a workspace file while optionally registering it as an attachment.
Args:
path: Relative path where the file will be written.
content: Plain-text payload encoded with `encoding`.
encoding: Text encoding used when `content` is provided.
mode: Whether to replace the file (`overwrite`) or append to it (`append`).
Returns:
A dictionary describing the persisted file, including workspace path, absolute path,
and byte size.
Raises:
ValueError: If arguments are missing/invalid or the path escapes the workspace.
OSError: If the file cannot be written.
"""
if mode not in {"overwrite", "append"}:
raise ValueError("mode must be either 'overwrite' or 'append'")
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if target.exists() and target.is_dir():
raise ValueError("Target path points to a directory")
target.parent.mkdir(parents=True, exist_ok=True)
data = content.encode(encoding)
write_mode = "wb" if mode == "overwrite" else "ab"
try:
with target.open(write_mode) as handle:
handle.write(data)
except OSError as exc:
raise OSError(f"Failed to write file '{target}': {exc}") from exc
size = target.stat().st_size if target.exists() else None
return {
"path": ctx.to_workspace_relative(target),
"absolute_path": str(target),
"size": size,
# "mode": mode,
# "encoding": encoding if content is not None else None,
}
def read_text_file_snippet(
path: str,
*,
offset: int = 0,
limit: int = 4000,
encoding: str = "utf-8",
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Read a snippet of a workspace text file without loading entire content."""
ctx = FileToolContext(_context)
target: Path | None = None
try:
candidate = ctx.resolve_under_workspace(path)
except ValueError:
candidate = None
if candidate and candidate.exists() and candidate.is_file():
target = candidate
if target is None:
target = ctx.resolve_under_session(path)
if not target.exists() or not target.is_file():
raise ValueError(f"File not found in session attachments/workspace: {path}")
data = target.read_text(encoding=encoding, errors="replace")
snippet = data[offset : offset + limit]
return {
"snippet": snippet,
"truncated": offset + limit < len(data),
"length": len(data),
"offset": offset,
}
def read_file_segment(
path: Annotated[str, ParamMeta(description="Workspace-relative text file path")],
*,
start_line: Annotated[int, ParamMeta(description="1-based line to begin the snippet")]=1,
line_count: Annotated[int, ParamMeta(description="Number of lines to include starting from start_line")]=40,
inline_line_numbers: Annotated[
bool,
ParamMeta(description="If true, prefix each snippet line with its line number inside the snippet"),
] = False,
encoding: Annotated[str, ParamMeta(description="Explicit encoding or 'auto'")]="auto",
include_line_offsets: Annotated[
bool,
ParamMeta(description="Include 1-based line metadata for the returned snippet"),
] = False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Read a line range plus metadata from a workspace file."""
if start_line < 1:
raise ValueError("start_line must be >= 1")
if line_count < 1:
raise ValueError("line_count must be >= 1")
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if not target.exists() or not target.is_file():
raise FileNotFoundError(f"File not found: {path}")
text, used_encoding = _read_text_content(target, encoding)
newline_style = _detect_newline(text)
stat = target.stat()
lines_with_breaks = text.splitlines(keepends=True)
if not lines_with_breaks:
lines_with_breaks = [""]
total_lines = len(lines_with_breaks)
start_idx = start_line - 1
if start_idx >= total_lines:
raise ValueError("start_line is beyond the total number of lines in the file")
lines_returned = min(line_count, total_lines - start_idx)
end_idx = start_idx + lines_returned
segment_lines = lines_with_breaks[start_idx:end_idx]
snippet = "".join(segment_lines)
raw_snippet = snippet
line_starts: List[int] = [0]
for line in lines_with_breaks:
line_starts.append(line_starts[-1] + len(line))
start_char = line_starts[start_idx]
response: Dict[str, Any] = {
"path": ctx.to_workspace_relative(target),
"encoding": used_encoding,
"newline": newline_style,
"start_line": start_line,
"end_line": start_line + lines_returned - 1,
"line_count": line_count,
"lines_returned": lines_returned,
"total_lines": total_lines,
"snippet": raw_snippet,
"truncated": end_idx < total_lines,
"file_size": stat.st_size,
"modified_ts": stat.st_mtime,
"mode": "line_range",
}
if inline_line_numbers:
snippet = _render_snippet_with_line_numbers(
segment_lines,
start_line,
newline_style,
raw_snippet.endswith(("\r\n", "\n", "\r")),
)
response["snippet"] = snippet
if include_line_offsets:
response.update(_describe_segment_line_offsets(text, start_char, raw_snippet))
return response
@dataclass(frozen=True)
class TextEdit:
"""Normalized representation of a single line edit."""
start_line: int
end_line: int
replacement_lines: List[str]
def apply_text_edits(
path: Annotated[str, ParamMeta(description="Workspace-relative file to edit")],
*,
start_line: Annotated[int, ParamMeta(description="1-based line where the replacement should begin")],
end_line: Annotated[
Optional[int],
ParamMeta(description="Last line (>= start_line-1) to replace; defaults to start_line"),
] = None,
replacement: Annotated[
Optional[str],
ParamMeta(description="Text that should replace the selected line range"),
] = "",
encoding: Annotated[str, ParamMeta(description="Text encoding or 'auto'")]="auto",
newline: Annotated[
str,
ParamMeta(description="Newline style: 'preserve', 'lf', 'crlf', or 'cr'"),
]="preserve",
ensure_trailing_newline: Annotated[
Optional[bool],
ParamMeta(description="Force presence/absence of trailing newline; default preserves original"),
] = None,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Apply ordered line edits with newline and encoding preservation."""
ctx = FileToolContext(_context)
target = ctx.resolve_under_workspace(path)
if not target.exists() or not target.is_file():
raise FileNotFoundError(f"File not found: {path}")
_check_attachments_not_modified(path)
normalized = _normalize_edits(_build_single_edit(start_line, end_line, replacement))
original_text, used_encoding = _read_text_content(target, encoding)
lines, had_trailing_newline = _split_lines(original_text)
newline_style = _resolve_newline_choice(newline, _detect_newline(original_text))
_apply_edits_in_place(lines, normalized)
if ensure_trailing_newline is None:
final_trailing = had_trailing_newline
else:
final_trailing = ensure_trailing_newline
rendered = newline_style.join(lines)
if final_trailing:
rendered += newline_style
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(rendered, encoding=used_encoding)
stat = target.stat()
return {
"path": ctx.to_workspace_relative(target),
"encoding": used_encoding,
"newline": newline_style,
"line_count": len(lines),
"applied_edits": len(normalized),
"trailing_newline": final_trailing,
"file_size": stat.st_size,
"modified_ts": stat.st_mtime,
}
def rename_path(
src: Annotated[str, ParamMeta(description="Existing workspace-relative path")],
dst: Annotated[str, ParamMeta(description="New workspace-relative path")],
*,
overwrite: Annotated[
bool,
ParamMeta(description="Allow replacing an existing destination"),
] = False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Rename files or directories inside the workspace."""
ctx = FileToolContext(_context)
source = ctx.resolve_under_workspace(src)
destination = ctx.resolve_under_workspace(dst)
_check_attachments_not_modified(src)
_check_attachments_not_modified(dst)
if not source.exists():
raise FileNotFoundError(f"Source does not exist: {src}")
if source == destination:
return {
"path": ctx.to_workspace_relative(destination),
"operation": "rename",
"skipped": True,
}
_clear_destination(destination, overwrite)
destination.parent.mkdir(parents=True, exist_ok=True)
source.rename(destination)
return {
"path": ctx.to_workspace_relative(destination),
"previous_path": ctx.to_workspace_relative(source),
"operation": "rename",
}
def copy_path(
src: Annotated[str, ParamMeta(description="Source workspace-relative path")],
dst: Annotated[str, ParamMeta(description="Destination workspace-relative path")],
*,
overwrite: Annotated[
bool,
ParamMeta(description="Allow replacing destination if it exists"),
] = False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Copy a file tree within the workspace."""
ctx = FileToolContext(_context)
source = ctx.resolve_under_workspace(src)
destination = ctx.resolve_under_workspace(dst)
_check_attachments_not_modified(dst)
if not source.exists():
raise FileNotFoundError(f"Source does not exist: {src}")
if destination.exists():
if not overwrite:
raise FileExistsError(f"Destination already exists: {dst}")
_clear_destination(destination, overwrite=True)
destination.parent.mkdir(parents=True, exist_ok=True)
if source.is_dir():
shutil.copytree(source, destination)
else:
shutil.copy2(source, destination)
return {
"path": ctx.to_workspace_relative(destination),
"source": ctx.to_workspace_relative(source),
"operation": "copy",
}
def move_path(
src: Annotated[str, ParamMeta(description="Source workspace-relative path")],
dst: Annotated[str, ParamMeta(description="Destination workspace-relative path")],
*,
overwrite: Annotated[
bool,
ParamMeta(description="Allow replacing destination path"),
] = False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Move files or directories, mirroring `mv` semantics across platforms."""
ctx = FileToolContext(_context)
source = ctx.resolve_under_workspace(src)
destination = ctx.resolve_under_workspace(dst)
_check_attachments_not_modified(src)
_check_attachments_not_modified(dst)
if not source.exists():
raise FileNotFoundError(f"Source does not exist: {src}")
if source == destination:
return {
"path": ctx.to_workspace_relative(destination),
"operation": "move",
"skipped": True,
}
_clear_destination(destination, overwrite)
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.move(source, destination)
return {
"path": ctx.to_workspace_relative(destination),
"source": ctx.to_workspace_relative(source),
"operation": "move",
}
def search_in_files(
pattern: Annotated[str, ParamMeta(description="Plain text or regex pattern")],
*,
globs: Annotated[
Optional[Sequence[str]],
ParamMeta(description="Restrict search to these glob patterns"),
] = None,
exclude_globs: Annotated[
Optional[Sequence[str]],
ParamMeta(description="Glob patterns to exclude"),
] = None,
use_regex: Annotated[bool, ParamMeta(description="Treat pattern as regex")]=True,
case_sensitive: Annotated[bool, ParamMeta(description="Match case when True")]=False,
max_results: Annotated[int, ParamMeta(description="Stop after this many matches")]=200,
before_context: Annotated[int, ParamMeta(description="Lines to include before match")]=2,
after_context: Annotated[int, ParamMeta(description="Lines to include after match")]=2,
include_hidden: Annotated[bool, ParamMeta(description="Search hidden files/folders")]=False,
_context: Dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Search workspace files and return structured matches."""
if max_results <= 0:
raise ValueError("max_results must be positive")
ctx = FileToolContext(_context)
include_patterns = _normalize_globs(globs) or ["**/*"]
exclude_patterns = _normalize_globs(exclude_globs)
matches: List[Dict[str, Any]] = []
searched_files = 0
compiled_regex: Optional[re.Pattern[str]] = None
literal = pattern if case_sensitive else pattern.lower()
if use_regex:
flags = re.MULTILINE
if not case_sensitive:
flags |= re.IGNORECASE
compiled_regex = re.compile(pattern, flags)
for candidate in _iter_candidate_files(
ctx.workspace_root,
include_patterns,
exclude_patterns,
include_hidden,
):
searched_files += 1
lines = _read_file_lines_for_search(candidate)
if not lines:
continue
for match in _iter_line_matches(
lines,
compiled_regex,
literal,
pattern,
case_sensitive,
use_regex,
):
before = _slice_context(lines, match["line_number"], before_context, before=True)
after = _slice_context(lines, match["line_number"], after_context, before=False)
matches.append(
{
"file": ctx.to_workspace_relative(candidate),
"line": match["line_number"],
"column": match["column"],
"line_text": match["line_text"],
"before": before,
"after": after,
}
)
if len(matches) >= max_results:
return {
"matches": matches,
"limited": True,
"engine": "python",
"searched_files": searched_files,
}
return {
"matches": matches,
"limited": False,
"engine": "python",
"searched_files": searched_files,
}
def _read_text_content(path: Path, encoding: str) -> tuple[str, str]:
if encoding != "auto":
return path.read_text(encoding=encoding), encoding
raw = path.read_bytes()
for candidate in _candidate_encodings():
try:
return raw.decode(candidate), candidate
except UnicodeDecodeError:
continue
return raw.decode("utf-8", errors="replace"), "utf-8"
def _candidate_encodings() -> List[str]:
preferred = locale.getpreferredencoding(False) or ""
ordered = [
"utf-8-sig",
"utf-8",
preferred,
"utf-16",
"utf-16-le",
"utf-16-be",
"latin-1",
]
seen: set[str] = set()
result: List[str] = []
for item in ordered:
normalized = (item or "").lower()
if not normalized or normalized in seen:
continue
seen.add(normalized)
result.append(item)
return result
_LINE_BREAK_RE = re.compile(r"\r\n|\r|\n")
def _detect_newline(text: str) -> str:
if "\r\n" in text:
return "\r\n"
if "\r" in text and "\n" not in text:
return "\r"
return "\n"
def _split_lines(text: str) -> tuple[List[str], bool]:
if not text:
return [], False
has_trailing = text.endswith(("\r\n", "\n", "\r"))
return text.splitlines(), has_trailing
def _describe_segment_line_offsets(full_text: str, start_index: int, snippet: str) -> Dict[str, Any]:
"""Return 1-based line metadata (columns are 0-based) for a snippet extracted from full_text."""
before_segment = full_text[:start_index]
start_line = 1
last_break_end = 0
for match in _LINE_BREAK_RE.finditer(before_segment):
start_line += 1
last_break_end = match.end()
start_column = start_index - last_break_end
line_offsets: List[Dict[str, int]] = [
{"line": start_line, "offset": 0, "column": start_column},
]
line_number = start_line
last_break_inside = 0
for match in _LINE_BREAK_RE.finditer(snippet):
last_break_inside = match.end()
line_number += 1
line_offsets.append({"line": line_number, "offset": match.end(), "column": 0})
if snippet:
if last_break_inside:
end_column = len(snippet) - last_break_inside
else:
end_column = start_column + len(snippet)
else:
end_column = start_column
return {
"start_line": start_line,
"start_column": start_column,
"end_line": line_number,
"end_column": end_column,
"line_offsets": line_offsets,
}
def _render_snippet_with_line_numbers(
lines: Sequence[str],
start_line: int,
newline_style: str,
preserve_trailing_newline: bool,
) -> str:
numbered: List[str] = []
for idx, line in enumerate(lines):
body = line.rstrip("\r\n")
numbered.append(f"{start_line + idx}:{body}")
rendered = newline_style.join(numbered)
if preserve_trailing_newline and numbered:
rendered += newline_style
return rendered
def _normalize_edits(edits: Sequence[Mapping[str, Any]]) -> List[TextEdit]:
if not edits:
raise ValueError("at least one edit instruction is required")
normalized: List[TextEdit] = []
for item in edits:
if not isinstance(item, Mapping):
raise ValueError("each edit entry must be a mapping object")
try:
start_line = int(item["start_line"])
except (KeyError, TypeError, ValueError) as exc:
raise ValueError("start_line is required for each edit") from exc
end_line_raw = item.get("end_line", start_line)
try:
end_line = int(end_line_raw)
except (TypeError, ValueError) as exc:
raise ValueError("end_line must be an integer") from exc
if start_line < 1:
raise ValueError("start_line must be >= 1")
if end_line < start_line - 1:
raise ValueError("end_line must be >= start_line - 1")
replacement = item.get("replacement", "")
if not isinstance(replacement, str):
raise ValueError("replacement must be a string")
normalized.append(
TextEdit(
start_line=start_line,
end_line=end_line,
replacement_lines=replacement.splitlines(),
)
)
normalized.sort(key=lambda edit: (edit.start_line, edit.end_line))
_validate_edit_ranges(normalized)
return normalized
def _build_single_edit(
start_line: int,
end_line: Optional[int],
replacement: Optional[str],
) -> List[Mapping[str, Any]]:
effective_end = end_line if end_line is not None else start_line
payload = {
"start_line": start_line,
"end_line": effective_end,
"replacement": replacement if replacement is not None else "",
}
return [payload]
def _validate_edit_ranges(edits: Sequence[TextEdit]) -> None:
previous_range_end = 0
for edit in edits:
effective_end = max(edit.end_line, edit.start_line - 1)
if edit.start_line <= previous_range_end and previous_range_end > 0:
raise ValueError("edit ranges overlap; merge them before calling apply_text_edits")
previous_range_end = max(previous_range_end, effective_end)
def _apply_edits_in_place(lines: MutableSequence[str], edits: Sequence[TextEdit]) -> None:
for edit in reversed(edits):
current_line_count = len(lines)
if edit.start_line > current_line_count + 1:
raise ValueError("start_line is beyond the end of the file")
start_idx = min(edit.start_line - 1, current_line_count)
if start_idx > current_line_count:
raise ValueError("start_line is beyond the end of the file")
removal_count = max(edit.end_line - edit.start_line + 1, 0)
if removal_count > 0:
end_line = min(edit.end_line, len(lines))
removal_count = max(end_line - edit.start_line + 1, 0)
end_idx = start_idx + removal_count
lines[start_idx:end_idx] = edit.replacement_lines
def _resolve_newline_choice(preference: str, detected: str) -> str:
normalized = (preference or "").lower()
if normalized == "lf":
return "\n"
if normalized == "crlf":
return "\r\n"
if normalized == "cr":
return "\r"
return detected or os.linesep
def _clear_destination(destination: Path, overwrite: bool) -> None:
if not destination.exists():
return
if not overwrite:
raise FileExistsError(f"Destination already exists: {destination}")
if destination.is_dir():
shutil.rmtree(destination)
else:
destination.unlink()
def _normalize_globs(patterns: Optional[Sequence[str]]) -> List[str]:
if not patterns:
return []
normalized: List[str] = []
for raw in patterns:
if not raw:
continue
normalized.append(str(raw))
return normalized
def _iter_candidate_files(
root: Path,
include_patterns: Sequence[str],
exclude_patterns: Sequence[str],
include_hidden: bool,
) -> Iterable[Path]:
yielded: set[str] = set()
for pattern in include_patterns:
for candidate in root.glob(pattern):
if not candidate.is_file():
continue
rel = candidate.relative_to(root)
rel_key = rel.as_posix()
if rel_key in yielded:
continue
if not include_hidden and _path_is_hidden(rel):
continue
if _is_excluded(rel_key, exclude_patterns):
continue
yielded.add(rel_key)
yield candidate
def _path_is_hidden(path: Path) -> bool:
return any(part.startswith(".") for part in path.parts)
def _is_excluded(relative_posix: str, exclude_patterns: Sequence[str]) -> bool:
for pattern in exclude_patterns:
if fnmatch.fnmatch(relative_posix, pattern):
return True
return False
def _read_file_lines_for_search(path: Path) -> List[str]:
raw, _ = _read_text_content(path, encoding="auto")
return raw.splitlines()
def _iter_line_matches(
lines: Sequence[str],
compiled_regex: Optional[re.Pattern[str]],
literal_lower: str,
original_pattern: str,
case_sensitive: bool,
use_regex: bool,
) -> Iterable[Dict[str, Any]]:
for idx, raw_line in enumerate(lines):
line_number = idx + 1
line_text = raw_line
if use_regex and compiled_regex is not None:
for match in compiled_regex.finditer(line_text):
yield {
"line_number": line_number,
"column": match.start() + 1,
"line_text": line_text,
}
else:
if not original_pattern:
continue
haystack = line_text if case_sensitive else line_text.lower()
needle = original_pattern if case_sensitive else literal_lower
start = haystack.find(needle)
while start != -1:
yield {
"line_number": line_number,
"column": start + 1,
"line_text": line_text,
}
start = haystack.find(needle, start + max(len(needle), 1))
def _slice_context(
lines: Sequence[str],
center_line: int,
span: int,
*,
before: bool,
) -> List[str]:
if span <= 0:
return []
if before:
start_line = max(center_line - span, 1)
end_line = center_line - 1
else:
start_line = center_line + 1
end_line = min(center_line + span, len(lines))
if end_line < start_line:
return []
start_idx = start_line - 1
end_idx = end_line
return list(lines[start_idx:end_idx])