mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-12 11:22:51 +00:00
670 lines
23 KiB
Python
670 lines
23 KiB
Python
"""
|
||
视频帧提取工具
|
||
|
||
这个模块提供了简单高效的视频帧提取功能。主要特点:
|
||
1. 使用ffmpeg进行视频处理,支持硬件加速
|
||
2. 按指定时间间隔提取视频关键帧
|
||
3. 支持多种视频格式
|
||
4. 支持高清视频帧输出
|
||
5. 直接从原视频提取高质量关键帧
|
||
|
||
不依赖OpenCV和sklearn等库,只使用ffmpeg作为外部依赖,降低了安装和使用的复杂度。
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import time
|
||
import subprocess
|
||
from typing import List, Dict
|
||
from loguru import logger
|
||
from tqdm import tqdm
|
||
|
||
from app.utils import ffmpeg_utils
|
||
from app.config.ffmpeg_config import FFmpegConfigManager
|
||
|
||
|
||
class VideoProcessor:
|
||
def __init__(self, video_path: str):
|
||
"""
|
||
初始化视频处理器
|
||
|
||
Args:
|
||
video_path: 视频文件路径
|
||
"""
|
||
if not os.path.exists(video_path):
|
||
raise FileNotFoundError(f"视频文件不存在: {video_path}")
|
||
|
||
self.video_path = video_path
|
||
self.video_info = self._get_video_info()
|
||
self.fps = float(self.video_info.get('fps', 25))
|
||
self.duration = float(self.video_info.get('duration', 0))
|
||
self.width = int(self.video_info.get('width', 0))
|
||
self.height = int(self.video_info.get('height', 0))
|
||
self.total_frames = int(self.fps * self.duration)
|
||
|
||
def _get_video_info(self) -> Dict[str, str]:
|
||
"""
|
||
使用ffprobe获取视频信息
|
||
|
||
Returns:
|
||
Dict[str, str]: 包含视频基本信息的字典
|
||
"""
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v", "error",
|
||
"-select_streams", "v:0",
|
||
"-show_entries", "stream=width,height,r_frame_rate,duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=0",
|
||
self.video_path
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
||
lines = result.stdout.strip().split('\n')
|
||
info = {}
|
||
for line in lines:
|
||
if '=' in line:
|
||
key, value = line.split('=', 1)
|
||
info[key] = value
|
||
|
||
# 处理帧率(可能是分数形式)
|
||
if 'r_frame_rate' in info:
|
||
try:
|
||
num, den = map(int, info['r_frame_rate'].split('/'))
|
||
info['fps'] = str(num / den)
|
||
except ValueError:
|
||
info['fps'] = info.get('r_frame_rate', '25')
|
||
|
||
return info
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.error(f"获取视频信息失败: {e.stderr}")
|
||
return {
|
||
'width': '1280',
|
||
'height': '720',
|
||
'fps': '25',
|
||
'duration': '0'
|
||
}
|
||
|
||
def extract_frames_by_interval(self, output_dir: str, interval_seconds: float = 5.0,
|
||
use_hw_accel: bool = True) -> List[int]:
|
||
"""
|
||
按指定时间间隔提取视频帧
|
||
|
||
优化了 Windows 系统兼容性,特别是 N 卡硬件加速的滤镜链问题
|
||
|
||
Args:
|
||
output_dir: 输出目录
|
||
interval_seconds: 帧提取间隔(秒)
|
||
use_hw_accel: 是否使用硬件加速
|
||
|
||
Returns:
|
||
List[int]: 提取的帧号列表
|
||
"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
# 计算起始时间和帧提取点
|
||
start_time = 0
|
||
end_time = self.duration
|
||
extraction_times = []
|
||
|
||
current_time = start_time
|
||
while current_time < end_time:
|
||
extraction_times.append(current_time)
|
||
current_time += interval_seconds
|
||
|
||
if not extraction_times:
|
||
logger.warning("未找到需要提取的帧")
|
||
return []
|
||
|
||
# 获取硬件加速信息
|
||
hwaccel_info = ffmpeg_utils.get_ffmpeg_hwaccel_info()
|
||
hwaccel_type = hwaccel_info.get("type", "software")
|
||
|
||
# 提取帧 - 使用优化的进度条
|
||
frame_numbers = []
|
||
successful_extractions = 0
|
||
failed_extractions = 0
|
||
|
||
logger.info(f"开始提取 {len(extraction_times)} 个关键帧,使用 {hwaccel_type} 加速")
|
||
|
||
with tqdm(total=len(extraction_times), desc="🎬 提取视频帧", unit="帧",
|
||
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") as pbar:
|
||
for i, timestamp in enumerate(extraction_times):
|
||
frame_number = int(timestamp * self.fps)
|
||
frame_numbers.append(frame_number)
|
||
|
||
# 格式化时间戳字符串 (HHMMSSmmm)
|
||
hours = int(timestamp // 3600)
|
||
minutes = int((timestamp % 3600) // 60)
|
||
seconds = int(timestamp % 60)
|
||
milliseconds = int((timestamp % 1) * 1000)
|
||
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
|
||
|
||
output_path = os.path.join(output_dir, f"keyframe_{frame_number:06d}_{time_str}.jpg")
|
||
|
||
# 构建 FFmpeg 命令 - 针对 Windows N 卡优化
|
||
success = self._extract_single_frame_optimized(
|
||
timestamp, output_path, use_hw_accel, hwaccel_type
|
||
)
|
||
|
||
if success:
|
||
successful_extractions += 1
|
||
pbar.set_postfix({
|
||
"✅": successful_extractions,
|
||
"❌": failed_extractions,
|
||
"时间": f"{timestamp:.1f}s"
|
||
})
|
||
else:
|
||
failed_extractions += 1
|
||
pbar.set_postfix({
|
||
"✅": successful_extractions,
|
||
"❌": failed_extractions,
|
||
"时间": f"{timestamp:.1f}s"
|
||
})
|
||
|
||
pbar.update(1)
|
||
|
||
# 统计结果
|
||
total_attempts = len(extraction_times)
|
||
success_rate = (successful_extractions / total_attempts) * 100 if total_attempts > 0 else 0
|
||
|
||
logger.info(f"关键帧提取完成: 成功 {successful_extractions}/{total_attempts} 帧 ({success_rate:.1f}%)")
|
||
|
||
if failed_extractions > 0:
|
||
logger.warning(f"有 {failed_extractions} 帧提取失败,可能是硬件加速兼容性问题")
|
||
|
||
# 验证实际生成的文件
|
||
actual_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
|
||
logger.info(f"实际生成文件数量: {len(actual_files)} 个")
|
||
|
||
if len(actual_files) == 0:
|
||
logger.error("未生成任何关键帧文件,可能需要禁用硬件加速")
|
||
raise Exception("关键帧提取完全失败,请检查视频文件和 FFmpeg 配置")
|
||
|
||
return frame_numbers
|
||
|
||
def _extract_single_frame_optimized(self, timestamp: float, output_path: str,
|
||
use_hw_accel: bool, hwaccel_type: str) -> bool:
|
||
"""
|
||
优化的单帧提取方法,解决 Windows N 卡硬件加速兼容性问题
|
||
|
||
Args:
|
||
timestamp: 时间戳(秒)
|
||
output_path: 输出文件路径
|
||
use_hw_accel: 是否使用硬件加速
|
||
hwaccel_type: 硬件加速类型
|
||
|
||
Returns:
|
||
bool: 是否成功提取
|
||
"""
|
||
# 策略1: 优先尝试纯编码器方案(避免硬件解码滤镜链问题)
|
||
if use_hw_accel and hwaccel_type in ["nvenc", "cuda"]:
|
||
# 对于 NVIDIA 显卡,优先使用纯软件解码 + NVENC 编码
|
||
if self._try_extract_with_software_decode(timestamp, output_path):
|
||
return True
|
||
|
||
# 策略2: 尝试标准硬件加速
|
||
if use_hw_accel and ffmpeg_utils.is_ffmpeg_hwaccel_available():
|
||
hw_accel = ffmpeg_utils.get_ffmpeg_hwaccel_args()
|
||
if self._try_extract_with_hwaccel(timestamp, output_path, hw_accel):
|
||
return True
|
||
|
||
# 策略3: 软件方案
|
||
if self._try_extract_with_software(timestamp, output_path):
|
||
return True
|
||
|
||
# 策略4: 超级兼容性方案(Windows 特殊处理)
|
||
return self._try_extract_with_ultra_compatibility(timestamp, output_path)
|
||
|
||
def _try_extract_with_software_decode(self, timestamp: float, output_path: str) -> bool:
|
||
"""
|
||
使用纯软件解码提取帧(推荐用于 Windows N 卡)
|
||
参考 clip_video.py 中的成功实现
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
output_path: 输出路径
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
# 参考 clip_video.py 中的兼容性方案,专门针对图片输出优化
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-ss", str(timestamp), # 先定位时间戳
|
||
"-i", self.video_path,
|
||
"-vframes", "1", # 只提取一帧
|
||
"-q:v", "2", # 高质量
|
||
"-pix_fmt", "yuv420p", # 明确指定像素格式
|
||
"-y",
|
||
output_path
|
||
]
|
||
|
||
return self._execute_ffmpeg_command(cmd, f"软件解码提取帧 {timestamp:.1f}s")
|
||
|
||
def _try_extract_with_hwaccel(self, timestamp: float, output_path: str, hw_accel: List[str]) -> bool:
|
||
"""
|
||
使用硬件加速提取帧
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
output_path: 输出路径
|
||
hw_accel: 硬件加速参数
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
]
|
||
|
||
# 添加硬件加速参数
|
||
cmd.extend(hw_accel)
|
||
|
||
cmd.extend([
|
||
"-ss", str(timestamp),
|
||
"-i", self.video_path,
|
||
"-vframes", "1",
|
||
"-q:v", "2",
|
||
"-pix_fmt", "yuv420p",
|
||
"-y",
|
||
output_path
|
||
])
|
||
|
||
return self._execute_ffmpeg_command(cmd, f"硬件加速提取帧 {timestamp:.1f}s")
|
||
|
||
def _try_extract_with_software(self, timestamp: float, output_path: str) -> bool:
|
||
"""
|
||
使用纯软件方案提取帧(最后的备用方案)
|
||
参考 clip_video.py 中的基本编码方案
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
output_path: 输出路径
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
# 最基本的兼容性方案,参考 clip_video.py 的 try_basic_fallback
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "warning", # 更详细的日志用于调试
|
||
"-ss", str(timestamp),
|
||
"-i", self.video_path,
|
||
"-vframes", "1",
|
||
"-q:v", "3", # 稍微降低质量以提高兼容性
|
||
"-pix_fmt", "yuv420p",
|
||
"-avoid_negative_ts", "make_zero", # 避免时间戳问题
|
||
"-y",
|
||
output_path
|
||
]
|
||
|
||
return self._execute_ffmpeg_command(cmd, f"软件方案提取帧 {timestamp:.1f}s")
|
||
|
||
def _try_extract_with_ultra_compatibility(self, timestamp: float, output_path: str) -> bool:
|
||
"""
|
||
超级兼容性方案,专门解决 Windows 系统的 MJPEG 编码问题
|
||
|
||
Args:
|
||
timestamp: 时间戳
|
||
output_path: 输出路径
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
# 方案1: 使用 PNG 格式避免 MJPEG 问题
|
||
png_output = output_path.replace('.jpg', '.png')
|
||
cmd1 = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-ss", str(timestamp),
|
||
"-i", self.video_path,
|
||
"-vframes", "1",
|
||
"-f", "image2", # 明确指定图片格式
|
||
"-y",
|
||
png_output
|
||
]
|
||
|
||
if self._execute_ffmpeg_command(cmd1, f"PNG格式提取帧 {timestamp:.1f}s"):
|
||
# 如果 PNG 成功,转换为 JPG
|
||
try:
|
||
from PIL import Image
|
||
with Image.open(png_output) as img:
|
||
# 转换为 RGB 模式(去除 alpha 通道)
|
||
if img.mode in ('RGBA', 'LA'):
|
||
background = Image.new('RGB', img.size, (255, 255, 255))
|
||
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
|
||
img = background
|
||
img.save(output_path, 'JPEG', quality=90)
|
||
|
||
# 删除临时 PNG 文件
|
||
os.remove(png_output)
|
||
return True
|
||
except Exception as e:
|
||
logger.debug(f"PNG 转 JPG 失败: {e}")
|
||
# 如果转换失败,直接重命名 PNG 为 JPG
|
||
try:
|
||
os.rename(png_output, output_path)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# 方案2: 使用最简单的参数
|
||
cmd2 = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-i", self.video_path,
|
||
"-ss", str(timestamp), # 把 -ss 放在 -i 后面
|
||
"-vframes", "1",
|
||
"-f", "mjpeg", # 明确指定 MJPEG 格式
|
||
"-q:v", "5", # 降低质量要求
|
||
"-y",
|
||
output_path
|
||
]
|
||
|
||
if self._execute_ffmpeg_command(cmd2, f"MJPEG格式提取帧 {timestamp:.1f}s"):
|
||
return True
|
||
|
||
# 方案3: 最后的尝试 - 使用 BMP 格式
|
||
bmp_output = output_path.replace('.jpg', '.bmp')
|
||
cmd3 = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-i", self.video_path,
|
||
"-ss", str(timestamp),
|
||
"-vframes", "1",
|
||
"-f", "bmp",
|
||
"-y",
|
||
bmp_output
|
||
]
|
||
|
||
if self._execute_ffmpeg_command(cmd3, f"BMP格式提取帧 {timestamp:.1f}s"):
|
||
# 尝试转换 BMP 为 JPG
|
||
try:
|
||
from PIL import Image
|
||
with Image.open(bmp_output) as img:
|
||
img.save(output_path, 'JPEG', quality=90)
|
||
os.remove(bmp_output)
|
||
return True
|
||
except Exception:
|
||
# 如果转换失败,直接重命名
|
||
try:
|
||
os.rename(bmp_output, output_path)
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
return False
|
||
|
||
def _execute_ffmpeg_command(self, cmd: List[str], description: str) -> bool:
|
||
"""
|
||
执行 FFmpeg 命令并处理结果
|
||
参考 clip_video.py 中的错误处理机制
|
||
|
||
Args:
|
||
cmd: FFmpeg 命令列表
|
||
description: 操作描述
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
# 参考 clip_video.py 中的 Windows 处理方式
|
||
is_windows = os.name == 'nt'
|
||
process_kwargs = {
|
||
"stdout": subprocess.PIPE,
|
||
"stderr": subprocess.PIPE,
|
||
"text": True,
|
||
"check": True,
|
||
"timeout": 30 # 30秒超时
|
||
}
|
||
|
||
if is_windows:
|
||
process_kwargs["encoding"] = 'utf-8'
|
||
|
||
result = subprocess.run(cmd, **process_kwargs)
|
||
|
||
# 验证输出文件
|
||
output_path = cmd[-1]
|
||
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
# 简化错误日志,仅记录关键信息
|
||
return False
|
||
except subprocess.TimeoutExpired:
|
||
return False
|
||
except Exception as e:
|
||
return False
|
||
|
||
def _detect_hw_accelerator(self) -> List[str]:
|
||
"""
|
||
检测系统可用的硬件加速器
|
||
|
||
Returns:
|
||
List[str]: 硬件加速器ffmpeg命令参数
|
||
"""
|
||
# 使用集中式硬件加速检测
|
||
if ffmpeg_utils.is_ffmpeg_hwaccel_available():
|
||
return ffmpeg_utils.get_ffmpeg_hwaccel_args()
|
||
return []
|
||
|
||
def process_video_pipeline(self,
|
||
output_dir: str,
|
||
interval_seconds: float = 5.0, # 帧提取间隔(秒)
|
||
use_hw_accel: bool = True) -> None:
|
||
"""
|
||
执行简化的视频处理流程,直接从原视频按固定时间间隔提取帧
|
||
|
||
Args:
|
||
output_dir: 输出目录
|
||
interval_seconds: 帧提取间隔(秒)
|
||
use_hw_accel: 是否使用硬件加速
|
||
"""
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
try:
|
||
# 直接从原视频提取关键帧
|
||
logger.info(f"从视频间隔 {interval_seconds} 秒提取关键帧...")
|
||
self.extract_frames_by_interval(
|
||
output_dir,
|
||
interval_seconds=interval_seconds,
|
||
use_hw_accel=use_hw_accel
|
||
)
|
||
|
||
logger.info(f"处理完成!视频帧已保存在: {output_dir}")
|
||
|
||
except Exception as e:
|
||
import traceback
|
||
logger.error(f"视频处理失败: \n{traceback.format_exc()}")
|
||
raise
|
||
|
||
def extract_frames_by_interval_ultra_compatible(self, output_dir: str, interval_seconds: float = 5.0) -> List[int]:
|
||
"""
|
||
使用超级兼容性方案按指定时间间隔提取视频帧
|
||
|
||
直接使用PNG格式提取,避免MJPEG编码问题,确保最高兼容性
|
||
|
||
Args:
|
||
output_dir: 输出目录
|
||
interval_seconds: 帧提取间隔(秒)
|
||
|
||
Returns:
|
||
List[int]: 提取的帧号列表
|
||
"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
# 计算起始时间和帧提取点
|
||
start_time = 0
|
||
end_time = self.duration
|
||
extraction_times = []
|
||
|
||
current_time = start_time
|
||
while current_time < end_time:
|
||
extraction_times.append(current_time)
|
||
current_time += interval_seconds
|
||
|
||
if not extraction_times:
|
||
logger.warning("未找到需要提取的帧")
|
||
return []
|
||
|
||
# 提取帧 - 使用美化的进度条
|
||
frame_numbers = []
|
||
successful_extractions = 0
|
||
failed_extractions = 0
|
||
|
||
logger.info(f"开始提取 {len(extraction_times)} 个关键帧,使用超级兼容性方案")
|
||
|
||
with tqdm(total=len(extraction_times), desc="🎬 提取关键帧", unit="帧",
|
||
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") as pbar:
|
||
for i, timestamp in enumerate(extraction_times):
|
||
frame_number = int(timestamp * self.fps)
|
||
frame_numbers.append(frame_number)
|
||
|
||
# 格式化时间戳字符串 (HHMMSSmmm)
|
||
hours = int(timestamp // 3600)
|
||
minutes = int((timestamp % 3600) // 60)
|
||
seconds = int(timestamp % 60)
|
||
milliseconds = int((timestamp % 1) * 1000)
|
||
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
|
||
|
||
output_path = os.path.join(output_dir, f"keyframe_{frame_number:06d}_{time_str}.jpg")
|
||
|
||
# 直接使用超级兼容性方案
|
||
success = self._extract_frame_ultra_compatible(timestamp, output_path)
|
||
|
||
if success:
|
||
successful_extractions += 1
|
||
pbar.set_postfix({
|
||
"✅": successful_extractions,
|
||
"❌": failed_extractions,
|
||
"时间": f"{timestamp:.1f}s"
|
||
})
|
||
else:
|
||
failed_extractions += 1
|
||
pbar.set_postfix({
|
||
"✅": successful_extractions,
|
||
"❌": failed_extractions,
|
||
"时间": f"{timestamp:.1f}s"
|
||
})
|
||
|
||
pbar.update(1)
|
||
|
||
# 统计结果
|
||
total_attempts = len(extraction_times)
|
||
success_rate = (successful_extractions / total_attempts) * 100 if total_attempts > 0 else 0
|
||
|
||
logger.info(f"关键帧提取完成: 成功 {successful_extractions}/{total_attempts} 帧 ({success_rate:.1f}%)")
|
||
|
||
if failed_extractions > 0:
|
||
logger.warning(f"有 {failed_extractions} 帧提取失败")
|
||
|
||
# 验证实际生成的文件
|
||
actual_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
|
||
logger.info(f"实际生成文件数量: {len(actual_files)} 个")
|
||
|
||
if len(actual_files) == 0:
|
||
logger.error("未生成任何关键帧文件")
|
||
raise Exception("关键帧提取完全失败,请检查视频文件")
|
||
|
||
return frame_numbers
|
||
|
||
def _extract_frame_ultra_compatible(self, timestamp: float, output_path: str) -> bool:
|
||
"""
|
||
超级兼容性方案提取单帧
|
||
|
||
Args:
|
||
timestamp: 时间戳(秒)
|
||
output_path: 输出文件路径
|
||
|
||
Returns:
|
||
bool: 是否成功提取
|
||
"""
|
||
# 使用 PNG 格式避免 MJPEG 问题
|
||
png_output = output_path.replace('.jpg', '.png')
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-ss", str(timestamp),
|
||
"-i", self.video_path,
|
||
"-vframes", "1",
|
||
"-f", "image2", # 明确指定图片格式
|
||
"-y",
|
||
png_output
|
||
]
|
||
|
||
try:
|
||
# 执行FFmpeg命令
|
||
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30)
|
||
|
||
# 验证PNG文件是否成功生成
|
||
if os.path.exists(png_output) and os.path.getsize(png_output) > 0:
|
||
# 转换PNG为JPG
|
||
try:
|
||
from PIL import Image
|
||
with Image.open(png_output) as img:
|
||
# 转换为 RGB 模式(去除 alpha 通道)
|
||
if img.mode in ('RGBA', 'LA'):
|
||
background = Image.new('RGB', img.size, (255, 255, 255))
|
||
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
|
||
img = background
|
||
img.save(output_path, 'JPEG', quality=90)
|
||
|
||
# 删除临时 PNG 文件
|
||
os.remove(png_output)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"PNG 转 JPG 失败: {e}")
|
||
# 如果转换失败,直接重命名 PNG 为 JPG
|
||
try:
|
||
os.rename(png_output, output_path)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
else:
|
||
return False
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 失败: {e}")
|
||
return False
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 超时")
|
||
return False
|
||
except Exception as e:
|
||
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 异常: {e}")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import time
|
||
|
||
start_time = time.time()
|
||
|
||
# 使用示例
|
||
processor = VideoProcessor("./resource/videos/test.mp4")
|
||
|
||
# 设置间隔为3秒提取帧
|
||
processor.process_video_pipeline(
|
||
output_dir="output",
|
||
interval_seconds=3.0,
|
||
use_hw_accel=True
|
||
)
|
||
|
||
end_time = time.time()
|
||
print(f"处理完成!总耗时: {end_time - start_time:.2f} 秒")
|