mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-12 03:02:48 +00:00
128 lines
4.7 KiB
Python
128 lines
4.7 KiB
Python
import os
|
||
import json
|
||
import subprocess
|
||
import edge_tts
|
||
from edge_tts import submaker
|
||
from pydub import AudioSegment
|
||
from typing import List, Dict
|
||
from loguru import logger
|
||
from app.utils import utils
|
||
|
||
|
||
def check_ffmpeg():
|
||
"""检查FFmpeg是否已安装"""
|
||
try:
|
||
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||
return True
|
||
except FileNotFoundError:
|
||
return False
|
||
|
||
|
||
def merge_audio_files(task_id: str, audio_file_paths: List[str], total_duration: int, video_script: list):
|
||
"""
|
||
合并多个音频文件到一个指定总时长的音频文件中,并生成相应的字幕
|
||
:param task_id: 任务ID
|
||
:param audio_file_paths: 音频文件路径列表
|
||
:param total_duration: 最终音频文件的总时长(秒)
|
||
:param video_script: JSON格式的视频脚本
|
||
"""
|
||
output_dir = utils.task_dir(task_id)
|
||
|
||
if not check_ffmpeg():
|
||
logger.error("错误:FFmpeg未安装。请安装FFmpeg后再运行此脚本。")
|
||
return None, None
|
||
|
||
# 创建一个总时长为total_duration的空白音频
|
||
blank_audio = AudioSegment.silent(duration=total_duration * 1000) # pydub使用毫秒
|
||
|
||
for audio_path in audio_file_paths:
|
||
if not os.path.exists(audio_path):
|
||
logger.info(f"警告:文件 {audio_path} 不存在,已跳过。")
|
||
continue
|
||
|
||
# 从文件名中提取时间戳
|
||
filename = os.path.basename(audio_path)
|
||
start_time, end_time = extract_timestamp(filename)
|
||
|
||
# 读取音频文件
|
||
try:
|
||
audio = AudioSegment.from_mp3(audio_path)
|
||
except Exception as e:
|
||
logger.error(f"错误:无法读取文件 {audio_path}。错误信息:{str(e)}")
|
||
continue
|
||
|
||
# 将音频插入到空白音频的指定位置
|
||
blank_audio = blank_audio.overlay(audio, position=start_time * 1000)
|
||
|
||
# 尝试导出为WAV格式
|
||
try:
|
||
output_file = os.path.join(output_dir, "audio.wav")
|
||
blank_audio.export(output_file, format="wav")
|
||
logger.info(f"音频合并完成,已保存为 {output_file}")
|
||
except Exception as e:
|
||
logger.info(f"导出为WAV格式失败,尝试使用MP3格式:{str(e)}")
|
||
try:
|
||
output_file = os.path.join(output_dir, "audio.mp3")
|
||
blank_audio.export(output_file, format="mp3", codec="libmp3lame")
|
||
logger.info(f"音频合并完成,已保存为 {output_file}")
|
||
except Exception as e:
|
||
logger.error(f"导出音频失败:{str(e)}")
|
||
return None, None
|
||
|
||
return output_file
|
||
|
||
def parse_timestamp(timestamp: str):
|
||
"""解析时间戳字符串为秒数"""
|
||
# 确保使用冒号作为分隔符
|
||
timestamp = timestamp.replace('_', ':')
|
||
return time_to_seconds(timestamp)
|
||
|
||
def extract_timestamp(filename):
|
||
"""从文件名中提取开始和结束时间戳"""
|
||
# 从 "audio_00_06-00_24.mp3" 这样的格式中提取时间
|
||
time_part = filename.split('_', 1)[1].split('.')[0] # 获取 "00_06-00_24" 部分
|
||
start_time, end_time = time_part.split('-') # 分割成 "00_06" 和 "00_24"
|
||
|
||
# 将下划线格式转换回冒号格式
|
||
start_time = start_time.replace('_', ':')
|
||
end_time = end_time.replace('_', ':')
|
||
|
||
# 将时间戳转换为秒
|
||
start_seconds = time_to_seconds(start_time)
|
||
end_seconds = time_to_seconds(end_time)
|
||
|
||
return start_seconds, end_seconds
|
||
|
||
|
||
def time_to_seconds(time_str):
|
||
"""将 "00:06" 或 "00_06" 格式转换为总秒数"""
|
||
# 确保使用冒号作为分隔符
|
||
time_str = time_str.replace('_', ':')
|
||
try:
|
||
parts = time_str.split(':')
|
||
if len(parts) != 2:
|
||
logger.error(f"Invalid time format: {time_str}")
|
||
return 0
|
||
return int(parts[0]) * 60 + int(parts[1])
|
||
except (ValueError, IndexError) as e:
|
||
logger.error(f"Error parsing time {time_str}: {str(e)}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 示例用法
|
||
audio_files =[
|
||
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:06-00:24.mp3",
|
||
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:32-00:38.mp3",
|
||
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:43-00:52.mp3",
|
||
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:52-01:09.mp3",
|
||
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_01:13-01:15.mp3",
|
||
]
|
||
total_duration = 38
|
||
video_script_path = "/Users/apple/Desktop/home/NarratoAI/resource/scripts/test003.json"
|
||
with open(video_script_path, "r", encoding="utf-8") as f:
|
||
video_script = json.load(f)
|
||
|
||
output_file = merge_audio_files("test456", audio_files, total_duration, video_script)
|
||
print(output_file)
|