From 2569b7fee7e89666f8670e5a2ac5c265654a82a9 Mon Sep 17 00:00:00 2001 From: linyq Date: Sat, 16 Aug 2025 01:24:56 +0800 Subject: [PATCH] =?UTF-8?q?fix(subtitle):=20=E4=BF=AE=E5=A4=8D=E5=AD=97?= =?UTF-8?q?=E5=B9=95=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=E5=B9=B6=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E6=9C=89=E6=95=88=E6=80=A7=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 处理空字幕文件情况并改进错误处理 确保合并失败时有默认返回值 添加字幕文件有效性检查函数 --- app/services/generate_video.py | 80 +++++++++++----- app/services/subtitle_merger.py | 156 ++++++++++++++++++++------------ app/services/task.py | 28 +++++- 3 files changed, 178 insertions(+), 86 deletions(-) diff --git a/app/services/generate_video.py b/app/services/generate_video.py index 9b77a2c..2eb633f 100644 --- a/app/services/generate_video.py +++ b/app/services/generate_video.py @@ -29,6 +29,40 @@ from app.models.schema import AudioVolumeDefaults from app.services.audio_normalizer import AudioNormalizer, normalize_audio_for_mixing +def is_valid_subtitle_file(subtitle_path: str) -> bool: + """ + 检查字幕文件是否有效 + + 参数: + subtitle_path: 字幕文件路径 + + 返回: + bool: 如果字幕文件存在且包含有效内容则返回True,否则返回False + """ + if not subtitle_path or not os.path.exists(subtitle_path): + return False + + try: + with open(subtitle_path, 'r', encoding='utf-8') as f: + content = f.read().strip() + + # 检查文件是否为空 + if not content: + return False + + # 检查是否包含时间戳格式(SRT格式的基本特征) + # SRT格式应该包含类似 "00:00:00,000 --> 00:00:00,000" 的时间戳 + import re + time_pattern = r'\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}' + if not re.search(time_pattern, content): + return False + + return True + except Exception as e: + logger.warning(f"检查字幕文件时出错: {str(e)}") + return False + + def merge_materials( video_path: str, audio_path: str, @@ -318,34 +352,36 @@ def merge_materials( color=subtitle_color, ) - # 处理字幕 - 修复字幕开关bug - if subtitle_enabled and subtitle_path and os.path.exists(subtitle_path): - logger.info("字幕已启用,开始处理字幕文件") - try: - # 加载字幕文件 - sub = SubtitlesClip( - subtitles=subtitle_path, - encoding="utf-8", - make_textclip=make_textclip - ) + # 处理字幕 - 修复字幕开关bug和空字幕文件问题 + if subtitle_enabled and subtitle_path: + if is_valid_subtitle_file(subtitle_path): + logger.info("字幕已启用,开始处理字幕文件") + try: + # 加载字幕文件 + sub = SubtitlesClip( + subtitles=subtitle_path, + encoding="utf-8", + make_textclip=make_textclip + ) - # 创建每个字幕片段 - text_clips = [] - for item in sub.subtitles: - clip = create_text_clip(subtitle_item=item) - text_clips.append(clip) + # 创建每个字幕片段 + text_clips = [] + for item in sub.subtitles: + clip = create_text_clip(subtitle_item=item) + text_clips.append(clip) - # 合成视频和字幕 - video_clip = CompositeVideoClip([video_clip, *text_clips]) - logger.info(f"已添加{len(text_clips)}个字幕片段") - except Exception as e: - logger.error(f"处理字幕失败: \n{traceback.format_exc()}") + # 合成视频和字幕 + video_clip = CompositeVideoClip([video_clip, *text_clips]) + logger.info(f"已添加{len(text_clips)}个字幕片段") + except Exception as e: + logger.error(f"处理字幕失败: \n{traceback.format_exc()}") + logger.warning("字幕处理失败,继续生成无字幕视频") + else: + logger.warning(f"字幕文件无效或为空: {subtitle_path},跳过字幕处理") elif not subtitle_enabled: logger.info("字幕已禁用,跳过字幕处理") elif not subtitle_path: logger.info("未提供字幕文件路径,跳过字幕处理") - elif not os.path.exists(subtitle_path): - logger.warning(f"字幕文件不存在: {subtitle_path},跳过字幕处理") # 导出最终视频 try: diff --git a/app/services/subtitle_merger.py b/app/services/subtitle_merger.py index 9097586..221ea5c 100644 --- a/app/services/subtitle_merger.py +++ b/app/services/subtitle_merger.py @@ -62,91 +62,127 @@ def parse_edited_time_range(time_range_str): def merge_subtitle_files(subtitle_items, output_file=None): """ 合并多个SRT字幕文件 - + 参数: subtitle_items: 字典列表,每个字典包含subtitle文件路径和editedTimeRange output_file: 输出文件的路径,如果为None则自动生成 - + 返回: - 合并后的字幕文件路径 + 合并后的字幕文件路径,如果没有有效字幕则返回None """ # 按照editedTimeRange的开始时间排序 - sorted_items = sorted(subtitle_items, + sorted_items = sorted(subtitle_items, key=lambda x: parse_edited_time_range(x.get('editedTimeRange', ''))[0] or timedelta()) - + merged_subtitles = [] subtitle_index = 1 - + valid_items_count = 0 + for item in sorted_items: if not item.get('subtitle') or not os.path.exists(item.get('subtitle')): + print(f"跳过项目 {item.get('_id')}:字幕文件不存在或路径为空") continue - + # 从editedTimeRange获取起始时间偏移 offset_time, _ = parse_edited_time_range(item.get('editedTimeRange', '')) - + if offset_time is None: print(f"警告: 无法从项目 {item.get('_id')} 的editedTimeRange中提取时间范围,跳过该项") continue - - with open(item['subtitle'], 'r', encoding='utf-8') as file: - content = file.read() - - # 解析字幕文件 - subtitle_blocks = re.split(r'\n\s*\n', content.strip()) - - for block in subtitle_blocks: - lines = block.strip().split('\n') - if len(lines) < 3: # 确保块有足够的行数 + + try: + with open(item['subtitle'], 'r', encoding='utf-8') as file: + content = file.read().strip() + + # 检查文件内容是否为空 + if not content: + print(f"跳过项目 {item.get('_id')}:字幕文件内容为空") continue - - # 解析时间轴行 - time_line = lines[1] - time_parts = time_line.split(' --> ') - if len(time_parts) != 2: - continue - - start_time = parse_time(time_parts[0]) - end_time = parse_time(time_parts[1]) - - # 应用时间偏移 - adjusted_start_time = start_time + offset_time - adjusted_end_time = end_time + offset_time - - # 重建字幕块 - adjusted_time_line = f"{format_time(adjusted_start_time)} --> {format_time(adjusted_end_time)}" - text_lines = lines[2:] - - new_block = [ - str(subtitle_index), - adjusted_time_line, - *text_lines - ] - - merged_subtitles.append('\n'.join(new_block)) - subtitle_index += 1 - + + valid_items_count += 1 + + # 解析字幕文件 + subtitle_blocks = re.split(r'\n\s*\n', content) + + for block in subtitle_blocks: + lines = block.strip().split('\n') + if len(lines) < 3: # 确保块有足够的行数 + continue + + # 解析时间轴行 + time_line = lines[1] + time_parts = time_line.split(' --> ') + if len(time_parts) != 2: + continue + + start_time = parse_time(time_parts[0]) + end_time = parse_time(time_parts[1]) + + # 应用时间偏移 + adjusted_start_time = start_time + offset_time + adjusted_end_time = end_time + offset_time + + # 重建字幕块 + adjusted_time_line = f"{format_time(adjusted_start_time)} --> {format_time(adjusted_end_time)}" + text_lines = lines[2:] + + new_block = [ + str(subtitle_index), + adjusted_time_line, + *text_lines + ] + + merged_subtitles.append('\n'.join(new_block)) + subtitle_index += 1 + except Exception as e: + print(f"处理项目 {item.get('_id')} 的字幕文件时出错: {str(e)}") + continue + + # 检查是否有有效的字幕内容 + if not merged_subtitles: + print(f"警告: 没有找到有效的字幕内容,共检查了 {len(subtitle_items)} 个项目,其中 {valid_items_count} 个有有效文件") + return None + # 确定输出文件路径 if output_file is None: - dir_path = os.path.dirname(sorted_items[0]['subtitle']) + # 找到第一个有效的字幕文件来确定目录 + valid_item = None + for item in sorted_items: + if item.get('subtitle') and os.path.exists(item.get('subtitle')): + valid_item = item + break + + if not valid_item: + print("错误: 无法确定输出目录,没有找到有效的字幕文件") + return None + + dir_path = os.path.dirname(valid_item['subtitle']) first_start = parse_edited_time_range(sorted_items[0]['editedTimeRange'])[0] last_end = parse_edited_time_range(sorted_items[-1]['editedTimeRange'])[1] - - first_start_h, first_start_m, first_start_s = int(first_start.seconds // 3600), int((first_start.seconds % 3600) // 60), int(first_start.seconds % 60) - last_end_h, last_end_m, last_end_s = int(last_end.seconds // 3600), int((last_end.seconds % 3600) // 60), int(last_end.seconds % 60) - - first_start_str = f"{first_start_h:02d}_{first_start_m:02d}_{first_start_s:02d}" - last_end_str = f"{last_end_h:02d}_{last_end_m:02d}_{last_end_s:02d}" - - output_file = os.path.join(dir_path, f"merged_subtitle_{first_start_str}-{last_end_str}.srt") - + + if first_start and last_end: + first_start_h, first_start_m, first_start_s = int(first_start.seconds // 3600), int((first_start.seconds % 3600) // 60), int(first_start.seconds % 60) + last_end_h, last_end_m, last_end_s = int(last_end.seconds // 3600), int((last_end.seconds % 3600) // 60), int(last_end.seconds % 60) + + first_start_str = f"{first_start_h:02d}_{first_start_m:02d}_{first_start_s:02d}" + last_end_str = f"{last_end_h:02d}_{last_end_m:02d}_{last_end_s:02d}" + + output_file = os.path.join(dir_path, f"merged_subtitle_{first_start_str}-{last_end_str}.srt") + else: + output_file = os.path.join(dir_path, f"merged_subtitle.srt") + # 合并所有字幕块 merged_content = '\n\n'.join(merged_subtitles) - + # 写入合并后的内容 - with open(output_file, 'w', encoding='utf-8') as file: - file.write(merged_content) - - return output_file + try: + with open(output_file, 'w', encoding='utf-8') as file: + file.write(merged_content) + print(f"字幕文件合并成功: {output_file},包含 {len(merged_subtitles)} 个字幕条目") + return output_file + except Exception as e: + print(f"写入字幕文件失败: {str(e)}") + return None if __name__ == '__main__': diff --git a/app/services/task.py b/app/services/task.py index bec38be..3914df5 100644 --- a/app/services/task.py +++ b/app/services/task.py @@ -136,11 +136,21 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: di list_script=new_script_list ) logger.info(f"音频文件合并成功->{merged_audio_path}") + # 合并字幕文件 merged_subtitle_path = subtitle_merger.merge_subtitle_files(new_script_list) - logger.info(f"字幕文件合并成功->{merged_subtitle_path}") + if merged_subtitle_path: + logger.info(f"字幕文件合并成功->{merged_subtitle_path}") + else: + logger.warning("没有有效的字幕内容,将生成无字幕视频") + merged_subtitle_path = "" except Exception as e: - logger.error(f"合并音频文件失败: {str(e)}") + logger.error(f"合并音频/字幕文件失败: {str(e)}") + # 确保即使合并失败也有默认值 + if 'merged_audio_path' not in locals(): + merged_audio_path = "" + if 'merged_subtitle_path' not in locals(): + merged_subtitle_path = "" else: logger.warning("没有需要合并的音频/字幕") merged_audio_path = "" @@ -351,11 +361,21 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): list_script=new_script_list ) logger.info(f"音频文件合并成功->{merged_audio_path}") + # 合并字幕文件 merged_subtitle_path = subtitle_merger.merge_subtitle_files(new_script_list) - logger.info(f"字幕文件合并成功->{merged_subtitle_path}") + if merged_subtitle_path: + logger.info(f"字幕文件合并成功->{merged_subtitle_path}") + else: + logger.warning("没有有效的字幕内容,将生成无字幕视频") + merged_subtitle_path = "" except Exception as e: - logger.error(f"合并音频文件失败: {str(e)}") + logger.error(f"合并音频/字幕文件失败: {str(e)}") + # 确保即使合并失败也有默认值 + if 'merged_audio_path' not in locals(): + merged_audio_path = "" + if 'merged_subtitle_path' not in locals(): + merged_subtitle_path = "" else: logger.warning("没有需要合并的音频/字幕") merged_audio_path = ""