#!/usr/bin/env python # -*- coding: UTF-8 -*- ''' @Project: NarratoAI @File : merger_video @Author : Viccy同学 @Date : 2025/5/6 下午7:38 ''' import os import shutil import subprocess from enum import Enum from typing import List, Optional, Tuple from loguru import logger from app.utils import ffmpeg_utils class VideoAspect(Enum): """视频宽高比枚举""" landscape = "16:9" # 横屏 16:9 landscape_2 = "4:3" portrait = "9:16" # 竖屏 9:16 portrait_2 = "3:4" square = "1:1" # 方形 1:1 def to_resolution(self) -> Tuple[int, int]: """根据宽高比返回标准分辨率""" if self == VideoAspect.portrait: return 1080, 1920 # 竖屏 9:16 elif self == VideoAspect.portrait_2: return 720, 1280 # 竖屏 4:3 elif self == VideoAspect.landscape: return 1920, 1080 # 横屏 16:9 elif self == VideoAspect.landscape_2: return 1280, 720 # 横屏 4:3 elif self == VideoAspect.square: return 1080, 1080 # 方形 1:1 else: return 1080, 1920 # 默认竖屏 def check_ffmpeg_installation() -> bool: """ 检查ffmpeg是否已安装 Returns: bool: 如果安装则返回True,否则返回False """ try: subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) return True except (subprocess.SubprocessError, FileNotFoundError): logger.error("ffmpeg未安装或不在系统PATH中,请安装ffmpeg") return False def get_hardware_acceleration_option() -> Optional[str]: """ 根据系统环境选择合适的硬件加速选项 Returns: Optional[str]: 硬件加速参数,如果不支持则返回None """ # 使用新的硬件加速检测API return ffmpeg_utils.get_ffmpeg_hwaccel_type() def check_video_has_audio(video_path: str) -> bool: """ 检查视频是否包含音频流 Args: video_path: 视频文件路径 Returns: bool: 如果视频包含音频流则返回True,否则返回False """ if not os.path.exists(video_path): logger.warning(f"视频文件不存在: {video_path}") return False probe_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_type', '-of', 'csv=p=0', video_path ] try: result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) return result.stdout.strip() == 'audio' except Exception as e: logger.warning(f"检测视频音频流时出错: {str(e)}") return False def create_ffmpeg_concat_file(video_paths: List[str], concat_file_path: str) -> str: """ 创建ffmpeg合并所需的concat文件 Args: video_paths: 需要合并的视频文件路径列表 concat_file_path: concat文件的输出路径 Returns: str: concat文件的路径 """ with open(concat_file_path, 'w', encoding='utf-8') as f: for video_path in video_paths: # 获取绝对路径 abs_path = os.path.abspath(video_path) # 在Windows上将反斜杠替换为正斜杠 if os.name == 'nt': # Windows系统 abs_path = abs_path.replace('\\', '/') else: # Unix/Mac系统 # 转义特殊字符 abs_path = abs_path.replace('\\', '\\\\').replace(':', '\\:') # 处理路径中的单引号 (如果有) abs_path = abs_path.replace("'", "\\'") f.write(f"file '{abs_path}'\n") return concat_file_path def process_single_video( input_path: str, output_path: str, target_width: int, target_height: int, keep_audio: bool = True, hwaccel: Optional[str] = None ) -> str: """ 处理单个视频:调整分辨率、帧率等 重要修复:避免在视频滤镜处理时使用CUDA硬件解码, 因为这会导致滤镜链格式转换错误。使用纯NVENC编码器获得最佳兼容性。 Args: input_path: 输入视频路径 output_path: 输出视频路径 target_width: 目标宽度 target_height: 目标高度 keep_audio: 是否保留音频 hwaccel: 硬件加速选项 Returns: str: 处理后的视频路径 """ if not os.path.exists(input_path): raise FileNotFoundError(f"找不到视频文件: {input_path}") # 构建基本命令 command = ['ffmpeg', '-y'] # 安全检查:如果在Windows上,则慎用硬件加速 is_windows = os.name == 'nt' if is_windows and hwaccel: logger.info("在Windows系统上检测到硬件加速请求,将进行额外的兼容性检查") try: # 对视频进行快速探测,检测其基本信息 probe_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=codec_name,width,height', '-of', 'csv=p=0', input_path ] result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) # 如果探测成功,使用硬件加速;否则降级到软件编码 if result.returncode != 0: logger.warning(f"视频探测失败,为安全起见,禁用硬件加速: {result.stderr}") hwaccel = None except Exception as e: logger.warning(f"视频探测出错,禁用硬件加速: {str(e)}") hwaccel = None # 关键修复:对于涉及滤镜处理的场景,不使用CUDA硬件解码 # 这避免了 "Impossible to convert between the formats" 错误 # 我们将只使用纯NVENC编码器来获得硬件加速优势 # 输入文件(不添加硬件解码参数) command.extend(['-i', input_path]) # 处理音频 if not keep_audio: command.extend(['-an']) # 移除音频 else: # 检查输入视频是否有音频流 has_audio = check_video_has_audio(input_path) if has_audio: command.extend(['-c:a', 'aac', '-b:a', '128k']) # 音频编码为AAC else: logger.warning(f"视频 {input_path} 没有音频流,将会忽略音频设置") command.extend(['-an']) # 没有音频流时移除音频设置 # 视频处理参数:缩放并添加填充以保持比例 scale_filter = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease" pad_filter = f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2" command.extend([ '-vf', f"{scale_filter},{pad_filter}", '-r', '30', # 设置帧率为30fps ]) # 关键修复:选择编码器时优先使用纯NVENC(无硬件解码) if hwaccel: try: # 检查是否为NVIDIA硬件加速 hwaccel_info = ffmpeg_utils.detect_hardware_acceleration() if hwaccel_info.get("type") in ["cuda", "nvenc"] and hwaccel_info.get("encoder") == "h264_nvenc": # 使用纯NVENC编码器(最佳兼容性) logger.info("使用纯NVENC编码器(避免滤镜链问题)") command.extend(['-c:v', 'h264_nvenc']) command.extend(['-preset', 'medium', '-cq', '23', '-profile:v', 'main']) else: # 其他硬件编码器 encoder = ffmpeg_utils.get_optimal_ffmpeg_encoder() # logger.info(f"使用硬件编码器: {encoder}") command.extend(['-c:v', encoder]) # 根据编码器类型添加特定参数 if "amf" in encoder: command.extend(['-quality', 'balanced']) elif "qsv" in encoder: command.extend(['-preset', 'medium']) elif "videotoolbox" in encoder: command.extend(['-profile:v', 'high']) else: command.extend(['-preset', 'medium', '-profile:v', 'high']) except Exception as e: logger.warning(f"硬件编码器检测失败: {str(e)},将使用软件编码") hwaccel = None if not hwaccel: logger.info("使用软件编码器(libx264)") command.extend(['-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high']) # 设置视频比特率和其他参数 command.extend([ '-b:v', '5M', '-maxrate', '8M', '-bufsize', '10M', '-pix_fmt', 'yuv420p', # 兼容性更好的颜色格式 ]) # 输出文件 command.append(output_path) # 执行命令 try: # logger.info(f"执行FFmpeg命令: {' '.join(command)}") process = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # logger.info(f"视频处理成功: {output_path}") return output_path except subprocess.CalledProcessError as e: error_msg = e.stderr.decode() if e.stderr else str(e) logger.error(f"处理视频失败: {error_msg}") # 如果使用硬件加速失败,尝试使用软件编码 if hwaccel: logger.info("硬件加速失败,尝试使用软件编码作为备选方案") try: # 强制使用软件编码 ffmpeg_utils.force_software_encoding() # 构建新的命令,使用软件编码 fallback_cmd = ['ffmpeg', '-y', '-i', input_path] # 保持原有的音频设置 if not keep_audio: fallback_cmd.extend(['-an']) else: has_audio = check_video_has_audio(input_path) if has_audio: fallback_cmd.extend(['-c:a', 'aac', '-b:a', '128k']) else: fallback_cmd.extend(['-an']) # 保持原有的视频过滤器 fallback_cmd.extend([ '-vf', f"{scale_filter},{pad_filter}", '-r', '30', '-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high', '-b:v', '5M', '-maxrate', '8M', '-bufsize', '10M', '-pix_fmt', 'yuv420p', output_path ]) logger.info("执行软件编码备选方案") subprocess.run(fallback_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info(f"使用软件编码成功处理视频: {output_path}") return output_path except subprocess.CalledProcessError as fallback_error: fallback_error_msg = fallback_error.stderr.decode() if fallback_error.stderr else str(fallback_error) logger.error(f"软件编码备选方案也失败: {fallback_error_msg}") # 尝试最基本的编码参数 try: logger.info("尝试最基本的编码参数") basic_cmd = [ 'ffmpeg', '-y', '-i', input_path, '-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '23', '-pix_fmt', 'yuv420p', output_path ] subprocess.run(basic_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info(f"使用基本编码参数成功处理视频: {output_path}") return output_path except subprocess.CalledProcessError as basic_error: basic_error_msg = basic_error.stderr.decode() if basic_error.stderr else str(basic_error) logger.error(f"基本编码参数也失败: {basic_error_msg}") raise RuntimeError(f"无法处理视频 {input_path}: 所有编码方案都失败") # 如果不是硬件加速导致的问题,或者备选方案也失败了,抛出原始错误 raise RuntimeError(f"处理视频失败: {error_msg}") def combine_clip_videos( output_video_path: str, video_paths: List[str], video_ost_list: List[int], video_aspect: VideoAspect = VideoAspect.portrait, threads: int = 4, force_software_encoding: bool = False, # 新参数,强制使用软件编码 ) -> str: """ 合并子视频 Args: output_video_path: 合并后的存储路径 video_paths: 子视频路径列表 video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说) video_aspect: 屏幕比例 threads: 线程数 force_software_encoding: 是否强制使用软件编码(忽略硬件加速检测) Returns: str: 合并后的视频路径 """ # 检查ffmpeg是否安装 if not check_ffmpeg_installation(): raise RuntimeError("未找到ffmpeg,请先安装") # 准备输出目录 output_dir = os.path.dirname(output_video_path) os.makedirs(output_dir, exist_ok=True) # 获取目标分辨率 aspect = VideoAspect(video_aspect) video_width, video_height = aspect.to_resolution() # 检测可用的硬件加速选项 hwaccel = None if force_software_encoding else get_hardware_acceleration_option() if hwaccel: logger.info(f"将使用 {hwaccel} 硬件加速") elif force_software_encoding: logger.info("已强制使用软件编码,跳过硬件加速检测") else: logger.info("未检测到兼容的硬件加速,将使用软件编码") # Windows系统上,默认使用软件编码以提高兼容性 if os.name == 'nt' and hwaccel: logger.warning("在Windows系统上检测到硬件加速,但为了提高兼容性,建议使用软件编码") # 不强制禁用hwaccel,而是在process_single_video中进行额外安全检查 # 重组视频路径和原声设置为一个字典列表结构 video_segments = [] # 检查视频路径和原声设置列表长度是否匹配 if len(video_paths) != len(video_ost_list): logger.warning(f"视频路径列表({len(video_paths)})和原声设置列表({len(video_ost_list)})长度不匹配") # 调整长度以匹配较短的列表 min_length = min(len(video_paths), len(video_ost_list)) video_paths = video_paths[:min_length] video_ost_list = video_ost_list[:min_length] # 创建视频处理配置字典列表 for i, (video_path, video_ost) in enumerate(zip(video_paths, video_ost_list)): if not os.path.exists(video_path): logger.warning(f"视频不存在,跳过: {video_path}") continue # 检查是否有音频流 has_audio = check_video_has_audio(video_path) # 构建视频片段配置 segment = { "index": i, "path": video_path, "ost": video_ost, "has_audio": has_audio, "keep_audio": video_ost > 0 and has_audio # 只有当ost>0且实际有音频时才保留 } # 记录日志 if video_ost > 0 and not has_audio: logger.warning(f"视频 {video_path} 设置为保留原声(ost={video_ost}),但该视频没有音频流") video_segments.append(segment) # 处理每个视频片段 processed_videos = [] temp_dir = os.path.join(output_dir, "temp_videos") os.makedirs(temp_dir, exist_ok=True) try: # 第一阶段:处理所有视频片段到中间文件 for segment in video_segments: # 处理单个视频,去除或保留音频 temp_output = os.path.join(temp_dir, f"processed_{segment['index']}.mp4") try: process_single_video( input_path=segment['path'], output_path=temp_output, target_width=video_width, target_height=video_height, keep_audio=segment['keep_audio'], hwaccel=hwaccel ) processed_videos.append({ "index": segment["index"], "path": temp_output, "keep_audio": segment["keep_audio"] }) logger.info(f"视频 {segment['index'] + 1}/{len(video_segments)} 处理完成") except Exception as e: logger.error(f"处理视频 {segment['path']} 时出错: {str(e)}") # 如果使用硬件加速失败,尝试使用软件编码 if hwaccel and not force_software_encoding: logger.info(f"尝试使用软件编码处理视频 {segment['path']}") try: process_single_video( input_path=segment['path'], output_path=temp_output, target_width=video_width, target_height=video_height, keep_audio=segment['keep_audio'], hwaccel=None # 使用软件编码 ) processed_videos.append({ "index": segment["index"], "path": temp_output, "keep_audio": segment["keep_audio"] }) logger.info(f"使用软件编码成功处理视频 {segment['index'] + 1}/{len(video_segments)}") except Exception as fallback_error: logger.error(f"使用软件编码处理视频 {segment['path']} 也失败: {str(fallback_error)}") continue else: continue if not processed_videos: raise ValueError("没有有效的视频片段可以合并") # 按原始索引排序处理后的视频 processed_videos.sort(key=lambda x: x["index"]) # 第二阶段:分步骤合并视频 - 避免复杂的filter_complex滤镜 try: # 1. 首先,将所有没有音频的视频或音频被禁用的视频合并到一个临时文件中 video_paths_only = [video["path"] for video in processed_videos] video_concat_path = os.path.join(temp_dir, "video_concat.mp4") # 创建concat文件,用于合并视频流 concat_file = os.path.join(temp_dir, "concat_list.txt") create_ffmpeg_concat_file(video_paths_only, concat_file) # 合并所有视频流,但不包含音频 concat_cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high', '-an', # 不包含音频 '-threads', str(threads), video_concat_path ] subprocess.run(concat_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("视频流合并完成") # 2. 提取并合并有音频的片段 audio_segments = [video for video in processed_videos if video["keep_audio"]] if not audio_segments: # 如果没有音频片段,直接使用无音频的合并视频作为最终结果 shutil.copy(video_concat_path, output_video_path) logger.info("无音频视频合并完成") return output_video_path # 创建音频中间文件 audio_files = [] for i, segment in enumerate(audio_segments): # 提取音频 audio_file = os.path.join(temp_dir, f"audio_{i}.aac") extract_audio_cmd = [ 'ffmpeg', '-y', '-i', segment["path"], '-vn', # 不包含视频 '-c:a', 'aac', '-b:a', '128k', audio_file ] subprocess.run(extract_audio_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) audio_files.append({ "index": segment["index"], "path": audio_file }) logger.info(f"提取音频 {i+1}/{len(audio_segments)} 完成") # 3. 计算每个音频片段的时间位置 audio_timings = [] current_time = 0.0 # 获取每个视频片段的时长 for i, video in enumerate(processed_videos): duration_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', video["path"] ] result = subprocess.run(duration_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) duration = float(result.stdout.strip()) # 如果当前片段需要保留音频,记录时间位置 if video["keep_audio"]: for audio in audio_files: if audio["index"] == video["index"]: audio_timings.append({ "file": audio["path"], "start": current_time, "index": video["index"] }) break current_time += duration # 4. 创建静音音频轨道作为基础 silence_audio = os.path.join(temp_dir, "silence.aac") create_silence_cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', f'anullsrc=r=44100:cl=stereo', '-t', str(current_time), # 总时长 '-c:a', 'aac', '-b:a', '128k', silence_audio ] subprocess.run(create_silence_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 5. 创建复杂滤镜命令以混合音频 filter_script = os.path.join(temp_dir, "filter_script.txt") with open(filter_script, 'w') as f: f.write(f"[0:a]volume=0.0[silence];\n") # 首先静音背景轨道 # 添加每个音频文件,并补偿amix的音量稀释 # amix会将n个输入的音量平均分配,所以我们需要将每个输入的音量提高n倍来保持原始音量 num_inputs = len(audio_timings) + 1 # +1 for silence track volume_compensation = num_inputs # 补偿系数 for i, timing in enumerate(audio_timings): # 为每个音频添加音量补偿,确保原声保持原始音量 f.write(f"[{i+1}:a]volume={volume_compensation},adelay={int(timing['start']*1000)}|{int(timing['start']*1000)}[a{i}];\n") # 混合所有音频 mix_str = "[silence]" for i in range(len(audio_timings)): mix_str += f"[a{i}]" mix_str += f"amix=inputs={len(audio_timings)+1}:duration=longest[aout]" f.write(mix_str) # 6. 构建音频合并命令 audio_inputs = ['-i', silence_audio] for timing in audio_timings: audio_inputs.extend(['-i', timing["file"]]) mixed_audio = os.path.join(temp_dir, "mixed_audio.aac") audio_mix_cmd = [ 'ffmpeg', '-y' ] + audio_inputs + [ '-filter_complex_script', filter_script, '-map', '[aout]', '-c:a', 'aac', '-b:a', '128k', mixed_audio ] subprocess.run(audio_mix_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("音频混合完成") # 7. 将合并的视频和混合的音频组合在一起 final_cmd = [ 'ffmpeg', '-y', '-i', video_concat_path, '-i', mixed_audio, '-c:v', 'copy', '-c:a', 'aac', '-map', '0:v:0', '-map', '1:a:0', '-shortest', output_video_path ] subprocess.run(final_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("视频最终合并完成") return output_video_path except subprocess.CalledProcessError as e: logger.error(f"合并视频过程中出错: {e.stderr.decode() if e.stderr else str(e)}") # 尝试备用合并方法 - 最简单的无音频合并 logger.info("尝试备用合并方法 - 无音频合并") try: concat_file = os.path.join(temp_dir, "concat_list.txt") video_paths_only = [video["path"] for video in processed_videos] create_ffmpeg_concat_file(video_paths_only, concat_file) backup_cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c:v', 'copy', '-an', # 无音频 output_video_path ] subprocess.run(backup_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.warning("使用备用方法(无音频)成功合并视频") return output_video_path except Exception as backup_error: logger.error(f"备用合并方法也失败: {str(backup_error)}") raise RuntimeError(f"无法合并视频: {str(backup_error)}") except Exception as e: logger.error(f"合并视频时出错: {str(e)}") raise finally: # 清理临时文件 try: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) logger.info("已清理临时文件") except Exception as e: logger.warning(f"清理临时文件时出错: {str(e)}") if __name__ == '__main__': video_paths = [ '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E02_00_14_09_440.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_27_11_110.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_34_44_480.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_42_47_630.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E09_00_29_48_160.mp4' ] combine_clip_videos( output_video_path="/Users/apple/Desktop/home/NarratoAI/storage/temp/merge/merged_123.mp4", video_paths=video_paths, video_ost_list=[1, 1, 1,1,1], video_aspect=VideoAspect.portrait, force_software_encoding=False # 默认不强制使用软件编码,让系统自动决定 )