NarratoAI/app/services/clip_video.py
2025-05-19 02:41:30 +08:00

226 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : clip_video
@Author : 小林同学
@Date : 2025/5/6 下午6:14
'''
import os
import subprocess
import json
import hashlib
from loguru import logger
from typing import Dict, List, Optional
from pathlib import Path
from app.utils import ffmpeg_utils
def parse_timestamp(timestamp: str) -> tuple:
"""
解析时间戳字符串,返回开始和结束时间
Args:
timestamp: 格式为'HH:MM:SS-HH:MM:SS''HH:MM:SS,sss-HH:MM:SS,sss'的时间戳字符串
Returns:
tuple: (开始时间, 结束时间) 格式为'HH:MM:SS''HH:MM:SS,sss'
"""
start_time, end_time = timestamp.split('-')
return start_time, end_time
def calculate_end_time(start_time: str, duration: float, extra_seconds: float = 1.0) -> str:
"""
根据开始时间和持续时间计算结束时间
Args:
start_time: 开始时间,格式为'HH:MM:SS''HH:MM:SS,sss'(带毫秒)
duration: 持续时间,单位为秒
extra_seconds: 额外添加的秒数默认为1秒
Returns:
str: 计算后的结束时间,格式与输入格式相同
"""
# 检查是否包含毫秒
has_milliseconds = ',' in start_time
milliseconds = 0
if has_milliseconds:
time_part, ms_part = start_time.split(',')
h, m, s = map(int, time_part.split(':'))
milliseconds = int(ms_part)
else:
h, m, s = map(int, start_time.split(':'))
# 转换为总毫秒数
total_milliseconds = ((h * 3600 + m * 60 + s) * 1000 + milliseconds +
int((duration + extra_seconds) * 1000))
# 计算新的时、分、秒、毫秒
ms_new = total_milliseconds % 1000
total_seconds = total_milliseconds // 1000
h_new = int(total_seconds // 3600)
m_new = int((total_seconds % 3600) // 60)
s_new = int(total_seconds % 60)
# 返回与输入格式一致的时间字符串
if has_milliseconds:
return f"{h_new:02d}:{m_new:02d}:{s_new:02d},{ms_new:03d}"
else:
return f"{h_new:02d}:{m_new:02d}:{s_new:02d}"
def check_hardware_acceleration() -> Optional[str]:
"""
检查系统支持的硬件加速选项
Returns:
Optional[str]: 硬件加速参数如果不支持则返回None
"""
# 使用集中式硬件加速检测
return ffmpeg_utils.get_ffmpeg_hwaccel_type()
def clip_video(
video_origin_path: str,
tts_result: List[Dict],
output_dir: Optional[str] = None,
task_id: Optional[str] = None
) -> Dict[str, str]:
"""
根据时间戳裁剪视频
Args:
video_origin_path: 原始视频的路径
tts_result: 包含时间戳和持续时间信息的列表
output_dir: 输出目录路径默认为None时会自动生成
task_id: 任务ID用于生成唯一的输出目录默认为None时会自动生成
Returns:
Dict[str, str]: 时间戳到裁剪后视频路径的映射
"""
# 检查视频文件是否存在
if not os.path.exists(video_origin_path):
raise FileNotFoundError(f"视频文件不存在: {video_origin_path}")
# 如果未提供task_id则根据输入生成一个唯一ID
if task_id is None:
content_for_hash = f"{video_origin_path}_{json.dumps(tts_result)}"
task_id = hashlib.md5(content_for_hash.encode()).hexdigest()
# 设置输出目录
if output_dir is None:
output_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"storage", "temp", "clip_video", task_id
)
# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 获取硬件加速支持
hwaccel = check_hardware_acceleration()
hwaccel_args = []
if hwaccel:
hwaccel_args = ffmpeg_utils.get_ffmpeg_hwaccel_args()
# 存储裁剪结果
result = {}
for item in tts_result:
_id = item.get("_id", item.get("timestamp", "unknown"))
timestamp = item["timestamp"]
start_time, _ = parse_timestamp(timestamp)
# 根据持续时间计算真正的结束时间加上1秒余量
duration = item["duration"]
calculated_end_time = calculate_end_time(start_time, duration)
# 转换为FFmpeg兼容的时间格式逗号替换为点
ffmpeg_start_time = start_time.replace(',', '.')
ffmpeg_end_time = calculated_end_time.replace(',', '.')
# 格式化输出文件名(使用连字符替代冒号和逗号)
safe_start_time = start_time.replace(':', '-').replace(',', '-')
safe_end_time = calculated_end_time.replace(':', '-').replace(',', '-')
output_filename = f"vid_{safe_start_time}@{safe_end_time}.mp4"
output_path = os.path.join(output_dir, output_filename)
# 构建FFmpeg命令
ffmpeg_cmd = [
"ffmpeg", "-y", *hwaccel_args,
"-i", video_origin_path,
"-ss", ffmpeg_start_time,
"-to", ffmpeg_end_time,
"-c:v", "h264_videotoolbox" if hwaccel == "videotoolbox" else "libx264",
"-c:a", "aac",
"-strict", "experimental",
output_path
]
# 执行FFmpeg命令
try:
logger.info(f"裁剪视频片段: {timestamp} -> {ffmpeg_start_time}{ffmpeg_end_time}")
# logger.debug(f"执行命令: {' '.join(ffmpeg_cmd)}")
process = subprocess.run(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
result[_id] = output_path
except subprocess.CalledProcessError as e:
logger.error(f"裁剪视频片段失败: {timestamp}")
logger.error(f"错误信息: {e.stderr}")
raise RuntimeError(f"视频裁剪失败: {e.stderr}")
return result
if __name__ == "__main__":
video_origin_path = "/Users/apple/Desktop/home/NarratoAI/resource/videos/qyn2-2无片头片尾.mp4"
tts_result = [{'timestamp': '00:00:00-00:01:15',
'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_00_00-00_01_15.mp3',
'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_00_00-00_01_15.srt',
'duration': 25.55,
'text': '好的各位,欢迎回到我的频道!《庆余年 2》刚开播就给了我们一个王炸范闲在北齐""了?这怎么可能!上集片尾那个巨大的悬念,这一集就立刻揭晓了!范闲假死归来,他面临的第一个,也是最大的难关,就是如何面对他最敬爱的,同时也是最可怕的那个人——庆帝!'},
{'timestamp': '00:01:15-00:04:40',
'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_01_15-00_04_40.mp3',
'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_01_15-00_04_40.srt',
'duration': 13.488,
'text': '但我们都知道,他绝不可能就这么轻易退场!第二集一开场,范闲就已经秘密回到了京都。他的生死传闻,可不像我们想象中那样只是小范围流传,而是…'},
{'timestamp': '00:04:58-00:05:45',
'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_04_58-00_05_45.mp3',
'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_04_58-00_05_45.srt',
'duration': 21.363,
'text': '"欺君之罪"!在封建王朝,这可是抄家灭族的大罪!搁一般人,肯定脚底抹油溜之大吉了。但范闲是谁啊?他偏要反其道而行之!他竟然决定,直接去见庆帝!冒着天大的风险,用"假死"这个事实去赌庆帝的态度!'},
{'timestamp': '00:05:45-00:06:00',
'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_05_45-00_06_00.mp3',
'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_05_45-00_06_00.srt',
'duration': 7.675, 'text': '但想见庆帝,哪有那么容易?范闲艺高人胆大,竟然选择了最激进的方式——闯宫!'}]
subclip_path_videos = {
'00:00:00-00:01:15': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-00-00-00-01-15.mp4',
'00:01:15-00:04:40': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-01-15-00-04-40.mp4',
'00:04:41-00:04:58': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-41-00-04-58.mp4',
'00:04:58-00:05:45': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-58-00-05-45.mp4',
'00:05:45-00:06:00': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-05-45-00-06-00.mp4',
'00:06:00-00:06:03': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-06-00-00-06-03.mp4',
}
# 使用方法示例
try:
result = clip_video(video_origin_path, tts_result, subclip_path_videos)
print("裁剪结果:")
print(json.dumps(result, indent=4, ensure_ascii=False))
except Exception as e:
print(f"发生错误: {e}")