NarratoAI/app/utils/ffmpeg_detector.py
viccy 7a5303aa20 feat(ffmpeg,webui): 新增 FFmpeg 引擎管理与检测功能
- 新增配置项 ffmpeg_path 及路径应用逻辑,自动配置 FFmpeg 环境变量
- 实现全量 FFmpeg 引擎自动发现、能力检测工具链,支持多来源识别
- 添加 WebUI 系统设置面板,支持选择、测试和保存 FFmpeg 引擎
- 优化视频合并模块的 FFmpeg 调用,新增进度日志与流式输出处理
- 新增 FFmpeg 检测器单元测试覆盖核心功能
2026-06-08 13:28:27 +08:00

494 lines
15 KiB
Python

"""FFmpeg engine discovery and capability diagnostics."""
from __future__ import annotations
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from loguru import logger
_FFMPEG_EXE = "ffmpeg.exe" if os.name == "nt" else "ffmpeg"
_FFPROBE_EXE = "ffprobe.exe" if os.name == "nt" else "ffprobe"
_SOURCE_PRIORITY = {
"Configured": 0,
"NarratoAI packaged runtime": 1,
"Integrated runtime": 2,
"System PATH": 3,
"Homebrew": 4,
"Python environment": 5,
"Python executable folder": 6,
"IMAGEIO_FFMPEG_EXE": 7,
"imageio-ffmpeg": 8,
"System": 9,
}
@dataclass(frozen=True)
class FFmpegEngine:
"""A discovered FFmpeg executable."""
path: str
source: str
ffprobe_path: str
available: bool
version_line: str
@property
def label(self) -> str:
status = "OK" if self.available else "Unavailable"
version = self.version_line.replace("ffmpeg version", "").strip() or "unknown version"
return f"{self.source} - {version} - {self.path} ({status})"
def to_dict(self) -> dict[str, Any]:
payload = asdict(self)
payload["label"] = self.label
return payload
def _run_command(args: list[str], timeout: int = 10) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
timeout=timeout,
)
def _first_line(text: str) -> str:
for line in (text or "").splitlines():
stripped = line.strip()
if stripped:
return stripped
return ""
def _is_executable(path: str) -> bool:
if not path:
return False
if os.name == "nt":
return os.path.isfile(path)
return os.path.isfile(path) and os.access(path, os.X_OK)
def _normalize_path(path: str) -> str:
return str(Path(path).expanduser().resolve())
def _ffmpeg_version_line(ffmpeg_path: str) -> tuple[bool, str]:
if not _is_executable(ffmpeg_path):
return False, ""
try:
result = _run_command([ffmpeg_path, "-version"], timeout=8)
except Exception as exc:
logger.debug(f"FFmpeg version check failed for {ffmpeg_path}: {exc}")
return False, ""
output = result.stdout or result.stderr
return result.returncode == 0, _first_line(output)
def _paired_ffprobe_path(ffmpeg_path: str) -> str:
ffmpeg = Path(ffmpeg_path)
sibling = ffmpeg.with_name(_FFPROBE_EXE)
if _is_executable(str(sibling)):
return _normalize_path(str(sibling))
scoped_path = os.pathsep.join([str(ffmpeg.parent), os.environ.get("PATH", "")])
discovered = shutil.which(_FFPROBE_EXE, path=scoped_path)
return _normalize_path(discovered) if discovered else ""
def _candidate_paths(root_dir: str = "", include_system: bool = True) -> list[tuple[str, str]]:
candidates: list[tuple[str, str]] = []
root = Path(root_dir).expanduser().resolve() if root_dir else Path.cwd().resolve()
project_parent = root.parent
candidates.extend(
[
("Integrated runtime", str(root / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
("Integrated runtime", str(root.parent / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
(
"NarratoAI packaged runtime",
str(
project_parent
/ "NarratoAI-Pack"
/ "dist"
/ "NarratoAI-macos-arm64"
/ "runtime"
/ "python"
/ "bin"
/ _FFMPEG_EXE
),
),
("Python environment", str(Path(sys.prefix) / "bin" / _FFMPEG_EXE)),
("Python executable folder", str(Path(sys.executable).with_name(_FFMPEG_EXE))),
]
)
env_ffmpeg = os.environ.get("IMAGEIO_FFMPEG_EXE", "")
if env_ffmpeg:
candidates.append(("IMAGEIO_FFMPEG_EXE", env_ffmpeg))
if include_system:
path_ffmpeg = shutil.which(_FFMPEG_EXE)
if path_ffmpeg:
candidates.append(("System PATH", path_ffmpeg))
for source, path in (
("Homebrew", f"/opt/homebrew/bin/{_FFMPEG_EXE}"),
("Homebrew", f"/usr/local/bin/{_FFMPEG_EXE}"),
("System", f"/usr/bin/{_FFMPEG_EXE}"),
):
candidates.append((source, path))
try:
import imageio_ffmpeg
candidates.append(("imageio-ffmpeg", imageio_ffmpeg.get_ffmpeg_exe()))
except Exception as exc:
logger.debug(f"imageio-ffmpeg discovery skipped: {exc}")
return candidates
def discover_ffmpeg_engines(
configured_path: str = "",
root_dir: str = "",
include_system: bool = True,
) -> list[dict[str, Any]]:
"""Discover available FFmpeg engines from config, packaged runtime and PATH."""
candidates: list[tuple[str, str]] = []
if configured_path:
candidates.append(("Configured", configured_path))
candidates.extend(_candidate_paths(root_dir=root_dir, include_system=include_system))
engines: list[FFmpegEngine] = []
seen: set[str] = set()
for source, raw_path in candidates:
if not raw_path:
continue
try:
path = _normalize_path(raw_path)
except Exception:
path = str(Path(raw_path).expanduser())
key = os.path.normcase(path)
if key in seen:
continue
seen.add(key)
available, version_line = _ffmpeg_version_line(path)
if not available and source not in {"Configured", "IMAGEIO_FFMPEG_EXE"}:
continue
engines.append(
FFmpegEngine(
path=path,
source=source,
ffprobe_path=_paired_ffprobe_path(path),
available=available,
version_line=version_line,
)
)
engines.sort(
key=lambda engine: (
not engine.available,
_SOURCE_PRIORITY.get(engine.source, 99),
engine.path,
)
)
return [engine.to_dict() for engine in engines]
def _parse_hwaccels(output: str) -> list[str]:
values: list[str] = []
for line in output.splitlines():
item = line.strip().lower()
if not item or item.startswith("hardware acceleration"):
continue
if re.fullmatch(r"[a-z0-9_]+", item):
values.append(item)
return sorted(set(values))
def _parse_ffmpeg_table_names(output: str) -> set[str]:
names: set[str] = set()
for line in output.splitlines():
match = re.match(r"\s*[A-Z.]{2,}\s+([A-Za-z0-9_]+)\b", line)
if match:
names.add(match.group(1).lower())
return names
def _run_optional(args: list[str], timeout: int = 15, max_output_chars: int = 1200) -> tuple[bool, str]:
try:
result = _run_command(args, timeout=timeout)
except subprocess.TimeoutExpired:
return False, "Command timed out"
except Exception as exc:
return False, str(exc)
output = "\n".join(part for part in (result.stderr, result.stdout) if part)
if max_output_chars > 0:
output = output[-max_output_chars:]
return result.returncode == 0, output
def _hardware_candidates() -> list[tuple[str, str, list[str]]]:
system = platform.system().lower()
if system == "darwin":
return [
("videotoolbox", "h264_videotoolbox", ["-c:v", "h264_videotoolbox", "-q:v", "65"]),
]
if system == "windows":
return [
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
("qsv", "h264_qsv", ["-c:v", "h264_qsv", "-preset", "fast"]),
("amf", "h264_amf", ["-c:v", "h264_amf"]),
]
return [
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
("qsv", "h264_qsv", ["-vf", "format=nv12", "-c:v", "h264_qsv"]),
("vaapi", "h264_vaapi", ["-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi"]),
]
def _detect_hardware_encoding(ffmpeg_path: str, encoders: set[str]) -> dict[str, Any]:
tested: list[dict[str, Any]] = []
for accel_type, encoder, encoder_args in _hardware_candidates():
if encoder.lower() not in encoders:
tested.append(
{
"type": accel_type,
"encoder": encoder,
"available": False,
"message": "Encoder not listed by this FFmpeg build",
}
)
continue
cmd = [
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"testsrc=duration=0.5:size=128x72:rate=15",
"-frames:v",
"5",
*encoder_args,
"-pix_fmt",
"yuv420p",
"-f",
"null",
"-",
]
ok, message = _run_optional(cmd, timeout=18)
tested.append(
{
"type": accel_type,
"encoder": encoder,
"available": ok,
"message": "Hardware encode test passed" if ok else message,
}
)
if ok:
return {
"available": True,
"type": accel_type,
"encoder": encoder,
"message": "Hardware encode test passed",
"tested": tested,
}
return {
"available": False,
"type": None,
"encoder": None,
"message": "No hardware encoder passed the runtime test",
"tested": tested,
}
def _escape_filter_path(path: str) -> str:
return path.replace("\\", "\\\\").replace(":", "\\:").replace("'", "\\'")
def _test_subtitle_burn(ffmpeg_path: str, filters: set[str]) -> dict[str, Any]:
filter_status = {
"subtitles": "subtitles" in filters,
"ass": "ass" in filters,
"drawtext": "drawtext" in filters,
"overlay": "overlay" in filters,
}
if filter_status["subtitles"]:
with tempfile.TemporaryDirectory() as tmp_dir:
srt_path = Path(tmp_dir) / "subtitle_test.srt"
srt_path.write_text(
"1\n00:00:00,000 --> 00:00:00,800\nNarratoAI FFmpeg subtitle test\n",
encoding="utf-8",
)
ok, message = _run_optional(
[
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"color=black:size=320x180:duration=1",
"-vf",
f"subtitles={_escape_filter_path(str(srt_path))}",
"-frames:v",
"1",
"-f",
"null",
"-",
],
timeout=18,
)
if ok:
return {
"available": True,
"method": "subtitles",
"message": "SRT subtitle burn-in test passed",
"filters": filter_status,
}
subtitles_error = message
else:
subtitles_error = "subtitles filter is not listed by this FFmpeg build"
if filter_status["drawtext"]:
ok, message = _run_optional(
[
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"color=black:size=320x180:duration=1",
"-vf",
"drawtext=text=NarratoAI:x=10:y=10:fontsize=18:fontcolor=white",
"-frames:v",
"1",
"-f",
"null",
"-",
],
timeout=18,
)
if ok:
return {
"available": True,
"method": "drawtext",
"message": "drawtext burn-in fallback test passed",
"filters": filter_status,
}
drawtext_error = message
else:
drawtext_error = "drawtext filter is not listed by this FFmpeg build"
return {
"available": False,
"method": None,
"message": f"{subtitles_error}\n{drawtext_error}".strip(),
"filters": filter_status,
}
def validate_ffmpeg_engine(ffmpeg_path: str) -> dict[str, Any]:
"""Run runtime checks for a selected FFmpeg engine."""
path = _normalize_path(ffmpeg_path)
report: dict[str, Any] = {
"path": path,
"ffmpeg_available": False,
"version_line": "",
"ffprobe_path": "",
"ffprobe_available": False,
"ffprobe_version_line": "",
"hwaccels": [],
"hardware_acceleration": {
"available": False,
"type": None,
"encoder": None,
"message": "",
"tested": [],
},
"subtitle_burn": {
"available": False,
"method": None,
"message": "",
"filters": {},
},
"software_encoder_available": False,
"errors": [],
}
available, version_line = _ffmpeg_version_line(path)
report["ffmpeg_available"] = available
report["version_line"] = version_line
if not available:
report["errors"].append("FFmpeg is not executable or failed to run -version")
return report
ffprobe_path = _paired_ffprobe_path(path)
report["ffprobe_path"] = ffprobe_path
if ffprobe_path:
probe_available, probe_version = _ffmpeg_version_line(ffprobe_path)
report["ffprobe_available"] = probe_available
report["ffprobe_version_line"] = probe_version
ok, hwaccel_output = _run_optional(
[path, "-hide_banner", "-hwaccels"],
timeout=10,
max_output_chars=0,
)
if ok:
report["hwaccels"] = _parse_hwaccels(hwaccel_output)
else:
report["errors"].append(f"Failed to list hardware acceleration methods: {hwaccel_output}")
ok, encoders_output = _run_optional(
[path, "-hide_banner", "-encoders"],
timeout=10,
max_output_chars=0,
)
encoders = _parse_ffmpeg_table_names(encoders_output) if ok else set()
report["software_encoder_available"] = "libx264" in encoders or "libopenh264" in encoders
if not ok:
report["errors"].append(f"Failed to list encoders: {encoders_output}")
ok, filters_output = _run_optional(
[path, "-hide_banner", "-filters"],
timeout=10,
max_output_chars=0,
)
filters = _parse_ffmpeg_table_names(filters_output) if ok else set()
if not ok:
report["errors"].append(f"Failed to list filters: {filters_output}")
report["hardware_acceleration"] = _detect_hardware_encoding(path, encoders)
report["subtitle_burn"] = _test_subtitle_burn(path, filters)
return report