refactor(voice): 优化 Edge TTS 音频生成逻辑- 重构了 Edge TTS音频生成函数,提高了代码可读性和错误处理能力

-增加了重试机制,提高了生成音频的可靠性
-优化了日志输出,提供了更详细的错误信息和生成进度
- 删除了不必要的测试代码和注释,精简了代码结构
This commit is contained in:
linyqh 2024-12-03 23:24:20 +08:00
parent 9efccea97f
commit 0bb811ea79
3 changed files with 76 additions and 71 deletions

View File

@ -438,7 +438,6 @@ def clip_videos(task_id: str, timestamp_terms: List[str], origin_video: str, pro
elif material_directory and not os.path.isdir(material_directory):
material_directory = video_clips_dir # 如果没有指定material_directory,使用缓存目录
logger.debug("material_directory:",material_directory)
try:
saved_video_path = save_clip_video(timestamp=item, origin_video=origin_video, save_dir=material_directory)
if saved_video_path:
@ -511,7 +510,3 @@ def merge_videos(video_paths, ost_list):
os.remove(silent_video)
return output_file
if __name__ == "__main__":
save_clip_video('00:50-01:41', 'E:\\projects\\NarratoAI\\resource\\videos\\WeChat_20241110144511.mp4')

View File

@ -351,7 +351,7 @@ def process_subtitles(subtitle_path, video_clip, video_duration, create_text_cli
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
# 时间范围<EFBFBD><EFBFBD>
# 时间范围
start_time = max(clip.start, 0)
if start_time >= video_duration:
continue
@ -520,9 +520,9 @@ def combine_clip_videos(combined_video_path: str,
def extract_timestamp_from_filename(filename: str) -> tuple:
"""
从文件名中提取时间戳支持多种格式
- "vid-00_06,500-00_24,800.mp4" -> (6.5, 24.8)
- "vid-00_00_00-020-00_00_10-400.mp4" -> (0.02, 10.4)
从文件名中提取时间戳支持格式
- "vid-00-00-10_000-00-00-43_039.mp4" -> (10.0, 43.039)
表示 00时00分10秒000毫秒 00时00分43秒039毫秒
"""
try:
# 提取时间戳部分
@ -533,35 +533,37 @@ def extract_timestamp_from_filename(filename: str) -> tuple:
timestamp = match.group(1)
# 处理包含毫秒的格式 (00_00_00-020-00_00_10-400)
if timestamp.count('-') == 3:
parts = timestamp.split('-')
start_time = f"{parts[0]}-{parts[1]}" # 组合开始时间和毫秒
end_time = f"{parts[2]}-{parts[3]}" # 组合结束时间和毫秒
# 转换开始时间
start_time_str = start_time.replace('_', ':')
if start_time_str.count(':') == 2: # 如果是 00:00:00-020 格式
start_base = utils.time_to_seconds(start_time_str.split('-')[0])
start_ms = int(start_time_str.split('-')[1]) / 1000
start_seconds = start_base + start_ms
else:
start_seconds = utils.time_to_seconds(start_time_str)
# 转换结束时间
end_time_str = end_time.replace('_', ':')
if end_time_str.count(':') == 2: # 如果是 00:00:10-400 格式
end_base = utils.time_to_seconds(end_time_str.split('-')[0])
end_ms = int(end_time_str.split('-')[1]) / 1000
end_seconds = end_base + end_ms
else:
end_seconds = utils.time_to_seconds(end_time_str)
def parse_timestamp(time_str: str) -> float:
"""解析单个时间戳字符串为秒数"""
try:
# 处理 "00-00-10_000" 格式
main_time, milliseconds = time_str.rsplit('_', 1) # 从右边分割,处理可能存在的多个下划线
time_components = main_time.split('-')
# 处理简单格式 (00_06-00_24)
else:
start_str, end_str = timestamp.split('-')
start_seconds = utils.time_to_seconds(start_str.replace('_', ':'))
end_seconds = utils.time_to_seconds(end_str.replace('_', ':'))
if len(time_components) != 3:
raise ValueError(f"时间格式错误: {main_time}")
hours = int(time_components[0])
minutes = int(time_components[1])
seconds = int(time_components[2])
ms = int(milliseconds)
# 转换为秒数
total_seconds = hours * 3600 + minutes * 60 + seconds + ms / 1000
return total_seconds
except Exception as e:
raise ValueError(f"解析时间戳失败 {time_str}: {str(e)}")
# 分割起始和结束时间戳
timestamps = timestamp.split('-', 5) # 最多分割5次处理 00-00-10_000-00-00-43_039 格式
if len(timestamps) != 6: # 应该得到 ['00', '00', '10_000', '00', '00', '43_039']
raise ValueError(f"时间戳格式错误,无法分割: {timestamp}")
start_str = '-'.join(timestamps[0:3]) # 组合开始时间 "00-00-10_000"
end_str = '-'.join(timestamps[3:6]) # 组合结束时间 "00-00-43_039"
start_seconds = parse_timestamp(start_str)
end_seconds = parse_timestamp(end_str)
logger.debug(f"从文件名 {filename} 提取时间戳: {start_seconds:.3f} - {end_seconds:.3f}")
return start_seconds, end_seconds
@ -661,9 +663,11 @@ def add_subtitles(video_clip, subtitle_path, font_size, font_name, font_color, p
size=(video_clip.w * 0.9, None) # 限制字幕宽度
)
# 使用 SubtitlesClip但明确指定 UTF-8 编码
subtitles = SubtitlesClip(
subtitle_path,
subtitle_generator
subtitle_generator,
encoding='utf-8' # 明确指定使用 UTF-8 编码
)
# 添加字幕到视频
@ -692,7 +696,7 @@ if __name__ == "__main__":
# {
# "picture": "夜晚,一个小孩在树林里奔跑,后面有人拿着火把在追赶",
# "timestamp": "00:00-00:03",
# "narration": "夜<EFBFBD><EFBFBD><EFBFBD>风高的树林,一个小孩在拼命奔跑,后面的人穷追不舍!",
# "narration": "夜风高的树林,一个小孩在拼命奔跑,后面的人穷追不舍!",
# "OST": False,
# "new_timestamp": "00:00-00:03"
# },

View File

@ -11,6 +11,7 @@ from edge_tts.submaker import mktimestamp
from xml.sax.saxutils import unescape
from edge_tts import submaker, SubMaker
from moviepy.video.tools import subtitles
import time
from app.config import config
from app.utils import utils
@ -1071,33 +1072,47 @@ def azure_tts_v1(
pitch_str = convert_pitch_to_percent(voice_pitch)
for i in range(3):
try:
logger.info(f"start, voice name: {voice_name}, try: {i + 1}")
logger.info(f"{i+1} 次使用 edge_tts 生成音频")
async def _do() -> SubMaker:
async def _do() -> tuple[SubMaker, bytes]:
communicate = edge_tts.Communicate(text, voice_name, rate=rate_str, pitch=pitch_str, proxy=config.proxy.get("http"))
sub_maker = edge_tts.SubMaker()
with open(voice_file, "wb") as file:
async for chunk in communicate.stream():
if chunk["type"] == "audio":
file.write(chunk["data"])
elif chunk["type"] == "WordBoundary":
sub_maker.create_sub(
(chunk["offset"], chunk["duration"]), chunk["text"]
)
return sub_maker
# 判断音频文件是否一件存在
audio_data = bytes() # 用于存储音频数据
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_data += chunk["data"]
elif chunk["type"] == "WordBoundary":
sub_maker.create_sub(
(chunk["offset"], chunk["duration"]), chunk["text"]
)
return sub_maker, audio_data
# 判断音频文件是否已存在
if os.path.exists(voice_file):
logger.info(f"voice file exists, skip tts: {voice_file}")
continue
sub_maker = asyncio.run(_do())
if not sub_maker or not sub_maker.subs:
logger.warning(f"failed, sub_maker is None or sub_maker.subs is None")
# 获取音频数据和字幕信息
sub_maker, audio_data = asyncio.run(_do())
# 验证数据是否有效
if not sub_maker or not sub_maker.subs or not audio_data:
logger.warning(f"failed, invalid data generated")
if i < 2:
time.sleep(1)
continue
# 数据有效,写入文件
with open(voice_file, "wb") as file:
file.write(audio_data)
logger.info(f"completed, output file: {voice_file}")
return sub_maker
except Exception as e:
logger.error(f"failed, error: {str(e)}")
logger.error(f"生成音频文件时出错: {str(e)}")
if i < 2:
time.sleep(1)
return None
@ -1133,14 +1148,6 @@ def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> [SubMaker, None
sub_maker = SubMaker()
def speech_synthesizer_word_boundary_cb(evt: speechsdk.SessionEventArgs):
# print('WordBoundary event:')
# print('\tBoundaryType: {}'.format(evt.boundary_type))
# print('\tAudioOffset: {}ms'.format((evt.audio_offset + 5000)))
# print('\tDuration: {}'.format(evt.duration))
# print('\tText: {}'.format(evt.text))
# print('\tTextOffset: {}'.format(evt.text_offset))
# print('\tWordLength: {}'.format(evt.word_length))
duration = _format_duration_to_offset(str(evt.duration))
offset = _format_duration_to_offset(evt.audio_offset)
sub_maker.subs.append(evt.text)
@ -1186,9 +1193,13 @@ def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> [SubMaker, None
logger.error(
f"azure v2 speech synthesis error: {cancellation_details.error_details}"
)
if i < 2: # 如果不是最后一次重试则等待1秒
time.sleep(1)
logger.info(f"completed, output file: {voice_file}")
except Exception as e:
logger.error(f"failed, error: {str(e)}")
if i < 2: # 如果不是最后一次重试则等待1秒
time.sleep(1)
return None
@ -1446,7 +1457,7 @@ def tts_multiple(task_id: str, list_script: list, voice_name: str, voice_rate: f
if sub_maker is None:
logger.error(f"无法为时间戳 {timestamp} 生成音频; "
f"如果您在中国请使用VPN。或者手动选择 zh-CN-YunyangNeural 等角色;"
f"如果您在中国请使用VPN; "
f"或者使用其他 tts 引擎")
continue
@ -1463,17 +1474,12 @@ if __name__ == "__main__":
voice_name = parse_voice_name(voice_name)
print(voice_name)
with open("../../resource/scripts/test.json", 'r', encoding='utf-8') as f:
with open("../../resource/scripts/2024-1203-205442.json", 'r', encoding='utf-8') as f:
data = json.load(f)
audio_files, sub_maker_list = tts_multiple(task_id="12312312", list_script=data, voice_name=voice_name, voice_rate=1)
audio_files, sub_maker_list = tts_multiple(task_id="12312312", list_script=data, voice_name=voice_name, voice_rate=1, voice_pitch=1)
full_text = " ".join([item['narration'] for item in data if not item['OST']])
subtitle_file = os.path.join(utils.task_dir("12312312"), "subtitle_multiple.srt")
create_subtitle_from_multiple(full_text, sub_maker_list, data, subtitle_file)
print(f"生成的音频文件列表: {audio_files}")
print(f"生成的字幕文件: {subtitle_file}")
# text = " ".join([item['narration'] for item in data])
# sub_marks = tts(text=text, voice_name=voice_name, voice_rate=1, voice_file="../../storage/tasks/12312312/aaa.mp3")
# create_subtitle(text=text, sub_maker=sub_marks, subtitle_file="../../storage/tasks/12312312/subtitle_123.srt")