feat(skill): add blocking-io-guard — SOP skill for blocking-IO triage and runtime anchors (#3503)

* feat(blocking-io): add changed-lines blocking-IO scanner (L1)

* feat(blocking-io): add scan-changed CLI wrapper

* feat(skill): add blocking-io-guard developer SOP skill

* docs(blocking-io): point contributors at the blocking-io-guard skill

* style(blocking-io): apply ruff format to scanner and tests

* docs(backend): document changed-lines blocking-IO scanner in CLAUDE.md

* feat(skill): add post-fix re-scan check and PR batching policy

* refactor(skill): fix SOP step ordering, align template with repo conventions

- Move re-scan into an explicit 'apply the fix' step (was wedged after
  anchor generation while telling you to go back before the anchor)
- Renumber steps 0-6; drop undefined 'L1' jargon
- Mode A: document that the diff is <base>...HEAD (commit first)
- Mode B: prefer make detect-blocking-io + findings JSON file
- anchor template: module-level pytestmark per tests/blocking_io convention
- CLAUDE.md: fix 'git diff --base' phrasing

* fix(skill): catch findings introduced without touching the blocking line

Review follow-up: changed-line intersection alone misses the case where a
new async caller exposes an old sync helper — the static finding sits on
the untouched blocking line, so Mode A returned empty and the SOP stopped
on a false 'no blocking-IO surface'.

Selection is now a union over the changed files:
- findings on added lines of git diff <base>...HEAD (kept: a second
  identical symbol in an already-flagged function collides on the stable
  key and only this selection sees it);
- findings new versus the merge base, matched by (path, function,
  symbol) — never line numbers.

Base sources are materialized via git show <merge-base>:<path>; files
absent at base count every head finding as new. SKILL.md now states the
residual same-file-only blind spot (cross-file async callers) instead of
treating an empty list as proof of zero exposure, and only requires
reading sop-skeleton.md when generalizing to another detector domain.

* docs(skill): examples teach test-writing, the teeth check defines the rule

All examples in the references/template are filesystem-flavored; make
explicit that they are instances, not the SOP's boundary — the same rules
apply to every detector category (FILE_IO, HTTP, SUBPROCESS, SLEEP) and
acceptance is always red/green teeth, never similarity to an example.
Neutralize the template's arrange comment accordingly.

* fix(blocking-io): harden changed-lines scanner per review

- Dedup the union selection by the stable key (path, function, symbol)
  instead of dict identity, so a future selector returning copied dicts
  cannot silently empty the result.
- parse_changed_lines now handles any unified diff: context lines advance
  the new-file counter, \-markers and deletions do not, and the counter
  resets at each +++ header. Previously correct only for --unified=0.
- Add blocking_io_static.scan_source (in-memory scan); base-version
  comparison no longer round-trips through temp files.
- Empty Mode A report now prints the same-file-only reachability caveat
  at the point of use instead of relying on the SOP text alone.

* docs(skill): bound best-effort cleanup when the offload sits in finally

Lesson from the #3505 review: the SOP routinely drives 'offload the
cleanup branch' transformations, and an awaited cleanup in finally can
mask or stall the primary exception. One sentence in Step 2 closes that
gap at the point where the fix is written.
This commit is contained in:
AochenShen99 2026-06-12 10:20:38 +08:00 committed by GitHub
parent 330a2ff8c5
commit dc2ababf00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 703 additions and 4 deletions

View File

@ -0,0 +1,141 @@
---
name: blocking-io-guard
description: Ensure async-path backend code that could block the asyncio event loop is protected by a teeth-verified runtime anchor in tests/blocking_io/. Use when changing backend Python under app/, packages/harness/deerflow/, or scripts/, when running a blocking-IO triage round over the whole repo, or when a reviewer/CI asks for blocking-IO coverage. Runs a deterministic scan (changed-lines or full-repo), routes each candidate, drafts/extends an anchor, and proves it fails when the blocking IO regresses.
---
# Blocking-IO Guard Skill
Help a contributor ship backend async changes together with the runtime anchor
that lets DeerFlow's blocking-IO CI gate actually see the new code. The dynamic
detector only catches blocking IO on paths a test executes — this skill closes
that gap, either for your own diff or for a repo-wide triage round.
Read `references/good-anchor-rules.md` before writing any anchor.
Only read `references/sop-skeleton.md` when generalizing this SOP to another
detector domain — it is not needed to execute the steps below.
## When to use
- Your change touches Python under `backend/app/`,
`backend/packages/harness/deerflow/`, or `backend/scripts/` and may run on
the async event loop (Mode A). If unsure, run Step 0 — it answers
deterministically.
- You are doing a maintenance triage round over the existing codebase
(Mode B).
## SOP (router)
### Step 0 — Scope (deterministic)
**Mode A — your own diff** (default, pre-PR). From repo root:
```bash
uv run --project backend python scripts/scan_changed_blocking_io.py --base origin/main
```
Lists blocking-IO candidates your change introduces: findings on lines the
diff added, **plus** findings that are new versus the merge base — the latter
catches a new async caller exposing an old sync helper whose blocking line is
not in the diff. The diff is `<base>...HEAD`, so **commit your work first**
uncommitted lines are not selected.
If the list is empty, this change introduces no blocking-IO surface *that the
static detector can see in the changed files*. One residual blind spot
remains: reachability is same-file only, so a new async caller of a sync
helper **defined in another file** is invisible to both selections. If your
diff adds an async call into a helper that lives elsewhere, check that helper
manually (codegraph or `git grep`) before stopping.
**Mode B — full-repo triage round.** From repo root:
```bash
make detect-blocking-io
```
Prints a summary and writes the complete structured finding list to
`.deer-flow/blocking-io-findings.json`. Work HIGH priority first; do not start
MEDIUM until every HIGH is dispositioned (fixed, guarded, or recorded
NO-ACTION).
**Batching policy (PR sizing).** One **fix unit** per PR while any HIGH
remains: a fix unit is one root cause — usually a single HIGH, but two HIGHs
resolved by the same one-place fix belong together. Once no HIGH remains,
MEDIUM/LOW may be batched (about five per round, grouped by module or by
disposition) so each PR stays reviewable. A new Blockbuster rule is never
batched with anything — it always ships alone (see Step 5).
Both modes emit the same JSON shape per finding: `priority`, `location`
(path/line/function), `blocking_call` (category/operation/symbol),
`event_loop_exposure`, `reason`, `code`. Priority is a deterministic review
ordering, not proof of a bug — Step 1 makes the actual call.
### Step 1 — Judge each candidate (router)
Read the code around each candidate and route it:
- **Already offloaded** (`asyncio.to_thread`, `run_in_executor`, async client) →
**GUARD**: add/extend an anchor that locks the offload so a future edit cannot
move it back onto the loop.
- **On the loop, not offloaded****FIX+ANCHOR**: offload the production code
(your fix), then add an anchor that guards it.
- **Not actually exposed / acceptable** (rare: scanner false positive,
startup-only code) → **NO-ACTION**: record one line of why.
- **Cross-file caveat**: the scanner's async reachability is same-file only
(`ASYNC_REACHABLE_SAME_FILE`). If the candidate is a *sync helper*, check for
async callers in other files (codegraph or `git grep`) before deciding
NO-ACTION.
### Step 2 — Apply the fix, then re-scan (FIX+ANCHOR only)
Offload the blocking call in production code, then re-run the Step 0 scan and
confirm the candidate no longer appears. If the offloaded call sits in a
`finally` / cleanup path, keep it best-effort and bounded (swallow-and-log,
`asyncio.wait_for`) so a failing or hung cleanup cannot mask the primary
exception. Match by the stable key
**(path, function, symbol)** — line numbers shift after edits, so never
compare by line.
- The finding must disappear. If it still shows, the fix did not remove the
blocking pattern (e.g. the call is still a direct call, not offloaded) —
go back before touching any test.
- GUARD / NO-ACTION routes skip this step: a residual finding there is
*expected* (the raw call still exists inside a sync helper with the offload
at the caller, or the exposure was judged acceptable).
This is pattern-level feedback in seconds; it complements but never replaces
Step 5 — only the runtime gate proves the event loop is actually protected.
### Step 3 — Check existing anchors
Look in `backend/tests/blocking_io/` for a test that drives the production async
entry point reaching this candidate's branch.
- Covers this branch already → go to Step 5 (re-verify teeth).
- Covers the entry point but not this branch (e.g. happy path covered,
cleanup/404/409 not) → **extend** that anchor.
- None → create one from `templates/anchor.template.py`.
### Step 4 — Generate / extend the anchor
Follow `references/good-anchor-rules.md`. Drive the *specific* branch (e.g. force
the create failure that hits the cleanup `shutil.rmtree`). Never bypass the
blocking surface with a test-only `asyncio.to_thread` wrapper.
### Step 5 — Verify teeth (mandatory; also the anchor-vs-rule discriminator)
1. Reintroduce the block (GUARD: temporarily revert the offload; FIX+ANCHOR: run
against the pre-fix code).
2. Run `cd backend && make test-blocking-io` (or target the one test). It **must
go RED**.
3. Restore the fix. It **must go GREEN**.
A real block that stays GREEN means Blockbuster has no rule for that
primitive — that is the **RULE** route; see `references/good-anchor-rules.md`
for the admission criteria before adding one.
### Step 6 — Deliver
Commit the anchor(s) with your change; `make test-blocking-io` green. In the PR,
note: candidates found, each disposition, the re-scan result (Step 2), and
the teeth evidence (red→green). Include the reason for any NO-ACTION. A new
Blockbuster rule, if any, goes in its own commit with the evidence from Step 5.

View File

@ -0,0 +1,65 @@
# Good anchor rules + teeth (blocking-IO fill)
Distilled from `backend/docs/BLOCKING_IO_DETECTION.md`. An anchor lives in
`backend/tests/blocking_io/`; the suite's conftest runs each test under the
strict Blockbuster gate scoped to `app.*` / `deerflow.*`.
The examples in this file and in `templates/` are all filesystem-flavored.
They demonstrate how to *write* the test, not what the SOP covers: the same
rules apply to every category the detector reports (FILE_IO, HTTP,
SUBPROCESS, SLEEP), and the acceptance criterion is always the teeth check
below — never similarity to an example.
## A good anchor
- Calls the **real production async entry point** — not a low-level helper,
unless that helper *is* the entry point production executes.
- Does **not** bypass the blocking surface with a test-only
`asyncio.to_thread` / `run_in_executor` wrapper.
- Uses **real local filesystem** inputs when the bug shape is filesystem IO.
- Mocks **only** the external dependency boundary (network service, third-party
saver), never the offload being guarded.
- Drives the **specific branch** you are protecting (error / cleanup / 404 /
409), not just the happy path.
## Teeth (the acceptance test)
An anchor only counts if the gate actually fires when the code blocks:
1. Reintroduce the block (revert the offload, or run pre-fix code).
2. `cd backend && make test-blocking-io` → the anchor **must fail** (RED).
3. Restore the fix → the anchor **must pass** (GREEN).
A green-on-happy-path anchor with no proven red is fake coverage. Don't ship it.
## The RULE route (rare; strict admission criteria)
Blockbuster's built-in rules cover the common blocking primitives well. The
two deliberate openings in this SOP are:
1. **Coverage opening** (the normal case): the rules already see the
primitive — you only need an anchor so runtime detection executes the real
business path and CI prevents regression.
2. **Rule opening** (rare): you reintroduced a *real* block and the gate
stayed GREEN — Blockbuster has no rule for that primitive.
A project rule lives in `_PROJECT_BLOCKING_RULES` inside
`backend/tests/support/detectors/blocking_io_runtime.py` and changes detection
for the **entire** blocking-IO suite — global blast radius. Admission criteria
for adding one:
- You have the **fails-to-fail anchor** as evidence: a good anchor (per the
rules above) that drives a genuinely blocking path and stays green. No
evidence, no rule.
- The primitive is a real blocking call (verified against its implementation
or docs), not a false positive of the static detector.
- The rule ships in its **own commit**, naming the primitive, the anchor that
exposed the gap, and the suite-wide impact. Run the full
`make test-blocking-io` suite after adding it — a new rule can turn other
previously-green tests red, and each such red is either a real latent bug
(fix it) or rule overreach (narrow the rule).
- If you are not in a position to own that blast radius (e.g. external
contributor), escalate to a maintainer with the evidence instead.
**Never add a runtime rule just because a path is untested** — that case needs
an anchor, not a rule.

View File

@ -0,0 +1,34 @@
# SOP skeleton (generic shape — extraction seam)
This is the domain-agnostic shape the blocking-IO skill instantiates. It exists
so a second detector/gate domain can reuse the flow without copying it. Do not
add machinery for that until a second domain actually appears (YAGNI).
A domain provides:
- a **static detector** that can scan a diff (or the whole tree) and emit
located candidates,
- a **CI gate** that fails when the bad pattern executes,
- a **test location** for guard tests,
- **good-test rules** for that gate,
- a **teeth definition** (how to make the gate fire on purpose).
Steps:
1. **Scope (deterministic):** intersect the diff's added lines with the
detector's findings → candidates this change introduced/touched. (Or, in
triage mode, take the full finding list ordered by priority.)
2. **Judge (router):** per candidate — guard existing fix / fix + guard /
no-action / rule (the gate cannot see the primitive).
3. **Fix + re-scope (fixes only):** apply the fix, re-run the detector; the
fixed candidate must vanish from the findings (match by a stable key, not
line numbers). Pattern-level feedback in seconds — complements, never
replaces, step 5.
4. **Generate:** draft or extend a guard test per the good-test rules, driving
the specific branch.
5. **Verify teeth:** make the bad pattern happen → gate must fail; restore →
gate must pass. A pattern that stays green while genuinely bad is the
"rule" signal, not a coverage success.
6. **Deliver:** commit the verified guard test; any gate-rule change ships in
its own commit with the fails-to-fail evidence attached.
To add a domain: supply a new fill doc (like `good-anchor-rules.md`) + detector,
and promote this file into a parent skill the instances point at.

View File

@ -0,0 +1,32 @@
"""Template: a tests/blocking_io/ runtime anchor.
Copy into backend/tests/blocking_io/test_<area>.py and adapt. The suite's
conftest already wraps every test here in the strict Blockbuster gate, so you do
NOT import or activate the detector just drive the real async entry point.
Teeth check before you commit (see references/good-anchor-rules.md):
1. reintroduce the block -> `cd backend && make test-blocking-io` must FAIL
2. restore the fix -> it must PASS
"""
from __future__ import annotations
from pathlib import Path
import pytest
# from app.<module> import <real_async_entry_point>
pytestmark = pytest.mark.asyncio
async def test_<entry_point>_offloads_blocking_io_on_<branch>(tmp_path: Path) -> None:
# Arrange: real inputs at the boundary the code blocks on (FS -> tmp_path;
# HTTP/subprocess -> stub the external service). Mock ONLY the external
# boundary, never the offload under test.
# Act + Assert: call the REAL production async entry point and drive the
# specific branch you are guarding (e.g. force a failure to hit the cleanup
# path). If the entry point performs blocking IO on the loop, the gate fails.
# await <real_async_entry_point>(...)
raise NotImplementedError("Replace with the real async entry point call.")

View File

@ -112,6 +112,14 @@ calls are resolved by function name, so duplicate helper names in one file can
conservatively over-report async reachability. It is intentionally
informational and is not run from CI in this round.
For a diff-scoped view of the same findings, `scripts/scan_changed_blocking_io.py`
(repo root) reports findings on the added lines of `git diff <base>...HEAD`
plus findings new versus the merge base (so a new async caller exposing an
untouched sync helper in the same file is still reported) — used by the
`blocking-io-guard` skill (`.agent/skills/blocking-io-guard/`) as the
deterministic scope step before routing each candidate to a fix and/or a
`tests/blocking_io/` runtime anchor.
Regression tests related to Docker/provisioner behavior:
- `tests/test_docker_sandbox_mode_detection.py` (mode detection from `config.yaml`)
- `tests/test_provisioner_kubeconfig.py` (kubeconfig file/directory handling)

View File

@ -67,6 +67,11 @@ The normal workflow is:
3. Add or update a focused runtime anchor in `backend/tests/blocking_io/`.
4. Let CI prevent that path from regressing.
Contributors changing backend async code can run the `blocking-io-guard` skill
(`.agent/skills/blocking-io-guard/`) to execute steps 13 for their own diff: it
scans the change for blocking-IO candidates, drafts or extends a runtime anchor,
and verifies the anchor fails when the blocking IO regresses.
Runtime detection has two maintenance paths.
### Add a runtime rule

View File

@ -0,0 +1,213 @@
"""Intersect a git diff with static blocking-IO findings.
Wraps the static detector (`blocking_io_static`) to answer a narrower question:
which blocking-IO candidates does THIS change introduce? A candidate qualifies
when its blocking line is on an added line of the diff, or when the finding is
new versus the merge base the latter catches exposure created without
touching the blocking line itself (a new async caller making an old sync
helper async-reachable). Used by the `blocking-io-guard` skill as the
deterministic scope step.
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from collections import defaultdict
from collections.abc import Sequence
from pathlib import Path
from support.detectors import blocking_io_static as static
REPO_ROOT = Path(__file__).resolve().parents[4]
SCAN_ROOTS = (
"backend/app",
"backend/packages/harness/deerflow",
"backend/scripts",
)
_HUNK_RE = re.compile(r"^@@ -\d+(?:,\d+)? \+(\d+)(?:,\d+)? @@")
def parse_changed_lines(diff_text: str) -> dict[str, set[int]]:
"""Map repo-relative path -> set of added line numbers in the new file.
Accepts any unified diff (with or without `--unified=0`): context lines
advance the new-file counter, deletions (`-`) and `\\ No newline` markers
do not. Records only added lines (`+`, not the `+++` header), numbered
from each hunk's new-file start line; deleted files (`+++ /dev/null`) are
skipped.
"""
changed: dict[str, set[int]] = defaultdict(set)
current_path: str | None = None
next_line = 0
for raw in diff_text.splitlines():
if raw.startswith("+++ "):
target = raw[4:].strip()
if target == "/dev/null":
current_path = None
else:
current_path = target[2:] if target.startswith("b/") else target
next_line = 0
continue
match = _HUNK_RE.match(raw)
if match:
next_line = int(match.group(1))
continue
if not current_path:
continue
if raw.startswith("+"):
changed[current_path].add(next_line)
next_line += 1
elif raw.startswith(" ") or raw == "":
next_line += 1
return dict(changed)
def changed_python_lines(base: str, repo_root: Path = REPO_ROOT) -> dict[str, set[int]]:
"""Diff `base...HEAD` over scan roots and return added .py lines."""
cmd = [
"git",
"-C",
str(repo_root),
"diff",
"--unified=0",
"--no-color",
f"{base}...HEAD",
"--",
*SCAN_ROOTS,
]
diff_text = subprocess.run(cmd, capture_output=True, text=True, check=True).stdout
return {path: lines for path, lines in parse_changed_lines(diff_text).items() if path.endswith(".py")}
def select_findings_on_changed_lines(
findings: Sequence[dict[str, object]],
changed_lines: dict[str, set[int]],
) -> list[dict[str, object]]:
"""Keep findings whose (path, line) falls on a changed line."""
selected: list[dict[str, object]] = []
for finding in findings:
location = finding["location"] # type: ignore[index]
path = location["path"] # type: ignore[index]
line = location["line"] # type: ignore[index]
if line in changed_lines.get(path, set()):
selected.append(finding)
return selected
def base_python_contents(base: str, paths: Sequence[str], repo_root: Path = REPO_ROOT) -> dict[str, str]:
"""Return each path's content at the merge base of `base` and HEAD.
Files absent at the merge base (newly added) are omitted, so every head
finding in them counts as new.
"""
merge_base = subprocess.run(
["git", "-C", str(repo_root), "merge-base", base, "HEAD"],
capture_output=True,
text=True,
check=True,
).stdout.strip()
contents: dict[str, str] = {}
for path in paths:
shown = subprocess.run(
["git", "-C", str(repo_root), "show", f"{merge_base}:{path}"],
capture_output=True,
text=True,
)
if shown.returncode == 0:
contents[path] = shown.stdout
return contents
def scan_python_contents(contents: dict[str, str]) -> list[dict[str, object]]:
"""Run the static detector over in-memory sources (repo-relative path -> code)."""
findings: list[dict[str, object]] = []
for rel_path in sorted(contents):
findings.extend(finding.to_dict() for finding in static.scan_source(contents[rel_path], rel_path))
return findings
def _stable_key(finding: dict[str, object]) -> tuple[str, str, str]:
location = finding["location"] # type: ignore[index]
call = finding["blocking_call"] # type: ignore[index]
return (location["path"], location["function"], call["symbol"]) # type: ignore[index]
def select_findings_new_vs_base(
head_findings: Sequence[dict[str, object]],
base_findings: Sequence[dict[str, object]],
) -> list[dict[str, object]]:
"""Keep head findings whose stable key (path, function, symbol) is absent at base.
Line numbers shift between revisions, so matching is by stable key only.
A second identical symbol added inside a function that already had a
finding collides on the key and is NOT reported here that case is
covered by the changed-line selection instead.
"""
base_keys = {_stable_key(finding) for finding in base_findings}
return [finding for finding in head_findings if _stable_key(finding) not in base_keys]
def find_changed_blocking_io(base: str, repo_root: Path = REPO_ROOT) -> list[dict[str, object]]:
"""Return static findings this change introduces or touches.
Union over the changed files of:
- findings whose blocking line is on an added line of the diff;
- findings new versus the merge base (a new async caller can expose an
untouched sync helper the blocking line itself is not in the diff).
"""
changed_lines = changed_python_lines(base, repo_root)
if not changed_lines:
return []
files = [repo_root / path for path in changed_lines]
head_findings = [finding.to_dict() for finding in static.scan_paths(files, repo_root=repo_root)]
on_changed_lines = select_findings_on_changed_lines(head_findings, changed_lines)
base_findings = scan_python_contents(base_python_contents(base, sorted(changed_lines), repo_root))
new_vs_base = select_findings_new_vs_base(head_findings, base_findings)
selected_keys = {_stable_key(finding) for finding in (*on_changed_lines, *new_vs_base)}
return [finding for finding in head_findings if _stable_key(finding) in selected_keys]
def format_report(findings: Sequence[dict[str, object]], base: str) -> str:
if not findings:
return (
f"No blocking-IO candidates introduced by this change (base: {base}).\n"
"Note: async reachability is resolved within each file only. If this change\n"
"adds an async call into a sync helper defined in another file, check that\n"
"helper manually (codegraph or git grep) before relying on this empty result."
)
lines = [
f"Blocking-IO candidates introduced/touched by this change (base: {base}): {len(findings)}",
"",
]
order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
for finding in sorted(findings, key=lambda f: order.get(str(f["priority"]), 9)):
location = finding["location"] # type: ignore[index]
call = finding["blocking_call"] # type: ignore[index]
lines.append(f"{finding['priority']} {call['category']}/{call['operation']} {location['path']}:{location['line']} in {location['function']} exposure={finding['event_loop_exposure']}")
lines.append(f" symbol: {call['symbol']}")
if finding.get("code"):
lines.append(f" code: {finding['code']}")
return "\n".join(lines)
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="List blocking-IO candidates this change introduces: findings on added lines plus findings new versus the merge base (diff against --base).")
parser.add_argument("--base", default="origin/main", help="Base ref to diff against (default: origin/main).")
parser.add_argument("--format", choices=("text", "json"), default="text", help="Output format.")
args = parser.parse_args(argv)
findings = find_changed_blocking_io(args.base)
if args.format == "json":
print(json.dumps(findings, indent=2))
else:
print(format_report(findings, args.base))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -717,12 +717,11 @@ def _finalize_findings(visitor: BlockingIOStaticVisitor) -> list[BlockingIOStati
return findings
def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]:
source = path.read_text(encoding="utf-8")
def scan_source(source: str, relative_path: str) -> list[BlockingIOStaticFinding]:
"""Scan one in-memory Python source; `relative_path` is reported verbatim in findings."""
source_lines = source.splitlines()
relative_path = relative_to_repo(path, repo_root)
try:
tree = ast.parse(source, filename=str(path))
tree = ast.parse(source, filename=relative_path)
except SyntaxError as exc:
line = exc.lineno or 0
code = _source_snippet(source_lines, line)
@ -746,6 +745,10 @@ def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStat
return sorted(_finalize_findings(visitor), key=lambda finding: (finding.path, finding.line, finding.column, finding.category))
def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]:
return scan_source(path.read_text(encoding="utf-8"), relative_to_repo(path, repo_root))
def is_ignored_path(path: Path) -> bool:
return any(part in IGNORED_DIR_NAMES for part in path.parts)

View File

@ -0,0 +1,175 @@
from __future__ import annotations
import textwrap
from pathlib import Path
from support.detectors import blocking_io_changed as changed
from support.detectors import blocking_io_static as static
def _write_python(path: Path, source: str) -> Path:
path.write_text(textwrap.dedent(source).strip() + "\n", encoding="utf-8")
return path
_CLEANUP_BRANCH_SOURCE = """
import shutil
from pathlib import Path
async def create_agent(path: Path) -> None:
path.mkdir()
try:
await _save(path)
except Exception:
shutil.rmtree(path)
raise
"""
def test_parse_changed_lines_records_added_lines_only() -> None:
diff = textwrap.dedent(
"""\
diff --git a/backend/app/x.py b/backend/app/x.py
--- a/backend/app/x.py
+++ b/backend/app/x.py
@@ -10,0 +11,2 @@ def f():
+ a = 1
+ b = 2
@@ -20 +22,0 @@ def g():
- gone = 1
"""
)
assert changed.parse_changed_lines(diff) == {"backend/app/x.py": {11, 12}}
def test_parse_changed_lines_handles_context_diffs() -> None:
diff = textwrap.dedent(
"""\
diff --git a/backend/app/x.py b/backend/app/x.py
--- a/backend/app/x.py
+++ b/backend/app/x.py
@@ -8,7 +8,8 @@ def f():
ctx1
ctx2
- removed
+ added_one
ctx3
+ added_two
ctx4
\\ No newline at end of file
"""
)
assert changed.parse_changed_lines(diff) == {"backend/app/x.py": {10, 12}}
def test_parse_changed_lines_ignores_deleted_files() -> None:
diff = textwrap.dedent(
"""\
diff --git a/x.py b/x.py
+++ /dev/null
@@ -1,2 +0,0 @@
-gone
"""
)
assert changed.parse_changed_lines(diff) == {}
def test_select_findings_keeps_only_touched_candidates(tmp_path: Path) -> None:
src = _write_python(tmp_path / "agents.py", _CLEANUP_BRANCH_SOURCE)
findings = [f.to_dict() for f in static.scan_file(src, repo_root=tmp_path)]
rmtree = next(f for f in findings if f["blocking_call"]["symbol"] == "shutil.rmtree")
other = next(f for f in findings if f["blocking_call"]["symbol"] != "shutil.rmtree")
changed_lines = {"agents.py": {rmtree["location"]["line"]}}
selected = changed.select_findings_on_changed_lines(findings, changed_lines)
assert [f["blocking_call"]["symbol"] for f in selected] == ["shutil.rmtree"]
assert other not in selected
def test_find_changed_blocking_io_surfaces_only_changed_candidate(tmp_path: Path, monkeypatch) -> None:
src = _write_python(tmp_path / "agents.py", _CLEANUP_BRANCH_SOURCE)
all_findings = [f.to_dict() for f in static.scan_file(src, repo_root=tmp_path)]
rmtree_line = next(f["location"]["line"] for f in all_findings if f["blocking_call"]["symbol"] == "shutil.rmtree")
# Stub only the git boundary; the static scan runs for real against tmp_path.
monkeypatch.setattr(
changed,
"changed_python_lines",
lambda base, repo_root: {"agents.py": {rmtree_line}},
)
# Base content identical to head: every finding already existed, so only
# the changed-line selection contributes (and the union must not double).
monkeypatch.setattr(
changed,
"base_python_contents",
lambda base, paths, repo_root: {"agents.py": src.read_text(encoding="utf-8")},
)
result = changed.find_changed_blocking_io("origin/main", repo_root=tmp_path)
assert [f["blocking_call"]["symbol"] for f in result] == ["shutil.rmtree"]
_SYNC_HELPER_BASE = """
from pathlib import Path
def load(path: Path) -> str:
return path.read_text()
"""
_SYNC_HELPER_HEAD = """
from pathlib import Path
def load(path: Path) -> str:
return path.read_text()
async def route(path: Path) -> str:
return load(path)
"""
def test_new_async_caller_exposing_old_sync_helper_is_reported(tmp_path: Path, monkeypatch) -> None:
"""The blocking line is NOT in the diff — only the new async caller is.
The finding sits on the untouched `read_text` line, so changed-line
selection alone would return empty; the new-vs-base comparison must
surface it.
"""
src = _write_python(tmp_path / "mod.py", _SYNC_HELPER_HEAD)
head_findings = [f.to_dict() for f in static.scan_file(src, repo_root=tmp_path)]
read_text_line = next(f["location"]["line"] for f in head_findings if f["blocking_call"]["symbol"] == "path.read_text")
added_lines = {line for line in range(1, len(src.read_text().splitlines()) + 1) if line > read_text_line}
monkeypatch.setattr(changed, "changed_python_lines", lambda base, repo_root: {"mod.py": added_lines})
monkeypatch.setattr(
changed,
"base_python_contents",
lambda base, paths, repo_root: {"mod.py": textwrap.dedent(_SYNC_HELPER_BASE).strip() + "\n"},
)
result = changed.find_changed_blocking_io("origin/main", repo_root=tmp_path)
assert len(result) == 1
assert result[0]["blocking_call"]["symbol"] == "path.read_text"
assert result[0]["event_loop_exposure"] == "ASYNC_REACHABLE_SAME_FILE"
def test_select_findings_new_vs_base_matches_by_stable_key(tmp_path: Path) -> None:
head = _write_python(tmp_path / "mod.py", _SYNC_HELPER_HEAD)
head_findings = [f.to_dict() for f in static.scan_file(head, repo_root=tmp_path)]
base_findings = changed.scan_python_contents({"mod.py": textwrap.dedent(_SYNC_HELPER_BASE).strip() + "\n"})
assert base_findings == [] # no async exposure at base -> detector is silent
new = changed.select_findings_new_vs_base(head_findings, base_findings)
assert [f["blocking_call"]["symbol"] for f in new] == ["path.read_text"]
# Same content at base and head -> nothing is new, regardless of line drift.
assert changed.select_findings_new_vs_base(head_findings, head_findings) == []
def test_format_report_empty_warns_about_cross_file_blind_spot() -> None:
report = changed.format_report([], base="origin/main")
assert "No blocking-IO candidates" in report
assert "defined in another file" in report

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""CLI wrapper for the changed-lines blocking IO scanner."""
from __future__ import annotations
import sys
from collections.abc import Sequence
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
TEST_SUPPORT_PATH = REPO_ROOT / "backend" / "tests"
if str(TEST_SUPPORT_PATH) not in sys.path:
sys.path.insert(0, str(TEST_SUPPORT_PATH))
def main(argv: Sequence[str] | None = None) -> int:
from support.detectors.blocking_io_changed import main as scanner_main
return scanner_main(argv)
if __name__ == "__main__":
sys.exit(main())