feat(webui): 添加短剧解说功能

- 新增短剧解说页面和相关功能
- 实现字幕文件上传和处理逻辑- 添加剧情分析和解说文案生成功能
- 优化用户交互和错误处理
This commit is contained in:
linyq 2025-05-10 23:37:58 +08:00
parent 048b2ff39a
commit f5c4e93fcd
6 changed files with 712 additions and 11 deletions

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : prompt
@Author : 小林同学
@Date : 2025/5/9 上午12:57
'''
# 字幕剧情分析提示词
subtitle_plot_analysis_v1 = """
# 角色
你是一位专业的剧本分析师和剧情概括助手
# 任务
我将为你提供一部短剧的完整字幕文本请你基于这些字幕完成以下任务
1. **整体剧情分析**简要概括整个短剧的核心剧情脉络主要冲突和结局如果有的话
2. **分段剧情解析与时间戳定位**
* 将整个短剧划分为若干个关键的剧情段落例如开端发展转折高潮结局或根据具体情节自然划分
* 段落数不得少于 20
* 对于每一个剧情段落
* **概括该段落的主要内容**用简洁的语言描述这段剧情发生了什么
* **标注对应的时间戳范围**明确指出该剧情段落对应的开始字幕时间戳和结束字幕时间戳请直接从字幕中提取时间信息
# 输入格式
字幕内容通常包含时间戳和对话例如
```
00:00:05,000 --> 00:00:10,000
[角色A]: 你好吗
00:00:10,500 --> 00:00:15,000
[角色B]: 我很好谢谢发生了一些有趣的事情
... (更多字幕内容) ...
```
我将把实际字幕粘贴在下方
# 输出格式要求
请按照以下格式清晰地呈现分析结果
**整体剧情概括**
[此处填写对整个短剧剧情的概括]
**分段剧情解析**
**剧情段落 1[段落主题/概括例如主角登场与背景介绍]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
**剧情段落 2[段落主题/概括例如第一个冲突出现]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
... (根据实际剧情段落数量继续) ...
**剧情段落 N[段落主题/概括例如结局与反思]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
# 注意事项
* 请确保时间戳的准确性直接引用字幕中的时间
* 剧情段落的划分应合乎逻辑能够反映剧情的起承转合
* 语言表达应简洁准确客观
# 限制
1. 严禁输出与分析结果无关的内容
2.
# 请处理以下字幕:
"""
plot_writing = """
我是一个影视解说up主需要为我的粉丝讲解短剧家里家外的剧情目前正在解说1-5集的剧情希望能让粉丝通过我的解说了解剧情并且产生 继续观看的兴趣请生成一篇解说脚本包含解说文案以及穿插原声的片段下面<plot>中的内容是短剧的剧情概述
<plot>
%s
</plot>
请使用 json 格式进行输出使用 <output> 中的输出格式
<output>
{
"items": [
{
"_id": 1, # 唯一递增id
"timestamp": "00:00:05,390-00:00:10,430",
"picture": "剧情描述或者备注",
"narration": "解说文案,如果片段为穿插的原片片段,可以直接使用 ‘播放原片+_id 进行占位",
"OST": "值为 0 表示当前片段为解说片段,值为 1 表示当前片段为穿插的原片"
}
}
</output>
<restriction>
1. 只输出 json 内容不要输出其他任何说明性的文字
2. 解说文案的语言使用 简体中文
3. 严禁虚构剧情所有画面只能从 <polt> 中摘取
4. 严禁虚构时间戳所有时间戳范围只能从 <polt> 中摘取
</restriction>
"""

View File

@ -0,0 +1,454 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : 短剧解说
@Author : 小林同学
@Date : 2025/5/9 上午12:36
'''
import os
import json
import requests
from typing import Dict, Any, Optional
from loguru import logger
from app.config import config
from app.utils.utils import get_uuid, storage_dir
from app.services.SDE.prompt import subtitle_plot_analysis_v1, plot_writing
class SubtitleAnalyzer:
"""字幕剧情分析器,负责分析字幕内容并提取关键剧情段落"""
def __init__(
self,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
custom_prompt: Optional[str] = None,
temperature: Optional[float] = 1.0,
):
"""
初始化字幕分析器
Args:
api_key: API密钥如果不提供则从配置中读取
model: 模型名称如果不提供则从配置中读取
base_url: API基础URL如果不提供则从配置中读取或使用默认值
custom_prompt: 自定义提示词如果不提供则使用默认值
temperature: 模型温度
"""
# 使用传入的参数或从配置中获取
self.api_key = api_key
self.model = model
self.base_url = base_url
self.temperature = temperature
# 设置提示词模板
self.prompt_template = custom_prompt or subtitle_plot_analysis_v1
# 初始化HTTP请求所需的头信息
self._init_headers()
def _init_headers(self):
"""初始化HTTP请求头"""
try:
# 基础请求头包含API密钥和内容类型
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
logger.info(f"初始化成功 - API Key: {self.api_key[:8]}... - Base URL: {self.base_url}")
except Exception as e:
logger.error(f"初始化请求头失败: {str(e)}")
raise
def analyze_subtitle(self, subtitle_content: str) -> Dict[str, Any]:
"""
分析字幕内容
Args:
subtitle_content: 字幕内容文本
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
try:
# 构建完整提示词
prompt = f"{self.prompt_template}\n\n{subtitle_content}"
# 构建请求体数据
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的剧本分析师和剧情概括助手。"},
{"role": "user", "content": prompt}
],
"temperature": self.temperature
}
# 构建请求地址
url = f"{self.base_url}/chat/completions"
# 发送HTTP请求
response = requests.post(url, headers=self.headers, json=payload)
# 解析响应
if response.status_code == 200:
response_data = response.json()
# 提取响应内容
if "choices" in response_data and len(response_data["choices"]) > 0:
analysis_result = response_data["choices"][0]["message"]["content"]
logger.debug(f"字幕分析完成消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}")
# 返回结果
return {
"status": "success",
"analysis": analysis_result,
"tokens_used": response_data.get("usage", {}).get("total_tokens", 0),
"model": self.model,
"temperature": self.temperature
}
else:
logger.error("字幕分析失败: 未获取到有效响应")
return {
"status": "error",
"message": "未获取到有效响应",
"temperature": self.temperature
}
else:
error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"temperature": self.temperature
}
except Exception as e:
logger.error(f"字幕分析过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def analyze_subtitle_from_file(self, subtitle_file_path: str) -> Dict[str, Any]:
"""
从文件读取字幕并分析
Args:
subtitle_file_path: 字幕文件的路径
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
try:
# 检查文件是否存在
if not os.path.exists(subtitle_file_path):
return {
"status": "error",
"message": f"字幕文件不存在: {subtitle_file_path}",
"temperature": self.temperature
}
# 读取文件内容
with open(subtitle_file_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
# 分析字幕
return self.analyze_subtitle(subtitle_content)
except Exception as e:
logger.error(f"从文件读取字幕并分析过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def save_analysis_result(self, analysis_result: Dict[str, Any], output_path: Optional[str] = None) -> str:
"""
保存分析结果到文件
Args:
analysis_result: 分析结果
output_path: 输出文件路径如果不提供则自动生成
Returns:
str: 输出文件的路径
"""
try:
# 如果未提供输出路径,则自动生成
if not output_path:
output_dir = storage_dir("drama_analysis", create=True)
output_path = os.path.join(output_dir, f"analysis_{get_uuid(True)}.txt")
# 确保目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# 保存结果
with open(output_path, 'w', encoding='utf-8') as f:
if analysis_result["status"] == "success":
f.write(analysis_result["analysis"])
else:
f.write(f"分析失败: {analysis_result['message']}")
logger.info(f"分析结果已保存到: {output_path}")
return output_path
except Exception as e:
logger.error(f"保存分析结果时发生错误: {str(e)}")
return ""
def generate_narration_script(self, plot_analysis: str, temperature: float = 0.7) -> Dict[str, Any]:
"""
根据剧情分析生成解说文案
Args:
plot_analysis: 剧情分析内容
temperature: 生成温度控制创造性默认0.7
Returns:
Dict[str, Any]: 包含生成结果的字典
"""
try:
# 构建完整提示词
prompt = plot_writing % plot_analysis
# 构建请求体数据
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的短视频解说脚本撰写专家。"},
{"role": "user", "content": prompt}
],
"temperature": temperature
}
# 对特定模型添加响应格式设置
if self.model not in ["deepseek-reasoner"]:
payload["response_format"] = {"type": "json_object"}
# 构建请求地址
url = f"{self.base_url}/chat/completions"
# 发送HTTP请求
response = requests.post(url, headers=self.headers, json=payload)
# 解析响应
if response.status_code == 200:
response_data = response.json()
# 提取响应内容
if "choices" in response_data and len(response_data["choices"]) > 0:
narration_script = response_data["choices"][0]["message"]["content"]
logger.debug(f"解说文案生成完成消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}")
# 返回结果
return {
"status": "success",
"narration_script": narration_script,
"tokens_used": response_data.get("usage", {}).get("total_tokens", 0),
"model": self.model,
"temperature": self.temperature
}
else:
logger.error("解说文案生成失败: 未获取到有效响应")
return {
"status": "error",
"message": "未获取到有效响应",
"temperature": self.temperature
}
else:
error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"temperature": self.temperature
}
except Exception as e:
logger.error(f"解说文案生成过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def save_narration_script(self, narration_result: Dict[str, Any], output_path: Optional[str] = None) -> str:
"""
保存解说文案到文件
Args:
narration_result: 解说文案生成结果
output_path: 输出文件路径如果不提供则自动生成
Returns:
str: 输出文件的路径
"""
try:
# 如果未提供输出路径,则自动生成
if not output_path:
output_dir = storage_dir("narration_scripts", create=True)
output_path = os.path.join(output_dir, f"narration_{get_uuid(True)}.json")
# 确保目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# 保存结果
with open(output_path, 'w', encoding='utf-8') as f:
if narration_result["status"] == "success":
f.write(narration_result["narration_script"])
else:
f.write(f"生成失败: {narration_result['message']}")
logger.info(f"解说文案已保存到: {output_path}")
return output_path
except Exception as e:
logger.error(f"保存解说文案时发生错误: {str(e)}")
return ""
def analyze_subtitle(
subtitle_content: str = None,
subtitle_file_path: str = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
custom_prompt: Optional[str] = None,
temperature: float = 1.0,
save_result: bool = False,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
分析字幕内容的便捷函数
Args:
subtitle_content: 字幕内容文本
subtitle_file_path: 字幕文件路径
custom_prompt: 自定义提示词
api_key: API密钥
model: 模型名称
base_url: API基础URL
temperature: 模型温度
save_result: 是否保存结果到文件
output_path: 输出文件路径
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
# 初始化分析器
analyzer = SubtitleAnalyzer(
temperature=temperature,
api_key=api_key,
model=model,
base_url=base_url,
custom_prompt=custom_prompt
)
# 分析字幕
if subtitle_content:
result = analyzer.analyze_subtitle(subtitle_content)
elif subtitle_file_path:
result = analyzer.analyze_subtitle_from_file(subtitle_file_path)
else:
return {
"status": "error",
"message": "必须提供字幕内容或字幕文件路径",
"temperature": temperature
}
# 保存结果
if save_result and result["status"] == "success":
result["output_path"] = analyzer.save_analysis_result(result, output_path)
return result
def generate_narration_script(
plot_analysis: str = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
temperature: float = 1.0,
save_result: bool = False,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
根据剧情分析生成解说文案的便捷函数
Args:
plot_analysis: 剧情分析内容直接提供
api_key: API密钥
model: 模型名称
base_url: API基础URL
temperature: 生成温度控制创造性
save_result: 是否保存结果到文件
output_path: 输出文件路径
Returns:
Dict[str, Any]: 包含生成结果的字典
"""
# 初始化分析器
analyzer = SubtitleAnalyzer(
temperature=temperature,
api_key=api_key,
model=model,
base_url=base_url
)
# 生成解说文案
result = analyzer.generate_narration_script(plot_analysis, temperature)
# 保存结果
if save_result and result["status"] == "success":
result["output_path"] = analyzer.save_narration_script(result, output_path)
return result
if __name__ == '__main__':
# text_api_key = "sk-2dfe00cab68d4baf8e142cbbefea0f44"
text_api_key = "sk-narra-ZPA07ethehTb2VQ5mMI1P8tt"
text_model = "gemini-2.0-flash"
text_base_url = "http://47.252.0.22:7001/v1/chat/completions" # 确保URL不以斜杠结尾便于后续拼接
subtitle_path = "/Users/apple/Desktop/home/NarratoAI/resource/srt/家里家外1-5.srt"
# 示例用法
if subtitle_path:
# 分析字幕总结剧情
analysis_result = analyze_subtitle(
subtitle_file_path=subtitle_path,
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
if analysis_result["status"] == "success":
print("字幕分析成功!")
print("分析结果:")
print(analysis_result["analysis"])
# 根据剧情生成解说文案
narration_result = generate_narration_script(
plot_analysis=analysis_result["analysis"],
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
if narration_result["status"] == "success":
print("\n解说文案生成成功!")
print("解说文案:")
print(narration_result["narration_script"])
else:
print(f"\n解说文案生成失败: {narration_result['message']}")
else:
print(f"分析失败: {analysis_result['message']}")

View File

@ -325,6 +325,15 @@ def video_dir(sub_dir: str = ""):
return d
def subtitle_dir(sub_dir: str = ""):
d = resource_dir(f"srt")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def split_timestamp(timestamp):
"""
拆分时间戳

View File

@ -235,17 +235,32 @@ def render_video_details(tr):
def short_drama_summary(tr):
"""短剧解说 渲染视频主题和提示词"""
# 检查是否已经处理过字幕文件
if 'subtitle_file_processed' not in st.session_state:
st.session_state['subtitle_file_processed'] = False
subtitle_file = st.file_uploader(
tr("上传字幕文件"),
type=["srt"],
accept_multiple_files=False,
key="subtitle_file_uploader" # 添加唯一key
)
if subtitle_file is not None:
# 显示当前已上传的字幕文件路径
if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']:
st.info(f"已上传字幕: {os.path.basename(st.session_state['subtitle_path'])}")
if st.button(tr("清除已上传字幕")):
st.session_state['subtitle_path'] = None
st.session_state['subtitle_file_processed'] = False
st.rerun()
# 只有当有文件上传且尚未处理时才执行处理逻辑
if subtitle_file is not None and not st.session_state['subtitle_file_processed']:
try:
# 读取上传的JSON内容并验证格式
# 读取上传的SRT内容
script_content = subtitle_file.read().decode('utf-8')
# 保存到脚本目录
# 保存到字幕目录
script_file_path = os.path.join(utils.subtitle_dir(), subtitle_file.name)
file_name, file_extension = os.path.splitext(subtitle_file.name)
@ -255,20 +270,21 @@ def short_drama_summary(tr):
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension)
# 写入文件
# 直接写入SRT内容不进行JSON转换
with open(script_file_path, "w", encoding='utf-8') as f:
json.dump(script_content, f, ensure_ascii=False, indent=2)
f.write(script_content)
# 更新状态
st.success(tr("字幕上传成功"))
st.session_state['subtitle_path'] = script_file_path
time.sleep(0.1)
st.rerun()
except json.JSONDecodeError:
st.error(tr("Invalid JSON format"))
st.session_state['subtitle_file_processed'] = True # 标记已处理
# 避免使用rerun使用更新状态的方式
# st.rerun()
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
video_theme = st.text_input(tr("短剧名称"))
st.session_state['video_theme'] = video_theme
return video_theme

View File

@ -195,6 +195,7 @@
"Frame Interval (seconds)": "帧间隔 (秒)",
"Frame Interval (seconds) (More keyframes consume more tokens)": "帧间隔 (秒) (更多关键帧消耗更多令牌)",
"Batch Size": "批处理大小",
"Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多"
"Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多",
"Short Drama Summary": "短剧解说"
}
}

View File

@ -0,0 +1,124 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : 短剧解说脚本生成
@Author : 小林同学
@Date : 2025/5/10 下午10:26
'''
import os
import json
import time
import traceback
import streamlit as st
from loguru import logger
from app.config import config
from app.services.SDE.short_drama_explanation import analyze_subtitle, generate_narration_script
def generate_script_short_sunmmary(params, subtitle_path):
"""
生成 短剧解说 视频脚本
要求: 提供高质量短剧字幕
适合场景: 短剧
"""
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(progress: float, message: str = ""):
progress_bar.progress(progress)
if message:
status_text.text(f"{progress}% - {message}")
else:
status_text.text(f"进度: {progress}%")
try:
with st.spinner("正在生成脚本..."):
if not params.video_origin_path:
st.error("请先选择视频文件")
return
"""
1. 获取字幕
"""
update_progress(30, "正在解析字幕...")
# 判断字幕文件是否存在
if not os.path.exists(subtitle_path):
st.error("字幕文件不存在")
return
"""
2. 分析字幕总结剧情
"""
text_provider = config.app.get('text_llm_provider', 'gemini').lower()
text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_model = config.app.get(f'text_{text_provider}_model_name')
text_base_url = config.app.get(f'text_{text_provider}_base_url')
analysis_result = analyze_subtitle(
subtitle_file_path=subtitle_path,
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
"""
3. 根据剧情生成解说文案
"""
if analysis_result["status"] == "success":
logger.info("字幕分析成功!")
update_progress(60, "正在生成文案...")
# 根据剧情生成解说文案
narration_result = generate_narration_script(
plot_analysis=analysis_result["analysis"],
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
if narration_result["status"] == "success":
logger.info("\n解说文案生成成功!")
logger.info(narration_result["narration_script"])
else:
logger.info(f"\n解说文案生成失败: {narration_result['message']}")
st.error("生成脚本失败,请检查日志")
st.stop()
else:
logger.error(f"分析失败: {analysis_result['message']}")
st.error("生成脚本失败,请检查日志")
st.stop()
"""
4. 生成文案
"""
logger.info("开始准备生成解说文案")
# 结果转换为JSON字符串
narration_script = narration_result["narration_script"]
narration_dict = json.loads(narration_script)
script = json.dumps(narration_dict['items'], ensure_ascii=False, indent=2)
if script is None:
st.error("生成脚本失败,请检查日志")
st.stop()
logger.success(f"剪辑脚本生成完成")
if isinstance(script, list):
st.session_state['video_clip_json'] = script
elif isinstance(script, str):
st.session_state['video_clip_json'] = json.loads(script)
update_progress(90, "整理输出...")
time.sleep(0.1)
progress_bar.progress(100)
status_text.text("脚本生成完成!")
st.success("视频脚本生成成功!")
except Exception as err:
st.error(f"生成过程中发生错误: {str(err)}")
logger.exception(f"生成脚本时发生错误\n{traceback.format_exc()}")
finally:
time.sleep(2)
progress_bar.empty()
status_text.empty()