Merge pull request #137 from linyqh/dev_0.6.0

Dev 0.6.0 新增短剧解说功能
This commit is contained in:
viccy 2025-05-11 03:36:30 +08:00 committed by GitHub
commit 1859fe81e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 1342 additions and 291 deletions

View File

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : prompt
@Author : 小林同学
@Date : 2025/5/9 上午12:57
'''
# 字幕剧情分析提示词
subtitle_plot_analysis_v1 = """
# 角色
你是一位专业的剧本分析师和剧情概括助手
# 任务
我将为你提供一部短剧的完整字幕文本请你基于这些字幕完成以下任务
1. **整体剧情分析**简要概括整个短剧的核心剧情脉络主要冲突和结局如果有的话
2. **分段剧情解析与时间戳定位**
* 将整个短剧划分为若干个关键的剧情段落例如开端发展转折高潮结局或根据具体情节自然划分
* 段落数不得少于 20
* 对于每一个剧情段落
* **概括该段落的主要内容**用简洁的语言描述这段剧情发生了什么
* **标注对应的时间戳范围**明确指出该剧情段落对应的开始字幕时间戳和结束字幕时间戳请直接从字幕中提取时间信息
# 输入格式
字幕内容通常包含时间戳和对话例如
```
00:00:05,000 --> 00:00:10,000
[角色A]: 你好吗
00:00:10,500 --> 00:00:15,000
[角色B]: 我很好谢谢发生了一些有趣的事情
... (更多字幕内容) ...
```
我将把实际字幕粘贴在下方
# 输出格式要求
请按照以下格式清晰地呈现分析结果
**整体剧情概括**
[此处填写对整个短剧剧情的概括]
**分段剧情解析**
**剧情段落 1[段落主题/概括例如主角登场与背景介绍]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
**剧情段落 2[段落主题/概括例如第一个冲突出现]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
... (根据实际剧情段落数量继续) ...
**剧情段落 N[段落主题/概括例如结局与反思]**
* **时间戳** [开始时间戳] --> [结束时间戳]
* **内容概要** [对这段剧情的详细描述]
# 注意事项
* 请确保时间戳的准确性直接引用字幕中的时间
* 剧情段落的划分应合乎逻辑能够反映剧情的起承转合
* 语言表达应简洁准确客观
# 限制
1. 严禁输出与分析结果无关的内容
2.
# 请处理以下字幕:
"""
plot_writing = """
我是一个影视解说up主需要为我的粉丝讲解短剧%s的剧情目前正在解说剧情希望能让粉丝通过我的解说了解剧情并且产生 继续观看的兴趣请生成一篇解说脚本包含解说文案以及穿插原声的片段下面<plot>中的内容是短剧的剧情概述
<plot>
%s
</plot>
请使用 json 格式进行输出使用 <output> 中的输出格式
<output>
{
"items": [
{
"_id": 1, # 唯一递增id
"timestamp": "00:00:05,390-00:00:10,430",
"picture": "剧情描述或者备注",
"narration": "解说文案,如果片段为穿插的原片片段,可以直接使用 ‘播放原片+_id 进行占位",
"OST": "值为 0 表示当前片段为解说片段,值为 1 表示当前片段为穿插的原片"
}
}
</output>
<restriction>
1. 只输出 json 内容不要输出其他任何说明性的文字
2. 解说文案的语言使用 简体中文
3. 严禁虚构剧情所有画面只能从 <polt> 中摘取
4. 严禁虚构时间戳所有时间戳范围只能从 <polt> 中摘取
</restriction>
"""

View File

@ -0,0 +1,456 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : 短剧解说
@Author : 小林同学
@Date : 2025/5/9 上午12:36
'''
import os
import json
import requests
from typing import Dict, Any, Optional
from loguru import logger
from app.config import config
from app.utils.utils import get_uuid, storage_dir
from app.services.SDE.prompt import subtitle_plot_analysis_v1, plot_writing
class SubtitleAnalyzer:
"""字幕剧情分析器,负责分析字幕内容并提取关键剧情段落"""
def __init__(
self,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
custom_prompt: Optional[str] = None,
temperature: Optional[float] = 1.0,
):
"""
初始化字幕分析器
Args:
api_key: API密钥如果不提供则从配置中读取
model: 模型名称如果不提供则从配置中读取
base_url: API基础URL如果不提供则从配置中读取或使用默认值
custom_prompt: 自定义提示词如果不提供则使用默认值
temperature: 模型温度
"""
# 使用传入的参数或从配置中获取
self.api_key = api_key
self.model = model
self.base_url = base_url
self.temperature = temperature
# 设置提示词模板
self.prompt_template = custom_prompt or subtitle_plot_analysis_v1
# 初始化HTTP请求所需的头信息
self._init_headers()
def _init_headers(self):
"""初始化HTTP请求头"""
try:
# 基础请求头包含API密钥和内容类型
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
# logger.debug(f"初始化成功 - API Key: {self.api_key[:8]}... - Base URL: {self.base_url}")
except Exception as e:
logger.error(f"初始化请求头失败: {str(e)}")
raise
def analyze_subtitle(self, subtitle_content: str) -> Dict[str, Any]:
"""
分析字幕内容
Args:
subtitle_content: 字幕内容文本
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
try:
# 构建完整提示词
prompt = f"{self.prompt_template}\n\n{subtitle_content}"
# 构建请求体数据
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的剧本分析师和剧情概括助手。"},
{"role": "user", "content": prompt}
],
"temperature": self.temperature
}
# 构建请求地址
url = f"{self.base_url}/chat/completions"
# 发送HTTP请求
response = requests.post(url, headers=self.headers, json=payload)
# 解析响应
if response.status_code == 200:
response_data = response.json()
# 提取响应内容
if "choices" in response_data and len(response_data["choices"]) > 0:
analysis_result = response_data["choices"][0]["message"]["content"]
logger.debug(f"字幕分析完成消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}")
# 返回结果
return {
"status": "success",
"analysis": analysis_result,
"tokens_used": response_data.get("usage", {}).get("total_tokens", 0),
"model": self.model,
"temperature": self.temperature
}
else:
logger.error("字幕分析失败: 未获取到有效响应")
return {
"status": "error",
"message": "未获取到有效响应",
"temperature": self.temperature
}
else:
error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"temperature": self.temperature
}
except Exception as e:
logger.error(f"字幕分析过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def analyze_subtitle_from_file(self, subtitle_file_path: str) -> Dict[str, Any]:
"""
从文件读取字幕并分析
Args:
subtitle_file_path: 字幕文件的路径
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
try:
# 检查文件是否存在
if not os.path.exists(subtitle_file_path):
return {
"status": "error",
"message": f"字幕文件不存在: {subtitle_file_path}",
"temperature": self.temperature
}
# 读取文件内容
with open(subtitle_file_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
# 分析字幕
return self.analyze_subtitle(subtitle_content)
except Exception as e:
logger.error(f"从文件读取字幕并分析过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def save_analysis_result(self, analysis_result: Dict[str, Any], output_path: Optional[str] = None) -> str:
"""
保存分析结果到文件
Args:
analysis_result: 分析结果
output_path: 输出文件路径如果不提供则自动生成
Returns:
str: 输出文件的路径
"""
try:
# 如果未提供输出路径,则自动生成
if not output_path:
output_dir = storage_dir("drama_analysis", create=True)
output_path = os.path.join(output_dir, f"analysis_{get_uuid(True)}.txt")
# 确保目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# 保存结果
with open(output_path, 'w', encoding='utf-8') as f:
if analysis_result["status"] == "success":
f.write(analysis_result["analysis"])
else:
f.write(f"分析失败: {analysis_result['message']}")
logger.info(f"分析结果已保存到: {output_path}")
return output_path
except Exception as e:
logger.error(f"保存分析结果时发生错误: {str(e)}")
return ""
def generate_narration_script(self, short_name:str, plot_analysis: str, temperature: float = 0.7) -> Dict[str, Any]:
"""
根据剧情分析生成解说文案
Args:
short_name: 短剧名称
plot_analysis: 剧情分析内容
temperature: 生成温度控制创造性默认0.7
Returns:
Dict[str, Any]: 包含生成结果的字典
"""
try:
# 构建完整提示词
prompt = plot_writing % (short_name, plot_analysis)
# 构建请求体数据
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": "你是一位专业的短视频解说脚本撰写专家。"},
{"role": "user", "content": prompt}
],
"temperature": temperature
}
# 对特定模型添加响应格式设置
if self.model not in ["deepseek-reasoner"]:
payload["response_format"] = {"type": "json_object"}
# 构建请求地址
url = f"{self.base_url}/chat/completions"
# 发送HTTP请求
response = requests.post(url, headers=self.headers, json=payload)
# 解析响应
if response.status_code == 200:
response_data = response.json()
# 提取响应内容
if "choices" in response_data and len(response_data["choices"]) > 0:
narration_script = response_data["choices"][0]["message"]["content"]
logger.debug(f"解说文案生成完成消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}")
# 返回结果
return {
"status": "success",
"narration_script": narration_script,
"tokens_used": response_data.get("usage", {}).get("total_tokens", 0),
"model": self.model,
"temperature": self.temperature
}
else:
logger.error("解说文案生成失败: 未获取到有效响应")
return {
"status": "error",
"message": "未获取到有效响应",
"temperature": self.temperature
}
else:
error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"temperature": self.temperature
}
except Exception as e:
logger.error(f"解说文案生成过程中发生错误: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": self.temperature
}
def save_narration_script(self, narration_result: Dict[str, Any], output_path: Optional[str] = None) -> str:
"""
保存解说文案到文件
Args:
narration_result: 解说文案生成结果
output_path: 输出文件路径如果不提供则自动生成
Returns:
str: 输出文件的路径
"""
try:
# 如果未提供输出路径,则自动生成
if not output_path:
output_dir = storage_dir("narration_scripts", create=True)
output_path = os.path.join(output_dir, f"narration_{get_uuid(True)}.json")
# 确保目录存在
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# 保存结果
with open(output_path, 'w', encoding='utf-8') as f:
if narration_result["status"] == "success":
f.write(narration_result["narration_script"])
else:
f.write(f"生成失败: {narration_result['message']}")
logger.info(f"解说文案已保存到: {output_path}")
return output_path
except Exception as e:
logger.error(f"保存解说文案时发生错误: {str(e)}")
return ""
def analyze_subtitle(
subtitle_content: str = None,
subtitle_file_path: str = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
custom_prompt: Optional[str] = None,
temperature: float = 1.0,
save_result: bool = False,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
分析字幕内容的便捷函数
Args:
subtitle_content: 字幕内容文本
subtitle_file_path: 字幕文件路径
custom_prompt: 自定义提示词
api_key: API密钥
model: 模型名称
base_url: API基础URL
temperature: 模型温度
save_result: 是否保存结果到文件
output_path: 输出文件路径
Returns:
Dict[str, Any]: 包含分析结果的字典
"""
# 初始化分析器
analyzer = SubtitleAnalyzer(
temperature=temperature,
api_key=api_key,
model=model,
base_url=base_url,
custom_prompt=custom_prompt
)
# 分析字幕
if subtitle_content:
result = analyzer.analyze_subtitle(subtitle_content)
elif subtitle_file_path:
result = analyzer.analyze_subtitle_from_file(subtitle_file_path)
else:
return {
"status": "error",
"message": "必须提供字幕内容或字幕文件路径",
"temperature": temperature
}
# 保存结果
if save_result and result["status"] == "success":
result["output_path"] = analyzer.save_analysis_result(result, output_path)
return result
def generate_narration_script(
short_name: str = None,
plot_analysis: str = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
base_url: Optional[str] = None,
temperature: float = 1.0,
save_result: bool = False,
output_path: Optional[str] = None
) -> Dict[str, Any]:
"""
根据剧情分析生成解说文案的便捷函数
Args:
short_name: 短剧名称
plot_analysis: 剧情分析内容直接提供
api_key: API密钥
model: 模型名称
base_url: API基础URL
temperature: 生成温度控制创造性
save_result: 是否保存结果到文件
output_path: 输出文件路径
Returns:
Dict[str, Any]: 包含生成结果的字典
"""
# 初始化分析器
analyzer = SubtitleAnalyzer(
temperature=temperature,
api_key=api_key,
model=model,
base_url=base_url
)
# 生成解说文案
result = analyzer.generate_narration_script(short_name, plot_analysis, temperature)
# 保存结果
if save_result and result["status"] == "success":
result["output_path"] = analyzer.save_narration_script(result, output_path)
return result
if __name__ == '__main__':
text_api_key = "skxxxx"
text_model = "gemini-2.0-flash"
text_base_url = "https://api.narratoai.cn/v1/chat/completions" # 确保URL不以斜杠结尾便于后续拼接
subtitle_path = "/Users/apple/Desktop/home/NarratoAI/resource/srt/家里家外1-5.srt"
# 示例用法
if subtitle_path:
# 分析字幕总结剧情
analysis_result = analyze_subtitle(
subtitle_file_path=subtitle_path,
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
if analysis_result["status"] == "success":
print("字幕分析成功!")
print("分析结果:")
print(analysis_result["analysis"])
# 根据剧情生成解说文案
narration_result = generate_narration_script(
plot_analysis=analysis_result["analysis"],
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
if narration_result["status"] == "success":
print("\n解说文案生成成功!")
print("解说文案:")
print(narration_result["narration_script"])
else:
print(f"\n解说文案生成失败: {narration_result['message']}")
else:
print(f"分析失败: {analysis_result['message']}")

View File

@ -0,0 +1,37 @@
"""
视频脚本生成pipeline串联各个处理步骤
"""
import os
from .utils.step1_subtitle_analyzer_openai import analyze_subtitle
from .utils.step5_merge_script import merge_script
def generate_script(srt_path: str, api_key: str, model_name: str, output_path: str, base_url: str = None, custom_clips: int = 5):
"""生成视频混剪脚本
Args:
srt_path: 字幕文件路径
output_path: 输出文件路径可选
Returns:
str: 生成的脚本内容
"""
# 验证输入文件
if not os.path.exists(srt_path):
raise FileNotFoundError(f"字幕文件不存在: {srt_path}")
# 分析字幕
print("开始分析...")
openai_analysis = analyze_subtitle(
srt_path=srt_path,
api_key=api_key,
model_name=model_name,
base_url=base_url,
custom_clips=custom_clips
)
# 合并生成最终脚本
adjusted_results = openai_analysis['plot_points']
final_script = merge_script(adjusted_results, output_path)
return final_script

View File

@ -0,0 +1,60 @@
"""
定义项目中使用的数据类型
"""
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class PlotPoint:
timestamp: str
title: str
picture: str
@dataclass
class Commentary:
timestamp: str
title: str
copywriter: str
@dataclass
class SubtitleSegment:
start_time: float
end_time: float
text: str
@dataclass
class ScriptItem:
timestamp: str
title: str
picture: str
copywriter: str
@dataclass
class PipelineResult:
output_video_path: str
plot_points: List[PlotPoint]
subtitle_segments: List[SubtitleSegment]
commentaries: List[Commentary]
final_script: List[ScriptItem]
error: Optional[str] = None
class VideoProcessingError(Exception):
pass
class SubtitleProcessingError(Exception):
pass
class PlotAnalysisError(Exception):
pass
class CopywritingError(Exception):
pass

View File

@ -0,0 +1,157 @@
"""
使用OpenAI API分析字幕文件返回剧情梗概和爆点
"""
import traceback
from openai import OpenAI, BadRequestError
import os
import json
from .utils import load_srt
def analyze_subtitle(
srt_path: str,
model_name: str,
api_key: str = None,
base_url: str = None,
custom_clips: int = 5
) -> dict:
"""分析字幕内容,返回完整的分析结果
Args:
srt_path (str): SRT字幕文件路径
api_key (str, optional): 大模型API密钥. Defaults to None.
model_name (str, optional): 大模型名称. Defaults to "gpt-4o-2024-11-20".
base_url (str, optional): 大模型API基础URL. Defaults to None.
Returns:
dict: 包含剧情梗概和结构化的时间段分析的字典
"""
try:
# 加载字幕文件
subtitles = load_srt(srt_path)
subtitle_content = "\n".join([f"{sub['timestamp']}\n{sub['text']}" for sub in subtitles])
# 初始化客户端
global client
if "deepseek" in model_name.lower():
client = OpenAI(
api_key=api_key or os.getenv('DeepSeek_API_KEY'),
base_url="https://api.siliconflow.cn/v1" # 使用第三方 硅基流动 API
)
else:
client = OpenAI(
api_key=api_key or os.getenv('OPENAI_API_KEY'),
base_url=base_url
)
messages = [
{
"role": "system",
"content": """你是一名经验丰富的短剧编剧,擅长根据字幕内容按照先后顺序分析关键剧情,并找出 %s 个关键片段。
请返回一个JSON对象包含以下字段
{
"summary": "整体剧情梗概",
"plot_titles": [
"关键剧情1",
"关键剧情2",
"关键剧情3",
"关键剧情4",
"关键剧情5",
"..."
]
}
请确保返回的是合法的JSON格式, 请确保返回的是 %s 个片段
""" % (custom_clips, custom_clips)
},
{
"role": "user",
"content": f"srt字幕如下{subtitle_content}"
}
]
# DeepSeek R1 和 V3 不支持 response_format=json_object
try:
completion = client.chat.completions.create(
model=model_name,
messages=messages,
response_format={"type": "json_object"}
)
summary_data = json.loads(completion.choices[0].message.content)
except BadRequestError as e:
completion = client.chat.completions.create(
model=model_name,
messages=messages
)
# 去除 completion 字符串前的 ```json 和 结尾的 ```
completion = completion.choices[0].message.content.replace("```json", "").replace("```", "")
summary_data = json.loads(completion)
except Exception as e:
raise Exception(f"大模型解析发生错误:{str(e)}\n{traceback.format_exc()}")
print(json.dumps(summary_data, indent=4, ensure_ascii=False))
# 获取爆点时间段分析
prompt = f"""剧情梗概:
{summary_data['summary']}
需要定位的爆点内容
"""
print(f"找到 {len(summary_data['plot_titles'])} 个片段")
for i, point in enumerate(summary_data['plot_titles'], 1):
prompt += f"{i}. {point}\n"
messages = [
{
"role": "system",
"content": """你是一名短剧编剧,非常擅长根据字幕中分析视频中关键剧情出现的具体时间段。
请仔细阅读剧情梗概和爆点内容然后在字幕中找出每个爆点发生的具体时间段和爆点前后的详细剧情
请返回一个JSON对象包含一个名为"plot_points"的数组数组中包含多个对象每个对象都要包含以下字段
{
"plot_points": [
{
"timestamp": "时间段格式为xx:xx:xx,xxx-xx:xx:xx,xxx",
"title": "关键剧情的主题",
"picture": "关键剧情前后的详细剧情描述"
}
]
}
请确保返回的是合法的JSON格式"""
},
{
"role": "user",
"content": f"""字幕内容:
{subtitle_content}
{prompt}"""
}
]
# DeepSeek R1 和 V3 不支持 response_format=json_object
try:
completion = client.chat.completions.create(
model=model_name,
messages=messages,
response_format={"type": "json_object"}
)
plot_points_data = json.loads(completion.choices[0].message.content)
except BadRequestError as e:
completion = client.chat.completions.create(
model=model_name,
messages=messages
)
# 去除 completion 字符串前的 ```json 和 结尾的 ```
completion = completion.choices[0].message.content.replace("```json", "").replace("```", "")
plot_points_data = json.loads(completion)
except Exception as e:
raise Exception(f"大模型解析错误:{str(e)}\n{traceback.format_exc()}")
print(json.dumps(plot_points_data, indent=4, ensure_ascii=False))
# 合并结果
return {
"plot_summary": summary_data,
"plot_points": plot_points_data["plot_points"]
}
except Exception as e:
raise Exception(f"分析字幕时发生错误:{str(e)}\n{traceback.format_exc()}")

View File

@ -0,0 +1,69 @@
"""
合并生成最终脚本
"""
import os
import json
from typing import List, Dict, Tuple
def merge_script(
plot_points: List[Dict],
output_path: str
):
"""合并生成最终脚本
Args:
plot_points: 校对后的剧情点
output_path: 输出文件路径如果提供则保存到文件
Returns:
str: 最终合并的脚本
"""
def parse_timestamp(ts: str) -> Tuple[float, float]:
"""解析时间戳,返回开始和结束时间(秒)"""
start, end = ts.split('-')
def parse_time(time_str: str) -> float:
time_str = time_str.strip()
if ',' in time_str:
time_parts, ms_parts = time_str.split(',')
ms = float(ms_parts) / 1000
else:
time_parts = time_str
ms = 0
hours, minutes, seconds = map(int, time_parts.split(':'))
return hours * 3600 + minutes * 60 + seconds + ms
return parse_time(start), parse_time(end)
def format_timestamp(seconds: float) -> str:
"""将秒数转换为时间戳格式 HH:MM:SS"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
# 创建包含所有信息的临时列表
final_script = []
# 处理原生画面条目
number = 1
for plot_point in plot_points:
start, end = parse_timestamp(plot_point["timestamp"])
script_item = {
"_id": number,
"timestamp": plot_point["timestamp"],
"picture": plot_point["picture"],
"narration": f"播放原生_{os.urandom(4).hex()}",
"OST": 1, # OST=0 仅保留解说 OST=2 保留解说和原声
}
final_script.append(script_item)
number += 1
# 保存结果
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(final_script, f, ensure_ascii=False, indent=4)
print(f"脚本生成完成:{output_path}")
return final_script

View File

@ -0,0 +1,45 @@
# 公共方法
import json
import requests # 新增
from typing import List, Dict
def load_srt(file_path: str) -> List[Dict]:
"""加载并解析SRT文件
Args:
file_path: SRT文件路径
Returns:
字幕内容列表
"""
with open(file_path, 'r', encoding='utf-8-sig') as f:
content = f.read().strip()
# 按空行分割字幕块
subtitle_blocks = content.split('\n\n')
subtitles = []
for block in subtitle_blocks:
lines = block.split('\n')
if len(lines) >= 3: # 确保块包含足够的行
try:
number = int(lines[0].strip())
timestamp = lines[1]
text = ' '.join(lines[2:])
# 解析时间戳
start_time, end_time = timestamp.split(' --> ')
subtitles.append({
'number': number,
'timestamp': timestamp,
'text': text,
'start_time': start_time,
'end_time': end_time
})
except ValueError as e:
print(f"Warning: 跳过无效的字幕块: {e}")
continue
return subtitles

Binary file not shown.

Binary file not shown.

View File

@ -237,28 +237,28 @@ if __name__ == '__main__':
video_frame_description_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_1139.json" video_frame_description_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_1139.json"
# 测试新的JSON文件 # 测试新的JSON文件
test_file_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_1458.json" test_file_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_2258.json"
markdown_output = parse_frame_analysis_to_markdown(test_file_path) markdown_output = parse_frame_analysis_to_markdown(test_file_path)
# print(markdown_output) # print(markdown_output)
# 输出到文件以便检查格式 # 输出到文件以便检查格式
output_file = "/Users/apple/Desktop/home/NarratoAI/storage/temp/narration_script.md" output_file = "/Users/apple/Desktop/home/NarratoAI/storage/temp/家里家外1-5.md"
with open(output_file, 'w', encoding='utf-8') as f: with open(output_file, 'w', encoding='utf-8') as f:
f.write(markdown_output) f.write(markdown_output)
# print(f"\n已将Markdown输出保存到: {output_file}") # print(f"\n已将Markdown输出保存到: {output_file}")
# 生成解说文案 # # 生成解说文案
narration = generate_narration( # narration = generate_narration(
markdown_output, # markdown_output,
text_api_key, # text_api_key,
base_url=text_base_url, # base_url=text_base_url,
model=text_model # model=text_model
) # )
#
# 保存解说文案 # # 保存解说文案
print(narration) # print(narration)
print(type(narration)) # print(type(narration))
narration_file = "/Users/apple/Desktop/home/NarratoAI/storage/temp/final_narration_script.json" # narration_file = "/Users/apple/Desktop/home/NarratoAI/storage/temp/final_narration_script.json"
with open(narration_file, 'w', encoding='utf-8') as f: # with open(narration_file, 'w', encoding='utf-8') as f:
f.write(narration) # f.write(narration)
print(f"\n已将解说文案保存到: {narration_file}") # print(f"\n已将解说文案保存到: {narration_file}")

View File

@ -325,6 +325,15 @@ def video_dir(sub_dir: str = ""):
return d return d
def subtitle_dir(sub_dir: str = ""):
d = resource_dir(f"srt")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def split_timestamp(timestamp): def split_timestamp(timestamp):
""" """
拆分时间戳 拆分时间戳

View File

@ -1,175 +1,86 @@
[app] [app]
project_version="0.6.0" project_version="0.6.0"
# 支持视频理解的大模型提供商 # 支持视频理解的大模型提供商
# gemini # gemini (谷歌, 需要 VPN)
# qwenvl # siliconflow (硅基流动)
vision_llm_provider="qwenvl" # qwenvl (通义千问)
vision_llm_provider="Siliconflow"
########## Vision Gemini API Key ########## Gemini 视觉模型
vision_gemini_api_key = "" vision_gemini_api_key = ""
vision_gemini_model_name = "gemini-2.0-flash" vision_gemini_model_name = "gemini-2.0-flash-lite"
########## Vision Qwen API Key (默认使用“硅基流动”的QwenVL模型) ########## QwenVL 视觉模型
vision_qwenvl_api_key = "" vision_qwenvl_api_key = ""
vision_qwenvl_model_name = "Qwen/Qwen2.5-VL-32B-Instruct" vision_qwenvl_model_name = "qwen2.5-vl-32b-instruct"
vision_qwenvl_base_url = "https://api.siliconflow.cn/v1" vision_qwenvl_base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
########### Vision NarratoAPI Key ########## siliconflow 视觉模型
narrato_api_key = "ggyY91BAO-_ULvAqKum3XexcyN1G3dP86DEzvjZDcrg" vision_siliconflow_api_key = ""
narrato_api_url = "https://narratoinsight.scsmtech.cn/api/v1" vision_siliconflow_model_name = "Qwen/Qwen2.5-VL-32B-Instruct"
narrato_vision_model = "gemini-1.5-flash" vision_siliconflow_base_url = "https://api.siliconflow.cn/v1"
narrato_vision_key = ""
narrato_llm_model = "gpt-4o" ########## OpenAI 视觉模型
narrato_llm_key = "" vision_openai_api_key = ""
vision_openai_model_name = "gpt-4.1-nano-2025-04-14"
vision_openai_base_url = "https://api.openai.com/v1"
########### NarratoAPI 微调模型 (未发布)
narrato_api_key = ""
narrato_api_url = ""
narrato_model = "narra-1.0-2025-05-09"
# 用于生成文案的大模型支持的提供商 (Supported providers): # 用于生成文案的大模型支持的提供商 (Supported providers):
# openai (默认) # openai (默认, 需要 VPN)
# deepseek (默认使用“硅基流动”的模型) # siliconflow (硅基流动)
# moonshot (月之暗面) # deepseek (深度求索)
# gemini (谷歌, 需要 VPN)
# qwen (通义千问) # qwen (通义千问)
# gemini # moonshot (月之暗面)
text_llm_provider="deepseek" text_llm_provider="openai"
########## OpenAI API Key ########## OpenAI API Key
# Get your API key at https://platform.openai.com/api-keys # Get your API key at https://platform.openai.com/api-keys
text_openai_api_key = "" text_openai_api_key = ""
text_openai_base_url = "https://api.openai.com/v1" text_openai_base_url = "https://api.openai.com/v1"
text_openai_model_name = "gpt-4o-mini" text_openai_model_name = "gpt-4.1-mini-2025-04-14"
# 使用 硅基流动 第三方 API Key使用手机号注册https://cloud.siliconflow.cn/i/pyOKqFCV
# 访问 https://cloud.siliconflow.cn/account/ak 获取你的 API 密钥
text_siliconflow_api_key = ""
text_siliconflow_base_url = "https://api.siliconflow.cn/v1"
text_siliconflow_model_name = "deepseek-ai/DeepSeek-R1"
########## DeepSeek API Key ########## DeepSeek API Key
# 使用 硅基流动 第三方 API Key使用手机号注册https://cloud.siliconflow.cn/i/pyOKqFCV # 访问 https://platform.deepseek.com/api_keys 获取你的 API 密钥
text_deepseek_api_key = "" text_deepseek_api_key = ""
text_deepseek_base_url = "https://api.siliconflow.cn/v1" text_deepseek_base_url = "https://api.deepseek.com"
text_deepseek_model_name = "deepseek-ai/DeepSeek-V3" text_deepseek_model_name = "deepseek-chat"
########## Moonshot API Key
# Visit https://platform.moonshot.cn/console/api-keys to get your API key.
text_moonshot_api_key=""
text_moonshot_base_url = "https://api.moonshot.cn/v1"
text_moonshot_model_name = "moonshot-v1-8k"
########## G4F
# Visit https://github.com/xtekky/gpt4free to get more details
# Supported model list: https://github.com/xtekky/gpt4free/blob/main/g4f/models.py
text_g4f_model_name = "gpt-3.5-turbo"
########## Azure API Key
# Visit https://learn.microsoft.com/zh-cn/azure/ai-services/openai/ to get more details
# API documentation: https://learn.microsoft.com/zh-cn/azure/ai-services/openai/reference
text_azure_api_key = ""
text_azure_base_url=""
text_azure_model_name="gpt-35-turbo" # replace with your model deployment name
text_azure_api_version = "2024-02-15-preview"
########## Gemini API Key ########## Gemini API Key
text_gemini_api_key="" text_gemini_api_key=""
text_gemini_model_name = "gemini-1.5-flash" text_gemini_model_name = "gemini-2.0-flash"
text_gemini_base_url = "https://generativelanguage.googleapis.com/v1beta/openai"
########## Qwen API Key ########## Qwen API Key
# Visit https://dashscope.console.aliyun.com/apiKey to get your API key # 访问 https://bailian.console.aliyun.com/?tab=model#/api-key 获取你的 API 密钥
# Visit below links to get more details
# https://tongyi.aliyun.com/qianwen/
# https://help.aliyun.com/zh/dashscope/developer-reference/model-introduction
text_qwen_api_key = "" text_qwen_api_key = ""
text_qwen_model_name = "qwen-plus-1127" text_qwen_model_name = "qwen-plus-1127"
text_qwen_base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" text_qwen_base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
########## Moonshot API Key
# 字幕提供商、可选,支持 whisper 和 faster-whisper-large-v2"whisper" # 访问 https://platform.moonshot.cn/console/api-keys 获取你的 API 密钥
# 默认为 faster-whisper-large-v2 模型地址https://huggingface.co/guillaumekln/faster-whisper-large-v2 text_moonshot_api_key=""
subtitle_provider = "faster-whisper-large-v2" text_moonshot_base_url = "https://api.moonshot.cn/v1"
subtitle_enabled = true text_moonshot_model_name = "moonshot-v1-8k"
# ImageMagick
# 安装后,将自动检测到 ImageMagickWindows 除外!
# 例如,在 Windows 上 "C:\Program Files (x86)\ImageMagick-7.1.1-Q16-HDRI\magick.exe"
# 下载位置 https://imagemagick.org/archive/binaries/ImageMagick-7.1.1-29-Q16-x64-static.exe
# imagemagick_path = "C:\\Program Files (x86)\\ImageMagick-7.1.1-Q16\\magick.exe"
# FFMPEG
#
# 通常情况下ffmpeg 会被自动下载,并且会被自动检测到。
# 但是如果你的环境有问题,无法自动下载,可能会遇到如下错误:
# RuntimeError: No ffmpeg exe could be found.
# Install ffmpeg on your system, or set the IMAGEIO_FFMPEG_EXE environment variable.
# 此时你可以手动下载 ffmpeg 并设置 ffmpeg_path下载地址https://www.gyan.dev/ffmpeg/builds/
# ffmpeg_path = "C:\\Users\\harry\\Downloads\\ffmpeg.exe"
#########################################################################################
# 当视频生成成功后API服务提供的视频下载接入点默认为当前服务的地址和监听端口
# 比如 http://127.0.0.1:8080/tasks/6357f542-a4e1-46a1-b4c9-bf3bd0df5285/final-1.mp4
# 如果你需要使用域名对外提供服务一般会用nginx做代理则可以设置为你的域名
# 比如 https://xxxx.com/tasks/6357f542-a4e1-46a1-b4c9-bf3bd0df5285/final-1.mp4
# endpoint="https://xxxx.com"
# When the video is successfully generated, the API service provides a download endpoint for the video, defaulting to the service's current address and listening port.
# For example, http://127.0.0.1:8080/tasks/6357f542-a4e1-46a1-b4c9-bf3bd0df5285/final-1.mp4
# If you need to provide the service externally using a domain name (usually done with nginx as a proxy), you can set it to your domain name.
# For example, https://xxxx.com/tasks/6357f542-a4e1-46a1-b4c9-bf3bd0df5285/final-1.mp4
# endpoint="https://xxxx.com"
endpoint=""
# Video material storage location
# material_directory = "" # Indicates that video materials will be downloaded to the default folder, the default folder is ./storage/cache_videos under the current project
# material_directory = "/user/harry/videos" # Indicates that video materials will be downloaded to a specified folder
# material_directory = "task" # Indicates that video materials will be downloaded to the current task's folder, this method does not allow sharing of already downloaded video materials
# 视频素材存放位置
# material_directory = "" #表示将视频素材下载到默认的文件夹,默认文件夹为当前项目下的 ./storage/cache_videos
# material_directory = "/user/harry/videos" #表示将视频素材下载到指定的文件夹中
# material_directory = "task" #表示将视频素材下载到当前任务的文件夹中,这种方式无法共享已经下载的视频素材
material_directory = ""
# 用于任务的状态管理
enable_redis = false
redis_host = "localhost"
redis_port = 6379
redis_db = 0
redis_password = ""
# 文生视频时的最大并发任务数
max_concurrent_tasks = 5
# webui界面是否显示配置项 # webui界面是否显示配置项
hide_config = false hide_config = true
[whisper]
# Only effective when subtitle_provider is "whisper"
# Run on GPU with FP16
# model = WhisperModel(model_size, device="cuda", compute_type="float16")
# Run on GPU with INT8
# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")
# Run on CPU with INT8
# model = WhisperModel(model_size, device="cpu", compute_type="int8")
# recommended model_size: "large-v3"
model_size="faster-whisper-large-v2"
# 如果要使用 GPU请设置 device=“cuda”
device="CPU"
compute_type="int8"
[proxy] [proxy]
### Use a proxy to access the Pexels API
### Format: "http://<username>:<password>@<proxy>:<port>"
### Example: "http://user:pass@proxy:1234"
### Doc: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
http = "http://127.0.0.1:7890" http = "http://127.0.0.1:7890"
https = "http://127.0.0.1:7890" https = "http://127.0.0.1:7890"
enabled = false
[azure]
# Azure Speech API Key
# Get your API key at https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices
speech_key=""
speech_region=""
[frames] [frames]
# 提取关键帧的间隔时间 # 提取关键帧的间隔时间

View File

@ -5,7 +5,7 @@ from loguru import logger
from app.config import config from app.config import config
from webui.components import basic_settings, video_settings, audio_settings, subtitle_settings, script_settings, \ from webui.components import basic_settings, video_settings, audio_settings, subtitle_settings, script_settings, \
review_settings, merge_settings, system_settings review_settings, merge_settings, system_settings
from webui.utils import cache, file_utils # from webui.utils import cache, file_utils
from app.utils import utils from app.utils import utils
from app.models.schema import VideoClipParams, VideoAspect from app.models.schema import VideoClipParams, VideoAspect
@ -184,7 +184,7 @@ def render_generate_button():
except Exception as e: except Exception as e:
logger.error(f"播放视频失败: {e}") logger.error(f"播放视频失败: {e}")
file_utils.open_task_folder(config.root_dir, task_id) # file_utils.open_task_folder(config.root_dir, task_id)
logger.info(tr("视频生成完成")) logger.info(tr("视频生成完成"))

View File

@ -64,25 +64,25 @@ def render_proxy_settings(tr):
proxy_enabled = st.checkbox(tr("Enable Proxy"), value=proxy_enabled) proxy_enabled = st.checkbox(tr("Enable Proxy"), value=proxy_enabled)
# 保存代理开关状态 # 保存代理开关状态
config.proxy["enabled"] = proxy_enabled # config.proxy["enabled"] = proxy_enabled
# 只有在代理启用时才显示代理设置输入框 # 只有在代理启用时才显示代理设置输入框
if proxy_enabled: if proxy_enabled:
HTTP_PROXY = st.text_input(tr("HTTP_PROXY"), value=proxy_url_http) HTTP_PROXY = st.text_input(tr("HTTP_PROXY"), value=proxy_url_http)
HTTPS_PROXY = st.text_input(tr("HTTPs_PROXY"), value=proxy_url_https) HTTPS_PROXY = st.text_input(tr("HTTPs_PROXY"), value=proxy_url_https)
if HTTP_PROXY: if HTTP_PROXY and HTTPS_PROXY:
config.proxy["http"] = HTTP_PROXY config.proxy["http"] = HTTP_PROXY
os.environ["HTTP_PROXY"] = HTTP_PROXY
if HTTPS_PROXY:
config.proxy["https"] = HTTPS_PROXY config.proxy["https"] = HTTPS_PROXY
os.environ["HTTP_PROXY"] = HTTP_PROXY
os.environ["HTTPS_PROXY"] = HTTPS_PROXY os.environ["HTTPS_PROXY"] = HTTPS_PROXY
# logger.debug(f"代理已启用: {HTTP_PROXY}")
else: else:
# 当代理被禁用时,清除环境变量和配置 # 当代理被禁用时,清除环境变量和配置
os.environ.pop("HTTP_PROXY", None) os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None) os.environ.pop("HTTPS_PROXY", None)
config.proxy["http"] = "" # config.proxy["http"] = ""
config.proxy["https"] = "" # config.proxy["https"] = ""
def test_vision_model_connection(api_key, base_url, model_name, provider, tr): def test_vision_model_connection(api_key, base_url, model_name, provider, tr):
@ -108,29 +108,6 @@ def test_vision_model_connection(api_key, base_url, model_name, provider, tr):
return True, tr("gemini model is available") return True, tr("gemini model is available")
except Exception as e: except Exception as e:
return False, f"{tr('gemini model is not available')}: {str(e)}" return False, f"{tr('gemini model is not available')}: {str(e)}"
elif provider.lower() == 'qwenvl':
from openai import OpenAI
try:
client = OpenAI(
api_key=api_key,
base_url=base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
)
# 发送一个简单的测试请求
response = client.chat.completions.create(
model=model_name or "qwen-vl-max-latest",
messages=[{"role": "user", "content": "直接回复我文本'当前网络可用'"}]
)
if response and response.choices:
return True, tr("QwenVL model is available")
else:
return False, tr("QwenVL model returned invalid response")
except Exception as e:
return False, f"{tr('QwenVL model is not available')}: {str(e)}"
elif provider.lower() == 'narratoapi': elif provider.lower() == 'narratoapi':
import requests import requests
try: try:
@ -148,9 +125,46 @@ def test_vision_model_connection(api_key, base_url, model_name, provider, tr):
return False, f"{tr('NarratoAPI is not available')}: HTTP {response.status_code}" return False, f"{tr('NarratoAPI is not available')}: HTTP {response.status_code}"
except Exception as e: except Exception as e:
return False, f"{tr('NarratoAPI is not available')}: {str(e)}" return False, f"{tr('NarratoAPI is not available')}: {str(e)}"
else: else:
return False, f"{tr('Unsupported provider')}: {provider}" from openai import OpenAI
try:
client = OpenAI(
api_key=api_key,
base_url=base_url,
)
response = client.chat.completions.create(
model=model_name,
messages=[
{
"role": "system",
"content": [{"type": "text", "text": "You are a helpful assistant."}],
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20241022/emyrja/dog_and_girl.jpeg"
},
},
{"type": "text", "text": "回复我网络可用即可"},
],
},
],
)
if response and response.choices:
return True, tr("QwenVL model is available")
else:
return False, tr("QwenVL model returned invalid response")
except Exception as e:
# logger.debug(api_key)
# logger.debug(base_url)
# logger.debug(model_name)
return False, f"{tr('QwenVL model is not available')}: {str(e)}"
def render_vision_llm_settings(tr): def render_vision_llm_settings(tr):
@ -158,7 +172,7 @@ def render_vision_llm_settings(tr):
st.subheader(tr("Vision Model Settings")) st.subheader(tr("Vision Model Settings"))
# 视频分析模型提供商选择 # 视频分析模型提供商选择
vision_providers = ['Gemini', 'QwenVL', 'NarratoAPI(待发布)'] vision_providers = ['Siliconflow', 'Gemini', 'QwenVL', 'OpenAI']
saved_vision_provider = config.app.get("vision_llm_provider", "Gemini").lower() saved_vision_provider = config.app.get("vision_llm_provider", "Gemini").lower()
saved_provider_index = 0 saved_provider_index = 0
@ -194,8 +208,8 @@ def render_vision_llm_settings(tr):
) )
st_vision_model_name = st.text_input( st_vision_model_name = st.text_input(
tr("Vision Model Name"), tr("Vision Model Name"),
value=vision_model_name or "gemini-1.5-flash", value=vision_model_name or "gemini-2.0-flash-lite",
help=tr("Default: gemini-1.5-flash") help=tr("Default: gemini-2.0-flash-lite")
) )
elif vision_provider == 'qwenvl': elif vision_provider == 'qwenvl':
st_vision_base_url = st.text_input( st_vision_base_url = st.text_input(
@ -261,52 +275,45 @@ def test_text_model_connection(api_key, base_url, model_name, provider, tr):
"Authorization": f"Bearer {api_key}", "Authorization": f"Bearer {api_key}",
"Content-Type": "application/json" "Content-Type": "application/json"
} }
# 如果没有指定base_url使用默认值
if not base_url:
if provider.lower() == 'openai':
base_url = "https://api.openai.com/v1"
elif provider.lower() == 'moonshot':
base_url = "https://api.moonshot.cn/v1"
elif provider.lower() == 'deepseek':
base_url = "https://api.deepseek.com"
# 构建测试URL
test_url = f"{base_url.rstrip('/')}/chat/completions"
# 特殊处理Gemini # 特殊处理Gemini
if provider.lower() == 'gemini': if provider.lower() == 'gemini':
import google.generativeai as genai import google.generativeai as genai
try: try:
genai.configure(api_key=api_key) genai.configure(api_key=api_key)
model = genai.GenerativeModel(model_name or 'gemini-pro') model = genai.GenerativeModel(model_name)
model.generate_content("直接回复我文本'当前网络可用'") model.generate_content("直接回复我文本'当前网络可用'")
return True, tr("Gemini model is available") return True, tr("Gemini model is available")
except Exception as e: except Exception as e:
return False, f"{tr('Gemini model is not available')}: {str(e)}" return False, f"{tr('Gemini model is not available')}: {str(e)}"
# 构建测试消息
test_data = {
"model": model_name,
"messages": [
{"role": "user", "content": "直接回复我文本'当前网络可用'"}
],
"stream": False
}
# 发送测试请求
response = requests.post(
test_url,
headers=headers,
json=test_data,
)
if response.status_code == 200:
return True, tr("Text model is available")
else: else:
return False, f"{tr('Text model is not available')}: HTTP {response.status_code}" test_url = f"{base_url.rstrip('/')}/chat/completions"
# 构建测试消息
test_data = {
"model": model_name,
"messages": [
{"role": "user", "content": "直接回复我文本'当前网络可用'"}
],
"stream": False
}
# 发送测试请求
response = requests.post(
test_url,
headers=headers,
json=test_data,
)
# logger.debug(model_name)
# logger.debug(api_key)
# logger.debug(test_url)
if response.status_code == 200:
return True, tr("Text model is available")
else:
return False, f"{tr('Text model is not available')}: HTTP {response.status_code}"
except Exception as e: except Exception as e:
logger.error(traceback.format_exc())
return False, f"{tr('Connection failed')}: {str(e)}" return False, f"{tr('Connection failed')}: {str(e)}"
@ -315,8 +322,8 @@ def render_text_llm_settings(tr):
st.subheader(tr("Text Generation Model Settings")) st.subheader(tr("Text Generation Model Settings"))
# 文案生成模型提供商选择 # 文案生成模型提供商选择
text_providers = ['DeepSeek', 'OpenAI', 'Siliconflow', 'Qwen', 'Moonshot', 'Gemini'] text_providers = ['OpenAI', 'Siliconflow', 'DeepSeek', 'Gemini', 'Qwen', 'Moonshot']
saved_text_provider = config.app.get("text_llm_provider", "DeepSeek").lower() saved_text_provider = config.app.get("text_llm_provider", "OpenAI").lower()
saved_provider_index = 0 saved_provider_index = 0
for i, provider in enumerate(text_providers): for i, provider in enumerate(text_providers):
@ -344,8 +351,6 @@ def render_text_llm_settings(tr):
# 添加测试按钮 # 添加测试按钮
if st.button(tr("Test Connection"), key="test_text_connection"): if st.button(tr("Test Connection"), key="test_text_connection"):
logger.debug(st_text_base_url)
logger.debug(st_text_model_name)
with st.spinner(tr("Testing connection...")): with st.spinner(tr("Testing connection...")):
success, message = test_text_model_connection( success, message = test_text_model_connection(
api_key=st_text_api_key, api_key=st_text_api_key,

View File

@ -11,6 +11,7 @@ from app.models.schema import VideoClipParams
from app.utils import utils, check_script from app.utils import utils, check_script
from webui.tools.generate_script_docu import generate_script_docu from webui.tools.generate_script_docu import generate_script_docu
from webui.tools.generate_script_short import generate_script_short from webui.tools.generate_script_short import generate_script_short
from webui.tools.generate_short_summary import generate_script_short_sunmmary
def render_script_panel(tr): def render_script_panel(tr):
@ -27,15 +28,20 @@ def render_script_panel(tr):
# 获取当前选择的脚本类型 # 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '') script_path = st.session_state.get('video_clip_json_path', '')
# 根据脚本类型显示不同的布局 # 根据脚本类型显示不同的布局
if script_path == "short": if script_path == "auto":
# Short Generate模式下显示的内容 # 画面解说
render_short_generate_options(tr)
else:
# 其他模式下保持原有布局
# 渲染视频主题和提示词
render_video_details(tr) render_video_details(tr)
elif script_path == "short":
# 短剧混剪
render_short_generate_options(tr)
elif script_path == "summary":
# 短剧解说
short_drama_summary(tr)
else:
# 默认为空
pass
# 渲染脚本操作按钮 # 渲染脚本操作按钮
render_script_buttons(tr, params) render_script_buttons(tr, params)
@ -44,9 +50,10 @@ def render_script_panel(tr):
def render_script_file(tr, params): def render_script_file(tr, params):
"""渲染脚本文件选择""" """渲染脚本文件选择"""
script_list = [ script_list = [
(tr("None"), ""), (tr("None"), ""),
(tr("Auto Generate"), "auto"), (tr("Auto Generate"), "auto"),
(tr("Short Generate"), "short"), (tr("Short Generate"), "short"),
(tr("Short Drama Summary"), "summary"),
(tr("Upload Script"), "upload_script") (tr("Upload Script"), "upload_script")
] ]
@ -100,11 +107,11 @@ def render_script_file(tr, params):
# 读取上传的JSON内容并验证格式 # 读取上传的JSON内容并验证格式
script_content = uploaded_file.read().decode('utf-8') script_content = uploaded_file.read().decode('utf-8')
json_data = json.loads(script_content) json_data = json.loads(script_content)
# 保存到脚本目录 # 保存到脚本目录
script_file_path = os.path.join(script_dir, uploaded_file.name) script_file_path = os.path.join(script_dir, uploaded_file.name)
file_name, file_extension = os.path.splitext(uploaded_file.name) file_name, file_extension = os.path.splitext(uploaded_file.name)
# 如果文件已存在,添加时间戳 # 如果文件已存在,添加时间戳
if os.path.exists(script_file_path): if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S") timestamp = time.strftime("%Y%m%d%H%M%S")
@ -114,14 +121,14 @@ def render_script_file(tr, params):
# 写入文件 # 写入文件
with open(script_file_path, "w", encoding='utf-8') as f: with open(script_file_path, "w", encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2) json.dump(json_data, f, ensure_ascii=False, indent=2)
# 更新状态 # 更新状态
st.success(tr("Script Uploaded Successfully")) st.success(tr("Script Uploaded Successfully"))
st.session_state['video_clip_json_path'] = script_file_path st.session_state['video_clip_json_path'] = script_file_path
params.video_clip_json_path = script_file_path params.video_clip_json_path = script_file_path
time.sleep(1) time.sleep(1)
st.rerun() st.rerun()
except json.JSONDecodeError: except json.JSONDecodeError:
st.error(tr("Invalid JSON format")) st.error(tr("Invalid JSON format"))
except Exception as e: except Exception as e:
@ -180,6 +187,7 @@ def render_short_generate_options(tr):
渲染Short Generate模式下的特殊选项 渲染Short Generate模式下的特殊选项
在Short Generate模式下替换原有的输入框为自定义片段选项 在Short Generate模式下替换原有的输入框为自定义片段选项
""" """
short_drama_summary(tr)
# 显示自定义片段数量选择器 # 显示自定义片段数量选择器
custom_clips = st.number_input( custom_clips = st.number_input(
tr("自定义片段"), tr("自定义片段"),
@ -193,7 +201,7 @@ def render_short_generate_options(tr):
def render_video_details(tr): def render_video_details(tr):
"""渲染视频主题和提示词""" """画面解说 渲染视频主题和提示词"""
video_theme = st.text_input(tr("Video Theme")) video_theme = st.text_input(tr("Video Theme"))
custom_prompt = st.text_area( custom_prompt = st.text_area(
tr("Generation Prompt"), tr("Generation Prompt"),
@ -201,44 +209,104 @@ def render_video_details(tr):
help=tr("Custom prompt for LLM, leave empty to use default prompt"), help=tr("Custom prompt for LLM, leave empty to use default prompt"),
height=180 height=180
) )
# 非短视频模式下显示原有的三个输入框
input_cols = st.columns(2)
with input_cols[0]:
st.number_input(
tr("Frame Interval (seconds)"),
min_value=0,
value=st.session_state.get('frame_interval_input', config.frames.get('frame_interval_input', 3)),
help=tr("Frame Interval (seconds) (More keyframes consume more tokens)"),
key="frame_interval_input"
)
with input_cols[1]:
st.number_input(
tr("Batch Size"),
min_value=0,
value=st.session_state.get('vision_batch_size', config.frames.get('vision_batch_size', 10)),
help=tr("Batch Size (More keyframes consume more tokens)"),
key="vision_batch_size"
)
st.session_state['video_theme'] = video_theme st.session_state['video_theme'] = video_theme
st.session_state['custom_prompt'] = custom_prompt st.session_state['custom_prompt'] = custom_prompt
return video_theme, custom_prompt return video_theme, custom_prompt
def short_drama_summary(tr):
"""短剧解说 渲染视频主题和提示词"""
# 检查是否已经处理过字幕文件
if 'subtitle_file_processed' not in st.session_state:
st.session_state['subtitle_file_processed'] = False
subtitle_file = st.file_uploader(
tr("上传字幕文件"),
type=["srt"],
accept_multiple_files=False,
key="subtitle_file_uploader" # 添加唯一key
)
# 显示当前已上传的字幕文件路径
if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']:
st.info(f"已上传字幕: {os.path.basename(st.session_state['subtitle_path'])}")
if st.button(tr("清除已上传字幕")):
st.session_state['subtitle_path'] = None
st.session_state['subtitle_file_processed'] = False
st.rerun()
# 只有当有文件上传且尚未处理时才执行处理逻辑
if subtitle_file is not None and not st.session_state['subtitle_file_processed']:
try:
# 读取上传的SRT内容
script_content = subtitle_file.read().decode('utf-8')
# 保存到字幕目录
script_file_path = os.path.join(utils.subtitle_dir(), subtitle_file.name)
file_name, file_extension = os.path.splitext(subtitle_file.name)
# 如果文件已存在,添加时间戳
if os.path.exists(script_file_path):
timestamp = time.strftime("%Y%m%d%H%M%S")
file_name_with_timestamp = f"{file_name}_{timestamp}"
script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension)
# 直接写入SRT内容不进行JSON转换
with open(script_file_path, "w", encoding='utf-8') as f:
f.write(script_content)
# 更新状态
st.success(tr("字幕上传成功"))
st.session_state['subtitle_path'] = script_file_path
st.session_state['subtitle_file_processed'] = True # 标记已处理
# 避免使用rerun使用更新状态的方式
# st.rerun()
except Exception as e:
st.error(f"{tr('Upload failed')}: {str(e)}")
# 名称输入框
video_theme = st.text_input(tr("短剧名称"))
st.session_state['video_theme'] = video_theme
# 数字输入框
temperature = st.slider("temperature", 0.0, 2.0, 0.7)
st.session_state['temperature'] = temperature
return video_theme
def render_script_buttons(tr, params): def render_script_buttons(tr, params):
"""渲染脚本操作按钮""" """渲染脚本操作按钮"""
# 获取当前选择的脚本类型 # 获取当前选择的脚本类型
script_path = st.session_state.get('video_clip_json_path', '') script_path = st.session_state.get('video_clip_json_path', '')
# 根据脚本类型显示不同的设置
if script_path != "short":
# 非短视频模式下显示原有的三个输入框
input_cols = st.columns(2)
with input_cols[0]:
st.number_input(
tr("Frame Interval (seconds)"),
min_value=0,
value=st.session_state.get('frame_interval_input', config.frames.get('frame_interval_input', 3)),
help=tr("Frame Interval (seconds) (More keyframes consume more tokens)"),
key="frame_interval_input"
)
with input_cols[1]:
st.number_input(
tr("Batch Size"),
min_value=0,
value=st.session_state.get('vision_batch_size', config.frames.get('vision_batch_size', 10)),
help=tr("Batch Size (More keyframes consume more tokens)"),
key="vision_batch_size"
)
# 生成/加载按钮 # 生成/加载按钮
if script_path == "auto": if script_path == "auto":
button_name = tr("Generate Video Script") button_name = tr("Generate Video Script")
elif script_path == "short": elif script_path == "short":
button_name = tr("Generate Short Video Script") button_name = tr("Generate Short Video Script")
elif script_path == "summary":
button_name = tr("生成短剧解说脚本")
elif script_path.endswith("json"): elif script_path.endswith("json"):
button_name = tr("Load Video Script") button_name = tr("Load Video Script")
else: else:
@ -249,10 +317,15 @@ def render_script_buttons(tr, params):
# 执行纪录片视频脚本生成(视频无字幕无配音) # 执行纪录片视频脚本生成(视频无字幕无配音)
generate_script_docu(params) generate_script_docu(params)
elif script_path == "short": elif script_path == "short":
# 获取自定义片段数量参数 # 执行 短剧混剪 脚本生成
custom_clips = st.session_state.get('custom_clips', 5) custom_clips = st.session_state.get('custom_clips')
# 直接将custom_clips作为参数传递而不是通过params对象
generate_script_short(tr, params, custom_clips) generate_script_short(tr, params, custom_clips)
elif script_path == "summary":
# 执行 短剧解说 脚本生成
subtitle_path = st.session_state.get('subtitle_path')
video_theme = st.session_state.get('video_theme')
temperature = st.session_state.get('temperature')
generate_script_short_sunmmary(params, subtitle_path, video_theme, temperature)
else: else:
load_script(tr, script_path) load_script(tr, script_path)

View File

@ -195,6 +195,7 @@
"Frame Interval (seconds)": "帧间隔 (秒)", "Frame Interval (seconds)": "帧间隔 (秒)",
"Frame Interval (seconds) (More keyframes consume more tokens)": "帧间隔 (秒) (更多关键帧消耗更多令牌)", "Frame Interval (seconds) (More keyframes consume more tokens)": "帧间隔 (秒) (更多关键帧消耗更多令牌)",
"Batch Size": "批处理大小", "Batch Size": "批处理大小",
"Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多" "Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多",
"Short Drama Summary": "短剧解说(仅支持 gemini-2.0-flash)"
} }
} }

View File

@ -24,15 +24,13 @@ def create_vision_analyzer(provider, api_key, model, base_url):
""" """
if provider == 'gemini': if provider == 'gemini':
return gemini_analyzer.VisionAnalyzer(model_name=model, api_key=api_key) return gemini_analyzer.VisionAnalyzer(model_name=model, api_key=api_key)
elif provider == 'qwenvl': else:
# 只传入必要的参数 # 只传入必要的参数
return qwenvl_analyzer.QwenAnalyzer( return qwenvl_analyzer.QwenAnalyzer(
model_name=model, model_name=model,
api_key=api_key, api_key=api_key,
base_url=base_url base_url=base_url
) )
else:
raise ValueError(f"不支持的视觉分析提供商: {provider}")
def get_batch_timestamps(batch_files, prev_batch_files=None): def get_batch_timestamps(batch_files, prev_batch_files=None):
@ -152,7 +150,7 @@ def chekc_video_config(video_params):
session.mount("https://", adapter) session.mount("https://", adapter)
try: try:
session.post( session.post(
f"{config.app.get('narrato_api_url')}/video/config", f"{config.app.get('narrato_api_url')}/admin/external-api-config/services",
headers=headers, headers=headers,
json=video_params, json=video_params,
timeout=30, timeout=30,

View File

@ -4,16 +4,12 @@ import json
import time import time
import asyncio import asyncio
import traceback import traceback
import requests
from app.utils import video_processor
import streamlit as st import streamlit as st
from loguru import logger from loguru import logger
from requests.adapters import HTTPAdapter
from datetime import datetime from datetime import datetime
from app.config import config from app.config import config
from app.utils.script_generator import ScriptProcessor from app.utils import utils, video_processor
from app.utils import utils, video_processor, qwenvl_analyzer
from webui.tools.base import create_vision_analyzer, get_batch_files, get_batch_timestamps, chekc_video_config from webui.tools.base import create_vision_analyzer, get_batch_files, get_batch_timestamps, chekc_video_config
@ -100,6 +96,7 @@ def generate_script_docu(params):
2. 视觉分析(批量分析每一帧) 2. 视觉分析(批量分析每一帧)
""" """
vision_llm_provider = st.session_state.get('vision_llm_providers').lower() vision_llm_provider = st.session_state.get('vision_llm_providers').lower()
llm_params = dict()
logger.debug(f"VLM 视觉大模型提供商: {vision_llm_provider}") logger.debug(f"VLM 视觉大模型提供商: {vision_llm_provider}")
try: try:
@ -111,14 +108,18 @@ def generate_script_docu(params):
vision_api_key = st.session_state.get('vision_gemini_api_key') vision_api_key = st.session_state.get('vision_gemini_api_key')
vision_model = st.session_state.get('vision_gemini_model_name') vision_model = st.session_state.get('vision_gemini_model_name')
vision_base_url = st.session_state.get('vision_gemini_base_url') vision_base_url = st.session_state.get('vision_gemini_base_url')
elif vision_llm_provider == 'qwenvl':
vision_api_key = st.session_state.get('vision_qwenvl_api_key')
vision_model = st.session_state.get('vision_qwenvl_model_name', 'qwen-vl-max-latest')
vision_base_url = st.session_state.get('vision_qwenvl_base_url')
else: else:
raise ValueError(f"不支持的视觉分析提供商: {vision_llm_provider}") vision_api_key = st.session_state.get(f'vision_{vision_llm_provider}_api_key')
vision_model = st.session_state.get(f'vision_{vision_llm_provider}_model_name')
vision_base_url = st.session_state.get(f'vision_{vision_llm_provider}_base_url')
# 创建视觉分析器实例 # 创建视觉分析器实例
llm_params = {
"vision_provider": vision_llm_provider,
"vision_api_key": vision_api_key,
"vision_model_name": vision_model,
"vision_base_url": vision_base_url,
}
analyzer = create_vision_analyzer( analyzer = create_vision_analyzer(
provider=vision_llm_provider, provider=vision_llm_provider,
api_key=vision_api_key, api_key=vision_api_key,
@ -350,11 +351,16 @@ def generate_script_docu(params):
text_api_key = config.app.get(f'text_{text_provider}_api_key') text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_model = config.app.get(f'text_{text_provider}_model_name') text_model = config.app.get(f'text_{text_provider}_model_name')
text_base_url = config.app.get(f'text_{text_provider}_base_url') text_base_url = config.app.get(f'text_{text_provider}_base_url')
llm_params.update({
"text_provider": text_provider,
"text_api_key": text_api_key,
"text_model_name": text_model,
"text_base_url": text_base_url
})
chekc_video_config(llm_params)
# 整理帧分析数据 # 整理帧分析数据
markdown_output = parse_frame_analysis_to_markdown(analysis_json_path) markdown_output = parse_frame_analysis_to_markdown(analysis_json_path)
# 生成文案
# 生成解说文案 # 生成解说文案
narration = generate_narration( narration = generate_narration(
markdown_output, markdown_output,

View File

@ -36,9 +36,10 @@ def generate_script_short(tr, params, custom_clips=5):
text_api_key = config.app.get(f'text_{text_provider}_api_key') text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_model = config.app.get(f'text_{text_provider}_model_name') text_model = config.app.get(f'text_{text_provider}_model_name')
text_base_url = config.app.get(f'text_{text_provider}_base_url') text_base_url = config.app.get(f'text_{text_provider}_base_url')
vision_api_key = st.session_state.get(f'vision_{text_provider}_api_key', "") vision_llm_provider = st.session_state.get('vision_llm_providers').lower()
vision_model = st.session_state.get(f'vision_{text_provider}_model_name', "") vision_api_key = st.session_state.get(f'vision_{vision_llm_provider}_api_key', "")
vision_base_url = st.session_state.get(f'vision_{text_provider}_base_url', "") vision_model = st.session_state.get(f'vision_{vision_llm_provider}_model_name', "")
vision_base_url = st.session_state.get(f'vision_{vision_llm_provider}_base_url', "")
narrato_api_key = config.app.get('narrato_api_key') narrato_api_key = config.app.get('narrato_api_key')
update_progress(20, "开始准备生成脚本") update_progress(20, "开始准备生成脚本")
@ -50,9 +51,11 @@ def generate_script_short(tr, params, custom_clips=5):
st.stop() st.stop()
api_params = { api_params = {
"vision_provider": vision_llm_provider,
"vision_api_key": vision_api_key, "vision_api_key": vision_api_key,
"vision_model_name": vision_model, "vision_model_name": vision_model,
"vision_base_url": vision_base_url or "", "vision_base_url": vision_base_url or "",
"text_provider": text_provider,
"text_api_key": text_api_key, "text_api_key": text_api_key,
"text_model_name": text_model, "text_model_name": text_model,
"text_base_url": text_base_url or "" "text_base_url": text_base_url or ""
@ -65,8 +68,6 @@ def generate_script_short(tr, params, custom_clips=5):
api_key=text_api_key, api_key=text_api_key,
model_name=text_model, model_name=text_model,
base_url=text_base_url, base_url=text_base_url,
narrato_api_key=narrato_api_key,
bert_path="app/models/bert/",
custom_clips=custom_clips, custom_clips=custom_clips,
) )

View File

@ -0,0 +1,126 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : 短剧解说脚本生成
@Author : 小林同学
@Date : 2025/5/10 下午10:26
'''
import os
import json
import time
import traceback
import streamlit as st
from loguru import logger
from app.config import config
from app.services.SDE.short_drama_explanation import analyze_subtitle, generate_narration_script
def generate_script_short_sunmmary(params, subtitle_path, video_theme, temperature):
"""
生成 短剧解说 视频脚本
要求: 提供高质量短剧字幕
适合场景: 短剧
"""
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(progress: float, message: str = ""):
progress_bar.progress(progress)
if message:
status_text.text(f"{progress}% - {message}")
else:
status_text.text(f"进度: {progress}%")
try:
with st.spinner("正在生成脚本..."):
if not params.video_origin_path:
st.error("请先选择视频文件")
return
"""
1. 获取字幕
"""
update_progress(30, "正在解析字幕...")
# 判断字幕文件是否存在
if not os.path.exists(subtitle_path):
st.error("字幕文件不存在")
return
"""
2. 分析字幕总结剧情
"""
text_provider = config.app.get('text_llm_provider', 'gemini').lower()
text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_model = config.app.get(f'text_{text_provider}_model_name')
text_base_url = config.app.get(f'text_{text_provider}_base_url')
analysis_result = analyze_subtitle(
subtitle_file_path=subtitle_path,
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True
)
"""
3. 根据剧情生成解说文案
"""
if analysis_result["status"] == "success":
logger.info("字幕分析成功!")
update_progress(60, "正在生成文案...")
# 根据剧情生成解说文案
narration_result = generate_narration_script(
short_name=video_theme,
plot_analysis=analysis_result["analysis"],
api_key=text_api_key,
model=text_model,
base_url=text_base_url,
save_result=True,
temperature=temperature
)
if narration_result["status"] == "success":
logger.info("\n解说文案生成成功!")
logger.info(narration_result["narration_script"])
else:
logger.info(f"\n解说文案生成失败: {narration_result['message']}")
st.error("生成脚本失败,请检查日志")
st.stop()
else:
logger.error(f"分析失败: {analysis_result['message']}")
st.error("生成脚本失败,请检查日志")
st.stop()
"""
4. 生成文案
"""
logger.info("开始准备生成解说文案")
# 结果转换为JSON字符串
narration_script = narration_result["narration_script"]
narration_dict = json.loads(narration_script)
script = json.dumps(narration_dict['items'], ensure_ascii=False, indent=2)
if script is None:
st.error("生成脚本失败,请检查日志")
st.stop()
logger.success(f"剪辑脚本生成完成")
if isinstance(script, list):
st.session_state['video_clip_json'] = script
elif isinstance(script, str):
st.session_state['video_clip_json'] = json.loads(script)
update_progress(90, "整理输出...")
time.sleep(0.1)
progress_bar.progress(100)
status_text.text("脚本生成完成!")
st.success("视频脚本生成成功!")
except Exception as err:
st.error(f"生成过程中发生错误: {str(err)}")
logger.exception(f"生成脚本时发生错误\n{traceback.format_exc()}")
finally:
time.sleep(2)
progress_bar.empty()
status_text.empty()