fix(subtitle): 修复字幕处理逻辑并添加有效性检查

处理空字幕文件情况并改进错误处理
确保合并失败时有默认返回值
添加字幕文件有效性检查函数
This commit is contained in:
linyq 2025-08-16 01:24:56 +08:00
parent 1fba4414aa
commit 2569b7fee7
3 changed files with 178 additions and 86 deletions

View File

@ -29,6 +29,40 @@ from app.models.schema import AudioVolumeDefaults
from app.services.audio_normalizer import AudioNormalizer, normalize_audio_for_mixing
def is_valid_subtitle_file(subtitle_path: str) -> bool:
"""
检查字幕文件是否有效
参数:
subtitle_path: 字幕文件路径
返回:
bool: 如果字幕文件存在且包含有效内容则返回True否则返回False
"""
if not subtitle_path or not os.path.exists(subtitle_path):
return False
try:
with open(subtitle_path, 'r', encoding='utf-8') as f:
content = f.read().strip()
# 检查文件是否为空
if not content:
return False
# 检查是否包含时间戳格式SRT格式的基本特征
# SRT格式应该包含类似 "00:00:00,000 --> 00:00:00,000" 的时间戳
import re
time_pattern = r'\d{2}:\d{2}:\d{2},\d{3}\s*-->\s*\d{2}:\d{2}:\d{2},\d{3}'
if not re.search(time_pattern, content):
return False
return True
except Exception as e:
logger.warning(f"检查字幕文件时出错: {str(e)}")
return False
def merge_materials(
video_path: str,
audio_path: str,
@ -318,34 +352,36 @@ def merge_materials(
color=subtitle_color,
)
# 处理字幕 - 修复字幕开关bug
if subtitle_enabled and subtitle_path and os.path.exists(subtitle_path):
logger.info("字幕已启用,开始处理字幕文件")
try:
# 加载字幕文件
sub = SubtitlesClip(
subtitles=subtitle_path,
encoding="utf-8",
make_textclip=make_textclip
)
# 处理字幕 - 修复字幕开关bug和空字幕文件问题
if subtitle_enabled and subtitle_path:
if is_valid_subtitle_file(subtitle_path):
logger.info("字幕已启用,开始处理字幕文件")
try:
# 加载字幕文件
sub = SubtitlesClip(
subtitles=subtitle_path,
encoding="utf-8",
make_textclip=make_textclip
)
# 创建每个字幕片段
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
# 创建每个字幕片段
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
# 合成视频和字幕
video_clip = CompositeVideoClip([video_clip, *text_clips])
logger.info(f"已添加{len(text_clips)}个字幕片段")
except Exception as e:
logger.error(f"处理字幕失败: \n{traceback.format_exc()}")
# 合成视频和字幕
video_clip = CompositeVideoClip([video_clip, *text_clips])
logger.info(f"已添加{len(text_clips)}个字幕片段")
except Exception as e:
logger.error(f"处理字幕失败: \n{traceback.format_exc()}")
logger.warning("字幕处理失败,继续生成无字幕视频")
else:
logger.warning(f"字幕文件无效或为空: {subtitle_path},跳过字幕处理")
elif not subtitle_enabled:
logger.info("字幕已禁用,跳过字幕处理")
elif not subtitle_path:
logger.info("未提供字幕文件路径,跳过字幕处理")
elif not os.path.exists(subtitle_path):
logger.warning(f"字幕文件不存在: {subtitle_path},跳过字幕处理")
# 导出最终视频
try:

View File

@ -62,91 +62,127 @@ def parse_edited_time_range(time_range_str):
def merge_subtitle_files(subtitle_items, output_file=None):
"""
合并多个SRT字幕文件
参数:
subtitle_items: 字典列表每个字典包含subtitle文件路径和editedTimeRange
output_file: 输出文件的路径如果为None则自动生成
返回:
合并后的字幕文件路径
合并后的字幕文件路径如果没有有效字幕则返回None
"""
# 按照editedTimeRange的开始时间排序
sorted_items = sorted(subtitle_items,
sorted_items = sorted(subtitle_items,
key=lambda x: parse_edited_time_range(x.get('editedTimeRange', ''))[0] or timedelta())
merged_subtitles = []
subtitle_index = 1
valid_items_count = 0
for item in sorted_items:
if not item.get('subtitle') or not os.path.exists(item.get('subtitle')):
print(f"跳过项目 {item.get('_id')}:字幕文件不存在或路径为空")
continue
# 从editedTimeRange获取起始时间偏移
offset_time, _ = parse_edited_time_range(item.get('editedTimeRange', ''))
if offset_time is None:
print(f"警告: 无法从项目 {item.get('_id')} 的editedTimeRange中提取时间范围跳过该项")
continue
with open(item['subtitle'], 'r', encoding='utf-8') as file:
content = file.read()
# 解析字幕文件
subtitle_blocks = re.split(r'\n\s*\n', content.strip())
for block in subtitle_blocks:
lines = block.strip().split('\n')
if len(lines) < 3: # 确保块有足够的行数
try:
with open(item['subtitle'], 'r', encoding='utf-8') as file:
content = file.read().strip()
# 检查文件内容是否为空
if not content:
print(f"跳过项目 {item.get('_id')}:字幕文件内容为空")
continue
# 解析时间轴行
time_line = lines[1]
time_parts = time_line.split(' --> ')
if len(time_parts) != 2:
continue
start_time = parse_time(time_parts[0])
end_time = parse_time(time_parts[1])
# 应用时间偏移
adjusted_start_time = start_time + offset_time
adjusted_end_time = end_time + offset_time
# 重建字幕块
adjusted_time_line = f"{format_time(adjusted_start_time)} --> {format_time(adjusted_end_time)}"
text_lines = lines[2:]
new_block = [
str(subtitle_index),
adjusted_time_line,
*text_lines
]
merged_subtitles.append('\n'.join(new_block))
subtitle_index += 1
valid_items_count += 1
# 解析字幕文件
subtitle_blocks = re.split(r'\n\s*\n', content)
for block in subtitle_blocks:
lines = block.strip().split('\n')
if len(lines) < 3: # 确保块有足够的行数
continue
# 解析时间轴行
time_line = lines[1]
time_parts = time_line.split(' --> ')
if len(time_parts) != 2:
continue
start_time = parse_time(time_parts[0])
end_time = parse_time(time_parts[1])
# 应用时间偏移
adjusted_start_time = start_time + offset_time
adjusted_end_time = end_time + offset_time
# 重建字幕块
adjusted_time_line = f"{format_time(adjusted_start_time)} --> {format_time(adjusted_end_time)}"
text_lines = lines[2:]
new_block = [
str(subtitle_index),
adjusted_time_line,
*text_lines
]
merged_subtitles.append('\n'.join(new_block))
subtitle_index += 1
except Exception as e:
print(f"处理项目 {item.get('_id')} 的字幕文件时出错: {str(e)}")
continue
# 检查是否有有效的字幕内容
if not merged_subtitles:
print(f"警告: 没有找到有效的字幕内容,共检查了 {len(subtitle_items)} 个项目,其中 {valid_items_count} 个有有效文件")
return None
# 确定输出文件路径
if output_file is None:
dir_path = os.path.dirname(sorted_items[0]['subtitle'])
# 找到第一个有效的字幕文件来确定目录
valid_item = None
for item in sorted_items:
if item.get('subtitle') and os.path.exists(item.get('subtitle')):
valid_item = item
break
if not valid_item:
print("错误: 无法确定输出目录,没有找到有效的字幕文件")
return None
dir_path = os.path.dirname(valid_item['subtitle'])
first_start = parse_edited_time_range(sorted_items[0]['editedTimeRange'])[0]
last_end = parse_edited_time_range(sorted_items[-1]['editedTimeRange'])[1]
first_start_h, first_start_m, first_start_s = int(first_start.seconds // 3600), int((first_start.seconds % 3600) // 60), int(first_start.seconds % 60)
last_end_h, last_end_m, last_end_s = int(last_end.seconds // 3600), int((last_end.seconds % 3600) // 60), int(last_end.seconds % 60)
first_start_str = f"{first_start_h:02d}_{first_start_m:02d}_{first_start_s:02d}"
last_end_str = f"{last_end_h:02d}_{last_end_m:02d}_{last_end_s:02d}"
output_file = os.path.join(dir_path, f"merged_subtitle_{first_start_str}-{last_end_str}.srt")
if first_start and last_end:
first_start_h, first_start_m, first_start_s = int(first_start.seconds // 3600), int((first_start.seconds % 3600) // 60), int(first_start.seconds % 60)
last_end_h, last_end_m, last_end_s = int(last_end.seconds // 3600), int((last_end.seconds % 3600) // 60), int(last_end.seconds % 60)
first_start_str = f"{first_start_h:02d}_{first_start_m:02d}_{first_start_s:02d}"
last_end_str = f"{last_end_h:02d}_{last_end_m:02d}_{last_end_s:02d}"
output_file = os.path.join(dir_path, f"merged_subtitle_{first_start_str}-{last_end_str}.srt")
else:
output_file = os.path.join(dir_path, f"merged_subtitle.srt")
# 合并所有字幕块
merged_content = '\n\n'.join(merged_subtitles)
# 写入合并后的内容
with open(output_file, 'w', encoding='utf-8') as file:
file.write(merged_content)
return output_file
try:
with open(output_file, 'w', encoding='utf-8') as file:
file.write(merged_content)
print(f"字幕文件合并成功: {output_file},包含 {len(merged_subtitles)} 个字幕条目")
return output_file
except Exception as e:
print(f"写入字幕文件失败: {str(e)}")
return None
if __name__ == '__main__':

View File

@ -136,11 +136,21 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: di
list_script=new_script_list
)
logger.info(f"音频文件合并成功->{merged_audio_path}")
# 合并字幕文件
merged_subtitle_path = subtitle_merger.merge_subtitle_files(new_script_list)
logger.info(f"字幕文件合并成功->{merged_subtitle_path}")
if merged_subtitle_path:
logger.info(f"字幕文件合并成功->{merged_subtitle_path}")
else:
logger.warning("没有有效的字幕内容,将生成无字幕视频")
merged_subtitle_path = ""
except Exception as e:
logger.error(f"合并音频文件失败: {str(e)}")
logger.error(f"合并音频/字幕文件失败: {str(e)}")
# 确保即使合并失败也有默认值
if 'merged_audio_path' not in locals():
merged_audio_path = ""
if 'merged_subtitle_path' not in locals():
merged_subtitle_path = ""
else:
logger.warning("没有需要合并的音频/字幕")
merged_audio_path = ""
@ -351,11 +361,21 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
list_script=new_script_list
)
logger.info(f"音频文件合并成功->{merged_audio_path}")
# 合并字幕文件
merged_subtitle_path = subtitle_merger.merge_subtitle_files(new_script_list)
logger.info(f"字幕文件合并成功->{merged_subtitle_path}")
if merged_subtitle_path:
logger.info(f"字幕文件合并成功->{merged_subtitle_path}")
else:
logger.warning("没有有效的字幕内容,将生成无字幕视频")
merged_subtitle_path = ""
except Exception as e:
logger.error(f"合并音频文件失败: {str(e)}")
logger.error(f"合并音频/字幕文件失败: {str(e)}")
# 确保即使合并失败也有默认值
if 'merged_audio_path' not in locals():
merged_audio_path = ""
if 'merged_subtitle_path' not in locals():
merged_subtitle_path = ""
else:
logger.warning("没有需要合并的音频/字幕")
merged_audio_path = ""