feat(ffmpeg,webui): 新增 FFmpeg 引擎管理与检测功能

- 新增配置项 ffmpeg_path 及路径应用逻辑,自动配置 FFmpeg 环境变量
- 实现全量 FFmpeg 引擎自动发现、能力检测工具链,支持多来源识别
- 添加 WebUI 系统设置面板,支持选择、测试和保存 FFmpeg 引擎
- 优化视频合并模块的 FFmpeg 调用,新增进度日志与流式输出处理
- 新增 FFmpeg 检测器单元测试覆盖核心功能
This commit is contained in:
viccy 2026-06-08 13:28:27 +08:00
parent dc12f390bb
commit 7a5303aa20
8 changed files with 986 additions and 16 deletions

View File

@ -173,8 +173,43 @@ imagemagick_path = app.get("imagemagick_path", "")
if imagemagick_path and os.path.isfile(imagemagick_path):
os.environ["IMAGEMAGICK_BINARY"] = imagemagick_path
_applied_ffmpeg_dir = None
def apply_ffmpeg_path(ffmpeg_binary: str = "") -> None:
"""Apply the configured FFmpeg binary to this Python process."""
global _applied_ffmpeg_dir
if not ffmpeg_binary or not os.path.isfile(ffmpeg_binary):
return
ffmpeg_binary = os.path.abspath(os.path.expanduser(ffmpeg_binary))
ffmpeg_dir = os.path.dirname(ffmpeg_binary)
os.environ["IMAGEIO_FFMPEG_EXE"] = ffmpeg_binary
current_paths = os.environ.get("PATH", "").split(os.pathsep)
normalized_ffmpeg_dir = os.path.normcase(os.path.abspath(ffmpeg_dir))
normalized_previous_dir = (
os.path.normcase(os.path.abspath(_applied_ffmpeg_dir))
if _applied_ffmpeg_dir
else None
)
filtered_paths = []
for path_item in current_paths:
if not path_item:
continue
normalized_item = os.path.normcase(os.path.abspath(path_item))
if normalized_item == normalized_ffmpeg_dir:
continue
if normalized_previous_dir and normalized_item == normalized_previous_dir:
continue
filtered_paths.append(path_item)
os.environ["PATH"] = os.pathsep.join([ffmpeg_dir, *filtered_paths])
_applied_ffmpeg_dir = ffmpeg_dir
ffmpeg_path = app.get("ffmpeg_path", "")
if ffmpeg_path and os.path.isfile(ffmpeg_path):
os.environ["IMAGEIO_FFMPEG_EXE"] = ffmpeg_path
apply_ffmpeg_path(ffmpeg_path)
logger.info(f"{project_name} v{project_version}")

View File

@ -11,8 +11,8 @@
import os
import json
import re
import shlex
import subprocess
import time
import traceback
import tempfile
from typing import Optional, Dict, Any
@ -327,6 +327,16 @@ def _format_ffmpeg_float(value: float) -> str:
return f"{float(value):.3f}".rstrip("0").rstrip(".")
def _format_duration(seconds: float) -> str:
seconds = max(0, float(seconds or 0))
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def _quote_filter_value(value: str) -> str:
escaped = str(value).replace("\\", "\\\\").replace("'", "\\'")
return f"'{escaped}'"
@ -435,6 +445,106 @@ def _select_compatible_encoder(preferred_encoder: str) -> str:
return "libx264"
def _parse_ffmpeg_progress_time(progress: Dict[str, str]) -> float:
for key in ("out_time_us", "out_time_ms"):
value = progress.get(key)
if value:
try:
return max(0.0, int(value) / 1_000_000)
except ValueError:
pass
value = progress.get("out_time")
if value:
match = re.match(
r"(?P<hours>\d+):(?P<minutes>\d{2}):(?P<seconds>\d{2})(?:\.(?P<fraction>\d+))?",
value,
)
if match:
fraction = match.group("fraction") or "0"
return (
int(match.group("hours")) * 3600
+ int(match.group("minutes")) * 60
+ int(match.group("seconds"))
+ float(f"0.{fraction}")
)
return 0.0
def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str]:
progress_keys = {
"frame",
"fps",
"stream_0_0_q",
"bitrate",
"total_size",
"out_time_us",
"out_time_ms",
"out_time",
"dup_frames",
"drop_frames",
"speed",
"progress",
}
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
progress: Dict[str, str] = {}
output_tail: list[str] = []
last_log_time = 0.0
last_logged_percent = -1.0
assert process.stdout is not None
for raw_line in process.stdout:
line = raw_line.strip()
if not line:
continue
if "=" not in line:
output_tail.append(line)
output_tail = output_tail[-80:]
continue
key, value = line.split("=", 1)
if key not in progress_keys:
output_tail.append(line)
output_tail = output_tail[-80:]
continue
progress[key] = value
if key != "progress":
continue
current = _parse_ffmpeg_progress_time(progress)
if value == "end":
current = duration
percent = min(100.0, (current / duration) * 100) if duration > 0 else 0.0
now = time.monotonic()
should_log = (
value == "end"
or now - last_log_time >= 5
or percent - last_logged_percent >= 5
)
if should_log:
speed = progress.get("speed", "N/A")
logger.info(
"ffmpeg 合并进度: "
f"{percent:.1f}% "
f"({_format_duration(current)}/{_format_duration(duration)}), "
f"speed={speed}"
)
last_log_time = now
last_logged_percent = percent
progress = {}
return_code = process.wait()
return return_code, "\n".join(output_tail[-80:])
def _srt_timestamp_to_seconds(timestamp: str) -> float:
match = re.match(
r"(?P<hours>\d{2}):(?P<minutes>\d{2}):(?P<seconds>\d{2}),(?P<millis>\d{3})",
@ -917,7 +1027,7 @@ def _build_ffmpeg_merge_command(
subtitle_path: Optional[str],
bgm_path: Optional[str],
options: Dict[str, Any],
) -> tuple[list[str], list[str]]:
) -> tuple[list[str], list[str], float]:
from app.utils import ffmpeg_utils
video_meta = _probe_video(video_path)
@ -1118,7 +1228,17 @@ def _build_ffmpeg_merge_command(
filter_parts = [*video_filters, *audio_filters]
ffmpeg_binary = _get_ffmpeg_binary()
cmd = [ffmpeg_binary, "-y", "-hide_banner", "-loglevel", "error", *input_args]
cmd = [
ffmpeg_binary,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-nostats",
"-progress",
"pipe:1",
*input_args,
]
if filter_parts:
cmd.extend(["-filter_complex", ";".join(filter_parts)])
@ -1134,7 +1254,7 @@ def _build_ffmpeg_merge_command(
cmd.append("-an")
cmd.extend(["-t", duration_arg, "-movflags", "+faststart", output_path])
return cmd, temp_files
return cmd, temp_files, duration
def _merge_materials_with_ffmpeg(
@ -1152,7 +1272,7 @@ def _merge_materials_with_ffmpeg(
options = options or {}
temp_files = []
try:
cmd, temp_files = _build_ffmpeg_merge_command(
cmd, temp_files, duration = _build_ffmpeg_merge_command(
video_path=video_path,
audio_path=audio_path,
output_path=output_path,
@ -1160,16 +1280,14 @@ def _merge_materials_with_ffmpeg(
bgm_path=bgm_path,
options=options,
)
logger.info(f"使用 ffmpeg 快速合并素材: {shlex.join(cmd)}")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
logger.info(
"使用 ffmpeg 快速合并素材: "
f"video={video_path}, audio={audio_path}, output={output_path}, "
f"duration={_format_duration(duration)}"
)
if result.returncode != 0:
logger.warning(f"ffmpeg 快速合并失败,将回退 MoviePy: {result.stderr[-3000:]}")
return_code, ffmpeg_output = _run_ffmpeg_with_progress(cmd, duration)
if return_code != 0:
logger.warning(f"ffmpeg 快速合并失败,将回退 MoviePy: {ffmpeg_output[-3000:]}")
if os.path.exists(output_path):
try:
os.remove(output_path)

View File

@ -0,0 +1,493 @@
"""FFmpeg engine discovery and capability diagnostics."""
from __future__ import annotations
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from loguru import logger
_FFMPEG_EXE = "ffmpeg.exe" if os.name == "nt" else "ffmpeg"
_FFPROBE_EXE = "ffprobe.exe" if os.name == "nt" else "ffprobe"
_SOURCE_PRIORITY = {
"Configured": 0,
"NarratoAI packaged runtime": 1,
"Integrated runtime": 2,
"System PATH": 3,
"Homebrew": 4,
"Python environment": 5,
"Python executable folder": 6,
"IMAGEIO_FFMPEG_EXE": 7,
"imageio-ffmpeg": 8,
"System": 9,
}
@dataclass(frozen=True)
class FFmpegEngine:
"""A discovered FFmpeg executable."""
path: str
source: str
ffprobe_path: str
available: bool
version_line: str
@property
def label(self) -> str:
status = "OK" if self.available else "Unavailable"
version = self.version_line.replace("ffmpeg version", "").strip() or "unknown version"
return f"{self.source} - {version} - {self.path} ({status})"
def to_dict(self) -> dict[str, Any]:
payload = asdict(self)
payload["label"] = self.label
return payload
def _run_command(args: list[str], timeout: int = 10) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
timeout=timeout,
)
def _first_line(text: str) -> str:
for line in (text or "").splitlines():
stripped = line.strip()
if stripped:
return stripped
return ""
def _is_executable(path: str) -> bool:
if not path:
return False
if os.name == "nt":
return os.path.isfile(path)
return os.path.isfile(path) and os.access(path, os.X_OK)
def _normalize_path(path: str) -> str:
return str(Path(path).expanduser().resolve())
def _ffmpeg_version_line(ffmpeg_path: str) -> tuple[bool, str]:
if not _is_executable(ffmpeg_path):
return False, ""
try:
result = _run_command([ffmpeg_path, "-version"], timeout=8)
except Exception as exc:
logger.debug(f"FFmpeg version check failed for {ffmpeg_path}: {exc}")
return False, ""
output = result.stdout or result.stderr
return result.returncode == 0, _first_line(output)
def _paired_ffprobe_path(ffmpeg_path: str) -> str:
ffmpeg = Path(ffmpeg_path)
sibling = ffmpeg.with_name(_FFPROBE_EXE)
if _is_executable(str(sibling)):
return _normalize_path(str(sibling))
scoped_path = os.pathsep.join([str(ffmpeg.parent), os.environ.get("PATH", "")])
discovered = shutil.which(_FFPROBE_EXE, path=scoped_path)
return _normalize_path(discovered) if discovered else ""
def _candidate_paths(root_dir: str = "", include_system: bool = True) -> list[tuple[str, str]]:
candidates: list[tuple[str, str]] = []
root = Path(root_dir).expanduser().resolve() if root_dir else Path.cwd().resolve()
project_parent = root.parent
candidates.extend(
[
("Integrated runtime", str(root / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
("Integrated runtime", str(root.parent / "runtime" / "python" / "bin" / _FFMPEG_EXE)),
(
"NarratoAI packaged runtime",
str(
project_parent
/ "NarratoAI-Pack"
/ "dist"
/ "NarratoAI-macos-arm64"
/ "runtime"
/ "python"
/ "bin"
/ _FFMPEG_EXE
),
),
("Python environment", str(Path(sys.prefix) / "bin" / _FFMPEG_EXE)),
("Python executable folder", str(Path(sys.executable).with_name(_FFMPEG_EXE))),
]
)
env_ffmpeg = os.environ.get("IMAGEIO_FFMPEG_EXE", "")
if env_ffmpeg:
candidates.append(("IMAGEIO_FFMPEG_EXE", env_ffmpeg))
if include_system:
path_ffmpeg = shutil.which(_FFMPEG_EXE)
if path_ffmpeg:
candidates.append(("System PATH", path_ffmpeg))
for source, path in (
("Homebrew", f"/opt/homebrew/bin/{_FFMPEG_EXE}"),
("Homebrew", f"/usr/local/bin/{_FFMPEG_EXE}"),
("System", f"/usr/bin/{_FFMPEG_EXE}"),
):
candidates.append((source, path))
try:
import imageio_ffmpeg
candidates.append(("imageio-ffmpeg", imageio_ffmpeg.get_ffmpeg_exe()))
except Exception as exc:
logger.debug(f"imageio-ffmpeg discovery skipped: {exc}")
return candidates
def discover_ffmpeg_engines(
configured_path: str = "",
root_dir: str = "",
include_system: bool = True,
) -> list[dict[str, Any]]:
"""Discover available FFmpeg engines from config, packaged runtime and PATH."""
candidates: list[tuple[str, str]] = []
if configured_path:
candidates.append(("Configured", configured_path))
candidates.extend(_candidate_paths(root_dir=root_dir, include_system=include_system))
engines: list[FFmpegEngine] = []
seen: set[str] = set()
for source, raw_path in candidates:
if not raw_path:
continue
try:
path = _normalize_path(raw_path)
except Exception:
path = str(Path(raw_path).expanduser())
key = os.path.normcase(path)
if key in seen:
continue
seen.add(key)
available, version_line = _ffmpeg_version_line(path)
if not available and source not in {"Configured", "IMAGEIO_FFMPEG_EXE"}:
continue
engines.append(
FFmpegEngine(
path=path,
source=source,
ffprobe_path=_paired_ffprobe_path(path),
available=available,
version_line=version_line,
)
)
engines.sort(
key=lambda engine: (
not engine.available,
_SOURCE_PRIORITY.get(engine.source, 99),
engine.path,
)
)
return [engine.to_dict() for engine in engines]
def _parse_hwaccels(output: str) -> list[str]:
values: list[str] = []
for line in output.splitlines():
item = line.strip().lower()
if not item or item.startswith("hardware acceleration"):
continue
if re.fullmatch(r"[a-z0-9_]+", item):
values.append(item)
return sorted(set(values))
def _parse_ffmpeg_table_names(output: str) -> set[str]:
names: set[str] = set()
for line in output.splitlines():
match = re.match(r"\s*[A-Z.]{2,}\s+([A-Za-z0-9_]+)\b", line)
if match:
names.add(match.group(1).lower())
return names
def _run_optional(args: list[str], timeout: int = 15, max_output_chars: int = 1200) -> tuple[bool, str]:
try:
result = _run_command(args, timeout=timeout)
except subprocess.TimeoutExpired:
return False, "Command timed out"
except Exception as exc:
return False, str(exc)
output = "\n".join(part for part in (result.stderr, result.stdout) if part)
if max_output_chars > 0:
output = output[-max_output_chars:]
return result.returncode == 0, output
def _hardware_candidates() -> list[tuple[str, str, list[str]]]:
system = platform.system().lower()
if system == "darwin":
return [
("videotoolbox", "h264_videotoolbox", ["-c:v", "h264_videotoolbox", "-q:v", "65"]),
]
if system == "windows":
return [
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
("qsv", "h264_qsv", ["-c:v", "h264_qsv", "-preset", "fast"]),
("amf", "h264_amf", ["-c:v", "h264_amf"]),
]
return [
("nvenc", "h264_nvenc", ["-c:v", "h264_nvenc", "-preset", "fast"]),
("qsv", "h264_qsv", ["-vf", "format=nv12", "-c:v", "h264_qsv"]),
("vaapi", "h264_vaapi", ["-vf", "format=nv12,hwupload", "-c:v", "h264_vaapi"]),
]
def _detect_hardware_encoding(ffmpeg_path: str, encoders: set[str]) -> dict[str, Any]:
tested: list[dict[str, Any]] = []
for accel_type, encoder, encoder_args in _hardware_candidates():
if encoder.lower() not in encoders:
tested.append(
{
"type": accel_type,
"encoder": encoder,
"available": False,
"message": "Encoder not listed by this FFmpeg build",
}
)
continue
cmd = [
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"testsrc=duration=0.5:size=128x72:rate=15",
"-frames:v",
"5",
*encoder_args,
"-pix_fmt",
"yuv420p",
"-f",
"null",
"-",
]
ok, message = _run_optional(cmd, timeout=18)
tested.append(
{
"type": accel_type,
"encoder": encoder,
"available": ok,
"message": "Hardware encode test passed" if ok else message,
}
)
if ok:
return {
"available": True,
"type": accel_type,
"encoder": encoder,
"message": "Hardware encode test passed",
"tested": tested,
}
return {
"available": False,
"type": None,
"encoder": None,
"message": "No hardware encoder passed the runtime test",
"tested": tested,
}
def _escape_filter_path(path: str) -> str:
return path.replace("\\", "\\\\").replace(":", "\\:").replace("'", "\\'")
def _test_subtitle_burn(ffmpeg_path: str, filters: set[str]) -> dict[str, Any]:
filter_status = {
"subtitles": "subtitles" in filters,
"ass": "ass" in filters,
"drawtext": "drawtext" in filters,
"overlay": "overlay" in filters,
}
if filter_status["subtitles"]:
with tempfile.TemporaryDirectory() as tmp_dir:
srt_path = Path(tmp_dir) / "subtitle_test.srt"
srt_path.write_text(
"1\n00:00:00,000 --> 00:00:00,800\nNarratoAI FFmpeg subtitle test\n",
encoding="utf-8",
)
ok, message = _run_optional(
[
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"color=black:size=320x180:duration=1",
"-vf",
f"subtitles={_escape_filter_path(str(srt_path))}",
"-frames:v",
"1",
"-f",
"null",
"-",
],
timeout=18,
)
if ok:
return {
"available": True,
"method": "subtitles",
"message": "SRT subtitle burn-in test passed",
"filters": filter_status,
}
subtitles_error = message
else:
subtitles_error = "subtitles filter is not listed by this FFmpeg build"
if filter_status["drawtext"]:
ok, message = _run_optional(
[
ffmpeg_path,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-f",
"lavfi",
"-i",
"color=black:size=320x180:duration=1",
"-vf",
"drawtext=text=NarratoAI:x=10:y=10:fontsize=18:fontcolor=white",
"-frames:v",
"1",
"-f",
"null",
"-",
],
timeout=18,
)
if ok:
return {
"available": True,
"method": "drawtext",
"message": "drawtext burn-in fallback test passed",
"filters": filter_status,
}
drawtext_error = message
else:
drawtext_error = "drawtext filter is not listed by this FFmpeg build"
return {
"available": False,
"method": None,
"message": f"{subtitles_error}\n{drawtext_error}".strip(),
"filters": filter_status,
}
def validate_ffmpeg_engine(ffmpeg_path: str) -> dict[str, Any]:
"""Run runtime checks for a selected FFmpeg engine."""
path = _normalize_path(ffmpeg_path)
report: dict[str, Any] = {
"path": path,
"ffmpeg_available": False,
"version_line": "",
"ffprobe_path": "",
"ffprobe_available": False,
"ffprobe_version_line": "",
"hwaccels": [],
"hardware_acceleration": {
"available": False,
"type": None,
"encoder": None,
"message": "",
"tested": [],
},
"subtitle_burn": {
"available": False,
"method": None,
"message": "",
"filters": {},
},
"software_encoder_available": False,
"errors": [],
}
available, version_line = _ffmpeg_version_line(path)
report["ffmpeg_available"] = available
report["version_line"] = version_line
if not available:
report["errors"].append("FFmpeg is not executable or failed to run -version")
return report
ffprobe_path = _paired_ffprobe_path(path)
report["ffprobe_path"] = ffprobe_path
if ffprobe_path:
probe_available, probe_version = _ffmpeg_version_line(ffprobe_path)
report["ffprobe_available"] = probe_available
report["ffprobe_version_line"] = probe_version
ok, hwaccel_output = _run_optional(
[path, "-hide_banner", "-hwaccels"],
timeout=10,
max_output_chars=0,
)
if ok:
report["hwaccels"] = _parse_hwaccels(hwaccel_output)
else:
report["errors"].append(f"Failed to list hardware acceleration methods: {hwaccel_output}")
ok, encoders_output = _run_optional(
[path, "-hide_banner", "-encoders"],
timeout=10,
max_output_chars=0,
)
encoders = _parse_ffmpeg_table_names(encoders_output) if ok else set()
report["software_encoder_available"] = "libx264" in encoders or "libopenh264" in encoders
if not ok:
report["errors"].append(f"Failed to list encoders: {encoders_output}")
ok, filters_output = _run_optional(
[path, "-hide_banner", "-filters"],
timeout=10,
max_output_chars=0,
)
filters = _parse_ffmpeg_table_names(filters_output) if ok else set()
if not ok:
report["errors"].append(f"Failed to list filters: {filters_output}")
report["hardware_acceleration"] = _detect_hardware_encoding(path, encoders)
report["subtitle_burn"] = _test_subtitle_burn(path, filters)
return report

View File

@ -0,0 +1,76 @@
import os
import tempfile
import unittest
from pathlib import Path
from app.utils import ffmpeg_detector
class FFmpegDetectorTests(unittest.TestCase):
def _write_fake_binary(self, path: Path, first_line: str) -> None:
path.write_text(
"#!/bin/sh\n"
"if [ \"$1\" = \"-version\" ]; then\n"
f" echo \"{first_line}\"\n"
" exit 0\n"
"fi\n"
"if [ \"$2\" = \"-hwaccels\" ]; then\n"
" echo \"Hardware acceleration methods:\"\n"
" echo \"videotoolbox\"\n"
" exit 0\n"
"fi\n"
"if [ \"$2\" = \"-encoders\" ]; then\n"
" echo \" V....D h264_videotoolbox Apple VideoToolbox H.264\"\n"
" echo \" V....D h264_nvenc NVIDIA NVENC H.264\"\n"
" echo \" V....D h264_qsv Intel QSV H.264\"\n"
" echo \" V....D libx264 libx264 H.264\"\n"
" exit 0\n"
"fi\n"
"if [ \"$2\" = \"-filters\" ]; then\n"
" echo \" ... subtitles V->V Render text subtitles\"\n"
" echo \" ... drawtext V->V Draw text\"\n"
" echo \" ... overlay VV->V Overlay video\"\n"
" exit 0\n"
"fi\n"
"exit 0\n",
encoding="utf-8",
)
path.chmod(0o755)
@unittest.skipIf(os.name == "nt", "shell fake binaries are POSIX-only")
def test_discover_includes_configured_path(self):
with tempfile.TemporaryDirectory() as tmp_dir:
ffmpeg_path = Path(tmp_dir) / "ffmpeg"
ffprobe_path = Path(tmp_dir) / "ffprobe"
self._write_fake_binary(ffmpeg_path, "ffmpeg version fake-1.0")
self._write_fake_binary(ffprobe_path, "ffprobe version fake-1.0")
engines = ffmpeg_detector.discover_ffmpeg_engines(
configured_path=str(ffmpeg_path),
root_dir=tmp_dir,
include_system=False,
)
self.assertEqual(engines[0]["path"], str(ffmpeg_path.resolve()))
self.assertEqual(engines[0]["ffprobe_path"], str(ffprobe_path.resolve()))
self.assertTrue(engines[0]["available"])
@unittest.skipIf(os.name == "nt", "shell fake binaries are POSIX-only")
def test_validate_reports_hardware_and_subtitle_support(self):
with tempfile.TemporaryDirectory() as tmp_dir:
ffmpeg_path = Path(tmp_dir) / "ffmpeg"
ffprobe_path = Path(tmp_dir) / "ffprobe"
self._write_fake_binary(ffmpeg_path, "ffmpeg version fake-1.0")
self._write_fake_binary(ffprobe_path, "ffprobe version fake-1.0")
report = ffmpeg_detector.validate_ffmpeg_engine(str(ffmpeg_path))
self.assertTrue(report["ffmpeg_available"])
self.assertTrue(report["ffprobe_available"])
self.assertTrue(report["hardware_acceleration"]["available"])
self.assertTrue(report["subtitle_burn"]["available"])
self.assertEqual(report["subtitle_burn"]["method"], "subtitles")
if __name__ == "__main__":
unittest.main()

View File

@ -75,6 +75,10 @@
# WebUI 界面是否显示配置项
hide_config = true
# FFmpeg 引擎路径(可选)
# 为空时使用系统 PATH也可以在系统设置中通过下拉框选择整合包或本机 ffmpeg。
ffmpeg_path = ""
# 官方 OpenAI 默认端点(可选):
# text_openai_base_url = "https://api.openai.com/v1"

View File

@ -3,6 +3,8 @@ import os
import shutil
from loguru import logger
from app.config import config
from app.utils import ffmpeg_detector, ffmpeg_utils
from app.utils.utils import storage_dir
@ -27,6 +29,162 @@ def clear_directory(dir_path, tr):
else:
st.warning(tr("Directory does not exist"))
def _format_engine_label(engines_by_path, tr):
def formatter(path):
engine = engines_by_path.get(path, {})
source = engine.get("source", "")
source_key = f"FFmpeg source {source}"
translated_source = tr(source_key)
if translated_source == source_key:
translated_source = source
version = str(engine.get("version_line", "")).replace("ffmpeg version", "").strip()
version = version or "unknown version"
status = _status_text(engine.get("available"), tr)
return f"{translated_source} - {version} - {path} ({status})"
return formatter
def _status_text(value, tr):
return tr("Available") if value else tr("Unavailable")
def _render_ffmpeg_report(report, tr):
st.write(f"**{tr('FFmpeg detection details')}**")
st.caption(f"{tr('Path')}: {report.get('path', '')}")
if report.get("version_line"):
st.caption(f"{tr('Version')}: {report['version_line']}")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("FFmpeg", _status_text(report.get("ffmpeg_available"), tr))
with col2:
st.metric("FFprobe", _status_text(report.get("ffprobe_available"), tr))
with col3:
hwaccel = report.get("hardware_acceleration", {})
st.metric(tr("Hardware Acceleration"), _status_text(hwaccel.get("available"), tr))
with col4:
subtitle_burn = report.get("subtitle_burn", {})
st.metric(tr("Subtitle Burn-in"), _status_text(subtitle_burn.get("available"), tr))
if report.get("ffmpeg_available") and report.get("subtitle_burn", {}).get("available"):
if report.get("hardware_acceleration", {}).get("available"):
st.success(tr("FFmpeg engine passed all checks"))
else:
st.warning(tr("FFmpeg engine works but hardware acceleration is unavailable"))
else:
st.error(tr("FFmpeg engine check failed"))
hwaccel = report.get("hardware_acceleration", {})
subtitle_burn = report.get("subtitle_burn", {})
col1, col2 = st.columns(2)
with col1:
st.write(f"**{tr('Hardware acceleration detail')}**")
st.write(f"- {tr('Type')}: {hwaccel.get('type') or '-'}")
st.write(f"- {tr('Encoder')}: {hwaccel.get('encoder') or '-'}")
st.write(f"- {tr('Message')}: {hwaccel.get('message') or '-'}")
hwaccels = report.get("hwaccels") or []
st.write(f"- {tr('Supported Hardware Methods')}: {', '.join(hwaccels) if hwaccels else '-'}")
with col2:
filters = subtitle_burn.get("filters") or {}
st.write(f"**{tr('Subtitle burn-in detail')}**")
st.write(f"- {tr('Method')}: {subtitle_burn.get('method') or '-'}")
st.write(f"- {tr('Message')}: {subtitle_burn.get('message') or '-'}")
st.write(
"- "
+ tr("Subtitle Filters")
+ ": "
+ ", ".join(
f"{name}={_status_text(enabled, tr)}"
for name, enabled in filters.items()
)
)
errors = report.get("errors") or []
if errors:
with st.expander(tr("FFmpeg errors")):
for error in errors:
st.write(f"- {error}")
with st.expander(tr("Raw FFmpeg report")):
st.json(report)
def render_ffmpeg_engine_settings(tr):
"""Render FFmpeg engine discovery, selection and diagnostics."""
st.divider()
st.subheader(tr("FFmpeg Engine Detection"))
engines = ffmpeg_detector.discover_ffmpeg_engines(
configured_path=config.app.get("ffmpeg_path", ""),
root_dir=config.root_dir,
)
engines_by_path = {engine["path"]: engine for engine in engines}
engine_paths = list(engines_by_path.keys())
if not engine_paths:
st.warning(tr("No FFmpeg engines found"))
current_path = config.app.get("ffmpeg_path", "")
selected_index = 0
if current_path in engines_by_path:
selected_index = engine_paths.index(current_path)
selected_path = ""
if engine_paths:
selected_path = st.selectbox(
tr("FFmpeg Engine"),
options=engine_paths,
index=selected_index,
format_func=_format_engine_label(engines_by_path, tr),
help=tr("FFmpeg Engine Help"),
)
custom_path = st.text_input(
tr("Custom FFmpeg Path"),
value="",
help=tr("Custom FFmpeg Path Help"),
placeholder="/path/to/ffmpeg",
).strip()
effective_path = custom_path or selected_path
active_path = config.app.get("ffmpeg_path", "")
if active_path:
st.caption(f"{tr('Current FFmpeg Engine')}: {active_path}")
col1, col2 = st.columns(2)
with col1:
if st.button(tr("Save FFmpeg Engine"), use_container_width=True, disabled=not effective_path):
try:
if not os.path.isfile(effective_path):
st.error(tr("Selected FFmpeg path is invalid"))
else:
config.app["ffmpeg_path"] = effective_path
config.ffmpeg_path = effective_path
config.apply_ffmpeg_path(effective_path)
config.save_config()
ffmpeg_utils.reset_hwaccel_detection()
st.success(tr("FFmpeg engine saved"))
except Exception as e:
st.error(f"{tr('Failed to save config')}: {str(e)}")
logger.error(f"保存 FFmpeg 引擎失败: {e}")
with col2:
if st.button(tr("Test Selected FFmpeg"), use_container_width=True, disabled=not effective_path):
with st.spinner(tr("Testing FFmpeg engine")):
try:
st.session_state["ffmpeg_engine_report"] = ffmpeg_detector.validate_ffmpeg_engine(effective_path)
except Exception as e:
st.error(f"{tr('FFmpeg engine check failed')}: {str(e)}")
logger.error(f"FFmpeg 引擎检测失败: {e}")
report = st.session_state.get("ffmpeg_engine_report")
if report:
_render_ffmpeg_report(report, tr)
def render_system_panel(tr):
"""渲染系统设置面板"""
with st.expander(tr("System settings"), expanded=False):
@ -43,3 +201,5 @@ def render_system_panel(tr):
with col3:
if st.button(tr("Clear tasks"), use_container_width=True):
clear_directory(os.path.join(storage_dir(), "tasks"), tr)
render_ffmpeg_engine_settings(tr)

View File

@ -235,6 +235,48 @@
"Directory cleared": "Directory cleared",
"Directory does not exist": "Directory does not exist",
"Failed to clear directory": "Failed to clear directory",
"FFmpeg Engine Detection": "FFmpeg Engine Detection",
"FFmpeg Engine": "FFmpeg Engine",
"FFmpeg Engine Help": "Choose the ffmpeg executable this app should prefer; the packaged runtime and local PATH are discovered automatically",
"No FFmpeg engines found": "No FFmpeg engines found",
"Custom FFmpeg Path": "Custom FFmpeg Path",
"Custom FFmpeg Path Help": "Paste an absolute path to an ffmpeg executable if the target engine is not listed",
"Current FFmpeg Engine": "Current FFmpeg Engine",
"Save FFmpeg Engine": "Save Engine",
"Test Selected FFmpeg": "Test Selected FFmpeg",
"Testing FFmpeg engine": "Testing FFmpeg engine...",
"FFmpeg engine saved": "FFmpeg engine saved",
"Selected FFmpeg path is invalid": "Selected FFmpeg path is invalid",
"FFmpeg detection details": "FFmpeg detection details",
"FFmpeg source Configured": "Configured",
"FFmpeg source NarratoAI packaged runtime": "NarratoAI packaged runtime",
"FFmpeg source Integrated runtime": "Integrated runtime",
"FFmpeg source System PATH": "System PATH",
"FFmpeg source Homebrew": "Homebrew",
"FFmpeg source Python environment": "Python environment",
"FFmpeg source Python executable folder": "Python executable folder",
"FFmpeg source IMAGEIO_FFMPEG_EXE": "IMAGEIO_FFMPEG_EXE",
"FFmpeg source imageio-ffmpeg": "imageio-ffmpeg",
"FFmpeg source System": "System",
"Version": "Version",
"Path": "Path",
"Available": "Available",
"Unavailable": "Unavailable",
"Hardware Acceleration": "Hardware Acceleration",
"Subtitle Burn-in": "Subtitle Burn-in",
"FFmpeg engine passed all checks": "FFmpeg engine passed all checks: basic execution, hardware acceleration and subtitle burn-in are available",
"FFmpeg engine works but hardware acceleration is unavailable": "FFmpeg and subtitle burn-in work, but hardware acceleration is unavailable; software encoding will be used",
"FFmpeg engine check failed": "FFmpeg engine check failed",
"Hardware acceleration detail": "Hardware acceleration detail",
"Subtitle burn-in detail": "Subtitle burn-in detail",
"Type": "Type",
"Encoder": "Encoder",
"Message": "Message",
"Method": "Method",
"Supported Hardware Methods": "Supported hardware methods",
"Subtitle Filters": "Subtitle filters",
"FFmpeg errors": "FFmpeg errors",
"Raw FFmpeg report": "Raw FFmpeg report",
"Subtitle Preview": "Subtitle Preview",
"One-Click Transcribe": "One-Click Transcribe",
"Transcribing...": "Transcribing...",

View File

@ -223,6 +223,48 @@
"Directory cleared": "目录清理完成",
"Directory does not exist": "目录不存在",
"Failed to clear directory": "清理目录失败",
"FFmpeg Engine Detection": "FFmpeg 引擎检测",
"FFmpeg Engine": "FFmpeg 引擎",
"FFmpeg Engine Help": "选择当前应用优先使用的 ffmpeg 可执行文件;会自动发现整合包运行时和本机 PATH 中的 ffmpeg",
"No FFmpeg engines found": "未发现可用 FFmpeg 引擎",
"Custom FFmpeg Path": "自定义 FFmpeg 路径",
"Custom FFmpeg Path Help": "如果下拉框没有列出目标引擎,可以粘贴 ffmpeg 可执行文件的绝对路径",
"Current FFmpeg Engine": "当前生效引擎",
"Save FFmpeg Engine": "保存引擎",
"Test Selected FFmpeg": "检测所选 FFmpeg",
"Testing FFmpeg engine": "正在检测 FFmpeg 引擎...",
"FFmpeg engine saved": "FFmpeg 引擎已保存",
"Selected FFmpeg path is invalid": "所选 FFmpeg 路径无效",
"FFmpeg detection details": "FFmpeg 检测详情",
"FFmpeg source Configured": "已配置",
"FFmpeg source NarratoAI packaged runtime": "NarratoAI 整合包运行时",
"FFmpeg source Integrated runtime": "内置运行时",
"FFmpeg source System PATH": "系统 PATH",
"FFmpeg source Homebrew": "Homebrew",
"FFmpeg source Python environment": "Python 环境",
"FFmpeg source Python executable folder": "Python 可执行目录",
"FFmpeg source IMAGEIO_FFMPEG_EXE": "IMAGEIO_FFMPEG_EXE",
"FFmpeg source imageio-ffmpeg": "imageio-ffmpeg",
"FFmpeg source System": "系统路径",
"Version": "版本",
"Path": "路径",
"Available": "可用",
"Unavailable": "不可用",
"Hardware Acceleration": "硬件加速",
"Subtitle Burn-in": "字幕烧录",
"FFmpeg engine passed all checks": "FFmpeg 引擎检测通过:基础功能、硬件加速和字幕烧录均可用",
"FFmpeg engine works but hardware acceleration is unavailable": "FFmpeg 基础功能和字幕烧录可用,但硬件加速不可用,将使用软件编码",
"FFmpeg engine check failed": "FFmpeg 引擎检测失败",
"Hardware acceleration detail": "硬件加速详情",
"Subtitle burn-in detail": "字幕烧录详情",
"Type": "类型",
"Encoder": "编码器",
"Message": "信息",
"Method": "方式",
"Supported Hardware Methods": "支持的硬件加速方法",
"Subtitle Filters": "字幕滤镜",
"FFmpeg errors": "FFmpeg 错误",
"Raw FFmpeg report": "原始 FFmpeg 报告",
"Subtitle Preview": "字幕预览",
"One-Click Transcribe": "一键转录",
"Transcribing...": "正在转录中...",