mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-11 10:32:49 +00:00
678 lines
27 KiB
Python
678 lines
27 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: UTF-8 -*-
|
||
|
||
'''
|
||
@Project: NarratoAI
|
||
@File : merger_video
|
||
@Author : Viccy同学
|
||
@Date : 2025/5/6 下午7:38
|
||
'''
|
||
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
from enum import Enum
|
||
from typing import List, Optional, Tuple
|
||
from loguru import logger
|
||
|
||
from app.utils import ffmpeg_utils
|
||
|
||
|
||
class VideoAspect(Enum):
|
||
"""视频宽高比枚举"""
|
||
landscape = "16:9" # 横屏 16:9
|
||
landscape_2 = "4:3"
|
||
portrait = "9:16" # 竖屏 9:16
|
||
portrait_2 = "3:4"
|
||
square = "1:1" # 方形 1:1
|
||
|
||
def to_resolution(self) -> Tuple[int, int]:
|
||
"""根据宽高比返回标准分辨率"""
|
||
if self == VideoAspect.portrait:
|
||
return 1080, 1920 # 竖屏 9:16
|
||
elif self == VideoAspect.portrait_2:
|
||
return 720, 1280 # 竖屏 4:3
|
||
elif self == VideoAspect.landscape:
|
||
return 1920, 1080 # 横屏 16:9
|
||
elif self == VideoAspect.landscape_2:
|
||
return 1280, 720 # 横屏 4:3
|
||
elif self == VideoAspect.square:
|
||
return 1080, 1080 # 方形 1:1
|
||
else:
|
||
return 1080, 1920 # 默认竖屏
|
||
|
||
|
||
def check_ffmpeg_installation() -> bool:
|
||
"""
|
||
检查ffmpeg是否已安装
|
||
|
||
Returns:
|
||
bool: 如果安装则返回True,否则返回False
|
||
"""
|
||
try:
|
||
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
|
||
return True
|
||
except (subprocess.SubprocessError, FileNotFoundError):
|
||
logger.error("ffmpeg未安装或不在系统PATH中,请安装ffmpeg")
|
||
return False
|
||
|
||
|
||
def get_hardware_acceleration_option() -> Optional[str]:
|
||
"""
|
||
根据系统环境选择合适的硬件加速选项
|
||
|
||
Returns:
|
||
Optional[str]: 硬件加速参数,如果不支持则返回None
|
||
"""
|
||
# 使用新的硬件加速检测API
|
||
return ffmpeg_utils.get_ffmpeg_hwaccel_type()
|
||
|
||
|
||
def check_video_has_audio(video_path: str) -> bool:
|
||
"""
|
||
检查视频是否包含音频流
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
|
||
Returns:
|
||
bool: 如果视频包含音频流则返回True,否则返回False
|
||
"""
|
||
if not os.path.exists(video_path):
|
||
logger.warning(f"视频文件不存在: {video_path}")
|
||
return False
|
||
|
||
probe_cmd = [
|
||
'ffprobe', '-v', 'error',
|
||
'-select_streams', 'a:0',
|
||
'-show_entries', 'stream=codec_type',
|
||
'-of', 'csv=p=0',
|
||
video_path
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
|
||
return result.stdout.strip() == 'audio'
|
||
except Exception as e:
|
||
logger.warning(f"检测视频音频流时出错: {str(e)}")
|
||
return False
|
||
|
||
|
||
def create_ffmpeg_concat_file(video_paths: List[str], concat_file_path: str) -> str:
|
||
"""
|
||
创建ffmpeg合并所需的concat文件
|
||
|
||
Args:
|
||
video_paths: 需要合并的视频文件路径列表
|
||
concat_file_path: concat文件的输出路径
|
||
|
||
Returns:
|
||
str: concat文件的路径
|
||
"""
|
||
with open(concat_file_path, 'w', encoding='utf-8') as f:
|
||
for video_path in video_paths:
|
||
# 获取绝对路径
|
||
abs_path = os.path.abspath(video_path)
|
||
# 在Windows上将反斜杠替换为正斜杠
|
||
if os.name == 'nt': # Windows系统
|
||
abs_path = abs_path.replace('\\', '/')
|
||
else: # Unix/Mac系统
|
||
# 转义特殊字符
|
||
abs_path = abs_path.replace('\\', '\\\\').replace(':', '\\:')
|
||
|
||
# 处理路径中的单引号 (如果有)
|
||
abs_path = abs_path.replace("'", "\\'")
|
||
|
||
f.write(f"file '{abs_path}'\n")
|
||
return concat_file_path
|
||
|
||
|
||
def process_single_video(
|
||
input_path: str,
|
||
output_path: str,
|
||
target_width: int,
|
||
target_height: int,
|
||
keep_audio: bool = True,
|
||
hwaccel: Optional[str] = None
|
||
) -> str:
|
||
"""
|
||
处理单个视频:调整分辨率、帧率等
|
||
|
||
重要修复:避免在视频滤镜处理时使用CUDA硬件解码,
|
||
因为这会导致滤镜链格式转换错误。使用纯NVENC编码器获得最佳兼容性。
|
||
|
||
Args:
|
||
input_path: 输入视频路径
|
||
output_path: 输出视频路径
|
||
target_width: 目标宽度
|
||
target_height: 目标高度
|
||
keep_audio: 是否保留音频
|
||
hwaccel: 硬件加速选项
|
||
|
||
Returns:
|
||
str: 处理后的视频路径
|
||
"""
|
||
if not os.path.exists(input_path):
|
||
raise FileNotFoundError(f"找不到视频文件: {input_path}")
|
||
|
||
# 构建基本命令
|
||
command = ['ffmpeg', '-y']
|
||
|
||
# 安全检查:如果在Windows上,则慎用硬件加速
|
||
is_windows = os.name == 'nt'
|
||
if is_windows and hwaccel:
|
||
logger.info("在Windows系统上检测到硬件加速请求,将进行额外的兼容性检查")
|
||
try:
|
||
# 对视频进行快速探测,检测其基本信息
|
||
probe_cmd = [
|
||
'ffprobe', '-v', 'error',
|
||
'-select_streams', 'v:0',
|
||
'-show_entries', 'stream=codec_name,width,height',
|
||
'-of', 'csv=p=0',
|
||
input_path
|
||
]
|
||
result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
|
||
|
||
# 如果探测成功,使用硬件加速;否则降级到软件编码
|
||
if result.returncode != 0:
|
||
logger.warning(f"视频探测失败,为安全起见,禁用硬件加速: {result.stderr}")
|
||
hwaccel = None
|
||
except Exception as e:
|
||
logger.warning(f"视频探测出错,禁用硬件加速: {str(e)}")
|
||
hwaccel = None
|
||
|
||
# 关键修复:对于涉及滤镜处理的场景,不使用CUDA硬件解码
|
||
# 这避免了 "Impossible to convert between the formats" 错误
|
||
# 我们将只使用纯NVENC编码器来获得硬件加速优势
|
||
|
||
# 输入文件(不添加硬件解码参数)
|
||
command.extend(['-i', input_path])
|
||
|
||
# 处理音频
|
||
if not keep_audio:
|
||
command.extend(['-an']) # 移除音频
|
||
else:
|
||
# 检查输入视频是否有音频流
|
||
has_audio = check_video_has_audio(input_path)
|
||
if has_audio:
|
||
command.extend(['-c:a', 'aac', '-b:a', '128k']) # 音频编码为AAC
|
||
else:
|
||
logger.warning(f"视频 {input_path} 没有音频流,将会忽略音频设置")
|
||
command.extend(['-an']) # 没有音频流时移除音频设置
|
||
|
||
# 视频处理参数:缩放并添加填充以保持比例
|
||
scale_filter = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease"
|
||
pad_filter = f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2"
|
||
command.extend([
|
||
'-vf', f"{scale_filter},{pad_filter}",
|
||
'-r', '30', # 设置帧率为30fps
|
||
])
|
||
|
||
# 关键修复:选择编码器时优先使用纯NVENC(无硬件解码)
|
||
if hwaccel:
|
||
try:
|
||
# 检查是否为NVIDIA硬件加速
|
||
hwaccel_info = ffmpeg_utils.detect_hardware_acceleration()
|
||
if hwaccel_info.get("type") in ["cuda", "nvenc"] and hwaccel_info.get("encoder") == "h264_nvenc":
|
||
# 使用纯NVENC编码器(最佳兼容性)
|
||
logger.info("使用纯NVENC编码器(避免滤镜链问题)")
|
||
command.extend(['-c:v', 'h264_nvenc'])
|
||
command.extend(['-preset', 'medium', '-cq', '23', '-profile:v', 'main'])
|
||
else:
|
||
# 其他硬件编码器
|
||
encoder = ffmpeg_utils.get_optimal_ffmpeg_encoder()
|
||
# logger.info(f"使用硬件编码器: {encoder}")
|
||
command.extend(['-c:v', encoder])
|
||
|
||
# 根据编码器类型添加特定参数
|
||
if "amf" in encoder:
|
||
command.extend(['-quality', 'balanced'])
|
||
elif "qsv" in encoder:
|
||
command.extend(['-preset', 'medium'])
|
||
elif "videotoolbox" in encoder:
|
||
command.extend(['-profile:v', 'high'])
|
||
else:
|
||
command.extend(['-preset', 'medium', '-profile:v', 'high'])
|
||
except Exception as e:
|
||
logger.warning(f"硬件编码器检测失败: {str(e)},将使用软件编码")
|
||
hwaccel = None
|
||
|
||
if not hwaccel:
|
||
logger.info("使用软件编码器(libx264)")
|
||
command.extend(['-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high'])
|
||
|
||
# 设置视频比特率和其他参数
|
||
command.extend([
|
||
'-b:v', '5M',
|
||
'-maxrate', '8M',
|
||
'-bufsize', '10M',
|
||
'-pix_fmt', 'yuv420p', # 兼容性更好的颜色格式
|
||
])
|
||
|
||
# 输出文件
|
||
command.append(output_path)
|
||
|
||
# 执行命令
|
||
try:
|
||
# logger.info(f"执行FFmpeg命令: {' '.join(command)}")
|
||
process = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
# logger.info(f"视频处理成功: {output_path}")
|
||
return output_path
|
||
except subprocess.CalledProcessError as e:
|
||
error_msg = e.stderr.decode() if e.stderr else str(e)
|
||
logger.error(f"处理视频失败: {error_msg}")
|
||
|
||
# 如果使用硬件加速失败,尝试使用软件编码
|
||
if hwaccel:
|
||
logger.info("硬件加速失败,尝试使用软件编码作为备选方案")
|
||
try:
|
||
# 强制使用软件编码
|
||
ffmpeg_utils.force_software_encoding()
|
||
|
||
# 构建新的命令,使用软件编码
|
||
fallback_cmd = ['ffmpeg', '-y', '-i', input_path]
|
||
|
||
# 保持原有的音频设置
|
||
if not keep_audio:
|
||
fallback_cmd.extend(['-an'])
|
||
else:
|
||
has_audio = check_video_has_audio(input_path)
|
||
if has_audio:
|
||
fallback_cmd.extend(['-c:a', 'aac', '-b:a', '128k'])
|
||
else:
|
||
fallback_cmd.extend(['-an'])
|
||
|
||
# 保持原有的视频过滤器
|
||
fallback_cmd.extend([
|
||
'-vf', f"{scale_filter},{pad_filter}",
|
||
'-r', '30',
|
||
'-c:v', 'libx264',
|
||
'-preset', 'medium',
|
||
'-profile:v', 'high',
|
||
'-b:v', '5M',
|
||
'-maxrate', '8M',
|
||
'-bufsize', '10M',
|
||
'-pix_fmt', 'yuv420p',
|
||
output_path
|
||
])
|
||
|
||
logger.info("执行软件编码备选方案")
|
||
subprocess.run(fallback_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info(f"使用软件编码成功处理视频: {output_path}")
|
||
return output_path
|
||
except subprocess.CalledProcessError as fallback_error:
|
||
fallback_error_msg = fallback_error.stderr.decode() if fallback_error.stderr else str(fallback_error)
|
||
logger.error(f"软件编码备选方案也失败: {fallback_error_msg}")
|
||
|
||
# 尝试最基本的编码参数
|
||
try:
|
||
logger.info("尝试最基本的编码参数")
|
||
basic_cmd = [
|
||
'ffmpeg', '-y', '-i', input_path,
|
||
'-c:v', 'libx264', '-preset', 'ultrafast',
|
||
'-crf', '23', '-pix_fmt', 'yuv420p',
|
||
output_path
|
||
]
|
||
subprocess.run(basic_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info(f"使用基本编码参数成功处理视频: {output_path}")
|
||
return output_path
|
||
except subprocess.CalledProcessError as basic_error:
|
||
basic_error_msg = basic_error.stderr.decode() if basic_error.stderr else str(basic_error)
|
||
logger.error(f"基本编码参数也失败: {basic_error_msg}")
|
||
raise RuntimeError(f"无法处理视频 {input_path}: 所有编码方案都失败")
|
||
|
||
# 如果不是硬件加速导致的问题,或者备选方案也失败了,抛出原始错误
|
||
raise RuntimeError(f"处理视频失败: {error_msg}")
|
||
|
||
|
||
def combine_clip_videos(
|
||
output_video_path: str,
|
||
video_paths: List[str],
|
||
video_ost_list: List[int],
|
||
video_aspect: VideoAspect = VideoAspect.portrait,
|
||
threads: int = 4,
|
||
force_software_encoding: bool = False, # 新参数,强制使用软件编码
|
||
) -> str:
|
||
"""
|
||
合并子视频
|
||
Args:
|
||
output_video_path: 合并后的存储路径
|
||
video_paths: 子视频路径列表
|
||
video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说)
|
||
video_aspect: 屏幕比例
|
||
threads: 线程数
|
||
force_software_encoding: 是否强制使用软件编码(忽略硬件加速检测)
|
||
|
||
Returns:
|
||
str: 合并后的视频路径
|
||
"""
|
||
# 检查ffmpeg是否安装
|
||
if not check_ffmpeg_installation():
|
||
raise RuntimeError("未找到ffmpeg,请先安装")
|
||
|
||
# 准备输出目录
|
||
output_dir = os.path.dirname(output_video_path)
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 获取目标分辨率
|
||
aspect = VideoAspect(video_aspect)
|
||
video_width, video_height = aspect.to_resolution()
|
||
|
||
# 检测可用的硬件加速选项
|
||
hwaccel = None if force_software_encoding else get_hardware_acceleration_option()
|
||
if hwaccel:
|
||
logger.info(f"将使用 {hwaccel} 硬件加速")
|
||
elif force_software_encoding:
|
||
logger.info("已强制使用软件编码,跳过硬件加速检测")
|
||
else:
|
||
logger.info("未检测到兼容的硬件加速,将使用软件编码")
|
||
|
||
# Windows系统上,默认使用软件编码以提高兼容性
|
||
if os.name == 'nt' and hwaccel:
|
||
logger.warning("在Windows系统上检测到硬件加速,但为了提高兼容性,建议使用软件编码")
|
||
# 不强制禁用hwaccel,而是在process_single_video中进行额外安全检查
|
||
|
||
# 重组视频路径和原声设置为一个字典列表结构
|
||
video_segments = []
|
||
|
||
# 检查视频路径和原声设置列表长度是否匹配
|
||
if len(video_paths) != len(video_ost_list):
|
||
logger.warning(f"视频路径列表({len(video_paths)})和原声设置列表({len(video_ost_list)})长度不匹配")
|
||
# 调整长度以匹配较短的列表
|
||
min_length = min(len(video_paths), len(video_ost_list))
|
||
video_paths = video_paths[:min_length]
|
||
video_ost_list = video_ost_list[:min_length]
|
||
|
||
# 创建视频处理配置字典列表
|
||
for i, (video_path, video_ost) in enumerate(zip(video_paths, video_ost_list)):
|
||
if not os.path.exists(video_path):
|
||
logger.warning(f"视频不存在,跳过: {video_path}")
|
||
continue
|
||
|
||
# 检查是否有音频流
|
||
has_audio = check_video_has_audio(video_path)
|
||
|
||
# 构建视频片段配置
|
||
segment = {
|
||
"index": i,
|
||
"path": video_path,
|
||
"ost": video_ost,
|
||
"has_audio": has_audio,
|
||
"keep_audio": video_ost > 0 and has_audio # 只有当ost>0且实际有音频时才保留
|
||
}
|
||
|
||
# 记录日志
|
||
if video_ost > 0 and not has_audio:
|
||
logger.warning(f"视频 {video_path} 设置为保留原声(ost={video_ost}),但该视频没有音频流")
|
||
|
||
video_segments.append(segment)
|
||
|
||
# 处理每个视频片段
|
||
processed_videos = []
|
||
temp_dir = os.path.join(output_dir, "temp_videos")
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
|
||
try:
|
||
# 第一阶段:处理所有视频片段到中间文件
|
||
for segment in video_segments:
|
||
# 处理单个视频,去除或保留音频
|
||
temp_output = os.path.join(temp_dir, f"processed_{segment['index']}.mp4")
|
||
try:
|
||
process_single_video(
|
||
input_path=segment['path'],
|
||
output_path=temp_output,
|
||
target_width=video_width,
|
||
target_height=video_height,
|
||
keep_audio=segment['keep_audio'],
|
||
hwaccel=hwaccel
|
||
)
|
||
processed_videos.append({
|
||
"index": segment["index"],
|
||
"path": temp_output,
|
||
"keep_audio": segment["keep_audio"]
|
||
})
|
||
logger.info(f"视频 {segment['index'] + 1}/{len(video_segments)} 处理完成")
|
||
except Exception as e:
|
||
logger.error(f"处理视频 {segment['path']} 时出错: {str(e)}")
|
||
# 如果使用硬件加速失败,尝试使用软件编码
|
||
if hwaccel and not force_software_encoding:
|
||
logger.info(f"尝试使用软件编码处理视频 {segment['path']}")
|
||
try:
|
||
process_single_video(
|
||
input_path=segment['path'],
|
||
output_path=temp_output,
|
||
target_width=video_width,
|
||
target_height=video_height,
|
||
keep_audio=segment['keep_audio'],
|
||
hwaccel=None # 使用软件编码
|
||
)
|
||
processed_videos.append({
|
||
"index": segment["index"],
|
||
"path": temp_output,
|
||
"keep_audio": segment["keep_audio"]
|
||
})
|
||
logger.info(f"使用软件编码成功处理视频 {segment['index'] + 1}/{len(video_segments)}")
|
||
except Exception as fallback_error:
|
||
logger.error(f"使用软件编码处理视频 {segment['path']} 也失败: {str(fallback_error)}")
|
||
continue
|
||
else:
|
||
continue
|
||
|
||
if not processed_videos:
|
||
raise ValueError("没有有效的视频片段可以合并")
|
||
|
||
# 按原始索引排序处理后的视频
|
||
processed_videos.sort(key=lambda x: x["index"])
|
||
|
||
# 第二阶段:分步骤合并视频 - 避免复杂的filter_complex滤镜
|
||
try:
|
||
# 1. 首先,将所有没有音频的视频或音频被禁用的视频合并到一个临时文件中
|
||
video_paths_only = [video["path"] for video in processed_videos]
|
||
video_concat_path = os.path.join(temp_dir, "video_concat.mp4")
|
||
|
||
# 创建concat文件,用于合并视频流
|
||
concat_file = os.path.join(temp_dir, "concat_list.txt")
|
||
create_ffmpeg_concat_file(video_paths_only, concat_file)
|
||
|
||
# 合并所有视频流,但不包含音频
|
||
concat_cmd = [
|
||
'ffmpeg', '-y',
|
||
'-f', 'concat',
|
||
'-safe', '0',
|
||
'-i', concat_file,
|
||
'-c:v', 'libx264',
|
||
'-preset', 'medium',
|
||
'-profile:v', 'high',
|
||
'-an', # 不包含音频
|
||
'-threads', str(threads),
|
||
video_concat_path
|
||
]
|
||
|
||
subprocess.run(concat_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info("视频流合并完成")
|
||
|
||
# 2. 提取并合并有音频的片段
|
||
audio_segments = [video for video in processed_videos if video["keep_audio"]]
|
||
|
||
if not audio_segments:
|
||
# 如果没有音频片段,直接使用无音频的合并视频作为最终结果
|
||
shutil.copy(video_concat_path, output_video_path)
|
||
logger.info("无音频视频合并完成")
|
||
return output_video_path
|
||
|
||
# 创建音频中间文件
|
||
audio_files = []
|
||
for i, segment in enumerate(audio_segments):
|
||
# 提取音频
|
||
audio_file = os.path.join(temp_dir, f"audio_{i}.aac")
|
||
extract_audio_cmd = [
|
||
'ffmpeg', '-y',
|
||
'-i', segment["path"],
|
||
'-vn', # 不包含视频
|
||
'-c:a', 'aac',
|
||
'-b:a', '128k',
|
||
audio_file
|
||
]
|
||
subprocess.run(extract_audio_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
audio_files.append({
|
||
"index": segment["index"],
|
||
"path": audio_file
|
||
})
|
||
logger.info(f"提取音频 {i+1}/{len(audio_segments)} 完成")
|
||
|
||
# 3. 计算每个音频片段的时间位置
|
||
audio_timings = []
|
||
current_time = 0.0
|
||
|
||
# 获取每个视频片段的时长
|
||
for i, video in enumerate(processed_videos):
|
||
duration_cmd = [
|
||
'ffprobe', '-v', 'error',
|
||
'-show_entries', 'format=duration',
|
||
'-of', 'csv=p=0',
|
||
video["path"]
|
||
]
|
||
result = subprocess.run(duration_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||
duration = float(result.stdout.strip())
|
||
|
||
# 如果当前片段需要保留音频,记录时间位置
|
||
if video["keep_audio"]:
|
||
for audio in audio_files:
|
||
if audio["index"] == video["index"]:
|
||
audio_timings.append({
|
||
"file": audio["path"],
|
||
"start": current_time,
|
||
"index": video["index"]
|
||
})
|
||
break
|
||
|
||
current_time += duration
|
||
|
||
# 4. 创建静音音频轨道作为基础
|
||
silence_audio = os.path.join(temp_dir, "silence.aac")
|
||
create_silence_cmd = [
|
||
'ffmpeg', '-y',
|
||
'-f', 'lavfi',
|
||
'-i', f'anullsrc=r=44100:cl=stereo',
|
||
'-t', str(current_time), # 总时长
|
||
'-c:a', 'aac',
|
||
'-b:a', '128k',
|
||
silence_audio
|
||
]
|
||
subprocess.run(create_silence_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
|
||
# 5. 创建复杂滤镜命令以混合音频
|
||
filter_script = os.path.join(temp_dir, "filter_script.txt")
|
||
with open(filter_script, 'w') as f:
|
||
f.write(f"[0:a]volume=0.0[silence];\n") # 首先静音背景轨道
|
||
|
||
# 添加每个音频文件,并补偿amix的音量稀释
|
||
# amix会将n个输入的音量平均分配,所以我们需要将每个输入的音量提高n倍来保持原始音量
|
||
num_inputs = len(audio_timings) + 1 # +1 for silence track
|
||
volume_compensation = num_inputs # 补偿系数
|
||
|
||
for i, timing in enumerate(audio_timings):
|
||
# 为每个音频添加音量补偿,确保原声保持原始音量
|
||
f.write(f"[{i+1}:a]volume={volume_compensation},adelay={int(timing['start']*1000)}|{int(timing['start']*1000)}[a{i}];\n")
|
||
|
||
# 混合所有音频
|
||
mix_str = "[silence]"
|
||
for i in range(len(audio_timings)):
|
||
mix_str += f"[a{i}]"
|
||
mix_str += f"amix=inputs={len(audio_timings)+1}:duration=longest[aout]"
|
||
f.write(mix_str)
|
||
|
||
# 6. 构建音频合并命令
|
||
audio_inputs = ['-i', silence_audio]
|
||
for timing in audio_timings:
|
||
audio_inputs.extend(['-i', timing["file"]])
|
||
|
||
mixed_audio = os.path.join(temp_dir, "mixed_audio.aac")
|
||
audio_mix_cmd = [
|
||
'ffmpeg', '-y'
|
||
] + audio_inputs + [
|
||
'-filter_complex_script', filter_script,
|
||
'-map', '[aout]',
|
||
'-c:a', 'aac',
|
||
'-b:a', '128k',
|
||
mixed_audio
|
||
]
|
||
|
||
subprocess.run(audio_mix_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info("音频混合完成")
|
||
|
||
# 7. 将合并的视频和混合的音频组合在一起
|
||
final_cmd = [
|
||
'ffmpeg', '-y',
|
||
'-i', video_concat_path,
|
||
'-i', mixed_audio,
|
||
'-c:v', 'copy',
|
||
'-c:a', 'aac',
|
||
'-map', '0:v:0',
|
||
'-map', '1:a:0',
|
||
'-shortest',
|
||
output_video_path
|
||
]
|
||
|
||
subprocess.run(final_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.info("视频最终合并完成")
|
||
|
||
return output_video_path
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"合并视频过程中出错: {e.stderr.decode() if e.stderr else str(e)}")
|
||
|
||
# 尝试备用合并方法 - 最简单的无音频合并
|
||
logger.info("尝试备用合并方法 - 无音频合并")
|
||
try:
|
||
concat_file = os.path.join(temp_dir, "concat_list.txt")
|
||
video_paths_only = [video["path"] for video in processed_videos]
|
||
create_ffmpeg_concat_file(video_paths_only, concat_file)
|
||
|
||
backup_cmd = [
|
||
'ffmpeg', '-y',
|
||
'-f', 'concat',
|
||
'-safe', '0',
|
||
'-i', concat_file,
|
||
'-c:v', 'copy',
|
||
'-an', # 无音频
|
||
output_video_path
|
||
]
|
||
|
||
subprocess.run(backup_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
logger.warning("使用备用方法(无音频)成功合并视频")
|
||
return output_video_path
|
||
except Exception as backup_error:
|
||
logger.error(f"备用合并方法也失败: {str(backup_error)}")
|
||
raise RuntimeError(f"无法合并视频: {str(backup_error)}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"合并视频时出错: {str(e)}")
|
||
raise
|
||
finally:
|
||
# 清理临时文件
|
||
try:
|
||
if os.path.exists(temp_dir):
|
||
shutil.rmtree(temp_dir)
|
||
logger.info("已清理临时文件")
|
||
except Exception as e:
|
||
logger.warning(f"清理临时文件时出错: {str(e)}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
video_paths = [
|
||
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E02_00_14_09_440.mp4',
|
||
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_27_11_110.mp4',
|
||
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_34_44_480.mp4',
|
||
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_42_47_630.mp4',
|
||
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E09_00_29_48_160.mp4'
|
||
]
|
||
|
||
combine_clip_videos(
|
||
output_video_path="/Users/apple/Desktop/home/NarratoAI/storage/temp/merge/merged_123.mp4",
|
||
video_paths=video_paths,
|
||
video_ost_list=[1, 1, 1,1,1],
|
||
video_aspect=VideoAspect.portrait,
|
||
force_software_encoding=False # 默认不强制使用软件编码,让系统自动决定
|
||
)
|