feat(webui): 视频合并添加一键转录功能

-改进文件上传和预览逻辑,支持视频和字幕文件的独立上传
- 添加字幕预览功能,可显示已上传字幕文件的内容
- 实现一键转录功能,为没有字幕的视频生成字幕
-优化合并文件的流程,提高合并效率
- 增加合并结果预览,方便用户查看合并后的视频和字幕
-重构代码,提高可维护性和可扩展性
This commit is contained in:
linyq 2024-12-06 18:01:14 +08:00
parent 4e590380f5
commit 65d5a681ac
4 changed files with 204 additions and 107 deletions

1
.gitignore vendored
View File

@ -29,5 +29,6 @@ resource/songs/*.mp3
resource/songs/*.flac
resource/fonts/*.ttc
resource/fonts/*.ttf
resource/fonts/*.otf
resource/srt/*.srt
app/models/faster-whisper-large-v2/*

View File

@ -419,11 +419,11 @@ def extract_audio_and_create_subtitle(video_file: str, subtitle_file: str = "")
if __name__ == "__main__":
task_id = "12121"
task_id = "123456"
task_dir = utils.task_dir(task_id)
subtitle_file = f"{task_dir}/subtitle.srt"
subtitle_file = f"{task_dir}/subtitle_123456.srt"
audio_file = f"{task_dir}/audio.wav"
video_file = f"{task_dir}/duanju_demo.mp4"
video_file = "/Users/apple/Desktop/home/NarratoAI/resource/videos/merged_video_1702.mp4"
extract_audio_and_create_subtitle(video_file, subtitle_file)

View File

@ -3,48 +3,87 @@ import time
import math
import sys
import tempfile
import traceback
import shutil
import streamlit as st
from pathlib import Path
from loguru import logger
from typing import List, Dict, Tuple
from dataclasses import dataclass
from streamlit.runtime.uploaded_file_manager import UploadedFile
from webui.utils.merge_video import merge_videos_and_subtitles
from app.utils.utils import video_dir, srt_dir
from app.services.subtitle import extract_audio_and_create_subtitle
# 定义临时目录路径
TEMP_MERGE_DIR = os.path.join("storage", "temp", "merge")
# 确保临时目录存在
os.makedirs(TEMP_MERGE_DIR, exist_ok=True)
@dataclass
class VideoSubtitlePair:
video_file: UploadedFile | None
subtitle_file: UploadedFile | None
subtitle_file: str | None
base_name: str
order: int = 0
def save_uploaded_file(uploaded_file: UploadedFile, temp_dir: str) -> str:
"""Save uploaded file to temporary directory and return the file path"""
file_path = os.path.join(temp_dir, uploaded_file.name)
def save_uploaded_file(uploaded_file: UploadedFile, target_dir: str) -> str:
"""Save uploaded file to target directory and return the file path"""
file_path = os.path.join(target_dir, uploaded_file.name)
# 如果文件已存在,先删除它
if os.path.exists(file_path):
os.remove(file_path)
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
return file_path
def clean_temp_dir():
"""清空临时目录"""
if os.path.exists(TEMP_MERGE_DIR):
for file in os.listdir(TEMP_MERGE_DIR):
file_path = os.path.join(TEMP_MERGE_DIR, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"清理临时文件失败: {str(e)}")
def group_files(files: List[UploadedFile]) -> Dict[str, VideoSubtitlePair]:
"""Group uploaded files by their base names"""
pairs = {}
order_counter = 0
# 首先处理所有视频文件
for file in files:
base_name = os.path.splitext(file.name)[0]
ext = os.path.splitext(file.name)[1].lower()
if base_name not in pairs:
pairs[base_name] = VideoSubtitlePair(None, None, base_name, order_counter)
order_counter += 1
if ext == ".mp4":
if base_name not in pairs:
pairs[base_name] = VideoSubtitlePair(None, None, base_name, order_counter)
order_counter += 1
pairs[base_name].video_file = file
elif ext == ".srt":
pairs[base_name].subtitle_file = file
# 保存视频文件到临时目录
video_path = save_uploaded_file(file, TEMP_MERGE_DIR)
# 然后处理所有字幕文件
for file in files:
base_name = os.path.splitext(file.name)[0]
ext = os.path.splitext(file.name)[1].lower()
if ext == ".srt":
# 即使没有对应视频也保存字幕文件
subtitle_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt")
save_uploaded_file(file, TEMP_MERGE_DIR)
if base_name in pairs: # 如果有对应的视频
pairs[base_name].subtitle_file = subtitle_path
return pairs
@ -92,7 +131,7 @@ def render_merge_settings(tr):
# 计算需要多少行来显示所有视频每行5个
num_pairs = len(sorted_pairs)
num_rows = (num_pairs + 4) // 5 # 向上取整
num_rows = (num_pairs + 4) // 5 # 向上取整,每行5个
# 遍历每一行
for row in range(num_rows):
@ -107,13 +146,63 @@ def render_merge_settings(tr):
with cols[col_idx]:
st.caption(base_name)
# 显示视频(如果存在)
if pair.video_file:
st.video(pair.video_file)
# 显示视频预览(如果存在)
video_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.mp4")
if os.path.exists(video_path):
st.video(video_path)
else:
st.warning(tr("Missing Video"))
# 添加排序输入框
# 显示字幕预览(如果存在)
subtitle_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt")
if os.path.exists(subtitle_path):
with open(subtitle_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
st.markdown(tr("Subtitle Preview"))
st.text_area(
"Subtitle Content",
value=subtitle_content,
height=100, # 减高度以适应5列布局
label_visibility="collapsed",
key=f"subtitle_preview_{base_name}"
)
else:
st.warning(tr("Missing Subtitle"))
# 如果有视频但没有字幕,显示一键转录按钮
if os.path.exists(video_path):
if st.button(tr("One-Click Transcribe"), key=f"transcribe_{base_name}"):
with st.spinner(tr("Transcribing...")):
try:
# 生成字幕文件
result = extract_audio_and_create_subtitle(video_path, subtitle_path)
if result:
# 读取生成的字幕文件内容并显示预览
with open(subtitle_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
st.markdown(tr("Subtitle Preview"))
st.text_area(
"Subtitle Content",
value=subtitle_content,
height=150,
label_visibility="collapsed",
key=f"subtitle_preview_transcribed_{base_name}"
)
st.success(tr("Transcription Complete!"))
# 更新pair的字幕文件路径
pair.subtitle_file = subtitle_path
else:
st.error(tr("Transcription Failed. Please try again."))
except Exception as e:
error_message = str(e)
logger.error(traceback.format_exc())
if "rate limit exceeded" in error_message.lower():
st.error(tr("API rate limit exceeded. Please wait about an hour and try again."))
elif "resource_exhausted" in error_message.lower():
st.error(tr("Resources exhausted. Please try again later."))
else:
st.error(f"{tr('Transcription Failed')}: {str(e)}")
# 排序输入框
order = st.number_input(
tr("Order"),
min_value=0,
@ -124,95 +213,91 @@ def render_merge_settings(tr):
if order != st.session_state.file_orders[base_name]:
st.session_state.file_orders[base_name] = order
st.session_state.needs_reorder = True
# 显示字幕(如果存在)
if pair.subtitle_file:
subtitle_content = pair.subtitle_file.getvalue().decode('utf-8')
st.text_area(
"字幕预览",
value=subtitle_content,
height=150,
label_visibility="collapsed"
)
else:
st.warning(tr("Missing Subtitle"))
# 合并后的视频预览
# 如果需要重新排序,重新加载页面
if st.session_state.needs_reorder:
st.session_state.needs_reorder = False
st.rerun()
# 找出有完整视频和字幕的文件对
complete_pairs = {
k: v for k, v in all_pairs.items()
if os.path.exists(os.path.join(TEMP_MERGE_DIR, f"{k}.mp4")) and
os.path.exists(os.path.join(TEMP_MERGE_DIR, f"{k}.srt"))
}
# 合并按钮和结果显示
cols = st.columns([1, 2, 1])
with cols[0]:
st.write(f"{tr('Mergeable Files')}: {len(complete_pairs)}")
merge_videos_result = None
# 只有当存在完整的配对时才显示按钮
complete_pairs = {k: v for k, v in all_pairs.items() if v.video_file and v.subtitle_file}
if complete_pairs:
# 创建按钮
cols = st.columns([1, 1, 3, 3, 3])
with cols[0]:
if st.button(tr("Reorder"), disabled=not st.session_state.needs_reorder, use_container_width=True):
st.session_state.needs_reorder = False
st.rerun()
with cols[1]:
if st.button(tr("Merge All Files"), type="primary", use_container_width=True):
try:
# 获取排序后的完整文件对
sorted_complete_pairs = sorted(
[(k, v) for k, v in complete_pairs.items()],
key=lambda x: st.session_state.file_orders[x[0]]
with cols[1]:
if st.button(tr("Merge All Files"), type="primary", use_container_width=True):
try:
# 获取排序后的完整文件对
sorted_complete_pairs = sorted(
[(k, v) for k, v in complete_pairs.items()],
key=lambda x: st.session_state.file_orders[x[0]]
)
video_paths = []
subtitle_paths = []
for base_name, _ in sorted_complete_pairs:
video_paths.append(os.path.join(TEMP_MERGE_DIR, f"{base_name}.mp4"))
subtitle_paths.append(os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt"))
# 获取输出文件路径
output_video = os.path.join(video_dir(), f"merged_video_{time.strftime('%M%S')}.mp4")
output_subtitle = os.path.join(srt_dir(), f"merged_subtitle_{time.strftime('%M%S')}.srt")
with st.spinner(tr("Merging files...")):
# 合并文件
merge_videos_and_subtitles(
video_paths,
subtitle_paths,
output_video,
output_subtitle
)
# 创建临时目录保存文件
with tempfile.TemporaryDirectory() as temp_dir:
# 保存上传的文件到临时目录
video_paths = []
subtitle_paths = []
for _, pair in sorted_complete_pairs:
video_path = save_uploaded_file(pair.video_file, temp_dir)
subtitle_path = save_uploaded_file(pair.subtitle_file, temp_dir)
video_paths.append(video_path)
subtitle_paths.append(subtitle_path)
# 获取输出目录, 文件名添加 MMSS 时间戳
output_video = os.path.join(video_dir(), f"merged_video_{time.strftime('%M%S')}.mp4")
output_subtitle = os.path.join(srt_dir(), f"merged_subtitle_{time.strftime('%M%S')}.srt")
with st.spinner(tr("Merging files...")):
# 合并文件
merge_videos_and_subtitles(
video_paths,
subtitle_paths,
output_video,
output_subtitle
)
success = True
error_msg = ""
# 检查输出文件是否成功生成
if not os.path.exists(output_video):
success = False
error_msg += tr("Failed to generate merged video. ")
if not os.path.exists(output_subtitle):
success = False
error_msg += tr("Failed to generate merged subtitle. ")
if success:
# 显示成功消息
st.success(tr("Merge completed!"))
merge_videos_result = (output_video, output_subtitle)
else:
st.error(error_msg)
except Exception as e:
error_message = str(e)
if "moviepy" in error_message.lower():
st.error(tr("Error processing video files. Please check if the videos are valid MP4 files."))
elif "pysrt" in error_message.lower():
st.error(tr("Error processing subtitle files. Please check if the subtitles are valid SRT files."))
success = True
error_msg = ""
# 检查输出文件是否成功生成
if not os.path.exists(output_video):
success = False
error_msg += tr("Failed to generate merged video. ")
if not os.path.exists(output_subtitle):
success = False
error_msg += tr("Failed to generate merged subtitle. ")
if success:
# 显示成功消息
st.success(tr("Merge completed!"))
merge_videos_result = (output_video, output_subtitle)
# 清理临时目录
clean_temp_dir()
else:
st.error(f"{tr('Error during merge')}: {error_message}")
with cols[2]:
# 提供视频和字幕的预览
if merge_videos_result:
with st.popover(tr("Preview Merged Video")):
st.video(merge_videos_result[0], subtitles=merge_videos_result[1])
st.code(f"{tr('Video Path')}: {merge_videos_result[0]}")
st.code(f"{tr('Subtitle Path')}: {merge_videos_result[1]}")
st.error(error_msg)
except Exception as e:
error_message = str(e)
if "moviepy" in error_message.lower():
st.error(tr("Error processing video files. Please check if the videos are valid MP4 files."))
elif "pysrt" in error_message.lower():
st.error(tr("Error processing subtitle files. Please check if the subtitles are valid SRT files."))
else:
st.error(f"{tr('Error during merge')}: {error_message}")
# 合并结果预览放在合并按钮下方
if merge_videos_result:
st.markdown(f"<h3 style='text-align: center'>{tr('Merge Result Preview')}</h3>", unsafe_allow_html=True)
# 使用列布局使视频居中
col1, col2, col3 = st.columns([1,2,1])
with col2:
st.video(merge_videos_result[0])
st.code(f"{tr('Video Path')}: {merge_videos_result[0]}")
st.code(f"{tr('Subtitle Path')}: {merge_videos_result[1]}")
else:
st.warning(tr("No Matched Pairs Found"))
st.warning(tr("No Files Found"))

View File

@ -177,6 +177,17 @@
"Clear tasks": "清理任务",
"Directory cleared": "目录清理完成",
"Directory does not exist": "目录不存在",
"Failed to clear directory": "清理目录失败"
"Failed to clear directory": "清理目录失败",
"Subtitle Preview": "字幕预览",
"One-Click Transcribe": "一键转录",
"Transcribing...": "正在转录中...",
"Transcription Complete!": "转录完成!",
"Transcription Failed. Please try again.": "转录失败,请重试。",
"API rate limit exceeded. Please wait about an hour and try again.": "API 调用次数已达到限制,请等待约一小时后再试。",
"Resources exhausted. Please try again later.": "资源已耗尽,请稍后再试。",
"Transcription Failed": "转录失败",
"Mergeable Files": "可合并文件数",
"Subtitle Content": "字幕内容",
"Merge Result Preview": "合并结果预览"
}
}