feat(subtitle): 添加从视频提取音频并生成字幕的功能

- 新增 extract_audio_and_create_subtitle函数,用于从视频文件中提取音频并生成字幕文件
- 更新 video_pipeline.py,修改视频下载和处理的相关参数
This commit is contained in:
linyqh 2024-11-20 00:34:11 +08:00 committed by linyq
parent 38f23983ef
commit 1be304a696
2 changed files with 84 additions and 25 deletions

View File

@ -8,6 +8,8 @@ from faster_whisper import WhisperModel
from timeit import default_timer as timer from timeit import default_timer as timer
from loguru import logger from loguru import logger
import google.generativeai as genai import google.generativeai as genai
from moviepy.editor import VideoFileClip
import os
from app.config import config from app.config import config
from app.utils import utils from app.utils import utils
@ -362,29 +364,86 @@ def create_with_gemini(audio_file: str, subtitle_file: str = "", api_key: Option
return None return None
def extract_audio_and_create_subtitle(video_file: str, subtitle_file: str = "") -> Optional[str]:
"""
从视频文件中提取音频并生成字幕文件
参数:
- video_file: MP4视频文件的路径
- subtitle_file: 输出字幕文件的路径可选如果未提供将根据视频文件名自动生成
返回:
- str: 生成的字幕文件路径
- None: 如果处理过程中出现错误
"""
try:
# 获取视频文件所在目录
video_dir = os.path.dirname(video_file)
video_name = os.path.splitext(os.path.basename(video_file))[0]
# 设置音频文件路径
audio_file = os.path.join(video_dir, f"{video_name}_audio.wav")
# 如果未指定字幕文件路径,则自动生成
if not subtitle_file:
subtitle_file = os.path.join(video_dir, f"{video_name}.srt")
logger.info(f"开始从视频提取音频: {video_file}")
# 加载视频文件
video = VideoFileClip(video_file)
# 提取音频并保存为WAV格式
logger.info(f"正在提取音频到: {audio_file}")
video.audio.write_audiofile(audio_file, codec='pcm_s16le')
# 关闭视频文件
video.close()
logger.info("音频提取完成,开始生成字幕")
# 使用create函数生成字幕
create(audio_file, subtitle_file)
# 删除临时音频文件
if os.path.exists(audio_file):
os.remove(audio_file)
logger.info("已清理临时音频文件")
return subtitle_file
except Exception as e:
logger.error(f"处理视频文件时出错: {str(e)}")
logger.error(traceback.format_exc())
return None
if __name__ == "__main__": if __name__ == "__main__":
task_id = "test456" task_id = "12121"
task_dir = utils.task_dir(task_id) task_dir = utils.task_dir(task_id)
subtitle_file = f"{task_dir}/subtitle.srt" subtitle_file = f"{task_dir}/subtitle.srt"
audio_file = f"{task_dir}/audio.wav" audio_file = f"{task_dir}/audio.wav"
video_file = f"{task_dir}/duanju_demo.mp4"
subtitles = file_to_subtitles(subtitle_file) extract_audio_and_create_subtitle(video_file, subtitle_file)
print(subtitles)
# script_file = f"{task_dir}/script.json" # subtitles = file_to_subtitles(subtitle_file)
# with open(script_file, "r") as f: # print(subtitles)
# script_content = f.read()
# s = json.loads(script_content)
# script = s.get("script")
#
# correct(subtitle_file, script)
subtitle_file = f"{task_dir}/subtitle111.srt" # # script_file = f"{task_dir}/script.json"
create(audio_file, subtitle_file) # # with open(script_file, "r") as f:
# # script_content = f.read()
# # s = json.loads(script_content)
# # script = s.get("script")
# #
# # correct(subtitle_file, script)
# # 使用Gemini模型处理音频 # subtitle_file = f"{task_dir}/subtitle111.srt"
# gemini_api_key = config.app.get("gemini_api_key") # 请替换为实际的API密钥 # create(audio_file, subtitle_file)
# gemini_subtitle_file = create_with_gemini(audio_file, api_key=gemini_api_key)
# # # # 使用Gemini模型处理音频
# if gemini_subtitle_file: # # gemini_api_key = config.app.get("gemini_api_key") # 请替换为实际的API密钥
# print(f"Gemini生成的字幕文件: {gemini_subtitle_file}") # # gemini_subtitle_file = create_with_gemini(audio_file, api_key=gemini_api_key)
# #
# # if gemini_subtitle_file:
# # print(f"Gemini生成的字幕文件: {gemini_subtitle_file}")

View File

@ -110,12 +110,12 @@ class VideoPipeline:
"""运行完整的pipeline""" """运行完整的pipeline"""
try: try:
current_path = os.path.dirname(os.path.abspath(__file__)) current_path = os.path.dirname(os.path.abspath(__file__))
video_path = os.path.join(current_path, "resource", "videos", video_name) video_path = os.path.join(current_path, "resource", "videos", f"{video_name}.mp4")
# 判断视频是否存在 # 判断视频是否存在
if not os.path.exists(video_path): if not os.path.exists(video_path):
# 1. 下载视频 # 1. 下载视频
print(f"视频不存在, 开始下载视频: {video_path}") print(f"视频不存在, 开始下载视频: {video_path}")
download_result = self.download_video(youtube_url=youtube_url, resolution="1080p", output_format="mp4", rename=video_name) download_result = self.download_video(url=youtube_url, resolution="1080p", output_format="mp4", rename=video_name)
video_path = download_result["output_path"] video_path = download_result["output_path"]
else: else:
print(f"视频已存在: {video_path}") print(f"视频已存在: {video_path}")
@ -168,12 +168,12 @@ class VideoPipeline:
if __name__ == "__main__": if __name__ == "__main__":
pipeline = VideoPipeline() pipeline = VideoPipeline()
result = pipeline.run_pipeline( result = pipeline.run_pipeline(
task_id="test_123", task_id="test_111901",
script_name="test.json", script_name="test.json",
youtube_url="https://www.youtube.com/watch?v=Kenm35gdqtk", youtube_url="https://www.youtube.com/watch?v=vLJ7Yed6FQ4",
video_name="test.mp4", video_name="2024-11-19-01",
skip_seconds=0, skip_seconds=50,
threshold=30, threshold=35,
vision_batch_size=10, vision_batch_size=10,
vision_llm_provider="gemini", vision_llm_provider="gemini",
voice_name="zh-CN-YunjianNeural", voice_name="zh-CN-YunjianNeural",