#!/usr/bin/env python # -*- coding: UTF-8 -*- ''' @Project: NarratoAI @File : clip_video @Author : 小林同学 @Date : 2025/5/6 下午6:14 ''' import os import subprocess import json import hashlib from loguru import logger from typing import Dict, List, Optional from pathlib import Path from app.utils import ffmpeg_utils def parse_timestamp(timestamp: str) -> tuple: """ 解析时间戳字符串,返回开始和结束时间 Args: timestamp: 格式为'HH:MM:SS-HH:MM:SS'或'HH:MM:SS,sss-HH:MM:SS,sss'的时间戳字符串 Returns: tuple: (开始时间, 结束时间) 格式为'HH:MM:SS'或'HH:MM:SS,sss' """ start_time, end_time = timestamp.split('-') return start_time, end_time def calculate_end_time(start_time: str, duration: float, extra_seconds: float = 1.0) -> str: """ 根据开始时间和持续时间计算结束时间 Args: start_time: 开始时间,格式为'HH:MM:SS'或'HH:MM:SS,sss'(带毫秒) duration: 持续时间,单位为秒 extra_seconds: 额外添加的秒数,默认为1秒 Returns: str: 计算后的结束时间,格式与输入格式相同 """ # 检查是否包含毫秒 has_milliseconds = ',' in start_time milliseconds = 0 if has_milliseconds: time_part, ms_part = start_time.split(',') h, m, s = map(int, time_part.split(':')) milliseconds = int(ms_part) else: h, m, s = map(int, start_time.split(':')) # 转换为总毫秒数 total_milliseconds = ((h * 3600 + m * 60 + s) * 1000 + milliseconds + int((duration + extra_seconds) * 1000)) # 计算新的时、分、秒、毫秒 ms_new = total_milliseconds % 1000 total_seconds = total_milliseconds // 1000 h_new = int(total_seconds // 3600) m_new = int((total_seconds % 3600) // 60) s_new = int(total_seconds % 60) # 返回与输入格式一致的时间字符串 if has_milliseconds: return f"{h_new:02d}:{m_new:02d}:{s_new:02d},{ms_new:03d}" else: return f"{h_new:02d}:{m_new:02d}:{s_new:02d}" def check_hardware_acceleration() -> Optional[str]: """ 检查系统支持的硬件加速选项 Returns: Optional[str]: 硬件加速参数,如果不支持则返回None """ # 使用集中式硬件加速检测 return ffmpeg_utils.get_ffmpeg_hwaccel_type() def get_safe_encoder_config(hwaccel_type: Optional[str] = None) -> Dict[str, str]: """ 获取安全的编码器配置,针对Windows平台优化 Args: hwaccel_type: 硬件加速类型 Returns: Dict[str, str]: 编码器配置字典 """ config = { "video_codec": "libx264", "audio_codec": "aac", "pixel_format": "yuv420p", "preset": "fast", "crf": "23" } # 根据硬件加速类型调整配置 if hwaccel_type == "cuda": config["video_codec"] = "h264_nvenc" config["preset"] = "fast" config["pixel_format"] = "yuv420p" elif hwaccel_type == "qsv": config["video_codec"] = "h264_qsv" config["preset"] = "fast" elif hwaccel_type == "d3d11va" or hwaccel_type == "dxva2": # Windows平台的硬件解码,但使用软件编码 config["video_codec"] = "libx264" config["preset"] = "fast" elif hwaccel_type == "videotoolbox": config["video_codec"] = "h264_videotoolbox" return config def build_ffmpeg_command( input_path: str, output_path: str, start_time: str, end_time: str, encoder_config: Dict[str, str], hwaccel_args: List[str] = None ) -> List[str]: """ 构建优化的ffmpeg命令 Args: input_path: 输入视频路径 output_path: 输出视频路径 start_time: 开始时间 end_time: 结束时间 encoder_config: 编码器配置 hwaccel_args: 硬件加速参数 Returns: List[str]: ffmpeg命令列表 """ cmd = ["ffmpeg", "-y"] # 添加硬件加速参数(如果有) if hwaccel_args: cmd.extend(hwaccel_args) # 输入文件 cmd.extend(["-i", input_path]) # 时间范围 cmd.extend(["-ss", start_time, "-to", end_time]) # 编码器设置 cmd.extend(["-c:v", encoder_config["video_codec"]]) cmd.extend(["-c:a", encoder_config["audio_codec"]]) # 像素格式(关键:避免滤镜链问题) cmd.extend(["-pix_fmt", encoder_config["pixel_format"]]) # 编码质量设置 if encoder_config["video_codec"] == "libx264": cmd.extend(["-preset", encoder_config["preset"]]) cmd.extend(["-crf", encoder_config["crf"]]) elif encoder_config["video_codec"] == "h264_nvenc": cmd.extend(["-preset", encoder_config["preset"]]) cmd.extend(["-rc", "vbr", "-cq", encoder_config["crf"]]) elif encoder_config["video_codec"] == "h264_qsv": cmd.extend(["-preset", encoder_config["preset"]]) cmd.extend(["-global_quality", encoder_config["crf"]]) # 音频设置 cmd.extend(["-ar", "44100", "-ac", "2"]) # 避免滤镜链问题的关键参数 cmd.extend(["-avoid_negative_ts", "make_zero"]) # 输出文件 cmd.append(output_path) return cmd def execute_ffmpeg_with_fallback( cmd: List[str], timestamp: str, input_path: str, output_path: str, start_time: str, end_time: str ) -> bool: """ 执行ffmpeg命令,带有fallback机制 Args: cmd: 主要的ffmpeg命令 timestamp: 时间戳(用于日志) input_path: 输入路径 output_path: 输出路径 start_time: 开始时间 end_time: 结束时间 Returns: bool: 是否成功 """ try: logger.debug(f"执行ffmpeg命令: {' '.join(cmd)}") # 在Windows系统上使用UTF-8编码处理输出 is_windows = os.name == 'nt' process_kwargs = { "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "text": True, "check": True } if is_windows: process_kwargs["encoding"] = 'utf-8' subprocess.run(cmd, **process_kwargs) # 验证输出文件 if os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"视频裁剪成功: {timestamp}") return True else: logger.warning(f"输出文件无效: {output_path}") return False except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) logger.warning(f"主要命令失败: {error_msg}") # 尝试fallback命令(纯软件编码) logger.info(f"尝试fallback方案: {timestamp}") return try_fallback_encoding(input_path, output_path, start_time, end_time, timestamp) except Exception as e: logger.error(f"执行ffmpeg命令时发生异常: {str(e)}") return False def try_fallback_encoding( input_path: str, output_path: str, start_time: str, end_time: str, timestamp: str ) -> bool: """ 尝试fallback编码方案(纯软件编码) Args: input_path: 输入路径 output_path: 输出路径 start_time: 开始时间 end_time: 结束时间 timestamp: 时间戳 Returns: bool: 是否成功 """ # 最简单的软件编码命令 fallback_cmd = [ "ffmpeg", "-y", "-i", input_path, "-ss", start_time, "-to", end_time, "-c:v", "libx264", "-c:a", "aac", "-pix_fmt", "yuv420p", "-preset", "ultrafast", # 最快速度 "-crf", "28", # 稍微降低质量以提高兼容性 "-avoid_negative_ts", "make_zero", "-movflags", "+faststart", output_path ] try: logger.debug(f"执行fallback命令: {' '.join(fallback_cmd)}") is_windows = os.name == 'nt' process_kwargs = { "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "text": True, "check": True } if is_windows: process_kwargs["encoding"] = 'utf-8' subprocess.run(fallback_cmd, **process_kwargs) if os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"Fallback编码成功: {timestamp}") return True else: logger.error(f"Fallback编码失败,输出文件无效: {output_path}") return False except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) logger.error(f"Fallback编码也失败: {error_msg}") return False except Exception as e: logger.error(f"Fallback编码异常: {str(e)}") return False def clip_video( video_origin_path: str, tts_result: List[Dict], output_dir: Optional[str] = None, task_id: Optional[str] = None ) -> Dict[str, str]: """ 根据时间戳裁剪视频 - 优化版本,增强Windows兼容性和错误处理 Args: video_origin_path: 原始视频的路径 tts_result: 包含时间戳和持续时间信息的列表 output_dir: 输出目录路径,默认为None时会自动生成 task_id: 任务ID,用于生成唯一的输出目录,默认为None时会自动生成 Returns: Dict[str, str]: 时间戳到裁剪后视频路径的映射 """ # 检查视频文件是否存在 if not os.path.exists(video_origin_path): raise FileNotFoundError(f"视频文件不存在: {video_origin_path}") # 如果未提供task_id,则根据输入生成一个唯一ID if task_id is None: content_for_hash = f"{video_origin_path}_{json.dumps(tts_result)}" task_id = hashlib.md5(content_for_hash.encode()).hexdigest() # 设置输出目录 if output_dir is None: output_dir = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "storage", "temp", "clip_video", task_id ) # 确保输出目录存在 Path(output_dir).mkdir(parents=True, exist_ok=True) # 获取硬件加速支持 hwaccel_type = check_hardware_acceleration() hwaccel_args = [] if hwaccel_type: hwaccel_args = ffmpeg_utils.get_ffmpeg_hwaccel_args() logger.info(f"使用硬件加速: {hwaccel_type}") else: logger.info("使用软件编码") # 获取编码器配置 encoder_config = get_safe_encoder_config(hwaccel_type) logger.debug(f"编码器配置: {encoder_config}") # 存储裁剪结果 result = {} failed_clips = [] for item in tts_result: _id = item.get("_id", item.get("timestamp", "unknown")) timestamp = item["timestamp"] start_time, _ = parse_timestamp(timestamp) # 根据持续时间计算真正的结束时间(加上1秒余量) duration = item["duration"] calculated_end_time = calculate_end_time(start_time, duration) # 转换为FFmpeg兼容的时间格式(逗号替换为点) ffmpeg_start_time = start_time.replace(',', '.') ffmpeg_end_time = calculated_end_time.replace(',', '.') # 格式化输出文件名(使用连字符替代冒号和逗号) safe_start_time = start_time.replace(':', '-').replace(',', '-') safe_end_time = calculated_end_time.replace(':', '-').replace(',', '-') output_filename = f"vid_{safe_start_time}@{safe_end_time}.mp4" output_path = os.path.join(output_dir, output_filename) # 构建FFmpeg命令 ffmpeg_cmd = build_ffmpeg_command( video_origin_path, output_path, ffmpeg_start_time, ffmpeg_end_time, encoder_config, hwaccel_args ) # 执行FFmpeg命令 logger.info(f"裁剪视频片段: {timestamp} -> {ffmpeg_start_time}到{ffmpeg_end_time}") success = execute_ffmpeg_with_fallback( ffmpeg_cmd, timestamp, video_origin_path, output_path, ffmpeg_start_time, ffmpeg_end_time ) if success: result[_id] = output_path else: failed_clips.append(timestamp) logger.error(f"裁剪视频片段失败: {timestamp}") # 检查是否有失败的片段 if failed_clips: logger.warning(f"以下片段裁剪失败: {failed_clips}") if len(failed_clips) == len(tts_result): raise RuntimeError("所有视频片段裁剪都失败了,请检查视频文件和ffmpeg配置") logger.info(f"视频裁剪完成,成功: {len(result)}, 失败: {len(failed_clips)}") return result if __name__ == "__main__": video_origin_path = "/Users/apple/Desktop/home/NarratoAI/resource/videos/qyn2-2无片头片尾.mp4" tts_result = [{'timestamp': '00:00:00-00:01:15', 'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_00_00-00_01_15.mp3', 'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_00_00-00_01_15.srt', 'duration': 25.55, 'text': '好的各位,欢迎回到我的频道!《庆余年 2》刚开播就给了我们一个王炸!范闲在北齐"死"了?这怎么可能!上集片尾那个巨大的悬念,这一集就立刻揭晓了!范闲假死归来,他面临的第一个,也是最大的难关,就是如何面对他最敬爱的,同时也是最可怕的那个人——庆帝!'}, {'timestamp': '00:01:15-00:04:40', 'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_01_15-00_04_40.mp3', 'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_01_15-00_04_40.srt', 'duration': 13.488, 'text': '但我们都知道,他绝不可能就这么轻易退场!第二集一开场,范闲就已经秘密回到了京都。他的生死传闻,可不像我们想象中那样只是小范围流传,而是…'}, {'timestamp': '00:04:58-00:05:45', 'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_04_58-00_05_45.mp3', 'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_04_58-00_05_45.srt', 'duration': 21.363, 'text': '"欺君之罪"!在封建王朝,这可是抄家灭族的大罪!搁一般人,肯定脚底抹油溜之大吉了。但范闲是谁啊?他偏要反其道而行之!他竟然决定,直接去见庆帝!冒着天大的风险,用"假死"这个事实去赌庆帝的态度!'}, {'timestamp': '00:05:45-00:06:00', 'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_05_45-00_06_00.mp3', 'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_05_45-00_06_00.srt', 'duration': 7.675, 'text': '但想见庆帝,哪有那么容易?范闲艺高人胆大,竟然选择了最激进的方式——闯宫!'}] subclip_path_videos = { '00:00:00-00:01:15': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-00-00-00-01-15.mp4', '00:01:15-00:04:40': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-01-15-00-04-40.mp4', '00:04:41-00:04:58': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-41-00-04-58.mp4', '00:04:58-00:05:45': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-58-00-05-45.mp4', '00:05:45-00:06:00': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-05-45-00-06-00.mp4', '00:06:00-00:06:03': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-06-00-00-06-03.mp4', } # 使用方法示例 try: result = clip_video(video_origin_path, tts_result, subclip_path_videos) print("裁剪结果:") print(json.dumps(result, indent=4, ensure_ascii=False)) except Exception as e: print(f"发生错误: {e}")