mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-12 03:02:48 +00:00
- 在 GenerateScriptRequest 和 ScriptGenerator 中新增 frame_interval_input 参数,默认为 5 - 更新 VideoProcessor 的 process_video_pipeline 方法,调整帧提取间隔逻辑 - 修改前端界面,增加帧间隔输入框并更新相关翻译文本 - 优化 generate_script_docu 函数,使用新的帧间隔参数进行视频处理
344 lines
11 KiB
Python
344 lines
11 KiB
Python
"""
|
||
视频帧提取工具
|
||
|
||
这个模块提供了简单高效的视频帧提取功能。主要特点:
|
||
1. 使用ffmpeg进行视频处理,支持硬件加速
|
||
2. 按指定时间间隔提取视频关键帧
|
||
3. 支持多种视频格式
|
||
4. 支持高清视频帧输出
|
||
5. 直接从原视频提取高质量关键帧
|
||
|
||
不依赖OpenCV和sklearn等库,只使用ffmpeg作为外部依赖,降低了安装和使用的复杂度。
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import time
|
||
import subprocess
|
||
from typing import List, Dict
|
||
from loguru import logger
|
||
from tqdm import tqdm
|
||
|
||
|
||
class VideoProcessor:
|
||
def __init__(self, video_path: str):
|
||
"""
|
||
初始化视频处理器
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
"""
|
||
if not os.path.exists(video_path):
|
||
raise FileNotFoundError(f"视频文件不存在: {video_path}")
|
||
|
||
self.video_path = video_path
|
||
self.video_info = self._get_video_info()
|
||
self.fps = float(self.video_info.get('fps', 25))
|
||
self.duration = float(self.video_info.get('duration', 0))
|
||
self.width = int(self.video_info.get('width', 0))
|
||
self.height = int(self.video_info.get('height', 0))
|
||
self.total_frames = int(self.fps * self.duration)
|
||
|
||
def _get_video_info(self) -> Dict[str, str]:
|
||
"""
|
||
使用ffprobe获取视频信息
|
||
|
||
Returns:
|
||
Dict[str, str]: 包含视频基本信息的字典
|
||
"""
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v", "error",
|
||
"-select_streams", "v:0",
|
||
"-show_entries", "stream=width,height,r_frame_rate,duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=0",
|
||
self.video_path
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||
lines = result.stdout.strip().split('\n')
|
||
info = {}
|
||
for line in lines:
|
||
if '=' in line:
|
||
key, value = line.split('=', 1)
|
||
info[key] = value
|
||
|
||
# 处理帧率(可能是分数形式)
|
||
if 'r_frame_rate' in info:
|
||
try:
|
||
num, den = map(int, info['r_frame_rate'].split('/'))
|
||
info['fps'] = str(num / den)
|
||
except ValueError:
|
||
info['fps'] = info.get('r_frame_rate', '25')
|
||
|
||
return info
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"获取视频信息失败: {e.stderr}")
|
||
return {
|
||
'width': '1280',
|
||
'height': '720',
|
||
'fps': '25',
|
||
'duration': '0'
|
||
}
|
||
|
||
def extract_frames_by_interval(self, output_dir: str, interval_seconds: float = 5.0,
|
||
use_hw_accel: bool = True, skip_seconds: float = 0.0) -> List[int]:
|
||
"""
|
||
按指定时间间隔提取视频帧
|
||
|
||
Args:
|
||
output_dir: 输出目录
|
||
interval_seconds: 帧提取间隔(秒)
|
||
use_hw_accel: 是否使用硬件加速
|
||
skip_seconds: 跳过视频开头的秒数
|
||
|
||
Returns:
|
||
List[int]: 提取的帧号列表
|
||
"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
# 计算起始时间和帧提取点
|
||
start_time = skip_seconds
|
||
end_time = self.duration
|
||
extraction_times = []
|
||
|
||
current_time = start_time
|
||
while current_time < end_time:
|
||
extraction_times.append(current_time)
|
||
current_time += interval_seconds
|
||
|
||
if not extraction_times:
|
||
logger.warning("未找到需要提取的帧")
|
||
return []
|
||
|
||
# 确定硬件加速器选项
|
||
hw_accel = []
|
||
if use_hw_accel:
|
||
# 尝试检测可用的硬件加速器
|
||
hw_accel_options = self._detect_hw_accelerator()
|
||
if hw_accel_options:
|
||
hw_accel = hw_accel_options
|
||
logger.info(f"使用硬件加速: {' '.join(hw_accel)}")
|
||
else:
|
||
logger.warning("未检测到可用的硬件加速器,使用软件解码")
|
||
|
||
# 提取帧
|
||
frame_numbers = []
|
||
for i, timestamp in enumerate(tqdm(extraction_times, desc="提取视频帧")):
|
||
frame_number = int(timestamp * self.fps)
|
||
frame_numbers.append(frame_number)
|
||
|
||
# 格式化时间戳字符串 (HHMMSSmmm)
|
||
hours = int(timestamp // 3600)
|
||
minutes = int((timestamp % 3600) // 60)
|
||
seconds = int(timestamp % 60)
|
||
milliseconds = int((timestamp % 1) * 1000)
|
||
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
|
||
|
||
output_path = os.path.join(output_dir, f"keyframe_{frame_number:06d}_{time_str}.jpg")
|
||
|
||
# 使用ffmpeg提取单帧
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
]
|
||
|
||
# 添加硬件加速参数
|
||
cmd.extend(hw_accel)
|
||
|
||
cmd.extend([
|
||
"-ss", str(timestamp),
|
||
"-i", self.video_path,
|
||
"-vframes", "1",
|
||
"-q:v", "1", # 最高质量
|
||
"-y",
|
||
output_path
|
||
])
|
||
|
||
try:
|
||
subprocess.run(cmd, check=True, capture_output=True)
|
||
except subprocess.CalledProcessError as e:
|
||
logger.warning(f"提取帧 {frame_number} 失败: {e.stderr}")
|
||
|
||
logger.info(f"成功提取了 {len(frame_numbers)} 个视频帧")
|
||
return frame_numbers
|
||
|
||
def _detect_hw_accelerator(self) -> List[str]:
|
||
"""
|
||
检测系统可用的硬件加速器
|
||
|
||
Returns:
|
||
List[str]: 硬件加速器ffmpeg命令参数
|
||
"""
|
||
# 检测操作系统
|
||
import platform
|
||
system = platform.system().lower()
|
||
|
||
# 测试不同的硬件加速器
|
||
accelerators = []
|
||
|
||
if system == 'darwin': # macOS
|
||
# 测试 videotoolbox (Apple 硬件加速)
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "videotoolbox",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "videotoolbox"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
elif system == 'linux':
|
||
# 测试 VAAPI
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "vaapi",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "vaapi"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
# 尝试 CUDA
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "cuda",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "cuda"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
elif system == 'windows':
|
||
# 测试 CUDA
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "cuda",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "cuda"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
# 测试 D3D11VA
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "d3d11va",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "d3d11va"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
# 测试 DXVA2
|
||
test_cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-hwaccel", "dxva2",
|
||
"-i", self.video_path,
|
||
"-t", "0.1",
|
||
"-f", "null",
|
||
"-"
|
||
]
|
||
try:
|
||
subprocess.run(test_cmd, capture_output=True, check=True)
|
||
return ["-hwaccel", "dxva2"]
|
||
except subprocess.CalledProcessError:
|
||
pass
|
||
|
||
# 如果没有找到可用的硬件加速器
|
||
return []
|
||
|
||
def process_video_pipeline(self,
|
||
output_dir: str,
|
||
skip_seconds: float = 0.0,
|
||
interval_seconds: float = 5.0, # 帧提取间隔(秒)
|
||
use_hw_accel: bool = True) -> None:
|
||
"""
|
||
执行简化的视频处理流程,直接从原视频按固定时间间隔提取帧
|
||
|
||
Args:
|
||
output_dir: 输出目录
|
||
skip_seconds: 跳过视频开头的秒数
|
||
interval_seconds: 帧提取间隔(秒)
|
||
use_hw_accel: 是否使用硬件加速
|
||
"""
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
try:
|
||
# 直接从原视频提取关键帧
|
||
logger.info(f"从视频间隔 {interval_seconds} 秒提取关键帧...")
|
||
self.extract_frames_by_interval(
|
||
output_dir,
|
||
interval_seconds=interval_seconds,
|
||
use_hw_accel=use_hw_accel,
|
||
skip_seconds=skip_seconds
|
||
)
|
||
|
||
logger.info(f"处理完成!视频帧已保存在: {output_dir}")
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
logger.error(f"视频处理失败: \n{traceback.format_exc()}")
|
||
raise
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import time
|
||
|
||
start_time = time.time()
|
||
|
||
# 使用示例
|
||
processor = VideoProcessor("./resource/videos/test.mp4")
|
||
|
||
# 设置间隔为3秒提取帧
|
||
processor.process_video_pipeline(
|
||
output_dir="output",
|
||
interval_seconds=3.0,
|
||
use_hw_accel=True
|
||
)
|
||
|
||
end_time = time.time()
|
||
print(f"处理完成!总耗时: {end_time - start_time:.2f} 秒")
|