deer-flow/scripts/sandbox_memory_profile.py
Admire 0d0968a364
chore: add sandbox memory profiling tools (#3249)
* chore: add sandbox memory profiling tools

* chore: keep sandbox memory PR profiling-only

* Format sandbox memory profiling script
2026-06-03 22:02:27 +08:00

519 lines
16 KiB
Python

#!/usr/bin/env python3
"""Collect Kubernetes sandbox pod memory snapshots for DeerFlow.
This script is intentionally lightweight: it shells out to ``kubectl`` and
emits either JSON or Markdown so maintainers can compare sandbox backends and
workloads without adding runtime dependencies.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
from typing import Any
DEFAULT_NAMESPACE = "deer-flow"
DEFAULT_SELECTOR = "app=deer-flow-sandbox"
DEFAULT_KUBECTL_TIMEOUT = 30
@dataclass(frozen=True)
class TopPod:
name: str
cpu_raw: str
memory_raw: str
cpu_millicores: int | None
memory_bytes: int | None
@dataclass(frozen=True)
class ProcessSample:
pid: int
ppid: int | None
rss_kib: int
command: str
@dataclass(frozen=True)
class ProcessSampleResult:
samples: dict[str, list[ProcessSample]]
errors: dict[str, str]
def parse_cpu_millicores(value: str) -> int | None:
value = value.strip()
if not value:
return None
if value.endswith("m"):
number = value[:-1]
return int(number) if number.isdigit() else None
if value.isdigit():
return int(value) * 1000
return None
def parse_memory_bytes(value: str) -> int | None:
value = value.strip()
if not value:
return None
suffixes = {
"Ki": 1024,
"Mi": 1024**2,
"Gi": 1024**3,
"Ti": 1024**4,
"K": 1000,
"M": 1000**2,
"G": 1000**3,
"T": 1000**4,
}
for suffix, multiplier in suffixes.items():
if value.endswith(suffix):
number = value[: -len(suffix)]
try:
return int(Decimal(number) * multiplier)
except InvalidOperation:
return None
try:
return int(value)
except ValueError:
return None
def format_mib(value: int | None) -> str:
if value is None:
return "-"
return f"{value / 1024 / 1024:.1f} MiB"
def run_kubectl(
args: list[str], *, kubectl: str, timeout: int = DEFAULT_KUBECTL_TIMEOUT
) -> str:
completed = subprocess.run(
[kubectl, *args],
check=True,
capture_output=True,
text=True,
timeout=timeout,
)
return completed.stdout
def parse_top_pods(output: str) -> list[TopPod]:
pods: list[TopPod] = []
for raw_line in output.splitlines():
line = raw_line.strip()
if not line:
continue
parts = line.split()
if parts and parts[0].upper() == "NAME":
continue
if len(parts) < 3:
continue
name, cpu_raw, memory_raw = parts[:3]
pods.append(
TopPod(
name=name,
cpu_raw=cpu_raw,
memory_raw=memory_raw,
cpu_millicores=parse_cpu_millicores(cpu_raw),
memory_bytes=parse_memory_bytes(memory_raw),
)
)
return pods
def parse_processes(output: str, *, limit: int) -> list[ProcessSample]:
if limit < 1:
raise ValueError("process limit must be greater than 0")
processes: list[ProcessSample] = []
for raw_line in output.splitlines():
line = raw_line.strip()
if not line:
continue
parts = line.split(maxsplit=3)
if parts and parts[0].upper() == "PID":
continue
if len(parts) < 4:
continue
pid_raw, ppid_raw, rss_raw, command = parts
try:
pid = int(pid_raw)
rss_kib = int(rss_raw)
except ValueError:
continue
try:
ppid = int(ppid_raw)
except ValueError:
ppid = None
processes.append(
ProcessSample(pid=pid, ppid=ppid, rss_kib=rss_kib, command=command)
)
processes.sort(key=lambda process: process.rss_kib, reverse=True)
return processes[:limit]
def _container_resources(pod: dict[str, Any]) -> dict[str, Any]:
resources: dict[str, Any] = {}
for container in pod.get("spec", {}).get("containers", []):
name = container.get("name", "")
if not name:
continue
resources[name] = {
"image": container.get("image", ""),
"requests": container.get("resources", {}).get("requests", {}),
"limits": container.get("resources", {}).get("limits", {}),
}
return resources
def merge_pod_data(
top_pods: list[TopPod], pod_json: dict[str, Any]
) -> list[dict[str, Any]]:
pod_items = pod_json.get("items", []) if isinstance(pod_json, dict) else []
metadata_by_name = {
pod.get("metadata", {}).get("name"): pod
for pod in pod_items
if pod.get("metadata", {}).get("name")
}
rows: list[dict[str, Any]] = []
for top in top_pods:
pod = metadata_by_name.get(top.name, {})
metadata = pod.get("metadata", {})
status = pod.get("status", {})
rows.append(
{
"name": top.name,
"cpu": {
"raw": top.cpu_raw,
"millicores": top.cpu_millicores,
},
"memory": {
"raw": top.memory_raw,
"bytes": top.memory_bytes,
"mib": None
if top.memory_bytes is None
else round(top.memory_bytes / 1024 / 1024, 2),
},
"phase": status.get("phase", ""),
"start_time": status.get("startTime", ""),
"labels": metadata.get("labels", {}),
"containers": _container_resources(pod),
"processes": [],
}
)
return rows
def attach_process_samples(
pods: list[dict[str, Any]],
process_samples: dict[str, list[ProcessSample]],
) -> list[dict[str, Any]]:
for pod in pods:
samples = process_samples.get(pod["name"], [])
pod["processes"] = [
{
"pid": sample.pid,
"ppid": sample.ppid,
"rss_kib": sample.rss_kib,
"rss_mib": round(sample.rss_kib / 1024, 2),
"command": sample.command,
}
for sample in samples
]
return pods
def build_report(
*,
namespace: str,
selector: str,
sample: str,
top_pods: list[TopPod],
pod_json: dict[str, Any],
process_samples: dict[str, list[ProcessSample]] | None = None,
process_errors: dict[str, str] | None = None,
) -> dict[str, Any]:
pods = merge_pod_data(top_pods, pod_json)
if process_samples:
pods = attach_process_samples(pods, process_samples)
memory_values = [
pod["memory"]["bytes"] for pod in pods if pod["memory"]["bytes"] is not None
]
cpu_values = [
pod["cpu"]["millicores"] for pod in pods if pod["cpu"]["millicores"] is not None
]
unparsed_memory_count = len(pods) - len(memory_values)
unparsed_cpu_count = len(pods) - len(cpu_values)
return {
"schema_version": 1,
"captured_at": datetime.now(timezone.utc).isoformat(), # noqa: UP017 - keep Python 3.10 compatibility.
"namespace": namespace,
"selector": selector,
"sample": sample,
"summary": {
"pod_count": len(pods),
"parsed_memory_count": len(memory_values),
"unparsed_memory_count": unparsed_memory_count,
"total_memory_bytes": sum(memory_values),
"total_memory_mib": round(sum(memory_values) / 1024 / 1024, 2),
"average_memory_mib": round(
(sum(memory_values) / len(memory_values)) / 1024 / 1024, 2
)
if memory_values
else None,
"max_memory_mib": round(max(memory_values) / 1024 / 1024, 2)
if memory_values
else None,
"parsed_cpu_count": len(cpu_values),
"unparsed_cpu_count": unparsed_cpu_count,
"total_cpu_millicores": sum(cpu_values),
"pods_with_process_samples": sum(1 for pod in pods if pod["processes"]),
"pods_with_process_sample_errors": len(process_errors or {}),
},
"pods": pods,
"process_errors": process_errors or {},
"notes": [
"kubectl top reports Kubernetes/container working set memory, not exclusive RSS/PSS.",
"Process RSS samples are collected with ps inside the sandbox container and do not include all cgroup memory such as page cache.",
"Compare multiple samples: empty sandbox, after bash, after Python/Node, after artifact generation, and warm reuse.",
"Use identical workloads when comparing AIO with another sandbox backend.",
],
}
def render_markdown(report: dict[str, Any]) -> str:
summary = report["summary"]
lines = [
"# DeerFlow Sandbox Memory Profile",
"",
f"- Captured at: `{report['captured_at']}`",
f"- Namespace: `{report['namespace']}`",
f"- Selector: `{report['selector']}`",
f"- Sample: `{report['sample']}`",
f"- Pods: `{summary['pod_count']}`",
f"- Parsed memory samples: `{summary['parsed_memory_count']}`",
f"- Unparsed memory samples: `{summary['unparsed_memory_count']}`",
f"- Total memory: `{format_mib(summary['total_memory_bytes'])}`",
f"- Average memory: `{summary['average_memory_mib']} MiB`"
if summary["average_memory_mib"] is not None
else "- Average memory: `-`",
f"- Max memory: `{summary['max_memory_mib']} MiB`"
if summary["max_memory_mib"] is not None
else "- Max memory: `-`",
f"- Total CPU: `{summary['total_cpu_millicores']}m`",
f"- Pods with process samples: `{summary['pods_with_process_samples']}`",
f"- Pods with process sample errors: `{summary['pods_with_process_sample_errors']}`",
"",
"| Pod | Phase | CPU | Memory | Start Time |",
"| --- | --- | ---: | ---: | --- |",
]
for pod in report["pods"]:
lines.append(
"| {name} | {phase} | {cpu} | {memory} | {start_time} |".format(
name=pod["name"],
phase=pod["phase"] or "-",
cpu=pod["cpu"]["raw"],
memory=pod["memory"]["raw"],
start_time=pod["start_time"] or "-",
)
)
sampled_pods = [pod for pod in report["pods"] if pod["processes"]]
if sampled_pods:
lines.extend(["", "## Top Processes"])
for pod in sampled_pods:
lines.extend(
[
"",
f"### {pod['name']}",
"",
"| PID | PPID | RSS | Command |",
"| ---: | ---: | ---: | --- |",
]
)
for process in pod["processes"]:
lines.append(
"| {pid} | {ppid} | {rss} | `{command}` |".format(
pid=process["pid"],
ppid=process["ppid"] if process["ppid"] is not None else "-",
rss=format_mib(process["rss_kib"] * 1024),
command=str(process["command"])
.replace("`", "'")
.replace("|", "\\|"),
)
)
if report["process_errors"]:
lines.extend(["", "## Process Sample Errors"])
for pod_name, error in sorted(report["process_errors"].items()):
lines.append(f"- `{pod_name}`: {error}")
lines.extend(["", "## Notes"])
lines.extend(f"- {note}" for note in report["notes"])
lines.append("")
return "\n".join(lines)
def collect_process_samples(
top_pods: list[TopPod],
*,
namespace: str,
kubectl: str,
limit: int,
kubectl_timeout: int = DEFAULT_KUBECTL_TIMEOUT,
) -> ProcessSampleResult:
samples: dict[str, list[ProcessSample]] = {}
errors: dict[str, str] = {}
command = (
"ps -eo pid,ppid,rss,args --sort=-rss 2>/dev/null || ps -eo pid,ppid,rss,args"
)
for pod in top_pods:
try:
output = run_kubectl(
["exec", "-n", namespace, pod.name, "--", "sh", "-c", command],
kubectl=kubectl,
timeout=kubectl_timeout,
)
except subprocess.CalledProcessError as exc:
errors[pod.name] = (exc.stderr or str(exc)).strip()
continue
except subprocess.TimeoutExpired as exc:
errors[pod.name] = f"kubectl exec timed out after {exc.timeout} seconds"
continue
samples[pod.name] = parse_processes(output, limit=limit)
return ProcessSampleResult(samples=samples, errors=errors)
def collect(
namespace: str,
selector: str,
sample: str,
kubectl: str,
*,
include_processes: bool = False,
process_limit: int = 10,
kubectl_timeout: int = DEFAULT_KUBECTL_TIMEOUT,
) -> dict[str, Any]:
if process_limit < 1:
raise ValueError("--process-limit must be greater than 0")
if kubectl_timeout < 1:
raise ValueError("--kubectl-timeout must be greater than 0")
top_output = run_kubectl(
["top", "pod", "-n", namespace, "-l", selector, "--no-headers"],
kubectl=kubectl,
timeout=kubectl_timeout,
)
top_pods = parse_top_pods(top_output)
pods_output = run_kubectl(
["get", "pods", "-n", namespace, "-l", selector, "-o", "json"],
kubectl=kubectl,
timeout=kubectl_timeout,
)
process_result = None
if include_processes:
process_result = collect_process_samples(
top_pods,
namespace=namespace,
kubectl=kubectl,
limit=process_limit,
kubectl_timeout=kubectl_timeout,
)
return build_report(
namespace=namespace,
selector=selector,
sample=sample,
top_pods=top_pods,
pod_json=json.loads(pods_output),
process_samples=process_result.samples if process_result else None,
process_errors=process_result.errors if process_result else None,
)
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--namespace",
default=DEFAULT_NAMESPACE,
help=f"Kubernetes namespace (default: {DEFAULT_NAMESPACE})",
)
parser.add_argument(
"--selector",
default=DEFAULT_SELECTOR,
help=f"Pod label selector (default: {DEFAULT_SELECTOR})",
)
parser.add_argument(
"--sample",
default="unspecified",
help="Human-readable sample label, such as empty, after-bash, after-python",
)
parser.add_argument("--kubectl", default="kubectl", help="kubectl executable path")
parser.add_argument(
"--format",
choices=("json", "markdown"),
default="markdown",
help="Output format",
)
parser.add_argument(
"--include-processes",
action="store_true",
help="Run kubectl exec ps in each sandbox pod and include top process RSS samples",
)
parser.add_argument(
"--process-limit",
type=int,
default=10,
help="Maximum processes to include per pod when --include-processes is set",
)
parser.add_argument(
"--kubectl-timeout",
type=int,
default=DEFAULT_KUBECTL_TIMEOUT,
help=f"Timeout in seconds for each kubectl call (default: {DEFAULT_KUBECTL_TIMEOUT})",
)
return parser.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = parse_args(list(sys.argv[1:] if argv is None else argv))
try:
report = collect(
namespace=args.namespace,
selector=args.selector,
sample=args.sample,
kubectl=args.kubectl,
include_processes=args.include_processes,
process_limit=args.process_limit,
kubectl_timeout=args.kubectl_timeout,
)
except subprocess.CalledProcessError as exc:
print(exc.stderr or str(exc), file=sys.stderr)
return exc.returncode or 1
except subprocess.TimeoutExpired as exc:
print(f"kubectl timed out after {exc.timeout} seconds", file=sys.stderr)
return 1
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 2
if args.format == "json":
print(json.dumps(report, indent=2, sort_keys=True))
else:
print(render_markdown(report))
return 0
if __name__ == "__main__":
raise SystemExit(main())