#!/usr/bin/env python # -*- coding: UTF-8 -*- ''' @Project: NarratoAI @File : merger_video @Author : 小林同学 @Date : 2025/5/6 下午7:38 ''' import os import subprocess import logging from enum import Enum from typing import List, Optional, Tuple, Dict, Any import shutil # 设置日志 logger = logging.getLogger(__name__) class VideoAspect(Enum): """视频宽高比枚举""" portrait = "portrait" # 竖屏 9:16 landscape = "landscape" # 横屏 16:9 square = "square" # 方形 1:1 def to_resolution(self) -> Tuple[int, int]: """根据宽高比返回标准分辨率""" if self == VideoAspect.portrait: return 1080, 1920 # 竖屏 9:16 elif self == VideoAspect.landscape: return 1920, 1080 # 横屏 16:9 elif self == VideoAspect.square: return 1080, 1080 # 方形 1:1 else: return 1080, 1920 # 默认竖屏 def check_ffmpeg_installation() -> bool: """ 检查ffmpeg是否已安装 Returns: bool: 如果安装则返回True,否则返回False """ try: subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) return True except (subprocess.SubprocessError, FileNotFoundError): logger.error("ffmpeg未安装或不在系统PATH中,请安装ffmpeg") return False def get_hardware_acceleration_option() -> Optional[str]: """ 根据系统环境选择合适的硬件加速选项 Returns: Optional[str]: 硬件加速参数,如果不支持则返回None """ try: # 检查NVIDIA GPU支持 nvidia_check = subprocess.run( ['ffmpeg', '-hide_banner', '-hwaccels'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) output = nvidia_check.stdout.lower() if 'cuda' in output: return 'cuda' elif 'nvenc' in output: return 'nvenc' elif 'qsv' in output: # Intel Quick Sync return 'qsv' elif 'videotoolbox' in output: # macOS return 'videotoolbox' elif 'vaapi' in output: # Linux VA-API return 'vaapi' else: logger.info("没有找到支持的硬件加速器,将使用软件编码") return None except Exception as e: logger.warning(f"检测硬件加速器时出错: {str(e)},将使用软件编码") return None def check_video_has_audio(video_path: str) -> bool: """ 检查视频是否包含音频流 Args: video_path: 视频文件路径 Returns: bool: 如果视频包含音频流则返回True,否则返回False """ if not os.path.exists(video_path): logger.warning(f"视频文件不存在: {video_path}") return False probe_cmd = [ 'ffprobe', '-v', 'error', '-select_streams', 'a:0', '-show_entries', 'stream=codec_type', '-of', 'csv=p=0', video_path ] try: result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False) return result.stdout.strip() == 'audio' except Exception as e: logger.warning(f"检测视频音频流时出错: {str(e)}") return False def create_ffmpeg_concat_file(video_paths: List[str], concat_file_path: str) -> str: """ 创建ffmpeg合并所需的concat文件 Args: video_paths: 需要合并的视频文件路径列表 concat_file_path: concat文件的输出路径 Returns: str: concat文件的路径 """ with open(concat_file_path, 'w', encoding='utf-8') as f: for video_path in video_paths: # 使用绝对路径并转义特殊字符 abs_path = os.path.abspath(video_path).replace('\\', '\\\\').replace(':', '\\:') f.write(f"file '{abs_path}'\n") return concat_file_path def process_single_video( input_path: str, output_path: str, target_width: int, target_height: int, keep_audio: bool = True, hwaccel: Optional[str] = None ) -> str: """ 处理单个视频:调整分辨率、帧率等 Args: input_path: 输入视频路径 output_path: 输出视频路径 target_width: 目标宽度 target_height: 目标高度 keep_audio: 是否保留音频 hwaccel: 硬件加速选项 Returns: str: 处理后的视频路径 """ if not os.path.exists(input_path): raise FileNotFoundError(f"找不到视频文件: {input_path}") # 构建基本命令 command = ['ffmpeg', '-y'] # 添加硬件加速参数 if hwaccel: if hwaccel == 'cuda' or hwaccel == 'nvenc': command.extend(['-hwaccel', 'cuda']) elif hwaccel == 'qsv': command.extend(['-hwaccel', 'qsv']) elif hwaccel == 'videotoolbox': command.extend(['-hwaccel', 'videotoolbox']) elif hwaccel == 'vaapi': command.extend(['-hwaccel', 'vaapi', '-vaapi_device', '/dev/dri/renderD128']) # 输入文件 command.extend(['-i', input_path]) # 处理音频 if not keep_audio: command.extend(['-an']) # 移除音频 else: # 检查输入视频是否有音频流 has_audio = check_video_has_audio(input_path) if has_audio: command.extend(['-c:a', 'aac', '-b:a', '128k']) # 音频编码为AAC else: logger.warning(f"视频 {input_path} 没有音频流,将会忽略音频设置") command.extend(['-an']) # 没有音频流时移除音频设置 # 视频处理参数:缩放并添加填充以保持比例 scale_filter = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease" pad_filter = f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2" command.extend([ '-vf', f"{scale_filter},{pad_filter}", '-r', '30', # 设置帧率为30fps ]) # 选择编码器 if hwaccel == 'cuda' or hwaccel == 'nvenc': command.extend(['-c:v', 'h264_nvenc', '-preset', 'p4', '-profile:v', 'high']) elif hwaccel == 'qsv': command.extend(['-c:v', 'h264_qsv', '-preset', 'medium']) elif hwaccel == 'videotoolbox': command.extend(['-c:v', 'h264_videotoolbox', '-profile:v', 'high']) elif hwaccel == 'vaapi': command.extend(['-c:v', 'h264_vaapi', '-profile', '100']) else: command.extend(['-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high']) # 设置视频比特率和其他参数 command.extend([ '-b:v', '5M', '-maxrate', '8M', '-bufsize', '10M', '-pix_fmt', 'yuv420p', # 兼容性更好的颜色格式 ]) # 输出文件 command.append(output_path) # 执行命令 try: logger.info(f"处理视频 {input_path} -> {output_path}") subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return output_path except subprocess.CalledProcessError as e: logger.error(f"处理视频失败: {e.stderr.decode() if e.stderr else str(e)}") raise RuntimeError(f"处理视频失败: {str(e)}") def combine_clip_videos( output_video_path: str, video_paths: List[str], video_ost_list: List[int], video_aspect: VideoAspect = VideoAspect.portrait, threads: int = 4, ) -> str: """ 合并子视频 Args: output_video_path: 合并后的存储路径 video_paths: 子视频路径列表 video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说) video_aspect: 屏幕比例 threads: 线程数 Returns: str: 合并后的视频路径 """ # 检查ffmpeg是否安装 if not check_ffmpeg_installation(): raise RuntimeError("未找到ffmpeg,请先安装") # 准备输出目录 output_dir = os.path.dirname(output_video_path) os.makedirs(output_dir, exist_ok=True) # 获取目标分辨率 aspect = VideoAspect(video_aspect) video_width, video_height = aspect.to_resolution() # 检测可用的硬件加速选项 hwaccel = get_hardware_acceleration_option() if hwaccel: logger.info(f"将使用 {hwaccel} 硬件加速") # 重组视频路径和原声设置为一个字典列表结构 video_segments = [] # 检查视频路径和原声设置列表长度是否匹配 if len(video_paths) != len(video_ost_list): logger.warning(f"视频路径列表({len(video_paths)})和原声设置列表({len(video_ost_list)})长度不匹配") # 调整长度以匹配较短的列表 min_length = min(len(video_paths), len(video_ost_list)) video_paths = video_paths[:min_length] video_ost_list = video_ost_list[:min_length] # 创建视频处理配置字典列表 for i, (video_path, video_ost) in enumerate(zip(video_paths, video_ost_list)): if not os.path.exists(video_path): logger.warning(f"视频不存在,跳过: {video_path}") continue # 检查是否有音频流 has_audio = check_video_has_audio(video_path) # 构建视频片段配置 segment = { "index": i, "path": video_path, "ost": video_ost, "has_audio": has_audio, "keep_audio": video_ost > 0 and has_audio # 只有当ost>0且实际有音频时才保留 } # 记录日志 if video_ost > 0 and not has_audio: logger.warning(f"视频 {video_path} 设置为保留原声(ost={video_ost}),但该视频没有音频流") video_segments.append(segment) # 处理每个视频片段 processed_videos = [] temp_dir = os.path.join(output_dir, "temp_videos") os.makedirs(temp_dir, exist_ok=True) try: # 第一阶段:处理所有视频片段到中间文件 for segment in video_segments: # 处理单个视频,去除或保留音频 temp_output = os.path.join(temp_dir, f"processed_{segment['index']}.mp4") try: process_single_video( input_path=segment['path'], output_path=temp_output, target_width=video_width, target_height=video_height, keep_audio=segment['keep_audio'], hwaccel=hwaccel ) processed_videos.append({ "index": segment["index"], "path": temp_output, "keep_audio": segment["keep_audio"] }) logger.info(f"视频 {segment['index'] + 1}/{len(video_segments)} 处理完成") except Exception as e: logger.error(f"处理视频 {segment['path']} 时出错: {str(e)}") continue if not processed_videos: raise ValueError("没有有效的视频片段可以合并") # 按原始索引排序处理后的视频 processed_videos.sort(key=lambda x: x["index"]) # 第二阶段:分步骤合并视频 - 避免复杂的filter_complex滤镜 try: # 1. 首先,将所有没有音频的视频或音频被禁用的视频合并到一个临时文件中 video_paths_only = [video["path"] for video in processed_videos] video_concat_path = os.path.join(temp_dir, "video_concat.mp4") # 创建concat文件,用于合并视频流 concat_file = os.path.join(temp_dir, "concat_list.txt") create_ffmpeg_concat_file(video_paths_only, concat_file) # 合并所有视频流,但不包含音频 concat_cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high', '-an', # 不包含音频 '-threads', str(threads), video_concat_path ] subprocess.run(concat_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("视频流合并完成") # 2. 提取并合并有音频的片段 audio_segments = [video for video in processed_videos if video["keep_audio"]] if not audio_segments: # 如果没有音频片段,直接使用无音频的合并视频作为最终结果 shutil.copy(video_concat_path, output_video_path) logger.info("无音频视频合并完成") return output_video_path # 创建音频中间文件 audio_files = [] for i, segment in enumerate(audio_segments): # 提取音频 audio_file = os.path.join(temp_dir, f"audio_{i}.aac") extract_audio_cmd = [ 'ffmpeg', '-y', '-i', segment["path"], '-vn', # 不包含视频 '-c:a', 'aac', '-b:a', '128k', audio_file ] subprocess.run(extract_audio_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) audio_files.append({ "index": segment["index"], "path": audio_file }) logger.info(f"提取音频 {i+1}/{len(audio_segments)} 完成") # 3. 计算每个音频片段的时间位置 audio_timings = [] current_time = 0.0 # 获取每个视频片段的时长 for i, video in enumerate(processed_videos): duration_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', video["path"] ] result = subprocess.run(duration_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) duration = float(result.stdout.strip()) # 如果当前片段需要保留音频,记录时间位置 if video["keep_audio"]: for audio in audio_files: if audio["index"] == video["index"]: audio_timings.append({ "file": audio["path"], "start": current_time, "index": video["index"] }) break current_time += duration # 4. 创建静音音频轨道作为基础 silence_audio = os.path.join(temp_dir, "silence.aac") create_silence_cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', f'anullsrc=r=44100:cl=stereo', '-t', str(current_time), # 总时长 '-c:a', 'aac', '-b:a', '128k', silence_audio ] subprocess.run(create_silence_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 5. 创建复杂滤镜命令以混合音频 filter_script = os.path.join(temp_dir, "filter_script.txt") with open(filter_script, 'w') as f: f.write(f"[0:a]volume=0.0[silence];\n") # 首先静音背景轨道 # 添加每个音频文件 for i, timing in enumerate(audio_timings): f.write(f"[{i+1}:a]adelay={int(timing['start']*1000)}|{int(timing['start']*1000)}[a{i}];\n") # 混合所有音频 mix_str = "[silence]" for i in range(len(audio_timings)): mix_str += f"[a{i}]" mix_str += f"amix=inputs={len(audio_timings)+1}:duration=longest[aout]" f.write(mix_str) # 6. 构建音频合并命令 audio_inputs = ['-i', silence_audio] for timing in audio_timings: audio_inputs.extend(['-i', timing["file"]]) mixed_audio = os.path.join(temp_dir, "mixed_audio.aac") audio_mix_cmd = [ 'ffmpeg', '-y' ] + audio_inputs + [ '-filter_complex_script', filter_script, '-map', '[aout]', '-c:a', 'aac', '-b:a', '128k', mixed_audio ] subprocess.run(audio_mix_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("音频混合完成") # 7. 将合并的视频和混合的音频组合在一起 final_cmd = [ 'ffmpeg', '-y', '-i', video_concat_path, '-i', mixed_audio, '-c:v', 'copy', '-c:a', 'aac', '-map', '0:v:0', '-map', '1:a:0', '-shortest', output_video_path ] subprocess.run(final_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.info("视频最终合并完成") return output_video_path except subprocess.CalledProcessError as e: logger.error(f"合并视频过程中出错: {e.stderr.decode() if e.stderr else str(e)}") # 尝试备用合并方法 - 最简单的无音频合并 logger.info("尝试备用合并方法 - 无音频合并") try: concat_file = os.path.join(temp_dir, "concat_list.txt") video_paths_only = [video["path"] for video in processed_videos] create_ffmpeg_concat_file(video_paths_only, concat_file) backup_cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c:v', 'copy', '-an', # 无音频 output_video_path ] subprocess.run(backup_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) logger.warning("使用备用方法(无音频)成功合并视频") return output_video_path except Exception as backup_error: logger.error(f"备用合并方法也失败: {str(backup_error)}") raise RuntimeError(f"无法合并视频: {str(backup_error)}") except Exception as e: logger.error(f"合并视频时出错: {str(e)}") raise finally: # 清理临时文件 try: if os.path.exists(temp_dir): shutil.rmtree(temp_dir) logger.info("已清理临时文件") except Exception as e: logger.warning(f"清理临时文件时出错: {str(e)}") if __name__ == '__main__': video_paths = [ '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/0ac14d474144b54d614c26a5c87cffe7/vid-00-00-00-00-00-26.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/0ac14d474144b54d614c26a5c87cffe7/vid-00-01-15-00-01-29.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-41-00-04-58.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/0ac14d474144b54d614c26a5c87cffe7/vid-00-04-58-00-05-20.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/0ac14d474144b54d614c26a5c87cffe7/vid-00-05-45-00-05-53.mp4', '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-06-00-00-06-03.mp4' ] combine_clip_videos( output_video_path="/Users/apple/Desktop/home/NarratoAI/storage/temp/merge/merged_123.mp4", video_paths=video_paths, video_ost_list=[1, 0, 1, 0, 0, 1], video_aspect=VideoAspect.portrait )