diff --git a/app/services/SDE/short_drama_explanation.py b/app/services/SDE/short_drama_explanation.py index 3044983..4fc2478 100644 --- a/app/services/SDE/short_drama_explanation.py +++ b/app/services/SDE/short_drama_explanation.py @@ -15,6 +15,7 @@ from typing import Dict, Any, Optional from loguru import logger from app.config import config from app.utils.utils import get_uuid, storage_dir +from app.services.subtitle_text import read_subtitle_text # 导入新的提示词管理系统 from app.services.prompts import PromptManager @@ -309,8 +310,13 @@ class SubtitleAnalyzer: } # 读取文件内容 - with open(subtitle_file_path, 'r', encoding='utf-8') as f: - subtitle_content = f.read() + subtitle_content = read_subtitle_text(subtitle_file_path).text + if not subtitle_content: + return { + "status": "error", + "message": f"字幕文件内容为空或无法读取: {subtitle_file_path}", + "temperature": self.temperature + } # 分析字幕 return self.analyze_subtitle(subtitle_content) diff --git a/app/services/SDP/utils/step1_subtitle_analyzer_openai.py b/app/services/SDP/utils/step1_subtitle_analyzer_openai.py index 2ca5243..2d7a7e7 100644 --- a/app/services/SDP/utils/step1_subtitle_analyzer_openai.py +++ b/app/services/SDP/utils/step1_subtitle_analyzer_openai.py @@ -5,7 +5,7 @@ import traceback import json from loguru import logger -from .utils import load_srt, load_srt_from_content +from app.services.subtitle_text import has_timecodes, normalize_subtitle_text, read_subtitle_text # 导入新的提示词管理系统 from app.services.prompts import PromptManager # 导入统一LLM服务 @@ -38,30 +38,41 @@ def analyze_subtitle( dict: 包含剧情梗概和结构化的时间段分析的字典 """ try: - # 加载字幕文件或内容 - if subtitle_content and subtitle_content.strip(): - subtitles = load_srt_from_content(subtitle_content) + # 读取并规范化字幕文本(不依赖结构化 SRT 解析,提升兼容性) + if subtitle_content and str(subtitle_content).strip(): + normalized_subtitle_text = normalize_subtitle_text(subtitle_content) source_label = "字幕内容(直接传入)" elif srt_path: - subtitles = load_srt(srt_path) - source_label = f"字幕文件: {srt_path}" + decoded = read_subtitle_text(srt_path) + normalized_subtitle_text = decoded.text + source_label = f"字幕文件: {srt_path} (encoding: {decoded.encoding})" else: raise ValueError("必须提供 srt_path 或 subtitle_content 参数") - # 检查字幕是否为空 - if not subtitles: + # 基础校验:必须有内容且包含可用于定位的时间码 + if not normalized_subtitle_text or len(normalized_subtitle_text.strip()) < 10: error_msg = ( - f"字幕来源 [{source_label}] 解析后无有效内容。\n" + f"字幕来源 [{source_label}] 内容为空或过短。\n" f"请检查:\n" f"1. 文件格式是否为标准 SRT\n" - f"2. 文件编码是否为 UTF-8、GBK 或 GB2312\n" + f"2. 文件编码是否为 UTF-8、UTF-16、GBK 或 GB2312\n" f"3. 文件内容是否为空" ) logger.error(error_msg) raise ValueError(error_msg) - logger.info(f"成功加载字幕来源 [{source_label}],共 {len(subtitles)} 条有效字幕") - subtitle_content = "\n".join([f"{sub['timestamp']}\n{sub['text']}" for sub in subtitles]) + if not has_timecodes(normalized_subtitle_text): + error_msg = ( + f"字幕来源 [{source_label}] 未检测到有效时间码,无法进行时间段定位。\n" + f"请确保字幕包含类似以下格式的时间轴:\n" + f"00:00:01,000 --> 00:00:02,000\n" + f"(若毫秒分隔符为'.',系统会自动规范化为',')" + ) + logger.error(error_msg) + raise ValueError(error_msg) + + logger.info(f"成功加载字幕来源 [{source_label}],字符数: {len(normalized_subtitle_text)}") + subtitle_content = normalized_subtitle_text # 如果没有指定provider,根据model_name推断 if not provider: @@ -107,11 +118,11 @@ def analyze_subtitle( raise Exception("无法解析LLM返回的JSON数据") logger.info(f"字幕分析完成,找到 {len(summary_data.get('plot_titles', []))} 个关键情节") - print(json.dumps(summary_data, indent=4, ensure_ascii=False)) + logger.debug(json.dumps(summary_data, indent=4, ensure_ascii=False)) # 构建爆点标题列表 plot_titles_text = "" - print(f"找到 {len(summary_data['plot_titles'])} 个片段") + logger.info(f"找到 {len(summary_data.get('plot_titles', []))} 个片段") for i, point in enumerate(summary_data['plot_titles'], 1): plot_titles_text += f"{i}. {point}\n" @@ -159,4 +170,3 @@ def analyze_subtitle( except Exception as e: logger.error(f"分析字幕时发生错误: {str(e)}") raise Exception(f"分析字幕时发生错误:{str(e)}\n{traceback.format_exc()}") - diff --git a/app/services/SDP/utils/step5_merge_script.py b/app/services/SDP/utils/step5_merge_script.py index b6e5720..a4d2802 100644 --- a/app/services/SDP/utils/step5_merge_script.py +++ b/app/services/SDP/utils/step5_merge_script.py @@ -3,7 +3,7 @@ """ import os import json -from typing import List, Dict, Tuple +from typing import Dict, List def merge_script( @@ -19,38 +19,12 @@ def merge_script( Returns: str: 最终合并的脚本 """ - def parse_timestamp(ts: str) -> Tuple[float, float]: - """解析时间戳,返回开始和结束时间(秒)""" - start, end = ts.split('-') - - def parse_time(time_str: str) -> float: - time_str = time_str.strip() - if ',' in time_str: - time_parts, ms_parts = time_str.split(',') - ms = float(ms_parts) / 1000 - else: - time_parts = time_str - ms = 0 - - hours, minutes, seconds = map(int, time_parts.split(':')) - return hours * 3600 + minutes * 60 + seconds + ms - - return parse_time(start), parse_time(end) - - def format_timestamp(seconds: float) -> str: - """将秒数转换为时间戳格式 HH:MM:SS""" - hours = int(seconds // 3600) - minutes = int((seconds % 3600) // 60) - secs = int(seconds % 60) - return f"{hours:02d}:{minutes:02d}:{secs:02d}" - # 创建包含所有信息的临时列表 final_script = [] # 处理原生画面条目 number = 1 for plot_point in plot_points: - start, end = parse_timestamp(plot_point["timestamp"]) script_item = { "_id": number, "timestamp": plot_point["timestamp"], @@ -62,6 +36,11 @@ def merge_script( number += 1 # 保存结果 + if not output_path or not str(output_path).strip(): + raise ValueError("output_path不能为空") + + output_path = str(output_path) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(final_script, f, ensure_ascii=False, indent=4) diff --git a/app/services/subtitle_text.py b/app/services/subtitle_text.py new file mode 100644 index 0000000..7833987 --- /dev/null +++ b/app/services/subtitle_text.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +""" +Subtitle text utilities. + +This module provides a shared, cross-platform way to read and normalize subtitle +content. Both Short Drama Editing (混剪) and Short Drama Narration (解说) should +consume subtitle content through this module to avoid platform-specific parsing +issues (e.g. Windows UTF-16 SRT, timestamp separators, etc.). +""" + +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from typing import Iterable, Optional + + +_SRT_TIME_RE = re.compile( + r"\b\d{2}:\d{2}:\d{2}(?:[,.]\d{3})?\s*-->\s*\d{2}:\d{2}:\d{2}(?:[,.]\d{3})?\b" +) +_SRT_MS_DOT_RE = re.compile(r"(\b\d{2}:\d{2}:\d{2})\.(\d{3}\b)") + + +@dataclass(frozen=True) +class DecodedSubtitle: + text: str + encoding: str + + +def has_timecodes(text: str) -> bool: + """Return True if the subtitle text contains at least one SRT timecode.""" + if not text: + return False + return _SRT_TIME_RE.search(text) is not None + + +def normalize_subtitle_text(text: str) -> str: + """ + Normalize subtitle text to improve cross-platform reliability. + + - Unifies line endings to LF + - Removes BOM and NUL bytes + - Normalizes millisecond separators from '.' to ',' in timecodes + """ + if text is None: + return "" + + normalized = str(text) + + # Strip BOM. + if normalized.startswith("\ufeff"): + normalized = normalized.lstrip("\ufeff") + + # Remove NUL bytes (common when UTF-16 is mis-decoded elsewhere). + normalized = normalized.replace("\x00", "") + + # Normalize newlines. + normalized = normalized.replace("\r\n", "\n").replace("\r", "\n") + + # Normalize timestamp millisecond separator: 00:00:01.000 -> 00:00:01,000 + normalized = _SRT_MS_DOT_RE.sub(r"\1,\2", normalized) + + return normalized.strip() + + +def decode_subtitle_bytes( + data: bytes, + *, + encodings: Optional[Iterable[str]] = None, +) -> DecodedSubtitle: + """ + Decode subtitle bytes using a small set of common encodings. + + Preference is given to decodings that yield detectable SRT timecodes. + """ + if data is None: + return DecodedSubtitle(text="", encoding="utf-8") + + candidates = list(encodings) if encodings else [ + "utf-8", + "utf-8-sig", + "utf-16", + "utf-16-le", + "utf-16-be", + "gbk", + "gb2312", + ] + + decoded_results: list[DecodedSubtitle] = [] + for encoding in candidates: + try: + decoded_text = data.decode(encoding) + except UnicodeDecodeError: + continue + decoded_results.append( + DecodedSubtitle(text=normalize_subtitle_text(decoded_text), encoding=encoding) + ) + + # Fast path: if we already see timecodes, keep the first such decode. + if has_timecodes(decoded_results[-1].text): + return decoded_results[-1] + + if decoded_results: + # Fall back to the first successful decoding. + return decoded_results[0] + + # Last resort: replace undecodable bytes. + return DecodedSubtitle(text=normalize_subtitle_text(data.decode("utf-8", errors="replace")), encoding="utf-8") + + +def read_subtitle_text(file_path: str) -> DecodedSubtitle: + """Read subtitle file from disk, decode and normalize its text.""" + if not file_path or not str(file_path).strip(): + return DecodedSubtitle(text="", encoding="utf-8") + + normalized_path = os.path.abspath(str(file_path)) + with open(normalized_path, "rb") as f: + data = f.read() + + return decode_subtitle_bytes(data) + diff --git a/webui/components/script_settings.py b/webui/components/script_settings.py index 2af53ea..f6ee6f5 100644 --- a/webui/components/script_settings.py +++ b/webui/components/script_settings.py @@ -8,6 +8,7 @@ from loguru import logger from app.config import config from app.models.schema import VideoClipParams +from app.services.subtitle_text import decode_subtitle_bytes from app.utils import utils, check_script from webui.tools.generate_script_docu import generate_script_docu from webui.tools.generate_script_short import generate_script_short @@ -190,8 +191,9 @@ def render_script_file(tr, params): json_data = json.loads(script_content) # 保存到脚本目录 - script_file_path = os.path.join(script_dir, uploaded_file.name) - file_name, file_extension = os.path.splitext(uploaded_file.name) + safe_filename = os.path.basename(uploaded_file.name) + script_file_path = os.path.join(script_dir, safe_filename) + file_name, file_extension = os.path.splitext(safe_filename) # 如果文件已存在,添加时间戳 if os.path.exists(script_file_path): @@ -250,8 +252,9 @@ def render_video_file(tr, params): ) if uploaded_file is not None: - video_file_path = os.path.join(utils.video_dir(), uploaded_file.name) - file_name, file_extension = os.path.splitext(uploaded_file.name) + safe_filename = os.path.basename(uploaded_file.name) + video_file_path = os.path.join(utils.video_dir(), safe_filename) + file_name, file_extension = os.path.splitext(safe_filename) if os.path.exists(video_file_path): timestamp = time.strftime("%Y%m%d%H%M%S") @@ -337,6 +340,7 @@ def short_drama_summary(tr): st.info(f"已上传字幕: {os.path.basename(st.session_state['subtitle_path'])}") if st.button(tr("清除已上传字幕")): st.session_state['subtitle_path'] = None + st.session_state['subtitle_content'] = None st.session_state['subtitle_file_processed'] = False st.rerun() @@ -346,22 +350,12 @@ def short_drama_summary(tr): # 清理文件名,防止路径污染和路径遍历攻击 safe_filename = os.path.basename(subtitle_file.name) - # 编码自动检测:依次尝试常见编码 - encodings = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312'] - script_content = None - detected_encoding = None + decoded = decode_subtitle_bytes(subtitle_file.getvalue()) + script_content = decoded.text + detected_encoding = decoded.encoding - for encoding in encodings: - try: - subtitle_file.seek(0) # 重置文件指针 - script_content = subtitle_file.read().decode(encoding) - detected_encoding = encoding - break - except UnicodeDecodeError: - continue - - if script_content is None: - st.error(tr("无法读取字幕文件,请检查文件编码(支持 UTF-8、GBK、GB2312)")) + if not script_content: + st.error(tr("无法读取字幕文件,请检查文件编码(支持 UTF-8、UTF-16、GBK、GB2312)")) st.stop() # 验证字幕内容(简单检查) @@ -389,6 +383,7 @@ def short_drama_summary(tr): f"大小: {len(script_content)} 字符)" ) st.session_state['subtitle_path'] = script_file_path + st.session_state['subtitle_content'] = script_content st.session_state['subtitle_file_processed'] = True # 标记已处理 # 避免使用rerun,使用更新状态的方式 diff --git a/webui/tools/generate_script_short.py b/webui/tools/generate_script_short.py index a6ab013..2f6ef9b 100644 --- a/webui/tools/generate_script_short.py +++ b/webui/tools/generate_script_short.py @@ -1,3 +1,4 @@ +import os import json import time import traceback @@ -6,6 +7,7 @@ from loguru import logger from app.config import config from app.services.upload_validation import ensure_existing_file, InputValidationError +from app.utils import utils def generate_script_short(tr, params, custom_clips=5): @@ -81,14 +83,23 @@ def generate_script_short(tr, params, custom_clips=5): # ========== 调用后端生成脚本 ========== from app.services.SDP.generate_script_short import generate_script_result + output_path = os.path.join(utils.script_dir(), "merged_subtitle.json") + + subtitle_content = st.session_state.get("subtitle_content") + subtitle_kwargs = ( + {"subtitle_content": str(subtitle_content)} + if subtitle_content is not None and str(subtitle_content).strip() + else {"subtitle_file_path": subtitle_path} + ) + result = generate_script_result( api_key=text_api_key, model_name=text_model, - output_path="resource/scripts/merged_subtitle.json", + output_path=output_path, base_url=text_base_url, custom_clips=custom_clips, provider=text_provider, - subtitle_file_path=subtitle_path, + **subtitle_kwargs, ) if result.get("status") != "success": diff --git a/webui/tools/generate_short_summary.py b/webui/tools/generate_short_summary.py index ddd34cc..0ead867 100644 --- a/webui/tools/generate_short_summary.py +++ b/webui/tools/generate_short_summary.py @@ -16,6 +16,7 @@ from loguru import logger from app.config import config from app.services.SDE.short_drama_explanation import analyze_subtitle, generate_narration_script +from app.services.subtitle_text import read_subtitle_text # 导入新的LLM服务模块 - 确保提供商被注册 import app.services.llm # 这会触发提供商注册 from app.services.llm.migration_adapter import SubtitleAnalyzerAdapter @@ -173,8 +174,10 @@ def generate_script_short_sunmmary(params, subtitle_path, video_theme, temperatu text_base_url = config.app.get(f'text_{text_provider}_base_url') # 读取字幕文件内容(无论使用哪种实现都需要) - with open(subtitle_path, 'r', encoding='utf-8') as f: - subtitle_content = f.read() + subtitle_content = read_subtitle_text(subtitle_path).text + if not subtitle_content: + st.error("字幕文件内容为空或无法读取") + return try: # 优先使用新的LLM服务架构