优化视频帧提取功能,新增超级兼容性方案以提高提取成功率,增强错误处理和用户反馈。在generate_script_docu.py中更新进度显示和错误提示,提升用户体验。

This commit is contained in:
linyqh 2025-07-07 21:33:25 +08:00
parent 6270224d45
commit c61462d706
2 changed files with 197 additions and 69 deletions

View File

@ -129,7 +129,8 @@ class VideoProcessor:
logger.info(f"开始提取 {len(extraction_times)} 个关键帧,使用 {hwaccel_type} 加速")
with tqdm(total=len(extraction_times), desc="提取视频帧", unit="") as pbar:
with tqdm(total=len(extraction_times), desc="🎬 提取视频帧", unit="",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") as pbar:
for i, timestamp in enumerate(extraction_times):
frame_number = int(timestamp * self.fps)
frame_numbers.append(frame_number)
@ -151,16 +152,16 @@ class VideoProcessor:
if success:
successful_extractions += 1
pbar.set_postfix({
"成功": successful_extractions,
"失败": failed_extractions,
"当前": f"{timestamp:.1f}s"
"": successful_extractions,
"": failed_extractions,
"时间": f"{timestamp:.1f}s"
})
else:
failed_extractions += 1
pbar.set_postfix({
"成功": successful_extractions,
"失败": failed_extractions,
"当前": f"{timestamp:.1f}s (失败)"
"": successful_extractions,
"": failed_extractions,
"时间": f"{timestamp:.1f}s"
})
pbar.update(1)
@ -203,19 +204,16 @@ class VideoProcessor:
# 对于 NVIDIA 显卡,优先使用纯软件解码 + NVENC 编码
if self._try_extract_with_software_decode(timestamp, output_path):
return True
logger.debug(f"纯软件解码方案失败,尝试其他方案")
# 策略2: 尝试标准硬件加速
if use_hw_accel and ffmpeg_utils.is_ffmpeg_hwaccel_available():
hw_accel = ffmpeg_utils.get_ffmpeg_hwaccel_args()
if self._try_extract_with_hwaccel(timestamp, output_path, hw_accel):
return True
logger.debug(f"硬件加速方案失败,回退到软件方案")
# 策略3: 软件方案
if self._try_extract_with_software(timestamp, output_path):
return True
logger.debug(f"软件方案失败,尝试超级兼容性方案")
# 策略4: 超级兼容性方案Windows 特殊处理)
return self._try_extract_with_ultra_compatibility(timestamp, output_path)
@ -434,37 +432,21 @@ class VideoProcessor:
if is_windows:
process_kwargs["encoding"] = 'utf-8'
logger.debug(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, **process_kwargs)
# 验证输出文件
output_path = cmd[-1]
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
logger.debug(f"{description} - 成功")
return True
else:
logger.debug(f"{description} - 输出文件无效: {output_path}")
return False
except subprocess.CalledProcessError as e:
error_msg = e.stderr if hasattr(e, 'stderr') and e.stderr else str(e)
# 分析错误类型,提供更好的调试信息
if "mjpeg" in error_msg.lower() and "non full-range yuv" in error_msg.lower():
logger.debug(f"{description} - MJPEG YUV 格式问题: {error_msg[:200]}")
elif "codec avOption" in error_msg.lower():
logger.debug(f"{description} - 编码器参数问题: {error_msg[:200]}")
elif "filter" in error_msg.lower():
logger.debug(f"{description} - 滤镜链问题: {error_msg[:200]}")
else:
logger.debug(f"{description} - 命令执行失败: {error_msg[:200]}")
# 简化错误日志,仅记录关键信息
return False
except subprocess.TimeoutExpired:
logger.debug(f"{description} - 命令执行超时")
return False
except Exception as e:
logger.debug(f"{description} - 未知错误: {str(e)}")
return False
def _detect_hw_accelerator(self) -> List[str]:
@ -510,6 +492,163 @@ class VideoProcessor:
logger.error(f"视频处理失败: \n{traceback.format_exc()}")
raise
def extract_frames_by_interval_ultra_compatible(self, output_dir: str, interval_seconds: float = 5.0) -> List[int]:
"""
使用超级兼容性方案按指定时间间隔提取视频帧
直接使用PNG格式提取避免MJPEG编码问题确保最高兼容性
Args:
output_dir: 输出目录
interval_seconds: 帧提取间隔
Returns:
List[int]: 提取的帧号列表
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 计算起始时间和帧提取点
start_time = 0
end_time = self.duration
extraction_times = []
current_time = start_time
while current_time < end_time:
extraction_times.append(current_time)
current_time += interval_seconds
if not extraction_times:
logger.warning("未找到需要提取的帧")
return []
# 提取帧 - 使用美化的进度条
frame_numbers = []
successful_extractions = 0
failed_extractions = 0
logger.info(f"开始提取 {len(extraction_times)} 个关键帧,使用超级兼容性方案")
with tqdm(total=len(extraction_times), desc="🎬 提取关键帧", unit="",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") as pbar:
for i, timestamp in enumerate(extraction_times):
frame_number = int(timestamp * self.fps)
frame_numbers.append(frame_number)
# 格式化时间戳字符串 (HHMMSSmmm)
hours = int(timestamp // 3600)
minutes = int((timestamp % 3600) // 60)
seconds = int(timestamp % 60)
milliseconds = int((timestamp % 1) * 1000)
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
output_path = os.path.join(output_dir, f"keyframe_{frame_number:06d}_{time_str}.jpg")
# 直接使用超级兼容性方案
success = self._extract_frame_ultra_compatible(timestamp, output_path)
if success:
successful_extractions += 1
pbar.set_postfix({
"": successful_extractions,
"": failed_extractions,
"时间": f"{timestamp:.1f}s"
})
else:
failed_extractions += 1
pbar.set_postfix({
"": successful_extractions,
"": failed_extractions,
"时间": f"{timestamp:.1f}s"
})
pbar.update(1)
# 统计结果
total_attempts = len(extraction_times)
success_rate = (successful_extractions / total_attempts) * 100 if total_attempts > 0 else 0
logger.info(f"关键帧提取完成: 成功 {successful_extractions}/{total_attempts} 帧 ({success_rate:.1f}%)")
if failed_extractions > 0:
logger.warning(f"{failed_extractions} 帧提取失败")
# 验证实际生成的文件
actual_files = [f for f in os.listdir(output_dir) if f.endswith('.jpg')]
logger.info(f"实际生成文件数量: {len(actual_files)}")
if len(actual_files) == 0:
logger.error("未生成任何关键帧文件")
raise Exception("关键帧提取完全失败,请检查视频文件")
return frame_numbers
def _extract_frame_ultra_compatible(self, timestamp: float, output_path: str) -> bool:
"""
超级兼容性方案提取单帧
Args:
timestamp: 时间戳
output_path: 输出文件路径
Returns:
bool: 是否成功提取
"""
# 使用 PNG 格式避免 MJPEG 问题
png_output = output_path.replace('.jpg', '.png')
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-ss", str(timestamp),
"-i", self.video_path,
"-vframes", "1",
"-f", "image2", # 明确指定图片格式
"-y",
png_output
]
try:
# 执行FFmpeg命令
result = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30)
# 验证PNG文件是否成功生成
if os.path.exists(png_output) and os.path.getsize(png_output) > 0:
# 转换PNG为JPG
try:
from PIL import Image
with Image.open(png_output) as img:
# 转换为 RGB 模式(去除 alpha 通道)
if img.mode in ('RGBA', 'LA'):
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
img.save(output_path, 'JPEG', quality=90)
# 删除临时 PNG 文件
os.remove(png_output)
return True
except Exception as e:
logger.warning(f"PNG 转 JPG 失败: {e}")
# 如果转换失败,直接重命名 PNG 为 JPG
try:
os.rename(png_output, output_path)
return True
except Exception:
return False
else:
return False
except subprocess.CalledProcessError as e:
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 失败: {e}")
return False
except subprocess.TimeoutExpired:
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 超时")
return False
except Exception as e:
logger.warning(f"超级兼容性方案提取帧 {timestamp:.1f}s 异常: {e}")
return False
if __name__ == "__main__":
import time

View File

@ -25,9 +25,9 @@ def generate_script_docu(params):
def update_progress(progress: float, message: str = ""):
progress_bar.progress(progress)
if message:
status_text.text(f"{progress}% - {message}")
status_text.text(f"🎬 {message}")
else:
status_text.text(f"进度: {progress}%")
status_text.text(f"📊 进度: {progress}%")
try:
with st.spinner("正在生成脚本..."):
@ -54,7 +54,7 @@ def generate_script_docu(params):
if keyframe_files:
logger.info(f"使用已缓存的关键帧: {video_keyframes_dir}")
st.info(f"使用已缓存的关键帧,如需重新提取请删除目录: {video_keyframes_dir}")
st.info(f"✅ 使用已缓存关键帧,共 {len(keyframe_files)}")
update_progress(20, f"使用已缓存关键帧,共 {len(keyframe_files)}")
# 如果没有缓存的关键帧,则进行提取
@ -67,30 +67,30 @@ def generate_script_docu(params):
processor = video_processor.VideoProcessor(params.video_origin_path)
# 显示视频信息
st.info(f"视频信息: {processor.width}x{processor.height}, {processor.fps:.1f}fps, {processor.duration:.1f}")
st.info(f"📹 视频信息: {processor.width}x{processor.height}, {processor.fps:.1f}fps, {processor.duration:.1f}")
# 处理视频并提取关键帧
update_progress(15, "正在提取关键帧...")
# 处理视频并提取关键帧 - 直接使用超级兼容性方案
update_progress(15, "正在提取关键帧(使用超级兼容性方案)...")
try:
processor.process_video_pipeline(
# 使用优化的关键帧提取方法
processor.extract_frames_by_interval_ultra_compatible(
output_dir=video_keyframes_dir,
interval_seconds=st.session_state.get('frame_interval_input'),
)
except Exception as extract_error:
# 如果硬件加速失败,尝试强制使用软件方案
logger.warning(f"硬件加速提取失败: {extract_error}")
st.warning("硬件加速提取失败,正在尝试软件方案...")
logger.error(f"关键帧提取失败: {extract_error}")
# 提供详细的错误信息和解决建议
error_msg = str(extract_error)
if "权限" in error_msg or "permission" in error_msg.lower():
suggestion = "建议:检查输出目录权限,或更换输出位置"
elif "空间" in error_msg or "space" in error_msg.lower():
suggestion = "建议:检查磁盘空间是否足够"
else:
suggestion = "建议:检查视频文件是否损坏,或尝试转换为标准格式"
# 强制使用软件编码重试
from app.utils import ffmpeg_utils
ffmpeg_utils.force_software_encoding()
processor.process_video_pipeline(
output_dir=video_keyframes_dir,
interval_seconds=st.session_state.get('frame_interval_input'),
use_hw_accel=False # 明确禁用硬件加速
)
raise Exception(f"关键帧提取失败: {error_msg}\n{suggestion}")
# 获取所有关键文件路径
for filename in sorted(os.listdir(video_keyframes_dir)):
@ -101,7 +101,7 @@ def generate_script_docu(params):
# 检查目录中是否有其他文件
all_files = os.listdir(video_keyframes_dir)
logger.error(f"关键帧目录内容: {all_files}")
raise Exception("未提取到任何关键帧文件,可能是 FFmpeg 兼容性问题")
raise Exception("未提取到任何关键帧文件,请检查视频文件格式")
update_progress(20, f"关键帧提取完成,共 {len(keyframe_files)}")
st.success(f"✅ 成功提取 {len(keyframe_files)} 个关键帧")
@ -115,23 +115,14 @@ def generate_script_docu(params):
except Exception as cleanup_err:
logger.error(f"清理失败的关键帧目录时出错: {cleanup_err}")
# 提供更详细的错误信息和解决建议
error_msg = str(e)
if "滤镜链" in error_msg or "filter" in error_msg.lower():
suggestion = "建议:这可能是硬件加速兼容性问题,请尝试在设置中禁用硬件加速"
elif "cuda" in error_msg.lower() or "nvenc" in error_msg.lower():
suggestion = "建议NVIDIA 显卡驱动可能需要更新,或尝试禁用硬件加速"
else:
suggestion = "建议:检查视频文件是否损坏,或尝试转换为标准格式"
raise Exception(f"关键帧提取失败: {error_msg}\n{suggestion}")
raise Exception(f"关键帧提取失败: {str(e)}")
"""
2. 视觉分析(批量分析每一帧)
"""
vision_llm_provider = st.session_state.get('vision_llm_providers').lower()
llm_params = dict()
logger.debug(f"VLM 视觉大模型提供商: {vision_llm_provider}")
logger.info(f"使用 {vision_llm_provider.upper()} 进行视觉分析")
try:
# ===================初始化视觉分析器===================
@ -212,7 +203,7 @@ def generate_script_docu(params):
overall_activity_summaries = [] # 合并所有批次的整体总结
prev_batch_files = None
frame_counter = 1 # 初始化帧计数器,用于给所有帧分配连续的序号
# logger.debug(json.dumps(results, indent=4, ensure_ascii=False))
# 确保分析目录存在
analysis_dir = os.path.join(utils.storage_dir(), "temp", "analysis")
os.makedirs(analysis_dir, exist_ok=True)
@ -228,11 +219,9 @@ def generate_script_docu(params):
# 获取当前批次的文件列表
batch_files = get_batch_files(keyframe_files, result, vision_batch_size)
logger.debug(f"批次 {result['batch_index']} 处理完成,共 {len(batch_files)} 张图片")
# 获取批次的时间戳范围
first_timestamp, last_timestamp, timestamp_range = get_batch_timestamps(batch_files, prev_batch_files)
logger.debug(f"处理时间戳: {first_timestamp}-{last_timestamp}")
# 解析响应中的JSON数据
response_text = result['response']
@ -377,8 +366,8 @@ def generate_script_docu(params):
"""
4. 生成文案
"""
logger.info("开始准备生成解说文案")
update_progress(80, "正在生成文案...")
logger.info("开始生成解说文案")
update_progress(80, "正在生成解说文案...")
from app.services.generate_narration_script import parse_frame_analysis_to_markdown, generate_narration
# 从配置中获取文本生成相关配置
text_provider = config.app.get('text_llm_provider', 'gemini').lower()
@ -414,7 +403,7 @@ def generate_script_docu(params):
narration_dict = narration_data['items']
# 为 narration_dict 中每个 item 新增一个 OST: 2 的字段, 代表保留原声和配音
narration_dict = [{**item, "OST": 2} for item in narration_dict]
logger.debug(f"解说文案创作完成:\n{"\n".join([item['narration'] for item in narration_dict])}")
logger.info(f"解说文案生成完成,共 {len(narration_dict)} 个片段")
# 结果转换为JSON字符串
script = json.dumps(narration_dict, ensure_ascii=False, indent=2)
@ -425,20 +414,20 @@ def generate_script_docu(params):
if script is None:
st.error("生成脚本失败,请检查日志")
st.stop()
logger.success(f"剪辑脚本生成完成")
logger.info(f"纪录片解说脚本生成完成")
if isinstance(script, list):
st.session_state['video_clip_json'] = script
elif isinstance(script, str):
st.session_state['video_clip_json'] = json.loads(script)
update_progress(80, "脚本生成完成")
update_progress(100, "脚本生成完成")
time.sleep(0.1)
progress_bar.progress(100)
status_text.text("脚本生成完成!")
st.success("视频脚本生成成功!")
status_text.text("🎉 脚本生成完成!")
st.success("视频脚本生成成功!")
except Exception as err:
st.error(f"生成过程中发生错误: {str(err)}")
st.error(f"生成过程中发生错误: {str(err)}")
logger.exception(f"生成脚本时发生错误\n{traceback.format_exc()}")
finally:
time.sleep(2)