NarratoAI/app/services/generate_video.py
viccy 99fcd45704 feat(subtitle, ui): 新增字幕安全区预览,优化字体与字幕配置
- 新增竖屏/横屏字幕安全区预览背景图,支持切换预览比例
- 将项目版本从0.8.1升级至0.8.2
- 扩展字体搜索候选列表,新增SourceHanSerifSC-SemiBold.otf和LXGWWenKaiScreen.ttf两款字体
- 修改默认字幕字体为SourceHanSansCN-Regular.otf,替换原Microsoft YaHei默认值
- 新增内置字体检测逻辑,检测到resource/fonts目录有有效字体时跳过下载
- 更新中英文多语言文案,优化字幕位置提示文本
- 重构字幕设置面板,合并位置控制到预览区域并精简标签页
- 调整字体大小滑块范围从20-100扩展至20-160,新增数值边界校验
2026-06-10 12:05:05 +08:00

1857 lines
65 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : generate_video
@Author : Viccy同学
@Date : 2025/5/7 上午11:55
'''
import os
import json
import re
import subprocess
import time
import traceback
import tempfile
from typing import Optional, Dict, Any, Callable
from loguru import logger
import numpy as np
from moviepy import (
VideoFileClip,
AudioFileClip,
CompositeAudioClip,
CompositeVideoClip,
TextClip,
afx
)
from moviepy.video.tools.subtitles import SubtitlesClip
from PIL import ImageFont, Image, ImageDraw, ImageEnhance, ImageFilter
from app.utils import utils
from app.models.schema import AudioVolumeDefaults
from app.services.audio_normalizer import AudioNormalizer, normalize_audio_for_mixing
SUBTITLE_MASK_DEFAULTS = {
"landscape": {
"x_percent": 10.0,
"y_percent": 78.0,
"width_percent": 80.0,
"height_percent": 14.0,
"blur_radius": 18,
"opacity_percent": 82,
},
"portrait": {
"x_percent": 8.0,
"y_percent": 79.0,
"width_percent": 84.0,
"height_percent": 16.0,
"blur_radius": 26,
"opacity_percent": 84,
},
}
_FFMPEG_FILTER_CACHE: Dict[tuple[str, str], bool] = {}
_FFMPEG_ENCODER_CACHE: Dict[tuple[str, str], bool] = {}
def _clamp(value, minimum, maximum):
return min(max(value, minimum), maximum)
def _get_numeric_option(options, key, default, integer=False):
try:
value = float(options.get(key, default))
except (TypeError, ValueError):
value = float(default)
return int(round(value)) if integer else value
def _get_subtitle_mask_region_options(options, orientation):
defaults = SUBTITLE_MASK_DEFAULTS[orientation]
prefix = f"subtitle_mask_{orientation}_"
x_percent = _clamp(_get_numeric_option(options, f"{prefix}x_percent", defaults["x_percent"]), 0, 99)
y_percent = _clamp(_get_numeric_option(options, f"{prefix}y_percent", defaults["y_percent"]), 0, 99)
width_percent = _clamp(
_get_numeric_option(options, f"{prefix}width_percent", defaults["width_percent"]),
2,
100 - x_percent,
)
height_percent = _clamp(
_get_numeric_option(options, f"{prefix}height_percent", defaults["height_percent"]),
2,
100 - y_percent,
)
blur_radius = _clamp(
_get_numeric_option(options, f"{prefix}blur_radius", defaults["blur_radius"], integer=True),
0,
200,
)
opacity_percent = _clamp(
_get_numeric_option(options, f"{prefix}opacity_percent", defaults["opacity_percent"], integer=True),
0,
100,
)
return {
"x_percent": x_percent,
"y_percent": y_percent,
"width_percent": width_percent,
"height_percent": height_percent,
"blur_radius": blur_radius,
"opacity_percent": opacity_percent,
}
def _resolve_subtitle_mask_region(video_width, video_height, options):
orientation = "portrait" if video_height > video_width else "landscape"
region = _get_subtitle_mask_region_options(options, orientation)
x = _clamp(round(video_width * region["x_percent"] / 100), 0, max(0, video_width - 2))
y = _clamp(round(video_height * region["y_percent"] / 100), 0, max(0, video_height - 2))
width = _clamp(round(video_width * region["width_percent"] / 100), 2, max(2, video_width - x))
height = _clamp(round(video_height * region["height_percent"] / 100), 2, max(2, video_height - y))
base_height = 1920 if orientation == "portrait" else 1080
blur_radius = (
0
if region["blur_radius"] == 0
else max(1, round(region["blur_radius"] * (video_height / base_height)))
)
corner_radius = max(8, round(min(height * 0.32, blur_radius * 1.4 or height * 0.24)))
feather = max(6, round(max(blur_radius * 0.85, 8)))
padding = blur_radius
padded_x = max(0, x - padding)
padded_y = max(0, y - padding)
padded_width = _clamp(width + padding * 2, 2, video_width - padded_x)
padded_height = _clamp(height + padding * 2, 2, video_height - padded_y)
return {
"orientation": orientation,
"x": int(x),
"y": int(y),
"width": int(width),
"height": int(height),
"blur_radius": int(blur_radius),
"opacity": _clamp(region["opacity_percent"] / 100, 0, 1),
"corner_radius": int(corner_radius),
"feather": int(feather),
"padded_x": int(padded_x),
"padded_y": int(padded_y),
"padded_width": int(padded_width),
"padded_height": int(padded_height),
}
def _build_subtitle_mask_alpha(region):
alpha = Image.new("L", (region["padded_width"], region["padded_height"]), 0)
draw = ImageDraw.Draw(alpha)
left = region["x"] - region["padded_x"]
top = region["y"] - region["padded_y"]
right = left + region["width"]
bottom = top + region["height"]
draw.rounded_rectangle(
(left, top, right, bottom),
radius=region["corner_radius"],
fill=255,
)
if region["feather"] > 0:
alpha = alpha.filter(ImageFilter.GaussianBlur(radius=max(1, region["feather"] / 2)))
return alpha
def apply_subtitle_mask(video_clip, options):
"""Apply a Speclip-style blurred subtitle mask before subtitle burn-in."""
if not options.get("subtitle_mask_enabled", False):
return video_clip
video_width, video_height = video_clip.size
region = _resolve_subtitle_mask_region(video_width, video_height, options)
logger.info(
"字幕遮罩已启用: "
f"{region['orientation']} x={region['x']} y={region['y']} "
f"w={region['width']} h={region['height']} blur={region['blur_radius']}"
)
alpha = _build_subtitle_mask_alpha(region)
tint_alpha = _clamp(round((0.05 + region["opacity"] * 0.07) * 100) / 100, 0.05, 0.14)
blur_sigma = (
max(4, round(region["blur_radius"] * (0.9 + region["opacity"] * 0.35)))
if region["blur_radius"] > 0
else 0
)
brightness = 1.0 + 0.03 + region["opacity"] * 0.04
contrast = 0.975 - region["opacity"] * 0.035
saturation = 1.0 + region["opacity"] * 0.03
obliterate_width = max(24, round(region["padded_width"] * 0.12))
obliterate_height = max(12, round(region["padded_height"] * 0.18))
def mask_frame(get_frame, t):
frame = np.asarray(get_frame(t))
if frame.dtype != np.uint8:
frame = np.clip(frame, 0, 255).astype(np.uint8)
image = Image.fromarray(frame).convert("RGB")
crop_box = (
region["padded_x"],
region["padded_y"],
region["padded_x"] + region["padded_width"],
region["padded_y"] + region["padded_height"],
)
mask_image = image.crop(crop_box)
mask_image = mask_image.resize(
(obliterate_width, obliterate_height),
Image.Resampling.BICUBIC,
).resize(
(region["padded_width"], region["padded_height"]),
Image.Resampling.LANCZOS,
)
if blur_sigma > 0:
mask_image = mask_image.filter(ImageFilter.GaussianBlur(radius=blur_sigma))
mask_image = mask_image.filter(ImageFilter.BoxBlur(4))
mask_image = ImageEnhance.Brightness(mask_image).enhance(brightness)
mask_image = ImageEnhance.Contrast(mask_image).enhance(contrast)
mask_image = ImageEnhance.Color(mask_image).enhance(saturation)
blurred = mask_image.convert("RGBA")
blurred.putalpha(alpha)
tint = Image.new("RGBA", blurred.size, (255, 255, 255, 0))
tint_alpha_mask = alpha.point(lambda value: int(value * tint_alpha))
tint.putalpha(tint_alpha_mask)
masked_region = Image.alpha_composite(blurred, tint)
output = image.convert("RGBA")
output.alpha_composite(masked_region, dest=(region["padded_x"], region["padded_y"]))
return np.asarray(output.convert("RGB"))
return video_clip.transform(mask_frame)
def _resolve_orientation_subtitle_y_percent(video_width, video_height, options):
orientation = "portrait" if video_height > video_width else "landscape"
key = f"subtitle_position_{orientation}_y_percent"
if key not in options:
return None
return _clamp(_get_numeric_option(options, key, 85 if orientation == "landscape" else 82), 0, 99)
def is_valid_subtitle_file(subtitle_path: str) -> bool:
"""
检查字幕文件是否有效
参数:
subtitle_path: 字幕文件路径
返回:
bool: 如果字幕文件存在且包含有效内容则返回True否则返回False
"""
if not subtitle_path or not os.path.exists(subtitle_path):
return False
try:
with open(subtitle_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
# 检查文件是否为空
if not content:
return False
# 检查是否包含时间戳格式SRT格式的基本特征
# SRT格式应该包含类似 "00:00:00,000 --> 00:00:00,000" 的时间戳
import re
time_pattern = r'\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}'
if not re.search(time_pattern, content):
return False
return True
except Exception as e:
logger.warning(f"检查字幕文件时出错: {str(e)}")
return False
def _has_existing_file(file_path: Optional[str]) -> bool:
return bool(file_path and os.path.exists(file_path))
def _get_ffmpeg_binary() -> str:
for env_name in ("NARRATO_FFMPEG_EXE", "IMAGEIO_FFMPEG_EXE"):
candidate = os.environ.get(env_name, "").strip()
if candidate and os.path.isfile(candidate):
return candidate
try:
import imageio_ffmpeg
candidate = imageio_ffmpeg.get_ffmpeg_exe()
if candidate and os.path.isfile(candidate):
return candidate
except Exception as e:
logger.debug(f"未找到 imageio-ffmpeg 二进制: {e}")
return "ffmpeg"
def _get_ffprobe_binary(ffmpeg_binary: Optional[str] = None) -> str:
for env_name in ("NARRATO_FFPROBE_EXE", "IMAGEIO_FFPROBE_EXE"):
candidate = os.environ.get(env_name, "").strip()
if candidate and os.path.isfile(candidate):
return candidate
if ffmpeg_binary:
sibling = os.path.join(os.path.dirname(ffmpeg_binary), "ffprobe")
if os.path.isfile(sibling):
return sibling
return "ffprobe"
def _check_ffmpeg_binary(ffmpeg_binary: str) -> bool:
try:
subprocess.run(
[ffmpeg_binary, "-version"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
return True
except (subprocess.SubprocessError, FileNotFoundError) as e:
logger.error(f"ffmpeg 不可用: {ffmpeg_binary}, {e}")
return False
def _format_ffmpeg_float(value: float) -> str:
return f"{float(value):.3f}".rstrip("0").rstrip(".")
def _format_duration(seconds: float) -> str:
seconds = max(0, float(seconds or 0))
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def _quote_filter_value(value: str) -> str:
escaped = str(value).replace("\\", "\\\\").replace("'", "\\'")
return f"'{escaped}'"
def _probe_video(video_path: str) -> Dict[str, Any]:
ffmpeg_binary = _get_ffmpeg_binary()
ffprobe_binary = _get_ffprobe_binary(ffmpeg_binary)
cmd = [
ffprobe_binary,
"-v",
"error",
"-print_format",
"json",
"-show_streams",
"-show_format",
video_path,
]
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
if result.returncode != 0:
raise RuntimeError(f"ffprobe 读取视频失败: {result.stderr.strip()}")
data = json.loads(result.stdout or "{}")
streams = data.get("streams", [])
video_stream = next((stream for stream in streams if stream.get("codec_type") == "video"), None)
if not video_stream:
raise RuntimeError("ffprobe 未找到视频流")
duration = (
video_stream.get("duration")
or data.get("format", {}).get("duration")
or 0
)
duration = float(duration)
if duration <= 0:
raise RuntimeError("ffprobe 未获取到有效视频时长")
return {
"width": int(video_stream["width"]),
"height": int(video_stream["height"]),
"duration": duration,
"has_audio": any(stream.get("codec_type") == "audio" for stream in streams),
}
def _ffmpeg_filter_available(filter_name: str) -> bool:
ffmpeg_binary = _get_ffmpeg_binary()
cache_key = (ffmpeg_binary, filter_name)
if cache_key in _FFMPEG_FILTER_CACHE:
return _FFMPEG_FILTER_CACHE[cache_key]
try:
result = subprocess.run(
[ffmpeg_binary, "-hide_banner", "-filters"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
available = False
if result.returncode == 0:
for line in result.stdout.splitlines():
parts = line.split()
if len(parts) >= 2 and parts[1] == filter_name:
available = True
break
_FFMPEG_FILTER_CACHE[cache_key] = available
return available
except Exception:
_FFMPEG_FILTER_CACHE[cache_key] = False
return False
def _ffmpeg_encoder_available(encoder_name: str) -> bool:
ffmpeg_binary = _get_ffmpeg_binary()
cache_key = (ffmpeg_binary, encoder_name)
if cache_key in _FFMPEG_ENCODER_CACHE:
return _FFMPEG_ENCODER_CACHE[cache_key]
try:
result = subprocess.run(
[ffmpeg_binary, "-hide_banner", "-encoders"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
available = result.returncode == 0 and encoder_name in result.stdout
_FFMPEG_ENCODER_CACHE[cache_key] = available
return available
except Exception:
_FFMPEG_ENCODER_CACHE[cache_key] = False
return False
def _select_compatible_encoder(preferred_encoder: str) -> str:
if _ffmpeg_encoder_available(preferred_encoder):
return preferred_encoder
logger.warning(f"当前 ffmpeg 二进制不支持编码器 {preferred_encoder},回退 libx264")
return "libx264"
def _parse_ffmpeg_progress_time(progress: Dict[str, str]) -> float:
for key in ("out_time_us", "out_time_ms"):
value = progress.get(key)
if value:
try:
return max(0.0, int(value) / 1_000_000)
except ValueError:
pass
value = progress.get("out_time")
if value:
match = re.match(
r"(?P<hours>\d+):(?P<minutes>\d{2}):(?P<seconds>\d{2})(?:\.(?P<fraction>\d+))?",
value,
)
if match:
fraction = match.group("fraction") or "0"
return (
int(match.group("hours")) * 3600
+ int(match.group("minutes")) * 60
+ int(match.group("seconds"))
+ float(f"0.{fraction}")
)
return 0.0
def _emit_ffmpeg_progress(
progress_callback: Optional[Callable[[float], None]],
percent: float,
) -> None:
if not progress_callback:
return
try:
progress_callback(max(0.0, min(100.0, float(percent))))
except Exception as e:
logger.debug(f"ffmpeg 进度回调失败: {e}")
def _run_ffmpeg_with_progress(
cmd: list[str],
duration: float,
progress_callback: Optional[Callable[[float], None]] = None,
) -> tuple[int, str]:
progress_keys = {
"frame",
"fps",
"stream_0_0_q",
"bitrate",
"total_size",
"out_time_us",
"out_time_ms",
"out_time",
"dup_frames",
"drop_frames",
"speed",
"progress",
}
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
progress: Dict[str, str] = {}
output_tail: list[str] = []
last_log_time = 0.0
last_logged_percent = -1.0
_emit_ffmpeg_progress(progress_callback, 0)
assert process.stdout is not None
for raw_line in process.stdout:
line = raw_line.strip()
if not line:
continue
if "=" not in line:
output_tail.append(line)
output_tail = output_tail[-80:]
continue
key, value = line.split("=", 1)
if key not in progress_keys:
output_tail.append(line)
output_tail = output_tail[-80:]
continue
progress[key] = value
if key != "progress":
continue
current = _parse_ffmpeg_progress_time(progress)
if value == "end":
current = duration
percent = min(100.0, (current / duration) * 100) if duration > 0 else 0.0
now = time.monotonic()
should_log = (
value == "end"
or now - last_log_time >= 5
or percent - last_logged_percent >= 5
)
if should_log:
speed = progress.get("speed", "N/A")
logger.info(
"ffmpeg 合并进度: "
f"{percent:.1f}% "
f"({_format_duration(current)}/{_format_duration(duration)}), "
f"speed={speed}"
)
_emit_ffmpeg_progress(progress_callback, percent)
last_log_time = now
last_logged_percent = percent
progress = {}
return_code = process.wait()
if return_code == 0:
_emit_ffmpeg_progress(progress_callback, 100)
return return_code, "\n".join(output_tail[-80:])
def _srt_timestamp_to_seconds(timestamp: str) -> float:
match = re.match(
r"(?P<hours>\d{2}):(?P<minutes>\d{2}):(?P<seconds>\d{2}),(?P<millis>\d{3})",
timestamp.strip(),
)
if not match:
raise ValueError(f"无效 SRT 时间戳: {timestamp}")
parts = {key: int(value) for key, value in match.groupdict().items()}
return (
parts["hours"] * 3600
+ parts["minutes"] * 60
+ parts["seconds"]
+ parts["millis"] / 1000
)
def _parse_srt_subtitles(subtitle_path: str) -> list[tuple[float, float, str]]:
with open(subtitle_path, "r", encoding="utf-8-sig") as file:
content = file.read().strip()
if not content:
return []
subtitles = []
blocks = re.split(r"\n\s*\n", content)
time_pattern = re.compile(
r"(?P<start>\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*"
r"(?P<end>\d{2}:\d{2}:\d{2},\d{3})"
)
for block in blocks:
lines = [line.strip("\ufeff") for line in block.splitlines() if line.strip()]
if not lines:
continue
time_index = next(
(index for index, line in enumerate(lines) if time_pattern.search(line)),
None,
)
if time_index is None:
continue
match = time_pattern.search(lines[time_index])
if not match:
continue
text = "\n".join(lines[time_index + 1:]).strip()
if not text:
continue
subtitles.append(
(
_srt_timestamp_to_seconds(match.group("start")),
_srt_timestamp_to_seconds(match.group("end")),
text,
)
)
return subtitles
def _normalize_hex_color(color: Optional[str], default: str) -> str:
color_names = {
"white": "#FFFFFF",
"black": "#000000",
"red": "#FF0000",
"green": "#008000",
"blue": "#0000FF",
"yellow": "#FFFF00",
"cyan": "#00FFFF",
"magenta": "#FF00FF",
}
value = (color or default or "").strip()
value = color_names.get(value.lower(), value)
if not value.startswith("#"):
return default
value = value[1:]
if len(value) == 3:
value = "".join(char * 2 for char in value)
if len(value) != 6:
return default
try:
int(value, 16)
except ValueError:
return default
return f"#{value.upper()}"
def _css_color_to_ass(color: Optional[str], default: str) -> str:
hex_color = _normalize_hex_color(color, default)[1:]
red = int(hex_color[0:2], 16)
green = int(hex_color[2:4], 16)
blue = int(hex_color[4:6], 16)
return f"&H00{blue:02X}{green:02X}{red:02X}"
def _resolve_font_path(subtitle_font: str) -> Optional[str]:
if subtitle_font and os.path.isabs(subtitle_font) and os.path.exists(subtitle_font):
return subtitle_font
if subtitle_font:
font_path = os.path.join(utils.font_dir(), subtitle_font)
if os.path.exists(font_path):
return font_path
for candidate in [
os.path.join(utils.font_dir(), "SourceHanSansCN-Regular.otf"),
os.path.join(utils.font_dir(), "SourceHanSerifSC-SemiBold.otf"),
os.path.join(utils.font_dir(), "LXGWWenKaiScreen.ttf"),
os.path.join(utils.font_dir(), "SimHei.ttf"),
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"/System/Library/Fonts/STHeiti Medium.ttc",
"/System/Library/Fonts/Hiragino Sans GB.ttc",
]:
if os.path.exists(candidate):
return candidate
return None
def _resolve_font_family(font_path: Optional[str], subtitle_font: str) -> str:
if font_path:
try:
return ImageFont.truetype(font_path, 12).getname()[0]
except Exception:
pass
if subtitle_font:
return os.path.splitext(os.path.basename(subtitle_font))[0]
return "Arial"
def _estimate_subtitle_margin(
video_height: int,
font_size: int,
subtitle_position: str,
custom_position: float,
orientation_subtitle_y_percent: Optional[float],
) -> tuple[int, int]:
if subtitle_position == "top":
return 8, max(10, round(video_height * 0.05))
if subtitle_position == "center":
return 5, 10
y_percent = orientation_subtitle_y_percent
if y_percent is None and subtitle_position == "custom":
y_percent = custom_position
if y_percent is not None:
estimated_text_height = max(24, round(font_size * 1.35))
y = (video_height - estimated_text_height) * (y_percent / 100)
margin = video_height - y - estimated_text_height
return 2, max(10, round(margin))
return 2, max(10, round(video_height * 0.05))
def _build_subtitle_filter(
subtitle_path: str,
font_path: Optional[str],
subtitle_font: str,
subtitle_font_size: int,
subtitle_color: str,
stroke_color: str,
stroke_width: float,
video_width: int,
video_height: int,
subtitle_position: str,
custom_position: float,
orientation_subtitle_y_percent: Optional[float],
) -> str:
font_family = _resolve_font_family(font_path, subtitle_font)
alignment, margin_v = _estimate_subtitle_margin(
video_height=video_height,
font_size=subtitle_font_size,
subtitle_position=subtitle_position,
custom_position=custom_position,
orientation_subtitle_y_percent=orientation_subtitle_y_percent,
)
force_style = ",".join(
[
f"Fontname={font_family}",
f"Fontsize={subtitle_font_size}",
f"PrimaryColour={_css_color_to_ass(subtitle_color, '#FFFFFF')}",
f"OutlineColour={_css_color_to_ass(stroke_color, '#000000')}",
"BorderStyle=1",
f"Outline={stroke_width}",
"Shadow=0",
f"Alignment={alignment}",
f"MarginV={margin_v}",
]
)
args = [f"filename={_quote_filter_value(subtitle_path)}"]
args.append(f"original_size={video_width}x{video_height}")
if font_path:
args.append(f"fontsdir={_quote_filter_value(os.path.dirname(font_path))}")
args.append(f"force_style={_quote_filter_value(force_style)}")
return f"subtitles={':'.join(args)}"
def _css_color_to_drawtext(color: Optional[str], default: str) -> str:
return f"0x{_normalize_hex_color(color, default)[1:]}"
def _escape_drawtext_text(text: str) -> str:
return (
text.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("\r\n", "\n")
.replace("\r", "\n")
.replace("\n", "\\n")
)
def _resolve_drawtext_y_expression(
subtitle_position: str,
custom_position: float,
orientation_subtitle_y_percent: Optional[float],
) -> str:
if subtitle_position == "top":
return "h*0.05"
if subtitle_position == "center":
return "(h-text_h)/2"
y_percent = orientation_subtitle_y_percent
if y_percent is None and subtitle_position == "custom":
y_percent = custom_position
if y_percent is not None:
return f"(h-text_h)*{_format_ffmpeg_float(y_percent / 100)}"
return "h*0.95-text_h"
def _build_drawtext_filters(
subtitle_path: str,
font_path: Optional[str],
subtitle_font_size: int,
subtitle_color: str,
stroke_color: str,
stroke_width: float,
subtitle_position: str,
custom_position: float,
orientation_subtitle_y_percent: Optional[float],
video_width: int,
) -> list[str]:
subtitles = _parse_srt_subtitles(subtitle_path)
if not subtitles:
raise RuntimeError("SRT 字幕解析结果为空,无法使用 drawtext 快路径")
y_expr = _resolve_drawtext_y_expression(
subtitle_position=subtitle_position,
custom_position=custom_position,
orientation_subtitle_y_percent=orientation_subtitle_y_percent,
)
max_width = video_width * 0.9
drawtext_filters = []
for start, end, text in subtitles:
wrapped_text = text
if font_path:
wrapped_text, _ = wrap_text(
text,
max_width=max_width,
font=font_path,
fontsize=subtitle_font_size,
)
args = []
if font_path:
args.append(f"fontfile={_quote_filter_value(font_path)}")
args.extend(
[
f"text={_quote_filter_value(_escape_drawtext_text(wrapped_text))}",
f"fontcolor={_css_color_to_drawtext(subtitle_color, '#FFFFFF')}",
f"fontsize={subtitle_font_size}",
f"borderw={stroke_width}",
f"bordercolor={_css_color_to_drawtext(stroke_color, '#000000')}",
"x=(w-text_w)/2",
f"y={y_expr}",
(
"enable="
f"{_quote_filter_value(f'between(t,{_format_ffmpeg_float(start)},{_format_ffmpeg_float(end)})')}"
),
]
)
drawtext_filters.append(f"drawtext={':'.join(args)}")
return drawtext_filters
def _hex_to_rgba(color: Optional[str], default: str, alpha: int = 255) -> tuple[int, int, int, int]:
hex_color = _normalize_hex_color(color, default)[1:]
return (
int(hex_color[0:2], 16),
int(hex_color[2:4], 16),
int(hex_color[4:6], 16),
alpha,
)
def _create_subtitle_png_file(
text: str,
font_path: Optional[str],
subtitle_font_size: int,
subtitle_color: str,
stroke_color: str,
stroke_width: float,
video_width: int,
output_dir: str,
) -> str:
font = ImageFont.truetype(font_path, subtitle_font_size) if font_path else ImageFont.load_default()
wrapped_text, _ = wrap_text(
text,
max_width=video_width * 0.9,
font=font_path or "Arial",
fontsize=subtitle_font_size,
)
stroke_width_px = max(0, int(round(float(stroke_width))))
padding = max(8, stroke_width_px * 3 + 6)
probe = Image.new("RGBA", (1, 1), (0, 0, 0, 0))
draw = ImageDraw.Draw(probe)
bbox = draw.multiline_textbbox(
(0, 0),
wrapped_text,
font=font,
spacing=4,
stroke_width=stroke_width_px,
align="center",
)
text_width = max(1, bbox[2] - bbox[0])
text_height = max(1, bbox[3] - bbox[1])
image = Image.new(
"RGBA",
(text_width + padding * 2, text_height + padding * 2),
(0, 0, 0, 0),
)
draw = ImageDraw.Draw(image)
draw.multiline_text(
(image.width / 2, padding - bbox[1]),
wrapped_text,
font=font,
fill=_hex_to_rgba(subtitle_color, "#FFFFFF"),
anchor="ma",
spacing=4,
align="center",
stroke_width=stroke_width_px,
stroke_fill=_hex_to_rgba(stroke_color, "#000000"),
)
temp_file = tempfile.NamedTemporaryFile(
suffix=".png",
prefix="subtitle_text_",
dir=output_dir,
delete=False,
)
temp_file.close()
image.save(temp_file.name)
return temp_file.name
def _resolve_overlay_y_expression(
subtitle_position: str,
custom_position: float,
orientation_subtitle_y_percent: Optional[float],
) -> str:
if subtitle_position == "top":
return "main_h*0.05"
if subtitle_position == "center":
return "(main_h-overlay_h)/2"
y_percent = orientation_subtitle_y_percent
if y_percent is None and subtitle_position == "custom":
y_percent = custom_position
if y_percent is not None:
return f"(main_h-overlay_h)*{_format_ffmpeg_float(y_percent / 100)}"
return "main_h*0.95-overlay_h"
def _create_subtitle_mask_alpha_file(region: Dict[str, Any], output_dir: str) -> str:
alpha = _build_subtitle_mask_alpha(region)
temp_file = tempfile.NamedTemporaryFile(
suffix=".png",
prefix="subtitle_mask_",
dir=output_dir,
delete=False,
)
temp_file.close()
alpha.save(temp_file.name)
return temp_file.name
def _build_mask_filter(
input_label: str,
mask_input_index: int,
region: Dict[str, Any],
output_label: str,
) -> list[str]:
blur_sigma = (
max(4, round(region["blur_radius"] * (0.9 + region["opacity"] * 0.35)))
if region["blur_radius"] > 0
else 0
)
brightness = 1.0 + 0.03 + region["opacity"] * 0.04
contrast = 0.975 - region["opacity"] * 0.035
saturation = 1.0 + region["opacity"] * 0.03
obliterate_width = max(24, round(region["padded_width"] * 0.12))
obliterate_height = max(12, round(region["padded_height"] * 0.18))
blur_chain = (
f"[masksrc]crop={region['padded_width']}:{region['padded_height']}:"
f"{region['padded_x']}:{region['padded_y']},"
f"scale={obliterate_width}:{obliterate_height}:flags=bicubic,"
f"scale={region['padded_width']}:{region['padded_height']}:flags=lanczos"
)
if blur_sigma > 0:
blur_chain += f",gblur=sigma={blur_sigma}"
blur_chain += (
",boxblur=4,"
f"eq=brightness={brightness - 1.0:.3f}:"
f"contrast={contrast:.3f}:saturation={saturation:.3f},"
"format=rgba[maskblur]"
)
return [
f"{input_label}split[maskbase][masksrc]",
blur_chain,
(
f"[{mask_input_index}:v]format=gray,"
f"scale={region['padded_width']}:{region['padded_height']}[maskalpha]"
),
"[maskblur][maskalpha]alphamerge[masked]",
(
f"[maskbase][masked]overlay={region['padded_x']}:{region['padded_y']}:"
f"format=auto{output_label}"
),
]
def _build_video_encoder_args(encoder: str, threads: int) -> list[str]:
if encoder == "h264_vaapi":
logger.warning("当前合成滤镜链暂不使用 VAAPI 编码,回退到 libx264")
encoder = "libx264"
args = ["-c:v", encoder]
if encoder == "h264_nvenc":
args.extend(["-preset", "fast", "-cq", "23"])
elif encoder == "h264_videotoolbox":
args.extend(["-q:v", "65"])
elif encoder == "h264_qsv":
args.extend(["-preset", "veryfast", "-global_quality", "23"])
elif encoder == "h264_amf":
args.extend(["-quality", "speed", "-qp_i", "23", "-qp_p", "23"])
else:
args.extend(["-preset", "veryfast", "-crf", "23", "-threads", str(threads)])
return args
def _build_moviepy_encoder_options() -> tuple[str, list[str]]:
from app.utils import ffmpeg_utils
encoder = _select_compatible_encoder(ffmpeg_utils.get_optimal_ffmpeg_encoder())
if encoder == "h264_vaapi":
logger.warning("MoviePy 兼容路径暂不使用 VAAPI 编码,回退到 libx264")
encoder = "libx264"
if encoder == "h264_nvenc":
return encoder, ["-preset", "fast", "-cq", "23", "-pix_fmt", "yuv420p"]
if encoder == "h264_videotoolbox":
return encoder, ["-q:v", "65", "-pix_fmt", "yuv420p"]
if encoder == "h264_qsv":
return encoder, ["-preset", "veryfast", "-global_quality", "23", "-pix_fmt", "yuv420p"]
if encoder == "h264_amf":
return encoder, ["-quality", "speed", "-qp_i", "23", "-qp_p", "23", "-pix_fmt", "yuv420p"]
return "libx264", ["-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p"]
def _build_ffmpeg_merge_command(
video_path: str,
audio_path: str,
output_path: str,
subtitle_path: Optional[str],
bgm_path: Optional[str],
options: Dict[str, Any],
) -> tuple[list[str], list[str], float]:
from app.utils import ffmpeg_utils
video_meta = _probe_video(video_path)
output_dir = os.path.dirname(output_path)
duration = float(video_meta["duration"])
duration_arg = _format_ffmpeg_float(duration)
video_width = int(video_meta["width"])
video_height = int(video_meta["height"])
voice_volume = options.get("voice_volume", AudioVolumeDefaults.VOICE_VOLUME)
bgm_volume = options.get("bgm_volume", AudioVolumeDefaults.BGM_VOLUME)
original_audio_volume = options.get("original_audio_volume", AudioVolumeDefaults.ORIGINAL_VOLUME)
keep_original_audio = options.get("keep_original_audio", True)
subtitle_font = options.get("subtitle_font", "")
subtitle_font_size = int(options.get("subtitle_font_size", 40))
subtitle_color = options.get("subtitle_color", "#FFFFFF")
subtitle_position = options.get("subtitle_position", "bottom")
custom_position = float(options.get("custom_position", 70))
stroke_color = options.get("stroke_color", "#000000")
stroke_width = options.get("stroke_width", 1)
threads = int(options.get("threads", 2))
fps = options.get("fps", 30)
subtitle_enabled = options.get("subtitle_enabled", True)
subtitle_mask_enabled = bool(options.get("subtitle_mask_enabled", False))
input_args = ["-i", video_path]
next_input_index = 1
audio_filters = []
audio_labels = []
temp_files = []
if keep_original_audio and original_audio_volume > 0 and video_meta["has_audio"]:
label = f"a{len(audio_labels)}"
audio_filters.append(
f"[0:a]volume={original_audio_volume},atrim=0:{duration_arg},"
f"asetpts=PTS-STARTPTS[{label}]"
)
audio_labels.append(f"[{label}]")
if _has_existing_file(audio_path):
voice_input_index = next_input_index
next_input_index += 1
input_args.extend(["-i", audio_path])
label = f"a{len(audio_labels)}"
audio_filters.append(
f"[{voice_input_index}:a]volume={voice_volume},atrim=0:{duration_arg},"
f"asetpts=PTS-STARTPTS[{label}]"
)
audio_labels.append(f"[{label}]")
if _has_existing_file(bgm_path) and bgm_volume > 0:
bgm_input_index = next_input_index
next_input_index += 1
input_args.extend(["-stream_loop", "-1", "-i", bgm_path])
fade_start = max(0.0, duration - 3.0)
label = f"a{len(audio_labels)}"
audio_filters.append(
f"[{bgm_input_index}:a]volume={bgm_volume},atrim=0:{duration_arg},"
f"afade=t=out:st={_format_ffmpeg_float(fade_start)}:d=3,"
f"asetpts=PTS-STARTPTS[{label}]"
)
audio_labels.append(f"[{label}]")
if len(audio_labels) == 1:
audio_filters.append(
f"{audio_labels[0]}atrim=0:{duration_arg},asetpts=PTS-STARTPTS[aout]"
)
elif len(audio_labels) > 1:
audio_filters.append(
f"{''.join(audio_labels)}amix=inputs={len(audio_labels)}:"
f"duration=longest:dropout_transition=0:normalize=0,"
f"atrim=0:{duration_arg},asetpts=PTS-STARTPTS[aout]"
)
valid_subtitle = bool(
subtitle_enabled
and subtitle_path
and is_valid_subtitle_file(subtitle_path)
)
has_subtitles_filter = _ffmpeg_filter_available("subtitles") if valid_subtitle else False
has_drawtext_filter = _ffmpeg_filter_available("drawtext") if valid_subtitle else False
if valid_subtitle and not has_subtitles_filter and not has_drawtext_filter:
if not _ffmpeg_filter_available("overlay"):
raise RuntimeError("当前 ffmpeg 缺少 subtitles/drawtext/overlay 字幕处理滤镜")
logger.warning("当前 ffmpeg 缺少 subtitles/drawtext改用 PNG 字幕叠加快路径")
video_filters = []
current_video_label = "[0:v]"
if subtitle_enabled and subtitle_mask_enabled:
region = _resolve_subtitle_mask_region(video_width, video_height, options)
mask_path = _create_subtitle_mask_alpha_file(region, output_dir)
temp_files.append(mask_path)
mask_input_index = next_input_index
next_input_index += 1
input_args.extend(["-loop", "1", "-t", duration_arg, "-i", mask_path])
logger.info(
"ffmpeg 字幕遮罩已启用: "
f"{region['orientation']} x={region['x']} y={region['y']} "
f"w={region['width']} h={region['height']} blur={region['blur_radius']}"
)
video_filters.extend(
_build_mask_filter(
input_label=current_video_label,
mask_input_index=mask_input_index,
region=region,
output_label="[v_masked]",
)
)
current_video_label = "[v_masked]"
if valid_subtitle:
font_path = _resolve_font_path(subtitle_font)
if font_path:
logger.info(f"ffmpeg 使用字幕字体: {font_path}")
orientation_subtitle_y_percent = _resolve_orientation_subtitle_y_percent(
video_width,
video_height,
options,
)
if has_drawtext_filter:
drawtext_filters = _build_drawtext_filters(
subtitle_path=subtitle_path,
font_path=font_path,
subtitle_font_size=subtitle_font_size,
subtitle_color=subtitle_color,
stroke_color=stroke_color,
stroke_width=stroke_width,
subtitle_position=subtitle_position,
custom_position=custom_position,
orientation_subtitle_y_percent=orientation_subtitle_y_percent,
video_width=video_width,
)
for index, drawtext_filter in enumerate(drawtext_filters):
next_label = f"[v_drawtext_{index}]"
video_filters.append(f"{current_video_label}{drawtext_filter}{next_label}")
current_video_label = next_label
elif has_subtitles_filter:
subtitle_filter = _build_subtitle_filter(
subtitle_path=subtitle_path,
font_path=font_path,
subtitle_font=subtitle_font,
subtitle_font_size=subtitle_font_size,
subtitle_color=subtitle_color,
stroke_color=stroke_color,
stroke_width=stroke_width,
video_width=video_width,
video_height=video_height,
subtitle_position=subtitle_position,
custom_position=custom_position,
orientation_subtitle_y_percent=orientation_subtitle_y_percent,
)
video_filters.append(f"{current_video_label}{subtitle_filter}[v_subtitled]")
current_video_label = "[v_subtitled]"
else:
y_expr = _resolve_overlay_y_expression(
subtitle_position=subtitle_position,
custom_position=custom_position,
orientation_subtitle_y_percent=orientation_subtitle_y_percent,
)
for index, (start, end, text) in enumerate(_parse_srt_subtitles(subtitle_path)):
png_path = _create_subtitle_png_file(
text=text,
font_path=font_path,
subtitle_font_size=subtitle_font_size,
subtitle_color=subtitle_color,
stroke_color=stroke_color,
stroke_width=stroke_width,
video_width=video_width,
output_dir=output_dir,
)
temp_files.append(png_path)
subtitle_input_index = next_input_index
next_input_index += 1
input_args.extend(["-loop", "1", "-t", duration_arg, "-i", png_path])
next_label = f"[v_subtitle_png_{index}]"
enable_expr = (
f"between(t,{_format_ffmpeg_float(start)},{_format_ffmpeg_float(end)})"
)
video_filters.append(
f"{current_video_label}[{subtitle_input_index}:v]"
f"overlay=x=(main_w-overlay_w)/2:y={y_expr}:"
f"enable={_quote_filter_value(enable_expr)}:format=auto{next_label}"
)
current_video_label = next_label
elif subtitle_enabled and subtitle_path:
logger.warning(f"字幕文件无效或为空: {subtitle_path}ffmpeg 快路径跳过字幕")
has_video_filter = bool(video_filters)
if has_video_filter:
final_video_filters = []
if fps:
final_video_filters.append(f"fps={fps}")
final_video_filters.append("format=yuv420p")
video_filters.append(
f"{current_video_label}{','.join(final_video_filters)}[vout]"
)
filter_parts = [*video_filters, *audio_filters]
ffmpeg_binary = _get_ffmpeg_binary()
cmd = [
ffmpeg_binary,
"-y",
"-hide_banner",
"-loglevel",
"error",
"-nostats",
"-progress",
"pipe:1",
*input_args,
]
if filter_parts:
cmd.extend(["-filter_complex", ";".join(filter_parts)])
if has_video_filter:
encoder = _select_compatible_encoder(ffmpeg_utils.get_optimal_ffmpeg_encoder())
cmd.extend(["-map", "[vout]", *_build_video_encoder_args(encoder, threads)])
else:
cmd.extend(["-map", "0:v:0", "-c:v", "copy"])
if audio_labels:
cmd.extend(["-map", "[aout]", "-c:a", "aac", "-b:a", "192k"])
else:
cmd.append("-an")
cmd.extend(["-t", duration_arg, "-movflags", "+faststart", output_path])
return cmd, temp_files, duration
def _merge_materials_with_ffmpeg(
video_path: str,
audio_path: str,
output_path: str,
subtitle_path: Optional[str] = None,
bgm_path: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[float], None]] = None,
) -> bool:
ffmpeg_binary = _get_ffmpeg_binary()
if not _check_ffmpeg_binary(ffmpeg_binary):
return False
options = options or {}
temp_files = []
try:
cmd, temp_files, duration = _build_ffmpeg_merge_command(
video_path=video_path,
audio_path=audio_path,
output_path=output_path,
subtitle_path=subtitle_path,
bgm_path=bgm_path,
options=options,
)
logger.info(
"使用 ffmpeg 快速合并素材: "
f"video={video_path}, audio={audio_path}, output={output_path}, "
f"duration={_format_duration(duration)}"
)
return_code, ffmpeg_output = _run_ffmpeg_with_progress(
cmd,
duration,
progress_callback=progress_callback,
)
if return_code != 0:
logger.warning(f"ffmpeg 快速合并失败,将回退 MoviePy: {ffmpeg_output[-3000:]}")
if os.path.exists(output_path):
try:
os.remove(output_path)
except OSError:
pass
return False
logger.success(f"ffmpeg 素材合并完成: {output_path}")
return True
except Exception as e:
logger.warning(f"ffmpeg 快速合并不可用,将回退 MoviePy: {e}")
return False
finally:
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except OSError:
pass
def merge_materials(
video_path: str,
audio_path: str,
output_path: str,
subtitle_path: Optional[str] = None,
bgm_path: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[float], None]] = None,
) -> str:
"""
合并视频、音频、BGM和字幕素材生成最终视频
参数:
video_path: 视频文件路径
audio_path: 音频文件路径
output_path: 输出文件路径
subtitle_path: 字幕文件路径,可选
bgm_path: 背景音乐文件路径,可选
options: 其他选项配置,可包含以下字段:
- voice_volume: 人声音量默认1.0
- bgm_volume: 背景音乐音量默认0.3
- original_audio_volume: 原始音频音量默认0.0
- keep_original_audio: 是否保留原始音频默认False
- subtitle_font: 字幕字体默认None系统会使用默认字体
- subtitle_font_size: 字幕字体大小默认40
- subtitle_color: 字幕颜色,默认白色
- subtitle_bg_color: 字幕背景颜色,默认透明
- subtitle_position: 字幕位置,可选值'bottom', 'top', 'center',默认'bottom'
- custom_position: 自定义位置
- stroke_color: 描边颜色,默认黑色
- stroke_width: 描边宽度默认1
- threads: 处理线程数默认2
- fps: 输出帧率默认30
- subtitle_enabled: 是否启用字幕默认True
progress_callback: ffmpeg 快速合并进度回调,参数为 0-100 的百分比
返回:
输出视频的路径
"""
# 合并选项默认值
if options is None:
options = {}
# 设置默认参数值 - 使用统一的音量配置
voice_volume = options.get('voice_volume', AudioVolumeDefaults.VOICE_VOLUME)
bgm_volume = options.get('bgm_volume', AudioVolumeDefaults.BGM_VOLUME)
# 修复bug: 将原声音量默认值从0.0改为0.7,确保短剧解说模式下原片音量正常
original_audio_volume = options.get('original_audio_volume', AudioVolumeDefaults.ORIGINAL_VOLUME)
keep_original_audio = options.get('keep_original_audio', True) # 默认保留原声
subtitle_font = options.get('subtitle_font', '')
subtitle_font_size = options.get('subtitle_font_size', 40)
subtitle_color = options.get('subtitle_color', '#FFFFFF')
subtitle_bg_color = options.get('subtitle_bg_color', 'transparent')
subtitle_position = options.get('subtitle_position', 'bottom')
custom_position = options.get('custom_position', 70)
stroke_color = options.get('stroke_color', '#000000')
stroke_width = options.get('stroke_width', 1)
threads = options.get('threads', 2)
fps = options.get('fps', 30)
subtitle_enabled = options.get('subtitle_enabled', True)
subtitle_mask_enabled = bool(options.get('subtitle_mask_enabled', False))
# 配置日志 - 便于调试问题
logger.info(f"音量配置详情:")
logger.info(f" - 配音音量: {voice_volume}")
logger.info(f" - 背景音乐音量: {bgm_volume}")
logger.info(f" - 原声音量: {original_audio_volume}")
logger.info(f" - 是否保留原声: {keep_original_audio}")
logger.info(f"字幕配置详情:")
logger.info(f" - 是否启用字幕: {subtitle_enabled}")
logger.info(f" - 是否启用字幕遮罩: {subtitle_mask_enabled}")
logger.info(f" - 字幕文件路径: {subtitle_path}")
# 音量参数验证
def validate_volume(volume, name):
if not (AudioVolumeDefaults.MIN_VOLUME <= volume <= AudioVolumeDefaults.MAX_VOLUME):
logger.warning(f"{name}音量 {volume} 超出有效范围 [{AudioVolumeDefaults.MIN_VOLUME}, {AudioVolumeDefaults.MAX_VOLUME}],将被限制")
return max(AudioVolumeDefaults.MIN_VOLUME, min(volume, AudioVolumeDefaults.MAX_VOLUME))
return volume
voice_volume = validate_volume(voice_volume, "配音")
bgm_volume = validate_volume(bgm_volume, "背景音乐")
original_audio_volume = validate_volume(original_audio_volume, "原声")
# 处理透明背景色问题 - MoviePy 2.1.1不支持'transparent'值
if subtitle_bg_color == 'transparent':
subtitle_bg_color = None # None在新版MoviePy中表示透明背景
# 创建输出目录(如果不存在)
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
logger.info(f"开始合并素材...")
logger.info(f" ① 视频: {video_path}")
logger.info(f" ② 音频: {audio_path}")
if subtitle_path:
logger.info(f" ③ 字幕: {subtitle_path}")
if bgm_path:
logger.info(f" ④ 背景音乐: {bgm_path}")
logger.info(f" ⑤ 输出: {output_path}")
merge_engine = str(options.get("merge_engine", "ffmpeg")).lower()
use_ffmpeg_merge = bool(options.get("use_ffmpeg_merge", True))
if use_ffmpeg_merge and merge_engine != "moviepy":
ffmpeg_options = dict(options)
ffmpeg_options.update(
{
"voice_volume": voice_volume,
"bgm_volume": bgm_volume,
"original_audio_volume": original_audio_volume,
"keep_original_audio": keep_original_audio,
"subtitle_font": subtitle_font,
"subtitle_font_size": subtitle_font_size,
"subtitle_color": subtitle_color,
"subtitle_bg_color": subtitle_bg_color,
"subtitle_position": subtitle_position,
"custom_position": custom_position,
"stroke_color": stroke_color,
"stroke_width": stroke_width,
"threads": threads,
"fps": fps,
"subtitle_enabled": subtitle_enabled,
"subtitle_mask_enabled": subtitle_mask_enabled,
}
)
if _merge_materials_with_ffmpeg(
video_path=video_path,
audio_path=audio_path,
output_path=output_path,
subtitle_path=subtitle_path,
bgm_path=bgm_path,
options=ffmpeg_options,
progress_callback=progress_callback,
):
return output_path
logger.warning("ffmpeg 快速合并失败,继续使用 MoviePy 兼容路径")
# 加载视频
try:
video_clip = VideoFileClip(video_path)
logger.info(f"视频尺寸: {video_clip.size[0]}x{video_clip.size[1]}, 时长: {video_clip.duration}")
# 提取视频原声(如果需要)
original_audio = None
if keep_original_audio and original_audio_volume > 0:
try:
original_audio = video_clip.audio
if original_audio:
# 关键修复只有当音量不为1.0时才进行音量调整,保持原声音量不变
if abs(original_audio_volume - 1.0) > 0.001: # 使用小的容差值比较浮点数
original_audio = original_audio.with_effects([afx.MultiplyVolume(original_audio_volume)])
logger.info(f"已提取视频原声,音量调整为: {original_audio_volume}")
else:
logger.info("已提取视频原声,保持原始音量不变")
else:
logger.warning("视频没有音轨,无法提取原声")
except Exception as e:
logger.error(f"提取视频原声失败: {str(e)}")
original_audio = None
# 移除原始音轨,稍后会合并新的音频
video_clip = video_clip.without_audio()
except Exception as e:
logger.error(f"加载视频失败: {str(e)}")
raise
# 处理背景音乐和所有音频轨道合成
audio_tracks = []
# 智能音量调整(可选功能)
if AudioVolumeDefaults.ENABLE_SMART_VOLUME and audio_path and os.path.exists(audio_path) and original_audio is not None:
try:
normalizer = AudioNormalizer()
temp_dir = tempfile.mkdtemp()
temp_original_path = os.path.join(temp_dir, "temp_original.wav")
# 保存原声到临时文件进行分析
original_audio.write_audiofile(temp_original_path, logger=None)
# 计算智能音量调整
tts_adjustment, original_adjustment = normalizer.calculate_volume_adjustment(
audio_path, temp_original_path
)
# 应用智能调整,但保留用户设置的相对比例
smart_voice_volume = voice_volume * tts_adjustment
smart_original_volume = original_audio_volume * original_adjustment
# 限制音量范围,避免过度调整
smart_voice_volume = max(0.1, min(1.5, smart_voice_volume))
smart_original_volume = max(0.1, min(2.0, smart_original_volume))
voice_volume = smart_voice_volume
original_audio_volume = smart_original_volume
logger.info(f"智能音量调整 - TTS: {voice_volume:.2f}, 原声: {original_audio_volume:.2f}")
# 清理临时文件
import shutil
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"智能音量分析失败,使用原始设置: {e}")
# 先添加主音频(配音)
if audio_path and os.path.exists(audio_path):
try:
voice_audio = AudioFileClip(audio_path).with_effects([afx.MultiplyVolume(voice_volume)])
audio_tracks.append(voice_audio)
logger.info(f"已添加配音音频,音量: {voice_volume}")
except Exception as e:
logger.error(f"加载配音音频失败: {str(e)}")
# 添加原声(如果需要)
if original_audio is not None:
# 重新应用调整后的音量因为original_audio已经应用了一次音量
# 计算需要的额外调整
current_volume_in_original = 1.0 # original_audio中已应用的音量
additional_adjustment = original_audio_volume / current_volume_in_original
adjusted_original_audio = original_audio.with_effects([afx.MultiplyVolume(additional_adjustment)])
audio_tracks.append(adjusted_original_audio)
logger.info(f"已添加视频原声,最终音量: {original_audio_volume}")
# 添加背景音乐(如果有)
if bgm_path and os.path.exists(bgm_path):
try:
bgm_clip = AudioFileClip(bgm_path).with_effects([
afx.MultiplyVolume(bgm_volume),
afx.AudioFadeOut(3),
afx.AudioLoop(duration=video_clip.duration),
])
audio_tracks.append(bgm_clip)
logger.info(f"已添加背景音乐,音量: {bgm_volume}")
except Exception as e:
logger.error(f"添加背景音乐失败: \n{traceback.format_exc()}")
# 合成最终的音频轨道
if audio_tracks:
final_audio = CompositeAudioClip(audio_tracks)
video_clip = video_clip.with_audio(final_audio)
logger.info(f"已合成所有音频轨道,共{len(audio_tracks)}")
else:
logger.warning("没有可用的音频轨道,输出视频将没有声音")
# 处理字体路径
font_path = _resolve_font_path(subtitle_font) if subtitle_path else None
if font_path:
if os.name == "nt":
font_path = font_path.replace("\\", "/")
logger.info(f"使用字体: {font_path}")
# 处理视频尺寸
video_width, video_height = video_clip.size
orientation_subtitle_y_percent = _resolve_orientation_subtitle_y_percent(video_width, video_height, options)
if subtitle_enabled and subtitle_mask_enabled:
video_clip = apply_subtitle_mask(video_clip, options)
# 字幕处理函数
def create_text_clip(subtitle_item):
"""创建单个字幕片段"""
phrase = subtitle_item[1]
max_width = video_width * 0.9
# 如果有字体路径,进行文本换行处理
wrapped_txt = phrase
txt_height = 0
if font_path:
wrapped_txt, txt_height = wrap_text(
phrase,
max_width=max_width,
font=font_path,
fontsize=subtitle_font_size
)
# 创建文本片段
try:
text_clip_kwargs = {
"text": wrapped_txt,
"font_size": subtitle_font_size,
"color": subtitle_color,
"bg_color": subtitle_bg_color, # 这里已经在前面处理过None表示透明
"stroke_color": stroke_color,
"stroke_width": stroke_width,
}
if font_path:
text_clip_kwargs["font"] = font_path
_clip = TextClip(**text_clip_kwargs)
except Exception as e:
logger.error(f"创建字幕片段失败: {str(e)}, 使用简化参数重试")
# 如果上面的方法失败,尝试使用更简单的参数
fallback_kwargs = {
"text": wrapped_txt,
"font_size": subtitle_font_size,
"color": subtitle_color,
}
if font_path:
fallback_kwargs["font"] = font_path
_clip = TextClip(**fallback_kwargs)
# 设置字幕时间
duration = subtitle_item[0][1] - subtitle_item[0][0]
_clip = _clip.with_start(subtitle_item[0][0])
_clip = _clip.with_end(subtitle_item[0][1])
_clip = _clip.with_duration(duration)
# 设置字幕位置
if orientation_subtitle_y_percent is not None:
margin = 10
max_y = video_height - _clip.h - margin
min_y = margin
custom_y = (video_height - _clip.h) * (orientation_subtitle_y_percent / 100)
custom_y = max(min_y, min(custom_y, max_y))
_clip = _clip.with_position(("center", custom_y))
elif subtitle_position == "bottom":
_clip = _clip.with_position(("center", video_height * 0.95 - _clip.h))
elif subtitle_position == "top":
_clip = _clip.with_position(("center", video_height * 0.05))
elif subtitle_position == "custom":
margin = 10
max_y = video_height - _clip.h - margin
min_y = margin
custom_y = (video_height - _clip.h) * (custom_position / 100)
custom_y = max(
min_y, min(custom_y, max_y)
)
_clip = _clip.with_position(("center", custom_y))
else: # center
_clip = _clip.with_position(("center", "center"))
return _clip
# 创建TextClip工厂函数
def make_textclip(text):
text_clip_kwargs = {
"text": text,
"font_size": subtitle_font_size,
"color": subtitle_color,
}
if font_path:
text_clip_kwargs["font"] = font_path
return TextClip(**text_clip_kwargs)
# 处理字幕 - 修复字幕开关bug和空字幕文件问题
if subtitle_enabled and subtitle_path:
if is_valid_subtitle_file(subtitle_path):
logger.info("字幕已启用,开始处理字幕文件")
try:
# 加载字幕文件
sub = SubtitlesClip(
subtitles=subtitle_path,
encoding="utf-8",
make_textclip=make_textclip
)
# 创建每个字幕片段
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
# 合成视频和字幕
video_clip = CompositeVideoClip([video_clip, *text_clips])
logger.info(f"已添加{len(text_clips)}个字幕片段")
except Exception as e:
logger.error(f"处理字幕失败: \n{traceback.format_exc()}")
logger.warning("字幕处理失败,继续生成无字幕视频")
else:
logger.warning(f"字幕文件无效或为空: {subtitle_path},跳过字幕处理")
elif not subtitle_enabled:
logger.info("字幕已禁用,跳过字幕处理")
elif not subtitle_path:
logger.info("未提供字幕文件路径,跳过字幕处理")
# 导出最终视频
try:
encoder, ffmpeg_params = _build_moviepy_encoder_options()
logger.info(f"MoviePy 导出编码器: {encoder}, 参数: {ffmpeg_params}")
try:
video_clip.write_videofile(
output_path,
codec=encoder,
audio_codec="aac",
temp_audiofile_path=output_dir,
threads=threads,
fps=fps,
ffmpeg_params=ffmpeg_params,
)
except Exception:
if encoder == "libx264":
raise
logger.warning(f"MoviePy 使用 {encoder} 导出失败,回退 libx264: {traceback.format_exc()}")
video_clip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
temp_audiofile_path=output_dir,
threads=threads,
fps=fps,
ffmpeg_params=["-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p"],
)
logger.success(f"素材合并完成: {output_path}")
except Exception as e:
logger.error(f"导出视频失败: {str(e)}")
raise
finally:
# 释放资源
video_clip.close()
del video_clip
return output_path
def wrap_text(text, max_width, font="Arial", fontsize=60):
"""
文本换行函数,使长文本适应指定宽度
参数:
text: 需要换行的文本
max_width: 最大宽度(像素)
font: 字体路径
fontsize: 字体大小
返回:
换行后的文本和文本高度
"""
# 创建ImageFont对象
try:
font_obj = ImageFont.truetype(font, fontsize)
except:
# 如果无法加载指定字体,使用默认字体
font_obj = ImageFont.load_default()
def get_text_size(inner_text):
inner_text = inner_text.strip()
left, top, right, bottom = font_obj.getbbox(inner_text)
return right - left, bottom - top
width, height = get_text_size(text)
if width <= max_width:
return text, height
processed = True
_wrapped_lines_ = []
words = text.split(" ")
_txt_ = ""
for word in words:
_before = _txt_
_txt_ += f"{word} "
_width, _height = get_text_size(_txt_)
if _width <= max_width:
continue
else:
if _txt_.strip() == word.strip():
processed = False
break
_wrapped_lines_.append(_before)
_txt_ = f"{word} "
_wrapped_lines_.append(_txt_)
if processed:
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
return result, height
_wrapped_lines_ = []
chars = list(text)
_txt_ = ""
for word in chars:
_txt_ += word
_width, _height = get_text_size(_txt_)
if _width <= max_width:
continue
else:
_wrapped_lines_.append(_txt_)
_txt_ = ""
_wrapped_lines_.append(_txt_)
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
return result, height
if __name__ == '__main__':
merger_mp4 = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger.mp4'
merger_sub = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merged_subtitle_00_00_00-00_01_30.srt'
merger_audio = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger_audio.mp3'
bgm_path = '/Users/apple/Desktop/home/NarratoAI/resource/songs/bgm.mp3'
output_video = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/combined_test.mp4'
# 调用示例
options = {
'voice_volume': 1.0, # 配音音量
'bgm_volume': 0.1, # 背景音乐音量
'original_audio_volume': 1.0, # 视频原声音量0表示不保留
'keep_original_audio': True, # 是否保留原声
'subtitle_enabled': True, # 是否启用字幕 - 修复字幕开关bug
'subtitle_font': 'MicrosoftYaHeiNormal.ttc', # 这里使用相对字体路径,会自动在 font_dir() 目录下查找
'subtitle_font_size': 40,
'subtitle_color': '#FFFFFF',
'subtitle_bg_color': None, # 直接使用None表示透明背景
'subtitle_position': 'bottom',
'threads': 2
}
try:
merge_materials(
video_path=merger_mp4,
audio_path=merger_audio,
subtitle_path=merger_sub,
bgm_path=bgm_path,
output_path=output_video,
options=options
)
except Exception as e:
logger.error(f"合并素材失败: \n{traceback.format_exc()}")