NarratoAI/app/utils/video_processor.py
linyq 2dc83bc18e perf(vision): 优化视觉分析流程和批量处理逻辑
- 移除了 vision_analysis_prompt 配置项
- 优化了 Gemini 和 QwenVL 分析器的批量处理逻辑
- 更新了文档生成脚本和 UI 组件以适应新的分析流程
- 调整了视频帧提取相关函数,移除了不必要的 skip_seconds 参数
- 更新了中文翻译文件,添加了新的批处理大小相关提示
2025-05-07 18:44:37 +08:00

340 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
视频帧提取工具
这个模块提供了简单高效的视频帧提取功能。主要特点:
1. 使用ffmpeg进行视频处理支持硬件加速
2. 按指定时间间隔提取视频关键帧
3. 支持多种视频格式
4. 支持高清视频帧输出
5. 直接从原视频提取高质量关键帧
不依赖OpenCV和sklearn等库只使用ffmpeg作为外部依赖降低了安装和使用的复杂度。
"""
import os
import re
import time
import subprocess
from typing import List, Dict
from loguru import logger
from tqdm import tqdm
class VideoProcessor:
def __init__(self, video_path: str):
"""
初始化视频处理器
Args:
video_path: 视频文件路径
"""
if not os.path.exists(video_path):
raise FileNotFoundError(f"视频文件不存在: {video_path}")
self.video_path = video_path
self.video_info = self._get_video_info()
self.fps = float(self.video_info.get('fps', 25))
self.duration = float(self.video_info.get('duration', 0))
self.width = int(self.video_info.get('width', 0))
self.height = int(self.video_info.get('height', 0))
self.total_frames = int(self.fps * self.duration)
def _get_video_info(self) -> Dict[str, str]:
"""
使用ffprobe获取视频信息
Returns:
Dict[str, str]: 包含视频基本信息的字典
"""
cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,r_frame_rate,duration",
"-of", "default=noprint_wrappers=1:nokey=0",
self.video_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().split('\n')
info = {}
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
info[key] = value
# 处理帧率(可能是分数形式)
if 'r_frame_rate' in info:
try:
num, den = map(int, info['r_frame_rate'].split('/'))
info['fps'] = str(num / den)
except ValueError:
info['fps'] = info.get('r_frame_rate', '25')
return info
except subprocess.CalledProcessError as e:
logger.error(f"获取视频信息失败: {e.stderr}")
return {
'width': '1280',
'height': '720',
'fps': '25',
'duration': '0'
}
def extract_frames_by_interval(self, output_dir: str, interval_seconds: float = 5.0,
use_hw_accel: bool = True) -> List[int]:
"""
按指定时间间隔提取视频帧
Args:
output_dir: 输出目录
interval_seconds: 帧提取间隔(秒)
use_hw_accel: 是否使用硬件加速
Returns:
List[int]: 提取的帧号列表
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 计算起始时间和帧提取点
start_time = 0
end_time = self.duration
extraction_times = []
current_time = start_time
while current_time < end_time:
extraction_times.append(current_time)
current_time += interval_seconds
if not extraction_times:
logger.warning("未找到需要提取的帧")
return []
# 确定硬件加速器选项
hw_accel = []
if use_hw_accel:
# 尝试检测可用的硬件加速器
hw_accel_options = self._detect_hw_accelerator()
if hw_accel_options:
hw_accel = hw_accel_options
logger.info(f"使用硬件加速: {' '.join(hw_accel)}")
else:
logger.warning("未检测到可用的硬件加速器,使用软件解码")
# 提取帧
frame_numbers = []
for i, timestamp in enumerate(tqdm(extraction_times, desc="提取视频帧")):
frame_number = int(timestamp * self.fps)
frame_numbers.append(frame_number)
# 格式化时间戳字符串 (HHMMSSmmm)
hours = int(timestamp // 3600)
minutes = int((timestamp % 3600) // 60)
seconds = int(timestamp % 60)
milliseconds = int((timestamp % 1) * 1000)
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
output_path = os.path.join(output_dir, f"keyframe_{frame_number:06d}_{time_str}.jpg")
# 使用ffmpeg提取单帧
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
]
# 添加硬件加速参数
cmd.extend(hw_accel)
cmd.extend([
"-ss", str(timestamp),
"-i", self.video_path,
"-vframes", "1",
"-q:v", "1", # 最高质量
"-y",
output_path
])
try:
subprocess.run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError as e:
logger.warning(f"提取帧 {frame_number} 失败: {e.stderr}")
logger.info(f"成功提取了 {len(frame_numbers)} 个视频帧")
return frame_numbers
def _detect_hw_accelerator(self) -> List[str]:
"""
检测系统可用的硬件加速器
Returns:
List[str]: 硬件加速器ffmpeg命令参数
"""
# 检测操作系统
import platform
system = platform.system().lower()
# 测试不同的硬件加速器
accelerators = []
if system == 'darwin': # macOS
# 测试 videotoolbox (Apple 硬件加速)
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "videotoolbox",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "videotoolbox"]
except subprocess.CalledProcessError:
pass
elif system == 'linux':
# 测试 VAAPI
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "vaapi",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "vaapi"]
except subprocess.CalledProcessError:
pass
# 尝试 CUDA
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "cuda",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "cuda"]
except subprocess.CalledProcessError:
pass
elif system == 'windows':
# 测试 CUDA
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "cuda",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "cuda"]
except subprocess.CalledProcessError:
pass
# 测试 D3D11VA
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "d3d11va",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "d3d11va"]
except subprocess.CalledProcessError:
pass
# 测试 DXVA2
test_cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-hwaccel", "dxva2",
"-i", self.video_path,
"-t", "0.1",
"-f", "null",
"-"
]
try:
subprocess.run(test_cmd, capture_output=True, check=True)
return ["-hwaccel", "dxva2"]
except subprocess.CalledProcessError:
pass
# 如果没有找到可用的硬件加速器
return []
def process_video_pipeline(self,
output_dir: str,
interval_seconds: float = 5.0, # 帧提取间隔(秒)
use_hw_accel: bool = True) -> None:
"""
执行简化的视频处理流程,直接从原视频按固定时间间隔提取帧
Args:
output_dir: 输出目录
interval_seconds: 帧提取间隔(秒)
use_hw_accel: 是否使用硬件加速
"""
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
try:
# 直接从原视频提取关键帧
logger.info(f"从视频间隔 {interval_seconds} 秒提取关键帧...")
self.extract_frames_by_interval(
output_dir,
interval_seconds=interval_seconds,
use_hw_accel=use_hw_accel
)
logger.info(f"处理完成!视频帧已保存在: {output_dir}")
except Exception as e:
import traceback
logger.error(f"视频处理失败: \n{traceback.format_exc()}")
raise
if __name__ == "__main__":
import time
start_time = time.time()
# 使用示例
processor = VideoProcessor("./resource/videos/test.mp4")
# 设置间隔为3秒提取帧
processor.process_video_pipeline(
output_dir="output",
interval_seconds=3.0,
use_hw_accel=True
)
end_time = time.time()
print(f"处理完成!总耗时: {end_time - start_time:.2f}")