NarratoAI/webui/components/script_settings.py
viccy e6d15fe246 feat(webui): 新增短剧剧情分析、可视化脚本编辑器与通用生成参数设置
- 抽离通用生成参数设置组件,统一管理temperature等LLM生成参数
- 新增短剧字幕剧情分析功能,支持一键分析与手动编辑分析结果
- 重构短剧脚本生成逻辑,支持传入预先生成的剧情分析内容
- 新增可视化视频脚本表格编辑器,支持增删编辑行与原始JSON预览
- 优化多语言翻译、UI交互细节与字幕相关提示文案
2026-06-05 19:31:35 +08:00

1013 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import glob
import json
import math
import time
import traceback
import pandas as pd
import streamlit as st
from loguru import logger
from app.config import config
from app.models.schema import VideoClipParams
from app.services.subtitle_text import decode_subtitle_bytes, read_subtitle_text
from app.utils import utils, check_script
from webui.tools.generate_script_docu import generate_script_docu
from webui.tools.generate_script_short import generate_script_short
from webui.tools.generate_short_summary import analyze_short_drama_plot, generate_script_short_sunmmary
SCRIPT_TABLE_BASE_COLUMNS = ["_id", "timestamp", "picture", "narration", "OST"]
def render_script_panel(tr):
"""渲染脚本配置面板"""
with st.container(border=True):
st.write(tr("Video Script Configuration"))
params = VideoClipParams()
# 渲染脚本文件选择
render_script_file(tr, params)
# 渲染视频文件选择
render_video_file(tr, params)
# 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '')
# 根据脚本类型显示不同的布局
if script_path == "auto":
# 画面解说
render_video_details(tr)
elif script_path == "short":
# 短剧混剪
render_short_generate_options(tr)
elif script_path == "summary":
# 短剧解说
short_drama_summary(tr)
else:
# 默认为空
pass
# 渲染脚本操作按钮
render_script_buttons(tr, params)
def render_script_file(tr, params):
"""渲染脚本文件选择"""
# 定义功能模式
MODE_FILE = "file_selection"
MODE_AUTO = "auto"
MODE_SHORT = "short"
MODE_SUMMARY = "summary"
# 模式选项映射,按工作流优先级展示
mode_options = {
tr("Short Drama Summary"): MODE_SUMMARY,
tr("Auto Generate"): MODE_AUTO,
tr("Short Generate"): MODE_SHORT,
tr("Select/Upload Script"): MODE_FILE,
}
# 获取当前状态
current_path = st.session_state.get('video_clip_json_path', '')
# 确定当前选中的模式索引
default_index = 0
mode_keys = list(mode_options.keys())
if current_path == "auto":
default_index = mode_keys.index(tr("Auto Generate"))
elif current_path == "short":
default_index = mode_keys.index(tr("Short Generate"))
elif current_path == "summary":
default_index = mode_keys.index(tr("Short Drama Summary"))
elif current_path:
default_index = mode_keys.index(tr("Select/Upload Script"))
else:
default_index = 0
# 1. 渲染功能选择组件
default_mode_label = mode_keys[default_index]
default_mode = mode_options[default_mode_label]
if st.session_state.get('_switch_to_file_mode'):
st.session_state['script_mode_selection'] = tr("Select/Upload Script")
del st.session_state['_switch_to_file_mode']
elif (
'script_mode_selection' not in st.session_state
or st.session_state['script_mode_selection'] not in mode_options
):
st.session_state['script_mode_selection'] = default_mode_label
elif mode_options[st.session_state['script_mode_selection']] != default_mode:
st.session_state['script_mode_selection'] = default_mode_label
# 定义回调函数来处理状态更新
def update_script_mode():
# 获取当前选中的标签
selected_label = st.session_state.script_mode_selection
if selected_label:
# 更新实际的 path 状态
new_mode = mode_options[selected_label]
st.session_state.video_clip_json_path = new_mode
params.video_clip_json_path = new_mode
else:
st.session_state.video_clip_json_path = default_mode
params.video_clip_json_path = default_mode
# 渲染组件
selected_mode_label = st.selectbox(
tr("Video Type"),
options=mode_keys,
index=None,
key="script_mode_selection",
on_change=update_script_mode,
)
# 处理旧状态为空的兜底情况
if not selected_mode_label:
selected_mode_label = default_mode_label
selected_mode = mode_options[selected_mode_label]
# 2. 根据选择的模式处理逻辑
if selected_mode == MODE_FILE:
# --- 文件选择模式 ---
script_list = [
(tr("None"), ""),
(tr("Upload Script"), "upload_script")
]
# 获取已有脚本文件
suffix = "*.json"
script_dir = utils.script_dir()
files = glob.glob(os.path.join(script_dir, suffix))
file_list = []
for file in files:
file_list.append({
"name": os.path.basename(file),
"file": file,
"ctime": os.path.getctime(file)
})
file_list.sort(key=lambda x: x["ctime"], reverse=True)
for file in file_list:
display_name = file['file'].replace(config.root_dir, "")
script_list.append((display_name, file['file']))
# 找到保存的脚本文件在列表中的索引
# 如果当前path是特殊值(auto/short/summary),则重置为空
saved_script_path = current_path if current_path not in [MODE_AUTO, MODE_SHORT, MODE_SUMMARY] else ""
selected_index = 0
for i, (_, path) in enumerate(script_list):
if path == saved_script_path:
selected_index = i
break
# 如果找到了保存的脚本,同步更新 selectbox 的 key 状态
if saved_script_path and selected_index > 0:
st.session_state['script_file_selection'] = selected_index
selected_script_index = st.selectbox(
tr("Script Files"),
index=selected_index,
options=range(len(script_list)),
format_func=lambda x: script_list[x][0],
key="script_file_selection"
)
script_path = script_list[selected_script_index][1]
# 只有当用户实际选择了脚本时才更新路径,避免覆盖已保存的路径
if script_path:
st.session_state['video_clip_json_path'] = script_path
params.video_clip_json_path = script_path
elif saved_script_path:
# 如果用户选择了 "None" 但之前有保存的脚本,保持原有路径
st.session_state['video_clip_json_path'] = saved_script_path
params.video_clip_json_path = saved_script_path
# 处理脚本上传
if script_path == "upload_script":
uploaded_file = st.file_uploader(
tr("Upload Script File"),
type=["json"],
accept_multiple_files=False,
)
if uploaded_file is not None:
try:
# 读取上传的JSON内容并验证格式
script_content = uploaded_file.read().decode('utf-8')
json_data = json.loads(script_content)
# 保存到脚本目录
safe_filename = os.path.basename(uploaded_file.name)
script_file_path = os.path.join(script_dir, safe_filename)
file_name, file_extension = os.path.splitext(safe_filename)
# 如果文件已存在,添加时间戳
if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(script_dir, file_name_with_timestamp + file_extension)
# 写入文件
with open(script_file_path, "w", encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
# 更新状态
st.success(tr("Script Uploaded Successfully"))
st.session_state['video_clip_json_path'] = script_file_path
params.video_clip_json_path = script_file_path
time.sleep(1)
st.rerun()
except json.JSONDecodeError:
st.error(tr("Invalid JSON format"))
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
else:
# --- 功能生成模式 ---
st.session_state['video_clip_json_path'] = selected_mode
params.video_clip_json_path = selected_mode
def render_video_file(tr, params):
"""渲染视频文件选择"""
source_options = {
tr("Upload Local Files"): "upload",
tr("Select from resource directory"): "resource",
}
source_labels = list(source_options.keys())
default_source_label = source_labels[0]
source_default_version = "upload_first_v1"
if st.session_state.get('_video_source_default_version') != source_default_version:
if (
st.session_state.get('video_source_selection') not in source_options
or not st.session_state.get('video_origin_path')
):
st.session_state['video_source_selection'] = default_source_label
st.session_state['_video_source_default_version'] = source_default_version
elif st.session_state.get('video_source_selection') not in source_options:
st.session_state['video_source_selection'] = default_source_label
current_source = st.session_state['video_source_selection']
source_caption = (
tr("Select a video from resource videos directory")
if source_options[current_source] == "resource"
else tr("Upload a new video file up to 2GB")
)
st.markdown(f"**{tr('Video Source')}** :gray[{source_caption}]")
source = st.pills(
tr("Video Source"),
options=source_labels,
selection_mode="single",
key="video_source_selection",
label_visibility="collapsed",
width="stretch",
)
if not source:
source = default_source_label
if source_options[source] == "resource":
video_files = []
for suffix in ["*.mp4", "*.mov", "*.avi", "*.flv", "*.mkv", "*.mpeg4"]:
video_files.extend(glob.glob(os.path.join(utils.video_dir(), suffix)))
video_files = sorted(video_files, key=os.path.getctime, reverse=True)
saved_video_path = st.session_state.get('video_origin_path', '')
selected_video_path = st.session_state.get('resource_video_selection')
if selected_video_path not in video_files:
st.session_state['resource_video_selection'] = (
saved_video_path if saved_video_path in video_files else None
)
def format_video_name(path):
return path.replace(config.root_dir, "")
video_path = st.selectbox(
tr("Select Video"),
options=video_files,
index=None,
placeholder=tr("Choose a video file"),
format_func=format_video_name,
key="resource_video_selection",
)
if video_path:
st.session_state['video_origin_path'] = video_path
params.video_origin_path = video_path
else:
st.session_state['video_origin_path'] = ""
params.video_origin_path = ""
if not video_files:
st.info(tr("No video files found in resource videos directory"))
return
if source_options[source] == "upload":
uploaded_file = st.file_uploader(
tr("Upload Video"),
type=["mp4", "mov", "avi", "flv", "mkv", "mpeg4"],
accept_multiple_files=False,
key="video_file_uploader",
)
if uploaded_file is None:
st.session_state['video_origin_path'] = ""
params.video_origin_path = ""
st.session_state['video_file_processed'] = False
st.session_state['uploaded_video_path'] = ""
st.session_state['uploaded_video_signature'] = ""
else:
uploaded_signature = f"{uploaded_file.name}:{uploaded_file.size}"
uploaded_video_path = st.session_state.get('uploaded_video_path', '')
is_processed = (
st.session_state.get('video_file_processed', False)
and st.session_state.get('uploaded_video_signature') == uploaded_signature
and uploaded_video_path
)
if is_processed:
st.session_state['video_origin_path'] = uploaded_video_path
params.video_origin_path = uploaded_video_path
else:
safe_filename = os.path.basename(uploaded_file.name)
video_file_path = os.path.join(utils.video_dir(), safe_filename)
file_name, file_extension = os.path.splitext(safe_filename)
if os.path.exists(video_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
video_file_path = os.path.join(utils.video_dir(), file_name_with_timestamp + file_extension)
with open(video_file_path, "wb") as f:
f.write(uploaded_file.read())
st.session_state['video_origin_path'] = video_file_path
params.video_origin_path = video_file_path
st.session_state['uploaded_video_path'] = video_file_path
st.session_state['uploaded_video_signature'] = uploaded_signature
st.session_state['video_file_processed'] = True
def render_short_generate_options(tr):
"""
渲染Short Generate模式下的特殊选项
在Short Generate模式下替换原有的输入框为自定义片段选项
"""
short_drama_summary(tr)
# 显示自定义片段数量选择器
custom_clips = st.number_input(
tr("自定义片段"),
min_value=1,
max_value=20,
value=st.session_state.get('custom_clips', 5),
help=tr("设置需要生成的短视频片段数量"),
key="custom_clips_input"
)
st.session_state['custom_clips'] = custom_clips
def render_video_details(tr):
"""画面解说 渲染视频主题和提示词"""
video_theme = st.text_input(tr("Video Theme"))
custom_prompt = st.text_area(
tr("Generation Prompt"),
value=st.session_state.get('video_plot', ''),
help=tr("Custom prompt for LLM, leave empty to use default prompt"),
height=180
)
# 非短视频模式下显示原有的三个输入框
input_cols = st.columns(2)
with input_cols[0]:
st.number_input(
tr("Frame Interval (seconds)"),
min_value=0,
value=st.session_state.get('frame_interval_input', config.frames.get('frame_interval_input', 3)),
help=tr("Frame Interval (seconds) (More keyframes consume more tokens)"),
key="frame_interval_input"
)
with input_cols[1]:
st.number_input(
tr("Batch Size"),
min_value=0,
value=st.session_state.get('vision_batch_size', config.frames.get('vision_batch_size', 10)),
help=tr("Batch Size (More keyframes consume more tokens)"),
key="vision_batch_size"
)
st.session_state['video_theme'] = video_theme
st.session_state['custom_prompt'] = custom_prompt
return video_theme, custom_prompt
def short_drama_summary(tr):
"""短剧解说 渲染视频主题和提示词"""
# 检查是否已经处理过字幕文件
if 'subtitle_file_processed' not in st.session_state:
st.session_state['subtitle_file_processed'] = False
render_fun_asr_transcription(tr)
render_subtitle_preview(tr)
current_subtitle_path = st.session_state.get('subtitle_path', '')
plot_analysis_source = st.session_state.get('short_drama_plot_analysis_subtitle_path')
if plot_analysis_source and plot_analysis_source != current_subtitle_path:
st.session_state['short_drama_plot_analysis'] = ""
st.session_state['short_drama_plot_analysis_subtitle_path'] = ""
name_cols = st.columns([4, 1.2], vertical_alignment="bottom")
with name_cols[0]:
video_theme = st.text_input(tr("短剧名称"))
with name_cols[1]:
analyze_plot_clicked = st.button(
tr("剧情理解"),
key="short_drama_plot_analysis_button",
disabled=not current_subtitle_path,
use_container_width=True,
)
st.session_state['video_theme'] = video_theme
if analyze_plot_clicked:
with st.spinner(tr("Analyzing plot...")):
plot_analysis = analyze_short_drama_plot(
current_subtitle_path,
st.session_state.get('temperature', 0.7),
tr,
subtitle_content=st.session_state.get('subtitle_content', ''),
)
if plot_analysis:
st.session_state['short_drama_plot_analysis'] = plot_analysis
st.session_state['short_drama_plot_analysis_subtitle_path'] = current_subtitle_path
st.success(tr("Plot analysis completed"))
if st.session_state.get('short_drama_plot_analysis'):
st.text_area(
tr("剧情理解结果"),
key="short_drama_plot_analysis",
height=240,
)
return video_theme
def render_subtitle_preview(tr):
"""渲染可折叠的当前字幕预览;没有字幕时提示用户先转写或上传。"""
subtitle_path = st.session_state.get('subtitle_path', '')
subtitle_content = st.session_state.get('subtitle_content', '')
if subtitle_path and not subtitle_content and os.path.exists(subtitle_path):
subtitle_content = read_subtitle_text(subtitle_path).text
st.session_state['subtitle_content'] = subtitle_content
with st.expander(tr("Subtitle Preview"), expanded=False):
if not subtitle_path or not subtitle_content:
st.info(tr("Please transcribe or upload subtitles first"))
return
st.text_area(
tr("Subtitle Preview"),
key="subtitle_content",
height=180,
label_visibility="collapsed",
)
def render_subtitle_upload(tr):
"""上传并保存用户提供的 SRT 字幕文件。"""
subtitle_dir_label = utils.subtitle_dir().replace(config.root_dir, ".")
st.markdown(
f"**{tr('上传字幕文件')}** "
f":gray[{tr('Transcribed subtitles storage hint').format(path=subtitle_dir_label)}]"
)
subtitle_file = st.file_uploader(
tr("上传字幕文件"),
type=["srt"],
accept_multiple_files=False,
key="subtitle_file_uploader", # 添加唯一key
label_visibility="collapsed",
)
# 显示当前已上传的字幕文件路径
if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']:
st.info(tr("Uploaded subtitle").format(file=os.path.basename(st.session_state['subtitle_path'])))
if st.button(tr("清除已上传字幕")):
st.session_state['subtitle_path'] = None
st.session_state['subtitle_content'] = None
st.session_state['subtitle_file_processed'] = False
st.rerun()
# 只有当有文件上传且尚未处理时才执行处理逻辑
if subtitle_file is not None and not st.session_state['subtitle_file_processed']:
try:
# 清理文件名,防止路径污染和路径遍历攻击
safe_filename = os.path.basename(subtitle_file.name)
decoded = decode_subtitle_bytes(subtitle_file.getvalue())
script_content = decoded.text
detected_encoding = decoded.encoding
if not script_content:
st.error(tr("无法读取字幕文件,请检查文件编码(支持 UTF-8、UTF-16、GBK、GB2312"))
st.stop()
# 验证字幕内容(简单检查)
if len(script_content.strip()) < 10:
st.warning(tr("字幕文件内容似乎为空,请检查文件"))
# 保存到字幕目录
script_file_path = os.path.join(utils.subtitle_dir(), safe_filename)
file_name, file_extension = os.path.splitext(safe_filename)
# 如果文件已存在,添加时间戳
if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension)
# 直接写入SRT内容统一使用 UTF-8
with open(script_file_path, "w", encoding='utf-8') as f:
f.write(script_content)
# 更新状态
st.success(
f"{tr('字幕上传成功')} "
f"({tr('Encoding')}: {detected_encoding.upper()}, "
f"{tr('Size')}: {len(script_content)} {tr('Characters')})"
)
st.session_state['subtitle_path'] = script_file_path
st.session_state['subtitle_content'] = script_content
st.session_state['subtitle_file_processed'] = True # 标记已处理
# 避免使用rerun使用更新状态的方式
# st.rerun()
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
def _is_blank_table_value(value):
if value is None:
return True
if isinstance(value, float) and math.isnan(value):
return True
if isinstance(value, str) and not value.strip():
return True
return False
def _ordered_script_columns(script_rows):
columns = []
for column in SCRIPT_TABLE_BASE_COLUMNS:
columns.append(column)
for row in script_rows:
if not isinstance(row, dict):
continue
for column in row.keys():
if column not in columns:
columns.append(column)
return columns
def _script_json_to_table(script_data):
if not isinstance(script_data, list):
script_data = []
if not script_data:
return pd.DataFrame(columns=SCRIPT_TABLE_BASE_COLUMNS)
if not all(isinstance(item, dict) for item in script_data):
rows = [
{"value": json.dumps(item, ensure_ascii=False)}
for item in script_data
]
return pd.DataFrame(rows, columns=["value"])
columns = _ordered_script_columns(script_data)
return pd.DataFrame(script_data, columns=columns)
def _normalize_script_table_value(column, value):
if _is_blank_table_value(value):
return ""
if column in {"_id", "OST"}:
try:
return int(value)
except (TypeError, ValueError):
return value
return value
def _script_table_to_json(edited_data):
if isinstance(edited_data, pd.DataFrame):
records = edited_data.to_dict("records")
elif isinstance(edited_data, list):
records = edited_data
else:
records = pd.DataFrame(edited_data).to_dict("records")
script_data = []
for row in records:
if not isinstance(row, dict):
continue
if all(_is_blank_table_value(value) for value in row.values()):
continue
cleaned_row = {}
for column, value in row.items():
if not column:
continue
normalized_value = _normalize_script_table_value(column, value)
if _is_blank_table_value(normalized_value) and column not in SCRIPT_TABLE_BASE_COLUMNS:
continue
cleaned_row[column] = normalized_value
if cleaned_row:
script_data.append(cleaned_row)
return json.dumps(script_data, indent=2, ensure_ascii=False)
def render_video_script_editor(tr):
"""使用弹窗和表格编辑视频脚本 JSON。"""
@st.dialog(tr("Video Script"), width="large")
def video_script_dialog():
script_data = st.session_state.get('video_clip_json', [])
table_data = _script_json_to_table(script_data)
column_order = list(table_data.columns)
st.caption(tr("Video script table help"))
edited_table = st.data_editor(
table_data,
key="video_script_table_editor",
hide_index=True,
num_rows="dynamic",
use_container_width=True,
height=520,
row_height=72,
column_order=column_order,
column_config={
"_id": st.column_config.NumberColumn(tr("Script Column ID"), step=1, format="%d", width=52),
"timestamp": st.column_config.TextColumn(tr("Script Column Timestamp"), width=200),
"picture": st.column_config.TextColumn(tr("Script Column Picture"), width=320),
"narration": st.column_config.TextColumn(tr("Script Column Narration"), width=480),
"OST": st.column_config.NumberColumn(
tr("Script Column OST"),
min_value=0,
max_value=2,
step=1,
format="%d",
width=52,
),
},
)
video_clip_json_details = _script_table_to_json(edited_table)
with st.expander(tr("Raw JSON Preview"), expanded=False):
st.code(video_clip_json_details, language="json")
if st.button(tr("Save Script"), key="save_script_from_dialog", use_container_width=True):
save_script_with_validation(tr, video_clip_json_details)
script_data = st.session_state.get('video_clip_json', [])
script_count = len(script_data) if isinstance(script_data, list) else 0
st.markdown(f"**{tr('Video Script')}** :gray[{tr('Video script row count').format(count=script_count)}]")
if st.button(tr("Edit Video Script"), key="open_video_script_editor", use_container_width=True):
video_script_dialog()
def render_fun_asr_transcription(tr):
"""使用 Fun-ASR 从本地音视频转写生成字幕。"""
def clear_fun_asr_subtitle_state():
st.session_state['subtitle_path'] = None
st.session_state['subtitle_content'] = None
st.session_state['subtitle_file_processed'] = False
from app.services import fun_asr_subtitle
backend_options = {
tr("Local FunASR-Pack API"): "local",
tr("Ali Bailian Online Fun-ASR"): "bailian",
tr("上传字幕文件"): "upload",
}
saved_backend = str(config.fun_asr.get("backend", "")).strip().lower()
if saved_backend not in {"local", "bailian", "upload"}:
saved_backend = (
"bailian"
if config.fun_asr.get("api_key") and not config.fun_asr.get("api_url")
else "local"
)
backend_values = list(backend_options.values())
backend_labels = list(backend_options.keys())
backend = saved_backend
api_key = ""
api_url = config.fun_asr.get("api_url", fun_asr_subtitle.LOCAL_FUN_ASR_API_URL)
hotword = config.fun_asr.get("hotword", "")
enable_spk = bool(config.fun_asr.get("enable_spk", False))
media_path = st.session_state.get('video_origin_path', '')
subtitle_cols = st.columns([3, 2], vertical_alignment="top")
with subtitle_cols[0]:
with st.expander(tr("Ali Bailian Fun-ASR Subtitle Transcription"), expanded=False):
backend_label = st.radio(
tr("Subtitle Processing Method"),
options=backend_labels,
index=backend_values.index(saved_backend),
horizontal=True,
key="fun_asr_backend",
)
backend = backend_options[backend_label]
if backend == "upload":
render_subtitle_upload(tr)
elif backend == "local":
st.caption(tr("Local Fun-ASR upload caption"))
api_url = st.text_input(
tr("Local FunASR-Pack API URL"),
value=api_url,
help=tr("Local FunASR-Pack API URL Help"),
key="fun_asr_api_url",
)
hotword = st.text_input(
tr("Fun-ASR Hotword"),
value=hotword,
help=tr("Fun-ASR Hotword Help"),
key="fun_asr_hotword",
)
enable_spk = st.checkbox(
tr("Enable speaker diarization"),
value=enable_spk,
help=tr("Enable speaker diarization Help"),
key="fun_asr_enable_spk",
)
else:
st.caption(tr("Fun-ASR upload caption"))
st.markdown(
f"{tr('API Key URL')}: "
"[https://bailian.console.aliyun.com/?tab=model#/api-key]"
"(https://bailian.console.aliyun.com/?tab=model#/api-key)"
)
api_key = st.text_input(
tr("Ali Bailian API Key"),
value=config.fun_asr.get("api_key", ""),
type="password",
help=tr("Ali Bailian API Key Help"),
key="fun_asr_api_key",
)
if backend != "upload":
if media_path:
st.info(
tr("Using selected video for subtitle transcription").format(
file=os.path.basename(media_path)
)
)
else:
st.warning(tr("Please select or upload a video first"))
can_transcribe = backend != "upload" and bool(media_path)
with subtitle_cols[1]:
transcribe_clicked = st.button(
tr("Transcribe subtitles"),
key="fun_asr_transcribe",
disabled=not can_transcribe,
use_container_width=True,
)
if not transcribe_clicked:
return
if backend == "bailian" and not api_key.strip():
clear_fun_asr_subtitle_state()
st.error(tr("Please enter Ali Bailian API Key"))
return
if backend == "local" and not str(api_url).strip():
clear_fun_asr_subtitle_state()
st.error(tr("Please enter local FunASR-Pack API URL"))
return
if not media_path or not os.path.exists(media_path):
clear_fun_asr_subtitle_state()
st.error(tr("Selected video file does not exist"))
return
try:
clear_fun_asr_subtitle_state()
config.fun_asr["backend"] = backend
config.fun_asr["api_url"] = str(api_url).strip()
config.fun_asr["api_key"] = api_key.strip()
config.fun_asr["hotword"] = str(hotword).strip()
config.fun_asr["enable_spk"] = bool(enable_spk)
config.fun_asr["model"] = "fun-asr"
config.save_config()
subtitle_name = f"{os.path.splitext(os.path.basename(media_path))[0]}_fun_asr.srt"
subtitle_path = os.path.join(utils.subtitle_dir(), subtitle_name)
spinner_text = (
tr("Transcribing with local FunASR-Pack...")
if backend == "local"
else tr("Transcribing with Fun-ASR...")
)
with st.spinner(spinner_text):
if backend == "local":
generated_path = fun_asr_subtitle.create_with_local_fun_asr(
local_file=media_path,
subtitle_file=subtitle_path,
api_url=str(api_url).strip(),
hotword=str(hotword).strip(),
enable_spk=bool(enable_spk),
)
else:
generated_path = fun_asr_subtitle.create_with_fun_asr(
local_file=media_path,
subtitle_file=subtitle_path,
api_key=api_key.strip(),
)
if not generated_path or not os.path.exists(generated_path):
clear_fun_asr_subtitle_state()
st.error(tr("Fun-ASR failed without subtitle file"))
return
with open(generated_path, "r", encoding="utf-8") as f:
subtitle_content = f.read()
st.session_state['subtitle_path'] = generated_path
st.session_state['subtitle_content'] = subtitle_content
st.session_state['subtitle_file_processed'] = True
success_placeholder = st.empty()
success_placeholder.success(
tr("Subtitle transcription succeeded").format(file=os.path.basename(generated_path))
)
time.sleep(3)
success_placeholder.empty()
except Exception as e:
clear_fun_asr_subtitle_state()
logger.error(f"Fun-ASR 字幕转写失败: {traceback.format_exc()}")
st.error(f"{tr('Fun-ASR transcription failed')}: {str(e)}")
def render_script_buttons(tr, params):
"""渲染脚本操作按钮"""
# 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '')
# 生成/加载按钮
if script_path == "auto":
button_name = tr("Generate Video Script")
elif script_path == "short":
button_name = tr("Generate Short Video Script")
elif script_path == "summary":
button_name = tr("生成短剧解说脚本")
elif script_path.endswith("json"):
button_name = tr("Load Video Script")
else:
button_name = tr("Please Select Script File")
if st.button(button_name, key="script_action", disabled=not script_path):
if script_path == "auto":
# 执行纪录片视频脚本生成(视频无字幕无配音)
generate_script_docu(params, tr)
elif script_path == "short":
# 执行 短剧混剪 脚本生成
custom_clips = st.session_state.get('custom_clips')
generate_script_short(tr, params, custom_clips)
elif script_path == "summary":
# 执行 短剧解说 脚本生成
subtitle_path = st.session_state.get('subtitle_path')
video_theme = st.session_state.get('video_theme')
temperature = st.session_state.get('temperature')
plot_analysis = ""
if st.session_state.get('short_drama_plot_analysis_subtitle_path') == subtitle_path:
plot_analysis = st.session_state.get('short_drama_plot_analysis', '')
generate_script_short_sunmmary(
params,
subtitle_path,
video_theme,
temperature,
tr,
plot_analysis=plot_analysis,
subtitle_content=st.session_state.get('subtitle_content', ''),
)
else:
load_script(tr, script_path)
render_video_script_editor(tr)
def load_script(tr, script_path):
"""加载脚本文件"""
try:
with open(script_path, 'r', encoding='utf-8') as f:
script = f.read()
script = utils.clean_model_output(script)
st.session_state['video_clip_json'] = json.loads(script)
st.success(tr("Script loaded successfully"))
st.rerun()
except Exception as e:
logger.error(f"加载脚本文件时发生错误\n{traceback.format_exc()}")
st.error(f"{tr('Failed to load script')}: {str(e)}")
def save_script_with_validation(tr, video_clip_json_details):
"""保存视频脚本(包含格式验证)"""
if not video_clip_json_details:
st.error(tr("请输入视频脚本"))
st.stop()
# 第一步:格式验证
with st.spinner(tr("Validating script format...")):
try:
result = check_script.check_format(video_clip_json_details)
if not result.get('success'):
# 格式验证失败,显示详细错误信息
error_message = result.get('message', '未知错误')
error_details = result.get('details', '')
st.error(f"**{tr('Script format validation failed')}**")
st.error(f"**{tr('Error Message')}:** {error_message}")
if error_details:
st.error(f"**{tr('Details')}:** {error_details}")
# 显示正确格式示例
st.info(f"**{tr('Correct script format example')}:**")
example_script = [
{
"_id": 1,
"timestamp": "00:00:00,600-00:00:07,559",
"picture": "工地上,蔡晓艳奋力救人,场面混乱",
"narration": "灾后重建,工地上险象环生!泼辣女工蔡晓艳挺身而出,救人第一!",
"OST": 0
},
{
"_id": 2,
"timestamp": "00:00:08,240-00:00:12,359",
"picture": "领导视察,蔡晓艳不屑一顾",
"narration": "播放原片4",
"OST": 1
}
]
st.code(json.dumps(example_script, ensure_ascii=False, indent=2), language='json')
st.stop()
except Exception as e:
st.error(f"{tr('Script format validation error')}: {str(e)}")
st.stop()
# 第二步:保存脚本
with st.spinner(tr("Save Script")):
script_dir = utils.script_dir()
timestamp = time.strftime("%Y-%m%d-%H%M%S")
save_path = os.path.join(script_dir, f"{timestamp}.json")
try:
data = json.loads(video_clip_json_details)
with open(save_path, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
st.session_state['video_clip_json'] = data
st.session_state['video_clip_json_path'] = save_path
# 标记需要切换到文件选择模式(在下次渲染前处理)
st.session_state['_switch_to_file_mode'] = True
# 更新配置
config.app["video_clip_json_path"] = save_path
# 显示成功消息
st.success(tr("Script validated and saved successfully"))
# 强制重新加载页面更新选择框
time.sleep(0.5) # 给一点时间让用户看到成功消息
st.rerun()
except Exception as err:
st.error(f"{tr('Failed to save script')}: {str(err)}")
st.stop()
# crop_video函数已移除 - 现在使用统一裁剪策略,不再需要预裁剪步骤
def get_script_params():
"""获取脚本参数"""
return {
'video_language': st.session_state.get('video_language', ''),
'video_clip_json_path': st.session_state.get('video_clip_json_path', ''),
'video_origin_path': st.session_state.get('video_origin_path', ''),
'video_name': st.session_state.get('video_name', ''),
'video_plot': st.session_state.get('video_plot', '')
}