mirror of
https://github.com/linyqh/NarratoAI.git
synced 2026-06-29 18:52:07 +00:00
- 新增配置项 ffmpeg_path 及路径应用逻辑,自动配置 FFmpeg 环境变量 - 实现全量 FFmpeg 引擎自动发现、能力检测工具链,支持多来源识别 - 添加 WebUI 系统设置面板,支持选择、测试和保存 FFmpeg 引擎 - 优化视频合并模块的 FFmpeg 调用,新增进度日志与流式输出处理 - 新增 FFmpeg 检测器单元测试覆盖核心功能
494 lines
15 KiB
Python
494 lines
15 KiB
Python
"""FFmpeg engine discovery and capability diagnostics."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from loguru import logger
|
|
|
|
|
|
_FFMPEG_EXE = "ffmpeg.exe" if os.name == "nt" else "ffmpeg"
|
|
_FFPROBE_EXE = "ffprobe.exe" if os.name == "nt" else "ffprobe"
|
|
_SOURCE_PRIORITY = {
|
|
"Configured": 0,
|
|
"NarratoAI packaged runtime": 1,
|
|
"Integrated runtime": 2,
|
|
"System PATH": 3,
|
|
"Homebrew": 4,
|
|
"Python environment": 5,
|
|
"Python executable folder": 6,
|
|
"IMAGEIO_FFMPEG_EXE": 7,
|
|
"imageio-ffmpeg": 8,
|
|
"System": 9,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FFmpegEngine:
|
|
"""A discovered FFmpeg executable."""
|
|
|
|
path: str
|
|
source: str
|
|
ffprobe_path: str
|
|
available: bool
|
|
version_line: str
|
|
|
|
@property
|
|
def label(self) -> str:
|
|
status = "OK" if self.available else "Unavailable"
|
|
version = self.version_line.replace("ffmpeg version", "").strip() or "unknown version"
|
|
return f"{self.source} - {version} - {self.path} ({status})"
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
payload = asdict(self)
|
|
payload["label"] = self.label
|
|
return payload
|
|
|
|
|
|
def _run_command(args: list[str], timeout: int = 10) -> subprocess.CompletedProcess[str]:
|
|
return subprocess.run(
|
|
args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
check=False,
|
|
timeout=timeout,
|
|
)
|
|
|
|
|
|
def _first_line(text: str) -> str:
|
|
for line in (text or "").splitlines():
|
|
stripped = line.strip()
|
|
if stripped:
|
|
return stripped
|
|
return ""
|
|
|
|
|
|
def _is_executable(path: str) -> bool:
|
|
if not path:
|
|
return False
|
|
if os.name == "nt":
|
|
return os.path.isfile(path)
|
|
return os.path.isfile(path) and os.access(path, os.X_OK)
|
|
|
|
|
|
def _normalize_path(path: str) -> str:
|
|
return str(Path(path).expanduser().resolve())
|
|
|
|
|
|
def _ffmpeg_version_line(ffmpeg_path: str) -> tuple[bool, str]:
|
|
if not _is_executable(ffmpeg_path):
|
|
return False, ""
|
|
try:
|
|
result = _run_command([ffmpeg_path, "-version"], timeout=8)
|
|
except Exception as exc:
|
|
logger.debug(f"FFmpeg version check failed for {ffmpeg_path}: {exc}")
|
|
return False, ""
|
|
|
|
output = result.stdout or result.stderr
|
|
return result.returncode == 0, _first_line(output)
|
|
|
|
|
|
def _paired_ffprobe_path(ffmpeg_path: str) -> str:
|
|
ffmpeg = Path(ffmpeg_path)
|
|
sibling = ffmpeg.with_name(_FFPROBE_EXE)
|
|
if _is_executable(str(sibling)):
|
|
return _normalize_path(str(sibling))
|
|
|
|
scoped_path = os.pathsep.join([str(ffmpeg.parent), os.environ.get("PATH", "")])
|
|
discovered = shutil.which(_FFPROBE_EXE, path=scoped_path)
|
|
return _normalize_path(discovered) if discovered else ""
|
|
|
|
|
|
def _candidate_paths(root_dir: str = "", include_system: bool = True) -> list[tuple[str, str]]:
|
|
candidates: list[tuple[str, str]] = []
|
|
root = Path(root_dir).expanduser().resolve() if root_dir else Path.cwd().resolve()
|
|
project_parent = root.parent
|
|
|
|
candidates.extend(
|
|
[
|
|
("Integrated runtime", str(root / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
|
|
("Integrated runtime", str(root.parent / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
|
|
(
|
|
"NarratoAI packaged runtime",
|
|
str(
|
|
project_parent
|
|
/ "NarratoAI-Pack"
|
|
/ "dist"
|
|
/ "NarratoAI-macos-arm64"
|
|
/ "runtime"
|
|
/ "python"
|
|
/ "bin"
|
|
/ _FFMPEG_EXE
|
|
),
|
|
),
|
|
("Python environment", str(Path(sys.prefix) / "bin" / _FFMPEG_EXE)),
|
|
("Python executable folder", str(Path(sys.executable).with_name(_FFMPEG_EXE))),
|
|
]
|
|
)
|
|
|
|
env_ffmpeg = os.environ.get("IMAGEIO_FFMPEG_EXE", "")
|
|
if env_ffmpeg:
|
|
candidates.append(("IMAGEIO_FFMPEG_EXE", env_ffmpeg))
|
|
|
|
if include_system:
|
|
path_ffmpeg = shutil.which(_FFMPEG_EXE)
|
|
if path_ffmpeg:
|
|
candidates.append(("System PATH", path_ffmpeg))
|
|
|
|
for source, path in (
|
|
("Homebrew", f"/opt/homebrew/bin/{_FFMPEG_EXE}"),
|
|
("Homebrew", f"/usr/local/bin/{_FFMPEG_EXE}"),
|
|
("System", f"/usr/bin/{_FFMPEG_EXE}"),
|
|
):
|
|
candidates.append((source, path))
|
|
|
|
try:
|
|
import imageio_ffmpeg
|
|
|
|
candidates.append(("imageio-ffmpeg", imageio_ffmpeg.get_ffmpeg_exe()))
|
|
except Exception as exc:
|
|
logger.debug(f"imageio-ffmpeg discovery skipped: {exc}")
|
|
|
|
return candidates
|
|
|
|
|
|
def discover_ffmpeg_engines(
|
|
configured_path: str = "",
|
|
root_dir: str = "",
|
|
include_system: bool = True,
|
|
) -> list[dict[str, Any]]:
|
|
"""Discover available FFmpeg engines from config, packaged runtime and PATH."""
|
|
|
|
candidates: list[tuple[str, str]] = []
|
|
if configured_path:
|
|
candidates.append(("Configured", configured_path))
|
|
candidates.extend(_candidate_paths(root_dir=root_dir, include_system=include_system))
|
|
|
|
engines: list[FFmpegEngine] = []
|
|
seen: set[str] = set()
|
|
for source, raw_path in candidates:
|
|
if not raw_path:
|
|
continue
|
|
try:
|
|
path = _normalize_path(raw_path)
|
|
except Exception:
|
|
path = str(Path(raw_path).expanduser())
|
|
key = os.path.normcase(path)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
|
|
available, version_line = _ffmpeg_version_line(path)
|
|
if not available and source not in {"Configured", "IMAGEIO_FFMPEG_EXE"}:
|
|
continue
|
|
engines.append(
|
|
FFmpegEngine(
|
|
path=path,
|
|
source=source,
|
|
ffprobe_path=_paired_ffprobe_path(path),
|
|
available=available,
|
|
version_line=version_line,
|
|
)
|
|
)
|
|
|
|
engines.sort(
|
|
key=lambda engine: (
|
|
not engine.available,
|
|
_SOURCE_PRIORITY.get(engine.source, 99),
|
|
engine.path,
|
|
)
|
|
)
|
|
return [engine.to_dict() for engine in engines]
|
|
|
|
|
|
def _parse_hwaccels(output: str) -> list[str]:
|
|
values: list[str] = []
|
|
for line in output.splitlines():
|
|
item = line.strip().lower()
|
|
if not item or item.startswith("hardware acceleration"):
|
|
continue
|
|
if re.fullmatch(r"[a-z0-9_]+", item):
|
|
values.append(item)
|
|
return sorted(set(values))
|
|
|
|
|
|
def _parse_ffmpeg_table_names(output: str) -> set[str]:
|
|
names: set[str] = set()
|
|
for line in output.splitlines():
|
|
match = re.match(r"\s*[A-Z.]{2,}\s+([A-Za-z0-9_]+)\b", line)
|
|
if match:
|
|
names.add(match.group(1).lower())
|
|
return names
|
|
|
|
|
|
def _run_optional(args: list[str], timeout: int = 15, max_output_chars: int = 1200) -> tuple[bool, str]:
|
|
try:
|
|
result = _run_command(args, timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
return False, "Command timed out"
|
|
except Exception as exc:
|
|
return False, str(exc)
|
|
|
|
output = "\n".join(part for part in (result.stderr, result.stdout) if part)
|
|
if max_output_chars > 0:
|
|
output = output[-max_output_chars:]
|
|
return result.returncode == 0, output
|
|
|
|
|
|
def _hardware_candidates() -> list[tuple[str, str, list[str]]]:
|
|
system = platform.system().lower()
|
|
if system == "darwin":
|
|
return [
|
|
("videotoolbox", "h264_videotoolbox", ["-c:v", "h264_videotoolbox", "-q:v", "65"]),
|
|
]
|
|
if system == "windows":
|
|
return [
|
|
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
|
|
("qsv", "h264_qsv", ["-c:v", "h264_qsv", "-preset", "fast"]),
|
|
("amf", "h264_amf", ["-c:v", "h264_amf"]),
|
|
]
|
|
return [
|
|
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
|
|
("qsv", "h264_qsv", ["-vf", "format=nv12", "-c:v", "h264_qsv"]),
|
|
("vaapi", "h264_vaapi", ["-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi"]),
|
|
]
|
|
|
|
|
|
def _detect_hardware_encoding(ffmpeg_path: str, encoders: set[str]) -> dict[str, Any]:
|
|
tested: list[dict[str, Any]] = []
|
|
for accel_type, encoder, encoder_args in _hardware_candidates():
|
|
if encoder.lower() not in encoders:
|
|
tested.append(
|
|
{
|
|
"type": accel_type,
|
|
"encoder": encoder,
|
|
"available": False,
|
|
"message": "Encoder not listed by this FFmpeg build",
|
|
}
|
|
)
|
|
continue
|
|
|
|
cmd = [
|
|
ffmpeg_path,
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"testsrc=duration=0.5:size=128x72:rate=15",
|
|
"-frames:v",
|
|
"5",
|
|
*encoder_args,
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
]
|
|
ok, message = _run_optional(cmd, timeout=18)
|
|
tested.append(
|
|
{
|
|
"type": accel_type,
|
|
"encoder": encoder,
|
|
"available": ok,
|
|
"message": "Hardware encode test passed" if ok else message,
|
|
}
|
|
)
|
|
if ok:
|
|
return {
|
|
"available": True,
|
|
"type": accel_type,
|
|
"encoder": encoder,
|
|
"message": "Hardware encode test passed",
|
|
"tested": tested,
|
|
}
|
|
|
|
return {
|
|
"available": False,
|
|
"type": None,
|
|
"encoder": None,
|
|
"message": "No hardware encoder passed the runtime test",
|
|
"tested": tested,
|
|
}
|
|
|
|
|
|
def _escape_filter_path(path: str) -> str:
|
|
return path.replace("\\", "\\\\").replace(":", "\\:").replace("'", "\\'")
|
|
|
|
|
|
def _test_subtitle_burn(ffmpeg_path: str, filters: set[str]) -> dict[str, Any]:
|
|
filter_status = {
|
|
"subtitles": "subtitles" in filters,
|
|
"ass": "ass" in filters,
|
|
"drawtext": "drawtext" in filters,
|
|
"overlay": "overlay" in filters,
|
|
}
|
|
|
|
if filter_status["subtitles"]:
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
srt_path = Path(tmp_dir) / "subtitle_test.srt"
|
|
srt_path.write_text(
|
|
"1\n00:00:00,000 --> 00:00:00,800\nNarratoAI FFmpeg subtitle test\n",
|
|
encoding="utf-8",
|
|
)
|
|
ok, message = _run_optional(
|
|
[
|
|
ffmpeg_path,
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"color=black:size=320x180:duration=1",
|
|
"-vf",
|
|
f"subtitles={_escape_filter_path(str(srt_path))}",
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
timeout=18,
|
|
)
|
|
if ok:
|
|
return {
|
|
"available": True,
|
|
"method": "subtitles",
|
|
"message": "SRT subtitle burn-in test passed",
|
|
"filters": filter_status,
|
|
}
|
|
subtitles_error = message
|
|
else:
|
|
subtitles_error = "subtitles filter is not listed by this FFmpeg build"
|
|
|
|
if filter_status["drawtext"]:
|
|
ok, message = _run_optional(
|
|
[
|
|
ffmpeg_path,
|
|
"-y",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-f",
|
|
"lavfi",
|
|
"-i",
|
|
"color=black:size=320x180:duration=1",
|
|
"-vf",
|
|
"drawtext=text=NarratoAI:x=10:y=10:fontsize=18:fontcolor=white",
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"null",
|
|
"-",
|
|
],
|
|
timeout=18,
|
|
)
|
|
if ok:
|
|
return {
|
|
"available": True,
|
|
"method": "drawtext",
|
|
"message": "drawtext burn-in fallback test passed",
|
|
"filters": filter_status,
|
|
}
|
|
drawtext_error = message
|
|
else:
|
|
drawtext_error = "drawtext filter is not listed by this FFmpeg build"
|
|
|
|
return {
|
|
"available": False,
|
|
"method": None,
|
|
"message": f"{subtitles_error}\n{drawtext_error}".strip(),
|
|
"filters": filter_status,
|
|
}
|
|
|
|
|
|
def validate_ffmpeg_engine(ffmpeg_path: str) -> dict[str, Any]:
|
|
"""Run runtime checks for a selected FFmpeg engine."""
|
|
|
|
path = _normalize_path(ffmpeg_path)
|
|
report: dict[str, Any] = {
|
|
"path": path,
|
|
"ffmpeg_available": False,
|
|
"version_line": "",
|
|
"ffprobe_path": "",
|
|
"ffprobe_available": False,
|
|
"ffprobe_version_line": "",
|
|
"hwaccels": [],
|
|
"hardware_acceleration": {
|
|
"available": False,
|
|
"type": None,
|
|
"encoder": None,
|
|
"message": "",
|
|
"tested": [],
|
|
},
|
|
"subtitle_burn": {
|
|
"available": False,
|
|
"method": None,
|
|
"message": "",
|
|
"filters": {},
|
|
},
|
|
"software_encoder_available": False,
|
|
"errors": [],
|
|
}
|
|
|
|
available, version_line = _ffmpeg_version_line(path)
|
|
report["ffmpeg_available"] = available
|
|
report["version_line"] = version_line
|
|
if not available:
|
|
report["errors"].append("FFmpeg is not executable or failed to run -version")
|
|
return report
|
|
|
|
ffprobe_path = _paired_ffprobe_path(path)
|
|
report["ffprobe_path"] = ffprobe_path
|
|
if ffprobe_path:
|
|
probe_available, probe_version = _ffmpeg_version_line(ffprobe_path)
|
|
report["ffprobe_available"] = probe_available
|
|
report["ffprobe_version_line"] = probe_version
|
|
|
|
ok, hwaccel_output = _run_optional(
|
|
[path, "-hide_banner", "-hwaccels"],
|
|
timeout=10,
|
|
max_output_chars=0,
|
|
)
|
|
if ok:
|
|
report["hwaccels"] = _parse_hwaccels(hwaccel_output)
|
|
else:
|
|
report["errors"].append(f"Failed to list hardware acceleration methods: {hwaccel_output}")
|
|
|
|
ok, encoders_output = _run_optional(
|
|
[path, "-hide_banner", "-encoders"],
|
|
timeout=10,
|
|
max_output_chars=0,
|
|
)
|
|
encoders = _parse_ffmpeg_table_names(encoders_output) if ok else set()
|
|
report["software_encoder_available"] = "libx264" in encoders or "libopenh264" in encoders
|
|
if not ok:
|
|
report["errors"].append(f"Failed to list encoders: {encoders_output}")
|
|
|
|
ok, filters_output = _run_optional(
|
|
[path, "-hide_banner", "-filters"],
|
|
timeout=10,
|
|
max_output_chars=0,
|
|
)
|
|
filters = _parse_ffmpeg_table_names(filters_output) if ok else set()
|
|
if not ok:
|
|
report["errors"].append(f"Failed to list filters: {filters_output}")
|
|
|
|
report["hardware_acceleration"] = _detect_hardware_encoding(path, encoders)
|
|
report["subtitle_burn"] = _test_subtitle_burn(path, filters)
|
|
return report
|