diff --git a/app/services/SDE/prompt.py b/app/services/SDE/prompt.py new file mode 100644 index 0000000..7ddcfc0 --- /dev/null +++ b/app/services/SDE/prompt.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +''' +@Project: NarratoAI +@File : prompt +@Author : 小林同学 +@Date : 2025/5/9 上午12:57 +''' +# 字幕剧情分析提示词 +subtitle_plot_analysis_v1 = """ +# 角色 +你是一位专业的剧本分析师和剧情概括助手。 + +# 任务 +我将为你提供一部短剧的完整字幕文本。请你基于这些字幕,完成以下任务: +1. **整体剧情分析**:简要概括整个短剧的核心剧情脉络、主要冲突和结局(如果有的话)。 +2. **分段剧情解析与时间戳定位**: + * 将整个短剧划分为若干个关键的剧情段落(例如:开端、发展、转折、高潮、结局,或根据具体情节自然划分)。 + * 段落数不得少于 20 个 + * 对于每一个剧情段落: + * **概括该段落的主要内容**:用简洁的语言描述这段剧情发生了什么。 + * **标注对应的时间戳范围**:明确指出该剧情段落对应的开始字幕时间戳和结束字幕时间戳。请直接从字幕中提取时间信息。 + +# 输入格式 +字幕内容通常包含时间戳和对话,例如: +``` +00:00:05,000 --> 00:00:10,000 +[角色A]: 你好吗? +00:00:10,500 --> 00:00:15,000 +[角色B]: 我很好,谢谢。发生了一些有趣的事情。 +... (更多字幕内容) ... +``` +我将把实际字幕粘贴在下方。 + +# 输出格式要求 +请按照以下格式清晰地呈现分析结果: + +**一、整体剧情概括:** +[此处填写对整个短剧剧情的概括] + +**二、分段剧情解析:** + +**剧情段落 1:[段落主题/概括,例如:主角登场与背景介绍]** +* **时间戳:** [开始时间戳] --> [结束时间戳] +* **内容概要:** [对这段剧情的详细描述] + +**剧情段落 2:[段落主题/概括,例如:第一个冲突出现]** +* **时间戳:** [开始时间戳] --> [结束时间戳] +* **内容概要:** [对这段剧情的详细描述] + +... (根据实际剧情段落数量继续) ... + +**剧情段落 N:[段落主题/概括,例如:结局与反思]** +* **时间戳:** [开始时间戳] --> [结束时间戳] +* **内容概要:** [对这段剧情的详细描述] + +# 注意事项 +* 请确保时间戳的准确性,直接引用字幕中的时间。 +* 剧情段落的划分应合乎逻辑,能够反映剧情的起承转合。 +* 语言表达应简洁、准确、客观。 + +# 限制 +1. 严禁输出与分析结果无关的内容 +2. + +# 请处理以下字幕: +""" + +plot_writing = """ +我是一个影视解说up主,需要为我的粉丝讲解短剧《家里家外》的剧情,目前正在解说1-5集的剧情,希望能让粉丝通过我的解说了解剧情,并且产生 继续观看的兴趣,请生成一篇解说脚本,包含解说文案,以及穿插原声的片段,下面中的内容是短剧的剧情概述: + + +%s + + +请使用 json 格式进行输出;使用 中的输出格式: + +{ + "items": [ + { + "_id": 1, # 唯一递增id + "timestamp": "00:00:05,390-00:00:10,430", + "picture": "剧情描述或者备注", + "narration": "解说文案,如果片段为穿插的原片片段,可以直接使用 ‘播放原片+_id‘ 进行占位", + "OST": "值为 0 表示当前片段为解说片段,值为 1 表示当前片段为穿插的原片" + } +} + + + +1. 只输出 json 内容,不要输出其他任何说明性的文字 +2. 解说文案的语言使用 简体中文 +3. 严禁虚构剧情,所有画面只能从 中摘取 +4. 严禁虚构时间戳,所有时间戳范围只能从 中摘取 + +""" \ No newline at end of file diff --git a/app/services/SDE/short_drama_explanation.py b/app/services/SDE/short_drama_explanation.py new file mode 100644 index 0000000..0034421 --- /dev/null +++ b/app/services/SDE/short_drama_explanation.py @@ -0,0 +1,454 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +''' +@Project: NarratoAI +@File : 短剧解说 +@Author : 小林同学 +@Date : 2025/5/9 上午12:36 +''' + +import os +import json +import requests +from typing import Dict, Any, Optional +from loguru import logger +from app.config import config +from app.utils.utils import get_uuid, storage_dir +from app.services.SDE.prompt import subtitle_plot_analysis_v1, plot_writing + + +class SubtitleAnalyzer: + """字幕剧情分析器,负责分析字幕内容并提取关键剧情段落""" + + def __init__( + self, + api_key: Optional[str] = None, + model: Optional[str] = None, + base_url: Optional[str] = None, + custom_prompt: Optional[str] = None, + temperature: Optional[float] = 1.0, + ): + """ + 初始化字幕分析器 + + Args: + api_key: API密钥,如果不提供则从配置中读取 + model: 模型名称,如果不提供则从配置中读取 + base_url: API基础URL,如果不提供则从配置中读取或使用默认值 + custom_prompt: 自定义提示词,如果不提供则使用默认值 + temperature: 模型温度 + """ + # 使用传入的参数或从配置中获取 + self.api_key = api_key + self.model = model + self.base_url = base_url + self.temperature = temperature + + # 设置提示词模板 + self.prompt_template = custom_prompt or subtitle_plot_analysis_v1 + + # 初始化HTTP请求所需的头信息 + self._init_headers() + + def _init_headers(self): + """初始化HTTP请求头""" + try: + # 基础请求头,包含API密钥和内容类型 + self.headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + logger.info(f"初始化成功 - API Key: {self.api_key[:8]}... - Base URL: {self.base_url}") + except Exception as e: + logger.error(f"初始化请求头失败: {str(e)}") + raise + + def analyze_subtitle(self, subtitle_content: str) -> Dict[str, Any]: + """ + 分析字幕内容 + + Args: + subtitle_content: 字幕内容文本 + + Returns: + Dict[str, Any]: 包含分析结果的字典 + """ + try: + # 构建完整提示词 + prompt = f"{self.prompt_template}\n\n{subtitle_content}" + + # 构建请求体数据 + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": "你是一位专业的剧本分析师和剧情概括助手。"}, + {"role": "user", "content": prompt} + ], + "temperature": self.temperature + } + + # 构建请求地址 + url = f"{self.base_url}/chat/completions" + + # 发送HTTP请求 + response = requests.post(url, headers=self.headers, json=payload) + + # 解析响应 + if response.status_code == 200: + response_data = response.json() + + # 提取响应内容 + if "choices" in response_data and len(response_data["choices"]) > 0: + analysis_result = response_data["choices"][0]["message"]["content"] + logger.debug(f"字幕分析完成,消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}") + + # 返回结果 + return { + "status": "success", + "analysis": analysis_result, + "tokens_used": response_data.get("usage", {}).get("total_tokens", 0), + "model": self.model, + "temperature": self.temperature + } + else: + logger.error("字幕分析失败: 未获取到有效响应") + return { + "status": "error", + "message": "未获取到有效响应", + "temperature": self.temperature + } + else: + error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}" + logger.error(error_msg) + return { + "status": "error", + "message": error_msg, + "temperature": self.temperature + } + + except Exception as e: + logger.error(f"字幕分析过程中发生错误: {str(e)}") + return { + "status": "error", + "message": str(e), + "temperature": self.temperature + } + + def analyze_subtitle_from_file(self, subtitle_file_path: str) -> Dict[str, Any]: + """ + 从文件读取字幕并分析 + + Args: + subtitle_file_path: 字幕文件的路径 + + Returns: + Dict[str, Any]: 包含分析结果的字典 + """ + try: + # 检查文件是否存在 + if not os.path.exists(subtitle_file_path): + return { + "status": "error", + "message": f"字幕文件不存在: {subtitle_file_path}", + "temperature": self.temperature + } + + # 读取文件内容 + with open(subtitle_file_path, 'r', encoding='utf-8') as f: + subtitle_content = f.read() + + # 分析字幕 + return self.analyze_subtitle(subtitle_content) + + except Exception as e: + logger.error(f"从文件读取字幕并分析过程中发生错误: {str(e)}") + return { + "status": "error", + "message": str(e), + "temperature": self.temperature + } + + def save_analysis_result(self, analysis_result: Dict[str, Any], output_path: Optional[str] = None) -> str: + """ + 保存分析结果到文件 + + Args: + analysis_result: 分析结果 + output_path: 输出文件路径,如果不提供则自动生成 + + Returns: + str: 输出文件的路径 + """ + try: + # 如果未提供输出路径,则自动生成 + if not output_path: + output_dir = storage_dir("drama_analysis", create=True) + output_path = os.path.join(output_dir, f"analysis_{get_uuid(True)}.txt") + + # 确保目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # 保存结果 + with open(output_path, 'w', encoding='utf-8') as f: + if analysis_result["status"] == "success": + f.write(analysis_result["analysis"]) + else: + f.write(f"分析失败: {analysis_result['message']}") + + logger.info(f"分析结果已保存到: {output_path}") + return output_path + + except Exception as e: + logger.error(f"保存分析结果时发生错误: {str(e)}") + return "" + + def generate_narration_script(self, plot_analysis: str, temperature: float = 0.7) -> Dict[str, Any]: + """ + 根据剧情分析生成解说文案 + + Args: + plot_analysis: 剧情分析内容 + temperature: 生成温度,控制创造性,默认0.7 + + Returns: + Dict[str, Any]: 包含生成结果的字典 + """ + try: + # 构建完整提示词 + prompt = plot_writing % plot_analysis + + # 构建请求体数据 + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": "你是一位专业的短视频解说脚本撰写专家。"}, + {"role": "user", "content": prompt} + ], + "temperature": temperature + } + + # 对特定模型添加响应格式设置 + if self.model not in ["deepseek-reasoner"]: + payload["response_format"] = {"type": "json_object"} + + # 构建请求地址 + url = f"{self.base_url}/chat/completions" + + # 发送HTTP请求 + response = requests.post(url, headers=self.headers, json=payload) + + # 解析响应 + if response.status_code == 200: + response_data = response.json() + + # 提取响应内容 + if "choices" in response_data and len(response_data["choices"]) > 0: + narration_script = response_data["choices"][0]["message"]["content"] + logger.debug(f"解说文案生成完成,消耗的tokens: {response_data.get('usage', {}).get('total_tokens', 0)}") + + # 返回结果 + return { + "status": "success", + "narration_script": narration_script, + "tokens_used": response_data.get("usage", {}).get("total_tokens", 0), + "model": self.model, + "temperature": self.temperature + } + else: + logger.error("解说文案生成失败: 未获取到有效响应") + return { + "status": "error", + "message": "未获取到有效响应", + "temperature": self.temperature + } + else: + error_msg = f"请求失败,状态码: {response.status_code}, 响应: {response.text}" + logger.error(error_msg) + return { + "status": "error", + "message": error_msg, + "temperature": self.temperature + } + + except Exception as e: + logger.error(f"解说文案生成过程中发生错误: {str(e)}") + return { + "status": "error", + "message": str(e), + "temperature": self.temperature + } + + def save_narration_script(self, narration_result: Dict[str, Any], output_path: Optional[str] = None) -> str: + """ + 保存解说文案到文件 + + Args: + narration_result: 解说文案生成结果 + output_path: 输出文件路径,如果不提供则自动生成 + + Returns: + str: 输出文件的路径 + """ + try: + # 如果未提供输出路径,则自动生成 + if not output_path: + output_dir = storage_dir("narration_scripts", create=True) + output_path = os.path.join(output_dir, f"narration_{get_uuid(True)}.json") + + # 确保目录存在 + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + # 保存结果 + with open(output_path, 'w', encoding='utf-8') as f: + if narration_result["status"] == "success": + f.write(narration_result["narration_script"]) + else: + f.write(f"生成失败: {narration_result['message']}") + + logger.info(f"解说文案已保存到: {output_path}") + return output_path + + except Exception as e: + logger.error(f"保存解说文案时发生错误: {str(e)}") + return "" + + +def analyze_subtitle( + subtitle_content: str = None, + subtitle_file_path: str = None, + api_key: Optional[str] = None, + model: Optional[str] = None, + base_url: Optional[str] = None, + custom_prompt: Optional[str] = None, + temperature: float = 1.0, + save_result: bool = False, + output_path: Optional[str] = None +) -> Dict[str, Any]: + """ + 分析字幕内容的便捷函数 + + Args: + subtitle_content: 字幕内容文本 + subtitle_file_path: 字幕文件路径 + custom_prompt: 自定义提示词 + api_key: API密钥 + model: 模型名称 + base_url: API基础URL + temperature: 模型温度 + save_result: 是否保存结果到文件 + output_path: 输出文件路径 + + Returns: + Dict[str, Any]: 包含分析结果的字典 + """ + # 初始化分析器 + analyzer = SubtitleAnalyzer( + temperature=temperature, + api_key=api_key, + model=model, + base_url=base_url, + custom_prompt=custom_prompt + ) + + # 分析字幕 + if subtitle_content: + result = analyzer.analyze_subtitle(subtitle_content) + elif subtitle_file_path: + result = analyzer.analyze_subtitle_from_file(subtitle_file_path) + else: + return { + "status": "error", + "message": "必须提供字幕内容或字幕文件路径", + "temperature": temperature + } + + # 保存结果 + if save_result and result["status"] == "success": + result["output_path"] = analyzer.save_analysis_result(result, output_path) + + return result + + +def generate_narration_script( + plot_analysis: str = None, + api_key: Optional[str] = None, + model: Optional[str] = None, + base_url: Optional[str] = None, + temperature: float = 1.0, + save_result: bool = False, + output_path: Optional[str] = None +) -> Dict[str, Any]: + """ + 根据剧情分析生成解说文案的便捷函数 + + Args: + plot_analysis: 剧情分析内容,直接提供 + api_key: API密钥 + model: 模型名称 + base_url: API基础URL + temperature: 生成温度,控制创造性 + save_result: 是否保存结果到文件 + output_path: 输出文件路径 + + Returns: + Dict[str, Any]: 包含生成结果的字典 + """ + # 初始化分析器 + analyzer = SubtitleAnalyzer( + temperature=temperature, + api_key=api_key, + model=model, + base_url=base_url + ) + + # 生成解说文案 + result = analyzer.generate_narration_script(plot_analysis, temperature) + + # 保存结果 + if save_result and result["status"] == "success": + result["output_path"] = analyzer.save_narration_script(result, output_path) + + return result + + +if __name__ == '__main__': + # text_api_key = "sk-2dfe00cab68d4baf8e142cbbefea0f44" + text_api_key = "sk-narra-ZPA07ethehTb2VQ5mMI1P8tt" + text_model = "gemini-2.0-flash" + text_base_url = "http://47.252.0.22:7001/v1/chat/completions" # 确保URL不以斜杠结尾,便于后续拼接 + subtitle_path = "/Users/apple/Desktop/home/NarratoAI/resource/srt/家里家外1-5.srt" + + # 示例用法 + if subtitle_path: + # 分析字幕总结剧情 + analysis_result = analyze_subtitle( + subtitle_file_path=subtitle_path, + api_key=text_api_key, + model=text_model, + base_url=text_base_url, + save_result=True + ) + + if analysis_result["status"] == "success": + print("字幕分析成功!") + print("分析结果:") + print(analysis_result["analysis"]) + + # 根据剧情生成解说文案 + narration_result = generate_narration_script( + plot_analysis=analysis_result["analysis"], + api_key=text_api_key, + model=text_model, + base_url=text_base_url, + save_result=True + ) + + if narration_result["status"] == "success": + print("\n解说文案生成成功!") + print("解说文案:") + print(narration_result["narration_script"]) + else: + print(f"\n解说文案生成失败: {narration_result['message']}") + else: + print(f"分析失败: {analysis_result['message']}") diff --git a/app/utils/utils.py b/app/utils/utils.py index 56eba09..1dbf7e3 100644 --- a/app/utils/utils.py +++ b/app/utils/utils.py @@ -325,6 +325,15 @@ def video_dir(sub_dir: str = ""): return d +def subtitle_dir(sub_dir: str = ""): + d = resource_dir(f"srt") + if sub_dir: + d = os.path.join(d, sub_dir) + if not os.path.exists(d): + os.makedirs(d) + return d + + def split_timestamp(timestamp): """ 拆分时间戳 diff --git a/webui/components/script_settings.py b/webui/components/script_settings.py index 4984bb8..9a66ca1 100644 --- a/webui/components/script_settings.py +++ b/webui/components/script_settings.py @@ -235,17 +235,32 @@ def render_video_details(tr): def short_drama_summary(tr): """短剧解说 渲染视频主题和提示词""" + # 检查是否已经处理过字幕文件 + if 'subtitle_file_processed' not in st.session_state: + st.session_state['subtitle_file_processed'] = False + subtitle_file = st.file_uploader( tr("上传字幕文件"), type=["srt"], accept_multiple_files=False, + key="subtitle_file_uploader" # 添加唯一key ) - if subtitle_file is not None: + + # 显示当前已上传的字幕文件路径 + if 'subtitle_path' in st.session_state and st.session_state['subtitle_path']: + st.info(f"已上传字幕: {os.path.basename(st.session_state['subtitle_path'])}") + if st.button(tr("清除已上传字幕")): + st.session_state['subtitle_path'] = None + st.session_state['subtitle_file_processed'] = False + st.rerun() + + # 只有当有文件上传且尚未处理时才执行处理逻辑 + if subtitle_file is not None and not st.session_state['subtitle_file_processed']: try: - # 读取上传的JSON内容并验证格式 + # 读取上传的SRT内容 script_content = subtitle_file.read().decode('utf-8') - # 保存到脚本目录 + # 保存到字幕目录 script_file_path = os.path.join(utils.subtitle_dir(), subtitle_file.name) file_name, file_extension = os.path.splitext(subtitle_file.name) @@ -255,20 +270,21 @@ def short_drama_summary(tr): file_name_with_timestamp = f"{file_name}_{timestamp}" script_file_path = os.path.join(utils.subtitle_dir(), file_name_with_timestamp + file_extension) - # 写入文件 + # 直接写入SRT内容,不进行JSON转换 with open(script_file_path, "w", encoding='utf-8') as f: - json.dump(script_content, f, ensure_ascii=False, indent=2) + f.write(script_content) # 更新状态 st.success(tr("字幕上传成功")) st.session_state['subtitle_path'] = script_file_path - time.sleep(0.1) - st.rerun() - - except json.JSONDecodeError: - st.error(tr("Invalid JSON format")) + st.session_state['subtitle_file_processed'] = True # 标记已处理 + + # 避免使用rerun,使用更新状态的方式 + # st.rerun() + except Exception as e: st.error(f"{tr('Upload failed')}: {str(e)}") + video_theme = st.text_input(tr("短剧名称")) st.session_state['video_theme'] = video_theme return video_theme diff --git a/webui/i18n/zh.json b/webui/i18n/zh.json index 6aa7fbc..fd99fc8 100644 --- a/webui/i18n/zh.json +++ b/webui/i18n/zh.json @@ -195,6 +195,7 @@ "Frame Interval (seconds)": "帧间隔 (秒)", "Frame Interval (seconds) (More keyframes consume more tokens)": "帧间隔 (秒) (更多关键帧消耗更多令牌)", "Batch Size": "批处理大小", - "Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多" + "Batch Size (More keyframes consume more tokens)": "批处理大小, 每批处理越少消耗 token 越多", + "Short Drama Summary": "短剧解说" } } \ No newline at end of file diff --git a/webui/tools/generate_short_summary.py b/webui/tools/generate_short_summary.py new file mode 100644 index 0000000..e351992 --- /dev/null +++ b/webui/tools/generate_short_summary.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +''' +@Project: NarratoAI +@File : 短剧解说脚本生成 +@Author : 小林同学 +@Date : 2025/5/10 下午10:26 +''' +import os +import json +import time +import traceback +import streamlit as st +from loguru import logger + +from app.config import config +from app.services.SDE.short_drama_explanation import analyze_subtitle, generate_narration_script + + +def generate_script_short_sunmmary(params, subtitle_path): + """ + 生成 短剧解说 视频脚本 + 要求: 提供高质量短剧字幕 + 适合场景: 短剧 + """ + progress_bar = st.progress(0) + status_text = st.empty() + + def update_progress(progress: float, message: str = ""): + progress_bar.progress(progress) + if message: + status_text.text(f"{progress}% - {message}") + else: + status_text.text(f"进度: {progress}%") + + try: + with st.spinner("正在生成脚本..."): + if not params.video_origin_path: + st.error("请先选择视频文件") + return + """ + 1. 获取字幕 + """ + update_progress(30, "正在解析字幕...") + # 判断字幕文件是否存在 + if not os.path.exists(subtitle_path): + st.error("字幕文件不存在") + return + + """ + 2. 分析字幕总结剧情 + """ + text_provider = config.app.get('text_llm_provider', 'gemini').lower() + text_api_key = config.app.get(f'text_{text_provider}_api_key') + text_model = config.app.get(f'text_{text_provider}_model_name') + text_base_url = config.app.get(f'text_{text_provider}_base_url') + analysis_result = analyze_subtitle( + subtitle_file_path=subtitle_path, + api_key=text_api_key, + model=text_model, + base_url=text_base_url, + save_result=True + ) + """ + 3. 根据剧情生成解说文案 + """ + if analysis_result["status"] == "success": + logger.info("字幕分析成功!") + update_progress(60, "正在生成文案...") + + # 根据剧情生成解说文案 + narration_result = generate_narration_script( + plot_analysis=analysis_result["analysis"], + api_key=text_api_key, + model=text_model, + base_url=text_base_url, + save_result=True + ) + + if narration_result["status"] == "success": + logger.info("\n解说文案生成成功!") + logger.info(narration_result["narration_script"]) + else: + logger.info(f"\n解说文案生成失败: {narration_result['message']}") + st.error("生成脚本失败,请检查日志") + st.stop() + else: + logger.error(f"分析失败: {analysis_result['message']}") + st.error("生成脚本失败,请检查日志") + st.stop() + + """ + 4. 生成文案 + """ + logger.info("开始准备生成解说文案") + + # 结果转换为JSON字符串 + narration_script = narration_result["narration_script"] + narration_dict = json.loads(narration_script) + script = json.dumps(narration_dict['items'], ensure_ascii=False, indent=2) + + if script is None: + st.error("生成脚本失败,请检查日志") + st.stop() + logger.success(f"剪辑脚本生成完成") + if isinstance(script, list): + st.session_state['video_clip_json'] = script + elif isinstance(script, str): + st.session_state['video_clip_json'] = json.loads(script) + update_progress(90, "整理输出...") + + time.sleep(0.1) + progress_bar.progress(100) + status_text.text("脚本生成完成!") + st.success("视频脚本生成成功!") + + except Exception as err: + st.error(f"生成过程中发生错误: {str(err)}") + logger.exception(f"生成脚本时发生错误\n{traceback.format_exc()}") + finally: + time.sleep(2) + progress_bar.empty() + status_text.empty()