mirror of
https://github.com/penpot/penpot.git
synced 2026-05-16 05:23:39 +00:00
371 lines
12 KiB
Python
Executable File
371 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
gh.py — Multi-purpose CLI helper for penpot/penpot GitHub operations.
|
|
|
|
Uses GitHub GraphQL and REST APIs via the authenticated ``gh`` CLI.
|
|
|
|
Subcommands:
|
|
issues List issues in a milestone
|
|
prs Fetch details for one or more PRs
|
|
|
|
Usage:
|
|
python3 tools/gh.py issues <milestone-title> (default: state=closed)
|
|
python3 tools/gh.py issues "2.16.0" --state all
|
|
python3 tools/gh.py issues "2.16.0" --exclude "release blocker,no changelog"
|
|
python3 tools/gh.py issues "2.16.0" --compare CHANGES.md
|
|
python3 tools/gh.py prs 9179 9204 9311
|
|
python3 tools/gh.py prs --file prs.txt
|
|
cat prs.txt | python3 tools/gh.py prs --stdin
|
|
|
|
Prerequisites:
|
|
- gh CLI authenticated (gh auth status)
|
|
- Python 3.8+
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from typing import Any
|
|
|
|
|
|
REPO = "penpot/penpot"
|
|
OWNER = "penpot"
|
|
REPO_NAME = "penpot"
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Shared helpers
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def run_gh(method: str, endpoint: str, **kwargs: Any) -> Any:
|
|
"""Run a ``gh api`` call and return parsed JSON."""
|
|
cmd = ["gh", "api", endpoint, "--method", method]
|
|
for key, val in kwargs.items():
|
|
if val is not None:
|
|
cmd.extend(["-f", f"{key}={val}"])
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"gh error: {result.stderr}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
def run_gh_graphql(query: str, variables: dict) -> Any:
|
|
"""Run a GraphQL query via ``gh api graphql --input -``."""
|
|
payload = json.dumps({"query": query, "variables": variables})
|
|
cmd = ["gh", "api", "graphql", "--input", "-"]
|
|
result = subprocess.run(cmd, input=payload, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print(f"gh error: {result.stderr}", file=sys.stderr)
|
|
sys.exit(1)
|
|
body = json.loads(result.stdout)
|
|
if "errors" in body:
|
|
for err in body["errors"]:
|
|
print(f"GraphQL error: {err.get('message')}", file=sys.stderr)
|
|
sys.exit(1)
|
|
return body["data"]
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Subcommand: issues
|
|
# ─────────────────────────────────────────────
|
|
|
|
GQL_ISSUES_QUERY = """\
|
|
query($owner: String!, $repo: String!, $milestone: Int!, $cursor: String) {
|
|
repository(owner: $owner, name: $repo) {
|
|
milestone(number: $milestone) {
|
|
issues(first: 100, after: $cursor, states: __STATES__) {
|
|
totalCount
|
|
pageInfo { hasNextPage endCursor }
|
|
nodes {
|
|
... on Issue {
|
|
number
|
|
title
|
|
state
|
|
labels(first: 20) { nodes { name } }
|
|
closedByPullRequestsReferences(first: 5) { nodes { number } }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def find_milestone(title: str) -> dict:
|
|
"""Look up milestone by title, return {number, title, open_issues, closed_issues}."""
|
|
data = run_gh("GET", f"repos/{OWNER}/{REPO_NAME}/milestones?per_page=100&state=all")
|
|
for ms in data:
|
|
if ms["title"] == title:
|
|
return {
|
|
"number": ms["number"],
|
|
"title": ms["title"],
|
|
"open_issues": ms["open_issues"],
|
|
"closed_issues": ms["closed_issues"],
|
|
}
|
|
print(f"ERROR: Milestone \"{title}\" not found in {REPO}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def fetch_milestone_issues(milestone_num: int, states: str) -> list[dict]:
|
|
"""
|
|
Fetch all issues in a milestone via paginated GraphQL.
|
|
|
|
Args:
|
|
milestone_num: milestone number
|
|
states: GraphQL states enum array literal, e.g. ``"[CLOSED]"`` or ``"[OPEN CLOSED]"``
|
|
|
|
Returns:
|
|
List of {number, title, state, labels: [str], closing_prs: [int]}
|
|
"""
|
|
query = GQL_ISSUES_QUERY.replace("__STATES__", states)
|
|
all_nodes: list[dict] = []
|
|
cursor: str | None = None
|
|
|
|
while True:
|
|
variables: dict[str, Any] = {
|
|
"owner": OWNER,
|
|
"repo": REPO_NAME,
|
|
"milestone": milestone_num,
|
|
"cursor": cursor,
|
|
}
|
|
data = run_gh_graphql(query, variables)
|
|
issues = data["repository"]["milestone"]["issues"]
|
|
page_info = issues["pageInfo"]
|
|
|
|
for node in issues["nodes"]:
|
|
if node is None:
|
|
continue
|
|
all_nodes.append({
|
|
"number": node["number"],
|
|
"title": node["title"],
|
|
"state": node["state"],
|
|
"labels": [lbl["name"] for lbl in node["labels"]["nodes"]],
|
|
"closing_prs": [pr["number"] for pr in node["closedByPullRequestsReferences"]["nodes"]],
|
|
})
|
|
|
|
total = len(all_nodes)
|
|
print(f" ... fetched {total} issues so far", file=sys.stderr)
|
|
|
|
if not page_info["hasNextPage"]:
|
|
break
|
|
cursor = page_info["endCursor"]
|
|
|
|
return all_nodes
|
|
|
|
|
|
def load_existing_issue_numbers(filepath: str) -> set[int]:
|
|
"""Parse all ``#NNNN`` references from a file (e.g. CHANGES.md)."""
|
|
pattern = re.compile(r"#(\d{3,5})\b")
|
|
nums: set[int] = set()
|
|
with open(filepath) as f:
|
|
for line in f:
|
|
for m in pattern.finditer(line):
|
|
nums.add(int(m.group(1)))
|
|
return nums
|
|
|
|
|
|
def cmd_issues(args: argparse.Namespace) -> None:
|
|
"""Handle the ``issues`` subcommand."""
|
|
|
|
# Resolve milestone
|
|
print(f"Looking up milestone \"{args.milestone}\"...", file=sys.stderr)
|
|
ms = find_milestone(args.milestone)
|
|
print(f"Milestone #{ms['number']}: {ms['open_issues']} open, {ms['closed_issues']} closed",
|
|
file=sys.stderr)
|
|
|
|
# Map state to GraphQL enum array literal
|
|
state_map = {"open": "[OPEN]", "closed": "[CLOSED]", "all": "[OPEN CLOSED]"}
|
|
gql_states = state_map[args.state]
|
|
|
|
# Fetch issues
|
|
print(f"Fetching {args.state} issues via GraphQL...", file=sys.stderr)
|
|
issues = fetch_milestone_issues(ms["number"], gql_states)
|
|
print(f"Fetched {len(issues)} issues total", file=sys.stderr)
|
|
|
|
# Filter by excluded labels
|
|
if args.exclude:
|
|
exclusions = set(label.strip() for label in args.exclude.split(","))
|
|
filtered = [issue for issue in issues
|
|
if not any(lbl in exclusions for lbl in issue["labels"])]
|
|
print(f"After excluding labels: {len(filtered)} issues", file=sys.stderr)
|
|
issues = filtered
|
|
|
|
# Filter to issues NOT yet in the comparison file (if --compare given)
|
|
if args.compare:
|
|
existing_nums = load_existing_issue_numbers(args.compare)
|
|
missing = [iss for iss in issues if iss["number"] not in existing_nums]
|
|
missing.sort(key=lambda x: x["number"])
|
|
print(f"Issues not yet in changelog: {len(missing)}", file=sys.stderr)
|
|
issues = missing
|
|
|
|
print(json.dumps(issues, indent=2))
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Subcommand: prs
|
|
# ─────────────────────────────────────────────
|
|
|
|
PRS_BATCH_SIZE = 50
|
|
|
|
GQL_PRS_QUERY_ITEM = """\
|
|
pr_{num}: pullRequest(number: {num}) {{
|
|
number
|
|
title
|
|
body
|
|
state
|
|
mergedAt
|
|
createdAt
|
|
author {{ login }}
|
|
labels(first: 20) {{ nodes {{ name }} }}
|
|
closingIssuesReferences(first: 5) {{ nodes {{ number }} }}
|
|
}}
|
|
"""
|
|
|
|
GQL_PRS_QUERY_WRAPPER = """\
|
|
query($owner: String!, $repo: String!) {{
|
|
repository(owner: $owner, name: $repo) {{
|
|
{items}
|
|
}}
|
|
}}
|
|
"""
|
|
|
|
|
|
def fetch_prs_batch(pr_numbers: list[int]) -> list[dict]:
|
|
"""
|
|
Fetch details for a list of PR numbers in a single GraphQL query.
|
|
|
|
Uses numbered aliases (pr_1234, pr_5678, …) so each PR is looked up by
|
|
number in one round-trip. Returns entries in the same order as the input.
|
|
"""
|
|
items = "\n".join(
|
|
GQL_PRS_QUERY_ITEM.format(num=n) for n in pr_numbers
|
|
)
|
|
query = GQL_PRS_QUERY_WRAPPER.format(items=items)
|
|
variables = {"owner": OWNER, "repo": REPO_NAME}
|
|
|
|
data = run_gh_graphql(query, variables)
|
|
repo = data["repository"]
|
|
|
|
results: list[dict] = []
|
|
for num in pr_numbers:
|
|
pr = repo.get(f"pr_{num}")
|
|
if pr is None:
|
|
results.append({
|
|
"number": num,
|
|
"error": "not_found",
|
|
})
|
|
continue
|
|
results.append({
|
|
"number": pr["number"],
|
|
"title": pr["title"],
|
|
"body": pr.get("body"),
|
|
"state": pr["state"],
|
|
"merged_at": pr.get("mergedAt"),
|
|
"created_at": pr.get("createdAt"),
|
|
"author": pr["author"]["login"] if pr["author"] else None,
|
|
"labels": [lbl["name"] for lbl in pr["labels"]["nodes"]],
|
|
"closing_issues": [iss["number"] for iss in pr["closingIssuesReferences"]["nodes"]],
|
|
})
|
|
return results
|
|
|
|
|
|
def cmd_prs(args: argparse.Namespace) -> None:
|
|
"""Handle the ``prs`` subcommand."""
|
|
|
|
# Collect PR numbers from args / file / stdin
|
|
pr_numbers: list[int] = []
|
|
|
|
if args.numbers:
|
|
pr_numbers.extend(args.numbers)
|
|
|
|
if args.file:
|
|
with open(args.file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
pr_numbers.append(int(line))
|
|
|
|
if args.stdin:
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if line:
|
|
pr_numbers.append(int(line))
|
|
|
|
if not pr_numbers:
|
|
print("ERROR: no PR numbers provided (pass numbers, --file, or --stdin)",
|
|
file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Deduplicate while preserving order
|
|
seen: set[int] = set()
|
|
pr_numbers = [n for n in pr_numbers if not (n in seen or seen.add(n))]
|
|
|
|
print(f"Fetching {len(pr_numbers)} PRs in batches of {PRS_BATCH_SIZE}...",
|
|
file=sys.stderr)
|
|
|
|
all_results: list[dict] = []
|
|
for i in range(0, len(pr_numbers), PRS_BATCH_SIZE):
|
|
batch = pr_numbers[i : i + PRS_BATCH_SIZE]
|
|
print(f" batch {i // PRS_BATCH_SIZE + 1}: PRs {batch[0]}..{batch[-1]}",
|
|
file=sys.stderr)
|
|
all_results.extend(fetch_prs_batch(batch))
|
|
|
|
print(json.dumps(all_results, indent=2))
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# CLI entrypoint
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Multi-purpose CLI helper for penpot/penpot GitHub operations"
|
|
)
|
|
sub = parser.add_subparsers(dest="command", required=True, title="subcommands")
|
|
|
|
# --- issues ---
|
|
p_issues = sub.add_parser("issues", help="List issues in a milestone")
|
|
p_issues.add_argument("milestone", help="Milestone title, e.g. '2.16.0'")
|
|
p_issues.add_argument(
|
|
"--state", choices=["open", "closed", "all"], default="closed",
|
|
help="Issue state filter (default: closed)"
|
|
)
|
|
p_issues.add_argument(
|
|
"--exclude", "--exclude-labels",
|
|
help="Comma-separated labels to exclude, e.g. 'release blocker,no changelog'"
|
|
)
|
|
p_issues.add_argument(
|
|
"--compare",
|
|
help="Path to CHANGES.md; only show issues NOT yet referenced in that file"
|
|
)
|
|
p_issues.set_defaults(func=cmd_issues)
|
|
|
|
# --- prs ---
|
|
p_prs = sub.add_parser("prs", help="Fetch details for one or more PRs")
|
|
p_prs.add_argument(
|
|
"numbers", type=int, nargs="*",
|
|
help="PR numbers to fetch (space-separated)"
|
|
)
|
|
p_prs.add_argument(
|
|
"--file", type=str,
|
|
help="File with one PR number per line"
|
|
)
|
|
p_prs.add_argument(
|
|
"--stdin", action="store_true",
|
|
help="Read PR numbers from stdin (one per line)"
|
|
)
|
|
p_prs.set_defaults(func=cmd_prs)
|
|
|
|
args = parser.parse_args()
|
|
args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|