NarratoAI/webui/components/script_settings.py
viccy 0bd001ce33 feat(webui, llm, subtitle): 新增字幕校准、多视频支持与LLM生成参数配置
- 添加字幕校准服务,支持通过LLM校对SRT格式字幕文件,支持批量处理
- 为视频参数模型新增video_origin_paths字段,支持多视频上传与批量处理
- 为OpenAI兼容LLM提供商添加temperature、top_p、max_tokens和thinking_level参数配置支持
- 重构WebUI模型设置页面,将通用生成参数配置拆分到各模型的独立配置项中
- 更新示例配置文件与默认配置,新增对应参数的默认值
- 完善多语言国际化文案,添加批量操作与字幕校准相关翻译
- 添加相关单元测试以覆盖新功能与配置项
2026-06-05 23:15:11 +08:00

1239 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import glob
import json
import math
import time
import traceback
import pandas as pd
import streamlit as st
from loguru import logger
from app.config import config
from app.models.schema import VideoClipParams
from app.services.subtitle_text import decode_subtitle_bytes, read_subtitle_text
from app.utils import utils, check_script
from webui.tools.generate_script_docu import generate_script_docu
from webui.tools.generate_script_short import generate_script_short
from webui.tools.generate_short_summary import analyze_short_drama_plot, generate_script_short_sunmmary
SCRIPT_TABLE_BASE_COLUMNS = ["_id", "timestamp", "picture", "narration", "OST"]
VIDEO_UPLOAD_TYPES = ["mp4", "mov", "avi", "flv", "mkv", "mpeg4"]
VIDEO_GLOB_PATTERNS = [f"*.{suffix}" for suffix in VIDEO_UPLOAD_TYPES]
def _normalize_video_paths(paths):
if isinstance(paths, str):
paths = [paths]
if not paths:
return []
normalized_paths = []
seen = set()
for path in paths:
if not isinstance(path, str):
continue
path = path.strip()
if not path or path in seen:
continue
normalized_paths.append(path)
seen.add(path)
return normalized_paths
def _set_video_origin_state(paths, params=None):
video_paths = _normalize_video_paths(paths)
first_video_path = video_paths[0] if video_paths else ""
st.session_state['video_origin_paths'] = video_paths
st.session_state['video_origin_path'] = first_video_path
if params is not None:
params.video_origin_path = first_video_path
params.video_origin_paths = video_paths
def _selected_video_paths():
video_paths = _normalize_video_paths(st.session_state.get('video_origin_paths', []))
if not video_paths:
video_paths = _normalize_video_paths(st.session_state.get('video_origin_path', ''))
return video_paths
def _uploaded_files_signature(uploaded_files):
return "|".join(f"{uploaded_file.name}:{uploaded_file.size}" for uploaded_file in uploaded_files)
def _unique_file_path(directory, filename):
safe_filename = os.path.basename(filename).strip()
if not safe_filename:
safe_filename = f"video_{int(time.time())}.mp4"
os.makedirs(directory, exist_ok=True)
file_name, file_extension = os.path.splitext(safe_filename)
candidate_path = os.path.join(directory, safe_filename)
if not os.path.exists(candidate_path):
return candidate_path
timestamp = time.strftime("%Y%m%d%H%M%S")
counter = 1
while True:
suffix = f"_{timestamp}" if counter == 1 else f"_{timestamp}_{counter}"
candidate_path = os.path.join(directory, f"{file_name}{suffix}{file_extension}")
if not os.path.exists(candidate_path):
return candidate_path
counter += 1
def _format_file_list_for_display(paths, max_items=3):
file_names = [os.path.basename(path) for path in _normalize_video_paths(paths)]
if len(file_names) <= max_items:
return ", ".join(file_names)
visible_names = ", ".join(file_names[:max_items])
return f"{visible_names} +{len(file_names) - max_items}"
def _read_subtitle_file(path):
try:
return read_subtitle_text(path).text
except Exception:
with open(path, "r", encoding="utf-8") as f:
return f.read()
def _build_combined_subtitle_content(subtitle_paths):
sections = []
subtitle_contents = {}
for subtitle_path in subtitle_paths:
if not subtitle_path or not os.path.exists(subtitle_path):
continue
content = _read_subtitle_file(subtitle_path)
subtitle_contents[subtitle_path] = content
sections.append(f"# {os.path.basename(subtitle_path)}\n{content}".strip())
return "\n\n".join(sections), subtitle_contents
def _selected_subtitle_paths():
subtitle_paths = _normalize_video_paths(st.session_state.get('subtitle_paths', []))
if not subtitle_paths:
subtitle_paths = _normalize_video_paths(st.session_state.get('subtitle_path', ''))
return subtitle_paths
def _set_subtitle_state(subtitle_paths):
subtitle_paths = _normalize_video_paths(subtitle_paths)
subtitle_content, subtitle_contents = _build_combined_subtitle_content(subtitle_paths)
st.session_state['subtitle_path'] = subtitle_paths[0] if subtitle_paths else None
st.session_state['subtitle_paths'] = subtitle_paths
st.session_state['subtitle_content'] = subtitle_content if subtitle_content else None
st.session_state['subtitle_contents'] = subtitle_contents
st.session_state['subtitle_file_processed'] = bool(subtitle_paths)
def render_script_panel(tr):
"""渲染脚本配置面板"""
with st.container(border=True):
st.write(tr("Video Script Configuration"))
params = VideoClipParams()
# 渲染脚本文件选择
render_script_file(tr, params)
# 渲染视频文件选择
render_video_file(tr, params)
# 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '')
# 根据脚本类型显示不同的布局
if script_path == "auto":
# 画面解说
render_video_details(tr)
elif script_path == "short":
# 短剧混剪
render_short_generate_options(tr)
elif script_path == "summary":
# 短剧解说
short_drama_summary(tr)
else:
# 默认为空
pass
# 渲染脚本操作按钮
render_script_buttons(tr, params)
def render_script_file(tr, params):
"""渲染脚本文件选择"""
# 定义功能模式
MODE_FILE = "file_selection"
MODE_AUTO = "auto"
MODE_SHORT = "short"
MODE_SUMMARY = "summary"
# 模式选项映射,按工作流优先级展示
mode_options = {
tr("Short Drama Summary"): MODE_SUMMARY,
tr("Auto Generate"): MODE_AUTO,
tr("Short Generate"): MODE_SHORT,
tr("Select/Upload Script"): MODE_FILE,
}
# 获取当前状态
current_path = st.session_state.get('video_clip_json_path', '')
# 确定当前选中的模式索引
default_index = 0
mode_keys = list(mode_options.keys())
if current_path == "auto":
default_index = mode_keys.index(tr("Auto Generate"))
elif current_path == "short":
default_index = mode_keys.index(tr("Short Generate"))
elif current_path == "summary":
default_index = mode_keys.index(tr("Short Drama Summary"))
elif current_path:
default_index = mode_keys.index(tr("Select/Upload Script"))
else:
default_index = 0
# 1. 渲染功能选择组件
default_mode_label = mode_keys[default_index]
default_mode = mode_options[default_mode_label]
if st.session_state.get('_switch_to_file_mode'):
st.session_state['script_mode_selection'] = tr("Select/Upload Script")
del st.session_state['_switch_to_file_mode']
elif (
'script_mode_selection' not in st.session_state
or st.session_state['script_mode_selection'] not in mode_options
):
st.session_state['script_mode_selection'] = default_mode_label
elif mode_options[st.session_state['script_mode_selection']] != default_mode:
st.session_state['script_mode_selection'] = default_mode_label
# 定义回调函数来处理状态更新
def update_script_mode():
# 获取当前选中的标签
selected_label = st.session_state.script_mode_selection
if selected_label:
# 更新实际的 path 状态
new_mode = mode_options[selected_label]
st.session_state.video_clip_json_path = new_mode
params.video_clip_json_path = new_mode
else:
st.session_state.video_clip_json_path = default_mode
params.video_clip_json_path = default_mode
# 渲染组件
selected_mode_label = st.selectbox(
tr("Video Type"),
options=mode_keys,
index=None,
key="script_mode_selection",
on_change=update_script_mode,
)
# 处理旧状态为空的兜底情况
if not selected_mode_label:
selected_mode_label = default_mode_label
selected_mode = mode_options[selected_mode_label]
# 2. 根据选择的模式处理逻辑
if selected_mode == MODE_FILE:
# --- 文件选择模式 ---
script_list = [
(tr("None"), ""),
(tr("Upload Script"), "upload_script")
]
# 获取已有脚本文件
suffix = "*.json"
script_dir = utils.script_dir()
files = glob.glob(os.path.join(script_dir, suffix))
file_list = []
for file in files:
file_list.append({
"name": os.path.basename(file),
"file": file,
"ctime": os.path.getctime(file)
})
file_list.sort(key=lambda x: x["ctime"], reverse=True)
for file in file_list:
display_name = file['file'].replace(config.root_dir, "")
script_list.append((display_name, file['file']))
# 找到保存的脚本文件在列表中的索引
# 如果当前path是特殊值(auto/short/summary),则重置为空
saved_script_path = current_path if current_path not in [MODE_AUTO, MODE_SHORT, MODE_SUMMARY] else ""
selected_index = 0
for i, (_, path) in enumerate(script_list):
if path == saved_script_path:
selected_index = i
break
# 如果找到了保存的脚本,同步更新 selectbox 的 key 状态
if saved_script_path and selected_index > 0:
st.session_state['script_file_selection'] = selected_index
selected_script_index = st.selectbox(
tr("Script Files"),
index=selected_index,
options=range(len(script_list)),
format_func=lambda x: script_list[x][0],
key="script_file_selection"
)
script_path = script_list[selected_script_index][1]
# 只有当用户实际选择了脚本时才更新路径,避免覆盖已保存的路径
if script_path:
st.session_state['video_clip_json_path'] = script_path
params.video_clip_json_path = script_path
elif saved_script_path:
# 如果用户选择了 "None" 但之前有保存的脚本,保持原有路径
st.session_state['video_clip_json_path'] = saved_script_path
params.video_clip_json_path = saved_script_path
# 处理脚本上传
if script_path == "upload_script":
uploaded_file = st.file_uploader(
tr("Upload Script File"),
type=["json"],
accept_multiple_files=False,
)
if uploaded_file is not None:
try:
# 读取上传的JSON内容并验证格式
script_content = uploaded_file.read().decode('utf-8')
json_data = json.loads(script_content)
# 保存到脚本目录
safe_filename = os.path.basename(uploaded_file.name)
script_file_path = os.path.join(script_dir, safe_filename)
file_name, file_extension = os.path.splitext(safe_filename)
# 如果文件已存在,添加时间戳
if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(script_dir, file_name_with_timestamp + file_extension)
# 写入文件
with open(script_file_path, "w", encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
# 更新状态
st.success(tr("Script Uploaded Successfully"))
st.session_state['video_clip_json_path'] = script_file_path
params.video_clip_json_path = script_file_path
time.sleep(1)
st.rerun()
except json.JSONDecodeError:
st.error(tr("Invalid JSON format"))
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
else:
# --- 功能生成模式 ---
st.session_state['video_clip_json_path'] = selected_mode
params.video_clip_json_path = selected_mode
def render_video_file(tr, params):
"""渲染视频文件选择"""
source_options = {
tr("Upload Local Files"): "upload",
tr("Select from resource directory"): "resource",
}
source_labels = list(source_options.keys())
default_source_label = source_labels[0]
source_default_version = "upload_first_v2"
if st.session_state.get('_video_source_default_version') != source_default_version:
if (
st.session_state.get('video_source_selection') not in source_options
or not _selected_video_paths()
):
st.session_state['video_source_selection'] = default_source_label
st.session_state['_video_source_default_version'] = source_default_version
elif st.session_state.get('video_source_selection') not in source_options:
st.session_state['video_source_selection'] = default_source_label
current_source = st.session_state['video_source_selection']
source_caption = (
tr("Select a video from resource videos directory")
if source_options[current_source] == "resource"
else tr("Upload new video files up to 2GB each")
)
st.markdown(f"**{tr('Video Source')}** :gray[{source_caption}]")
source = st.pills(
tr("Video Source"),
options=source_labels,
selection_mode="single",
key="video_source_selection",
label_visibility="collapsed",
width="stretch",
)
if not source:
source = default_source_label
if source_options[source] == "resource":
video_files = []
for suffix in VIDEO_GLOB_PATTERNS:
video_files.extend(glob.glob(os.path.join(utils.video_dir(), suffix)))
video_files = sorted(video_files, key=os.path.getctime, reverse=True)
saved_video_path = st.session_state.get('video_origin_path', '')
selected_video_path = st.session_state.get('resource_video_selection')
if selected_video_path not in video_files:
st.session_state['resource_video_selection'] = (
saved_video_path if saved_video_path in video_files else None
)
def format_video_name(path):
return path.replace(config.root_dir, "")
video_path = st.selectbox(
tr("Select Video"),
options=video_files,
index=None,
placeholder=tr("Choose a video file"),
format_func=format_video_name,
key="resource_video_selection",
)
if video_path:
_set_video_origin_state([video_path], params)
else:
_set_video_origin_state([], params)
if not video_files:
st.info(tr("No video files found in resource videos directory"))
return
if source_options[source] == "upload":
uploaded_files = st.file_uploader(
tr("Upload Video"),
type=VIDEO_UPLOAD_TYPES,
accept_multiple_files=True,
key="video_file_uploader",
)
if not uploaded_files:
_set_video_origin_state([], params)
st.session_state['video_file_processed'] = False
st.session_state['uploaded_video_path'] = ""
st.session_state['uploaded_video_paths'] = []
st.session_state['uploaded_video_signature'] = ""
else:
uploaded_signature = _uploaded_files_signature(uploaded_files)
uploaded_video_paths = _normalize_video_paths(st.session_state.get('uploaded_video_paths', []))
is_processed = (
st.session_state.get('video_file_processed', False)
and st.session_state.get('uploaded_video_signature') == uploaded_signature
and uploaded_video_paths
and all(os.path.exists(path) for path in uploaded_video_paths)
)
if is_processed:
_set_video_origin_state(uploaded_video_paths, params)
else:
video_paths = []
for uploaded_file in uploaded_files:
video_file_path = _unique_file_path(utils.video_dir(), uploaded_file.name)
with open(video_file_path, "wb") as f:
f.write(uploaded_file.read())
video_paths.append(video_file_path)
_set_video_origin_state(video_paths, params)
st.session_state['uploaded_video_path'] = video_paths[0] if video_paths else ""
st.session_state['uploaded_video_paths'] = video_paths
st.session_state['uploaded_video_signature'] = uploaded_signature
st.session_state['video_file_processed'] = True
current_video_paths = _selected_video_paths()
if current_video_paths:
st.info(
tr("Selected videos for processing").format(
count=len(current_video_paths),
files=_format_file_list_for_display(current_video_paths),
)
)
def render_short_generate_options(tr):
"""
渲染Short Generate模式下的特殊选项
在Short Generate模式下替换原有的输入框为自定义片段选项
"""
short_drama_summary(tr)
# 显示自定义片段数量选择器
custom_clips = st.number_input(
tr("自定义片段"),
min_value=1,
max_value=20,
value=st.session_state.get('custom_clips', 5),
help=tr("设置需要生成的短视频片段数量"),
key="custom_clips_input"
)
st.session_state['custom_clips'] = custom_clips
def render_video_details(tr):
"""画面解说 渲染视频主题和提示词"""
video_theme = st.text_input(tr("Video Theme"))
custom_prompt = st.text_area(
tr("Generation Prompt"),
value=st.session_state.get('video_plot', ''),
help=tr("Custom prompt for LLM, leave empty to use default prompt"),
height=180
)
# 非短视频模式下显示原有的三个输入框
input_cols = st.columns(2)
with input_cols[0]:
st.number_input(
tr("Frame Interval (seconds)"),
min_value=0,
value=st.session_state.get('frame_interval_input', config.frames.get('frame_interval_input', 3)),
help=tr("Frame Interval (seconds) (More keyframes consume more tokens)"),
key="frame_interval_input"
)
with input_cols[1]:
st.number_input(
tr("Batch Size"),
min_value=0,
value=st.session_state.get('vision_batch_size', config.frames.get('vision_batch_size', 10)),
help=tr("Batch Size (More keyframes consume more tokens)"),
key="vision_batch_size"
)
st.session_state['video_theme'] = video_theme
st.session_state['custom_prompt'] = custom_prompt
return video_theme, custom_prompt
def short_drama_summary(tr):
"""短剧解说 渲染视频主题和提示词"""
# 检查是否已经处理过字幕文件
if 'subtitle_file_processed' not in st.session_state:
st.session_state['subtitle_file_processed'] = False
render_fun_asr_transcription(tr)
render_subtitle_preview(tr)
current_subtitle_path = st.session_state.get('subtitle_path', '')
plot_analysis_source = st.session_state.get('short_drama_plot_analysis_subtitle_path')
if plot_analysis_source and plot_analysis_source != current_subtitle_path:
st.session_state['short_drama_plot_analysis'] = ""
st.session_state['short_drama_plot_analysis_subtitle_path'] = ""
name_cols = st.columns([4, 1.2], vertical_alignment="bottom")
with name_cols[0]:
video_theme = st.text_input(tr("短剧名称"))
with name_cols[1]:
analyze_plot_clicked = st.button(
tr("剧情理解"),
key="short_drama_plot_analysis_button",
disabled=not current_subtitle_path,
use_container_width=True,
)
st.session_state['video_theme'] = video_theme
if analyze_plot_clicked:
with st.spinner(tr("Analyzing plot...")):
plot_analysis = analyze_short_drama_plot(
current_subtitle_path,
st.session_state.get('temperature', 0.7),
tr,
subtitle_content=st.session_state.get('subtitle_content', ''),
)
if plot_analysis:
st.session_state['short_drama_plot_analysis'] = plot_analysis
st.session_state['short_drama_plot_analysis_subtitle_path'] = current_subtitle_path
st.success(tr("Plot analysis completed"))
if st.session_state.get('short_drama_plot_analysis'):
st.text_area(
tr("剧情理解结果"),
key="short_drama_plot_analysis",
height=240,
)
return video_theme
def render_subtitle_preview(tr):
"""渲染可折叠的当前字幕预览;没有字幕时提示用户先转写或上传。"""
subtitle_paths = _selected_subtitle_paths()
subtitle_content = st.session_state.get('subtitle_content', '')
subtitle_contents = st.session_state.get('subtitle_contents', {})
if not isinstance(subtitle_contents, dict):
subtitle_contents = {}
if subtitle_paths and (not subtitle_content or not subtitle_contents):
subtitle_content, subtitle_contents = _build_combined_subtitle_content(subtitle_paths)
st.session_state['subtitle_content'] = subtitle_content
st.session_state['subtitle_contents'] = subtitle_contents
with st.expander(tr("Subtitle Preview"), expanded=False):
if not subtitle_paths or not subtitle_content:
st.info(tr("Please transcribe or upload subtitles first"))
return
if len(subtitle_paths) > 1:
for index, path in enumerate(subtitle_paths, start=1):
content = subtitle_contents.get(path, "")
if not content and os.path.exists(path):
content = _read_subtitle_file(path)
st.markdown(f"**{index}. {os.path.basename(path)}**")
st.text_area(
tr("Subtitle Preview"),
value=content,
height=180,
label_visibility="collapsed",
disabled=True,
key=f"subtitle_content_preview_{index}",
)
return
st.text_area(
tr("Subtitle Preview"),
key="subtitle_content",
height=180,
label_visibility="collapsed",
)
def render_subtitle_upload(tr):
"""上传并保存用户提供的 SRT 字幕文件。"""
subtitle_dir_label = utils.subtitle_dir().replace(config.root_dir, ".")
st.markdown(
f"**{tr('上传字幕文件')}** "
f":gray[{tr('Transcribed subtitles storage hint').format(path=subtitle_dir_label)}]"
)
subtitle_file = st.file_uploader(
tr("上传字幕文件"),
type=["srt"],
accept_multiple_files=False,
key="subtitle_file_uploader", # 添加唯一key
label_visibility="collapsed",
)
# 显示当前已上传的字幕文件路径
if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']:
st.info(tr("Uploaded subtitle").format(file=os.path.basename(st.session_state['subtitle_path'])))
if st.button(tr("清除已上传字幕")):
_set_subtitle_state([])
st.rerun()
# 只有当有文件上传且尚未处理时才执行处理逻辑
if subtitle_file is not None and not st.session_state['subtitle_file_processed']:
try:
# 清理文件名,防止路径污染和路径遍历攻击
safe_filename = os.path.basename(subtitle_file.name)
decoded = decode_subtitle_bytes(subtitle_file.getvalue())
script_content = decoded.text
detected_encoding = decoded.encoding
if not script_content:
st.error(tr("无法读取字幕文件,请检查文件编码(支持 UTF-8、UTF-16、GBK、GB2312"))
st.stop()
# 验证字幕内容(简单检查)
if len(script_content.strip()) < 10:
st.warning(tr("字幕文件内容似乎为空,请检查文件"))
# 保存到字幕目录
script_file_path = os.path.join(utils.subtitle_dir(), safe_filename)
file_name, file_extension = os.path.splitext(safe_filename)
# 如果文件已存在,添加时间戳
if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension)
# 直接写入SRT内容统一使用 UTF-8
with open(script_file_path, "w", encoding='utf-8') as f:
f.write(script_content)
# 更新状态
st.success(
f"{tr('字幕上传成功')} "
f"({tr('Encoding')}: {detected_encoding.upper()}, "
f"{tr('Size')}: {len(script_content)} {tr('Characters')})"
)
_set_subtitle_state([script_file_path])
# 避免使用rerun使用更新状态的方式
# st.rerun()
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
def _is_blank_table_value(value):
if value is None:
return True
if isinstance(value, float) and math.isnan(value):
return True
if isinstance(value, str) and not value.strip():
return True
return False
def _ordered_script_columns(script_rows):
columns = []
for column in SCRIPT_TABLE_BASE_COLUMNS:
columns.append(column)
for row in script_rows:
if not isinstance(row, dict):
continue
for column in row.keys():
if column not in columns:
columns.append(column)
return columns
def _script_json_to_table(script_data):
if not isinstance(script_data, list):
script_data = []
if not script_data:
return pd.DataFrame(columns=SCRIPT_TABLE_BASE_COLUMNS)
if not all(isinstance(item, dict) for item in script_data):
rows = [
{"value": json.dumps(item, ensure_ascii=False)}
for item in script_data
]
return pd.DataFrame(rows, columns=["value"])
columns = _ordered_script_columns(script_data)
return pd.DataFrame(script_data, columns=columns)
def _normalize_script_table_value(column, value):
if _is_blank_table_value(value):
return ""
if column in {"_id", "OST"}:
try:
return int(value)
except (TypeError, ValueError):
return value
return value
def _script_table_to_json(edited_data):
if isinstance(edited_data, pd.DataFrame):
records = edited_data.to_dict("records")
elif isinstance(edited_data, list):
records = edited_data
else:
records = pd.DataFrame(edited_data).to_dict("records")
script_data = []
for row in records:
if not isinstance(row, dict):
continue
if all(_is_blank_table_value(value) for value in row.values()):
continue
cleaned_row = {}
for column, value in row.items():
if not column:
continue
normalized_value = _normalize_script_table_value(column, value)
if _is_blank_table_value(normalized_value) and column not in SCRIPT_TABLE_BASE_COLUMNS:
continue
cleaned_row[column] = normalized_value
if cleaned_row:
script_data.append(cleaned_row)
return json.dumps(script_data, indent=2, ensure_ascii=False)
def render_video_script_editor(tr):
"""使用弹窗和表格编辑视频脚本 JSON。"""
@st.dialog(tr("Video Script"), width="large")
def video_script_dialog():
script_data = st.session_state.get('video_clip_json', [])
table_data = _script_json_to_table(script_data)
column_order = list(table_data.columns)
st.caption(tr("Video script table help"))
edited_table = st.data_editor(
table_data,
key="video_script_table_editor",
hide_index=True,
num_rows="dynamic",
use_container_width=True,
height=520,
row_height=72,
column_order=column_order,
column_config={
"_id": st.column_config.NumberColumn(tr("Script Column ID"), step=1, format="%d", width=52),
"timestamp": st.column_config.TextColumn(tr("Script Column Timestamp"), width=200),
"picture": st.column_config.TextColumn(tr("Script Column Picture"), width=320),
"narration": st.column_config.TextColumn(tr("Script Column Narration"), width=480),
"OST": st.column_config.NumberColumn(
tr("Script Column OST"),
min_value=0,
max_value=2,
step=1,
format="%d",
width=52,
),
},
)
video_clip_json_details = _script_table_to_json(edited_table)
with st.expander(tr("Raw JSON Preview"), expanded=False):
st.code(video_clip_json_details, language="json")
if st.button(tr("Save Script"), key="save_script_from_dialog", use_container_width=True):
save_script_with_validation(tr, video_clip_json_details)
script_data = st.session_state.get('video_clip_json', [])
script_count = len(script_data) if isinstance(script_data, list) else 0
st.markdown(f"**{tr('Video Script')}** :gray[{tr('Video script row count').format(count=script_count)}]")
if st.button(tr("Edit Video Script"), key="open_video_script_editor", use_container_width=True):
video_script_dialog()
def render_fun_asr_transcription(tr):
"""使用 Fun-ASR 从本地音视频转写生成字幕。"""
def clear_fun_asr_subtitle_state():
_set_subtitle_state([])
from app.services import fun_asr_subtitle
backend_options = {
tr("Local FunASR-Pack API"): "local",
tr("Ali Bailian Online Fun-ASR"): "bailian",
tr("上传字幕文件"): "upload",
}
saved_backend = str(config.fun_asr.get("backend", "")).strip().lower()
if saved_backend not in {"local", "bailian", "upload"}:
saved_backend = (
"bailian"
if config.fun_asr.get("api_key") and not config.fun_asr.get("api_url")
else "local"
)
backend_values = list(backend_options.values())
backend_labels = list(backend_options.keys())
backend = saved_backend
api_key = ""
api_url = config.fun_asr.get("api_url", fun_asr_subtitle.LOCAL_FUN_ASR_API_URL)
hotword = config.fun_asr.get("hotword", "")
enable_spk = bool(config.fun_asr.get("enable_spk", False))
media_paths = _selected_video_paths()
subtitle_cols = st.columns([3, 2], vertical_alignment="top")
with subtitle_cols[0]:
with st.expander(tr("Ali Bailian Fun-ASR Subtitle Transcription"), expanded=False):
backend_label = st.radio(
tr("Subtitle Processing Method"),
options=backend_labels,
index=backend_values.index(saved_backend),
horizontal=True,
key="fun_asr_backend",
)
backend = backend_options[backend_label]
if backend == "upload":
render_subtitle_upload(tr)
elif backend == "local":
st.caption(tr("Local Fun-ASR upload caption"))
api_url = st.text_input(
tr("Local FunASR-Pack API URL"),
value=api_url,
help=tr("Local FunASR-Pack API URL Help"),
key="fun_asr_api_url",
)
hotword = st.text_input(
tr("Fun-ASR Hotword"),
value=hotword,
help=tr("Fun-ASR Hotword Help"),
key="fun_asr_hotword",
)
enable_spk = st.checkbox(
tr("Enable speaker diarization"),
value=enable_spk,
help=tr("Enable speaker diarization Help"),
key="fun_asr_enable_spk",
)
else:
st.caption(tr("Fun-ASR upload caption"))
st.markdown(
f"{tr('API Key URL')}: "
"[https://bailian.console.aliyun.com/?tab=model#/api-key]"
"(https://bailian.console.aliyun.com/?tab=model#/api-key)"
)
api_key = st.text_input(
tr("Ali Bailian API Key"),
value=config.fun_asr.get("api_key", ""),
type="password",
help=tr("Ali Bailian API Key Help"),
key="fun_asr_api_key",
)
if backend != "upload":
if media_paths:
if len(media_paths) == 1:
st.info(
tr("Using selected video for subtitle transcription").format(
file=os.path.basename(media_paths[0])
)
)
else:
st.info(
tr("Using selected videos for subtitle transcription").format(
count=len(media_paths),
files=_format_file_list_for_display(media_paths),
)
)
else:
st.warning(tr("Please select or upload a video first"))
# 上传字幕面板会在本轮渲染中更新 session_state这里重新读取一次保证按钮状态同步。
subtitle_paths = _selected_subtitle_paths()
can_transcribe = backend != "upload" and bool(media_paths)
can_correct_subtitles = bool(subtitle_paths)
with subtitle_cols[1]:
action_cols = st.columns(2)
with action_cols[0]:
transcribe_clicked = st.button(
tr("Transcribe subtitles"),
key="fun_asr_transcribe",
disabled=not can_transcribe,
use_container_width=True,
)
with action_cols[1]:
correct_clicked = st.button(
tr("Calibrate subtitles"),
key="subtitle_correct",
disabled=not can_correct_subtitles,
use_container_width=True,
)
if correct_clicked:
from app.services import subtitle_corrector
text_provider = config.app.get('text_llm_provider', 'openai').lower()
text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_base_url = config.app.get(f'text_{text_provider}_base_url')
corrected_paths = []
try:
spinner_text = tr("Calibrating subtitles...")
with st.spinner(spinner_text):
progress_bar = st.progress(0) if len(subtitle_paths) > 1 else None
for index, subtitle_path in enumerate(subtitle_paths, start=1):
subtitle_name = f"{os.path.splitext(os.path.basename(subtitle_path))[0]}_corrected.srt"
output_path = _unique_file_path(utils.subtitle_dir(), subtitle_name)
corrected_path = subtitle_corrector.correct_subtitle_file(
subtitle_file=subtitle_path,
output_file=output_path,
provider=text_provider,
api_key=text_api_key,
base_url=text_base_url,
)
corrected_paths.append(corrected_path)
if progress_bar:
progress_bar.progress(index / len(subtitle_paths))
if progress_bar:
progress_bar.empty()
_set_subtitle_state(corrected_paths)
success_placeholder = st.empty()
if len(corrected_paths) == 1:
success_placeholder.success(
tr("Subtitle calibration succeeded").format(file=os.path.basename(corrected_paths[0]))
)
else:
success_placeholder.success(
tr("Subtitle calibration succeeded for multiple files").format(
count=len(corrected_paths),
files=_format_file_list_for_display(corrected_paths),
)
)
time.sleep(3)
success_placeholder.empty()
except Exception as e:
logger.error(f"字幕校准失败: {traceback.format_exc()}")
st.error(f"{tr('Subtitle calibration failed')}: {str(e)}")
return
if not transcribe_clicked:
return
if backend == "bailian" and not api_key.strip():
clear_fun_asr_subtitle_state()
st.error(tr("Please enter Ali Bailian API Key"))
return
if backend == "local" and not str(api_url).strip():
clear_fun_asr_subtitle_state()
st.error(tr("Please enter local FunASR-Pack API URL"))
return
missing_paths = [path for path in media_paths if not os.path.exists(path)]
if not media_paths or missing_paths:
clear_fun_asr_subtitle_state()
if missing_paths:
st.error(
tr("Selected video files do not exist").format(
files=_format_file_list_for_display(missing_paths)
)
)
else:
st.error(tr("Selected video file does not exist"))
return
try:
clear_fun_asr_subtitle_state()
config.fun_asr["backend"] = backend
config.fun_asr["api_url"] = str(api_url).strip()
config.fun_asr["api_key"] = api_key.strip()
config.fun_asr["hotword"] = str(hotword).strip()
config.fun_asr["enable_spk"] = bool(enable_spk)
config.fun_asr["model"] = "fun-asr"
config.save_config()
spinner_text = (
tr("Transcribing with local FunASR-Pack...")
if backend == "local"
else tr("Transcribing with Fun-ASR...")
)
with st.spinner(spinner_text):
progress_bar = st.progress(0) if len(media_paths) > 1 else None
generated_paths = []
for index, media_path in enumerate(media_paths, start=1):
subtitle_name = f"{os.path.splitext(os.path.basename(media_path))[0]}_fun_asr.srt"
subtitle_path = _unique_file_path(utils.subtitle_dir(), subtitle_name)
if backend == "local":
generated_path = fun_asr_subtitle.create_with_local_fun_asr(
local_file=media_path,
subtitle_file=subtitle_path,
api_url=str(api_url).strip(),
hotword=str(hotword).strip(),
enable_spk=bool(enable_spk),
)
else:
generated_path = fun_asr_subtitle.create_with_fun_asr(
local_file=media_path,
subtitle_file=subtitle_path,
api_key=api_key.strip(),
)
if not generated_path or not os.path.exists(generated_path):
raise RuntimeError(tr("Fun-ASR failed without subtitle file"))
generated_paths.append(generated_path)
if progress_bar:
progress_bar.progress(index / len(media_paths))
if progress_bar:
progress_bar.empty()
if not generated_paths:
clear_fun_asr_subtitle_state()
st.error(tr("Fun-ASR failed without subtitle file"))
return
subtitle_content, subtitle_contents = _build_combined_subtitle_content(generated_paths)
if not subtitle_content.strip():
clear_fun_asr_subtitle_state()
st.error(tr("Fun-ASR failed without subtitle file"))
return
_set_subtitle_state(generated_paths)
success_placeholder = st.empty()
if len(generated_paths) == 1:
success_placeholder.success(
tr("Subtitle transcription succeeded").format(file=os.path.basename(generated_paths[0]))
)
else:
success_placeholder.success(
tr("Subtitle transcription succeeded for multiple files").format(
count=len(generated_paths),
files=_format_file_list_for_display(generated_paths),
)
)
time.sleep(3)
success_placeholder.empty()
st.rerun()
except Exception as e:
clear_fun_asr_subtitle_state()
logger.error(f"Fun-ASR 字幕转写失败: {traceback.format_exc()}")
st.error(f"{tr('Fun-ASR transcription failed')}: {str(e)}")
def render_script_buttons(tr, params):
"""渲染脚本操作按钮"""
# 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '')
# 生成/加载按钮
if script_path == "auto":
button_name = tr("Generate Video Script")
elif script_path == "short":
button_name = tr("Generate Short Video Script")
elif script_path == "summary":
button_name = tr("生成短剧解说脚本")
elif script_path.endswith("json"):
button_name = tr("Load Video Script")
else:
button_name = tr("Please Select Script File")
if st.button(button_name, key="script_action", disabled=not script_path):
if script_path == "auto":
# 执行纪录片视频脚本生成(视频无字幕无配音)
generate_script_docu(params, tr)
elif script_path == "short":
# 执行 短剧混剪 脚本生成
custom_clips = st.session_state.get('custom_clips')
generate_script_short(tr, params, custom_clips)
elif script_path == "summary":
# 执行 短剧解说 脚本生成
subtitle_path = st.session_state.get('subtitle_path')
video_theme = st.session_state.get('video_theme')
temperature = st.session_state.get('temperature')
plot_analysis = ""
if st.session_state.get('short_drama_plot_analysis_subtitle_path') == subtitle_path:
plot_analysis = st.session_state.get('short_drama_plot_analysis', '')
generate_script_short_sunmmary(
params,
subtitle_path,
video_theme,
temperature,
tr,
plot_analysis=plot_analysis,
subtitle_content=st.session_state.get('subtitle_content', ''),
)
else:
load_script(tr, script_path)
render_video_script_editor(tr)
def load_script(tr, script_path):
"""加载脚本文件"""
try:
with open(script_path, 'r', encoding='utf-8') as f:
script = f.read()
script = utils.clean_model_output(script)
st.session_state['video_clip_json'] = json.loads(script)
st.success(tr("Script loaded successfully"))
st.rerun()
except Exception as e:
logger.error(f"加载脚本文件时发生错误\n{traceback.format_exc()}")
st.error(f"{tr('Failed to load script')}: {str(e)}")
def save_script_with_validation(tr, video_clip_json_details):
"""保存视频脚本(包含格式验证)"""
if not video_clip_json_details:
st.error(tr("请输入视频脚本"))
st.stop()
# 第一步:格式验证
with st.spinner(tr("Validating script format...")):
try:
result = check_script.check_format(video_clip_json_details)
if not result.get('success'):
# 格式验证失败,显示详细错误信息
error_message = result.get('message', '未知错误')
error_details = result.get('details', '')
st.error(f"**{tr('Script format validation failed')}**")
st.error(f"**{tr('Error Message')}:** {error_message}")
if error_details:
st.error(f"**{tr('Details')}:** {error_details}")
# 显示正确格式示例
st.info(f"**{tr('Correct script format example')}:**")
example_script = [
{
"_id": 1,
"timestamp": "00:00:00,600-00:00:07,559",
"picture": "工地上,蔡晓艳奋力救人,场面混乱",
"narration": "灾后重建,工地上险象环生!泼辣女工蔡晓艳挺身而出,救人第一!",
"OST": 0
},
{
"_id": 2,
"timestamp": "00:00:08,240-00:00:12,359",
"picture": "领导视察,蔡晓艳不屑一顾",
"narration": "播放原片4",
"OST": 1
}
]
st.code(json.dumps(example_script, ensure_ascii=False, indent=2), language='json')
st.stop()
except Exception as e:
st.error(f"{tr('Script format validation error')}: {str(e)}")
st.stop()
# 第二步:保存脚本
with st.spinner(tr("Save Script")):
script_dir = utils.script_dir()
timestamp = time.strftime("%Y-%m%d-%H%M%S")
save_path = os.path.join(script_dir, f"{timestamp}.json")
try:
data = json.loads(video_clip_json_details)
with open(save_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
st.session_state['video_clip_json'] = data
st.session_state['video_clip_json_path'] = save_path
# 标记需要切换到文件选择模式(在下次渲染前处理)
st.session_state['_switch_to_file_mode'] = True
# 更新配置
config.app["video_clip_json_path"] = save_path
# 显示成功消息
st.success(tr("Script validated and saved successfully"))
# 强制重新加载页面更新选择框
time.sleep(0.5) # 给一点时间让用户看到成功消息
st.rerun()
except Exception as err:
st.error(f"{tr('Failed to save script')}: {str(err)}")
st.stop()
# crop_video函数已移除 - 现在使用统一裁剪策略,不再需要预裁剪步骤
def get_script_params():
"""获取脚本参数"""
return {
'video_language': st.session_state.get('video_language', ''),
'video_clip_json_path': st.session_state.get('video_clip_json_path', ''),
'video_origin_path': st.session_state.get('video_origin_path', ''),
'video_origin_paths': _selected_video_paths(),
'video_name': st.session_state.get('video_name', ''),
'video_plot': st.session_state.get('video_plot', '')
}