Merge pull request #167 from linyqh/dev0.7

Dev0.7
This commit is contained in:
viccy 2025-08-03 21:34:21 +08:00 committed by GitHub
commit cb02f2c897
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1795 additions and 1186 deletions

View File

@ -48,6 +48,7 @@ def save_config():
with open(config_file, "w", encoding="utf-8") as f:
_cfg["app"] = app
_cfg["azure"] = azure
_cfg["soulvoice"] = soulvoice
_cfg["ui"] = ui
f.write(toml.dumps(_cfg))
@ -57,6 +58,7 @@ app = _cfg.get("app", {})
whisper = _cfg.get("whisper", {})
proxy = _cfg.get("proxy", {})
azure = _cfg.get("azure", {})
soulvoice = _cfg.get("soulvoice", {})
ui = _cfg.get("ui", {})
frames = _cfg.get("frames", {})

View File

@ -546,6 +546,359 @@ def try_fallback_encoding(
return execute_simple_command(fallback_cmd, timestamp, "通用Fallback")
def _process_narration_only_segment(
video_origin_path: str,
script_item: Dict,
tts_map: Dict,
output_dir: str,
encoder_config: Dict,
hwaccel_args: List[str]
) -> Optional[str]:
"""
处理OST=0的纯解说片段
- 根据TTS音频时长动态裁剪
- 移除原声生成静音视频
"""
_id = script_item["_id"]
timestamp = script_item["timestamp"]
# 获取对应的TTS结果
tts_item = tts_map.get(_id)
if not tts_item:
logger.error(f"未找到片段 {_id} 的TTS结果")
return None
# 解析起始时间使用TTS音频时长计算结束时间
start_time, _ = parse_timestamp(timestamp)
duration = tts_item["duration"]
calculated_end_time = calculate_end_time(start_time, duration, extra_seconds=0)
# 转换为FFmpeg兼容的时间格式
ffmpeg_start_time = start_time.replace(',', '.')
ffmpeg_end_time = calculated_end_time.replace(',', '.')
# 生成输出文件名
safe_start_time = start_time.replace(':', '-').replace(',', '-')
safe_end_time = calculated_end_time.replace(':', '-').replace(',', '-')
output_filename = f"ost0_vid_{safe_start_time}@{safe_end_time}.mp4"
output_path = os.path.join(output_dir, output_filename)
# 构建FFmpeg命令 - 移除音频
cmd = _build_ffmpeg_command_with_audio_control(
video_origin_path, output_path, ffmpeg_start_time, ffmpeg_end_time,
encoder_config, hwaccel_args, remove_audio=True
)
# 执行命令
success = execute_ffmpeg_with_fallback(
cmd, timestamp, video_origin_path, output_path,
ffmpeg_start_time, ffmpeg_end_time
)
return output_path if success else None
def _process_original_audio_segment(
video_origin_path: str,
script_item: Dict,
output_dir: str,
encoder_config: Dict,
hwaccel_args: List[str]
) -> Optional[str]:
"""
处理OST=1的纯原声片段
- 严格按照脚本timestamp精确裁剪
- 保持原声不变
"""
_id = script_item["_id"]
timestamp = script_item["timestamp"]
# 严格按照timestamp进行裁剪
start_time, end_time = parse_timestamp(timestamp)
# 转换为FFmpeg兼容的时间格式
ffmpeg_start_time = start_time.replace(',', '.')
ffmpeg_end_time = end_time.replace(',', '.')
# 生成输出文件名
safe_start_time = start_time.replace(':', '-').replace(',', '-')
safe_end_time = end_time.replace(':', '-').replace(',', '-')
output_filename = f"ost1_vid_{safe_start_time}@{safe_end_time}.mp4"
output_path = os.path.join(output_dir, output_filename)
# 构建FFmpeg命令 - 保持原声
cmd = _build_ffmpeg_command_with_audio_control(
video_origin_path, output_path, ffmpeg_start_time, ffmpeg_end_time,
encoder_config, hwaccel_args, remove_audio=False
)
# 执行命令
success = execute_ffmpeg_with_fallback(
cmd, timestamp, video_origin_path, output_path,
ffmpeg_start_time, ffmpeg_end_time
)
return output_path if success else None
def _process_mixed_segment(
video_origin_path: str,
script_item: Dict,
tts_map: Dict,
output_dir: str,
encoder_config: Dict,
hwaccel_args: List[str]
) -> Optional[str]:
"""
处理OST=2的解说+原声混合片段
- 根据TTS音频时长动态裁剪
- 保持原声确保视频时长等于TTS音频时长
"""
_id = script_item["_id"]
timestamp = script_item["timestamp"]
# 获取对应的TTS结果
tts_item = tts_map.get(_id)
if not tts_item:
logger.error(f"未找到片段 {_id} 的TTS结果")
return None
# 解析起始时间使用TTS音频时长计算结束时间
start_time, _ = parse_timestamp(timestamp)
duration = tts_item["duration"]
calculated_end_time = calculate_end_time(start_time, duration, extra_seconds=0)
# 转换为FFmpeg兼容的时间格式
ffmpeg_start_time = start_time.replace(',', '.')
ffmpeg_end_time = calculated_end_time.replace(',', '.')
# 生成输出文件名
safe_start_time = start_time.replace(':', '-').replace(',', '-')
safe_end_time = calculated_end_time.replace(':', '-').replace(',', '-')
output_filename = f"ost2_vid_{safe_start_time}@{safe_end_time}.mp4"
output_path = os.path.join(output_dir, output_filename)
# 构建FFmpeg命令 - 保持原声
cmd = _build_ffmpeg_command_with_audio_control(
video_origin_path, output_path, ffmpeg_start_time, ffmpeg_end_time,
encoder_config, hwaccel_args, remove_audio=False
)
# 执行命令
success = execute_ffmpeg_with_fallback(
cmd, timestamp, video_origin_path, output_path,
ffmpeg_start_time, ffmpeg_end_time
)
return output_path if success else None
def _build_ffmpeg_command_with_audio_control(
input_path: str,
output_path: str,
start_time: str,
end_time: str,
encoder_config: Dict[str, str],
hwaccel_args: List[str] = None,
remove_audio: bool = False
) -> List[str]:
"""
构建支持音频控制的FFmpeg命令
Args:
input_path: 输入视频路径
output_path: 输出视频路径
start_time: 开始时间
end_time: 结束时间
encoder_config: 编码器配置
hwaccel_args: 硬件加速参数
remove_audio: 是否移除音频OST=0时为True
Returns:
List[str]: ffmpeg命令列表
"""
cmd = ["ffmpeg", "-y"]
# 硬件加速设置(参考原有逻辑)
if encoder_config["video_codec"] == "h264_nvenc":
# 对于NVENC不使用硬件解码以避免滤镜链问题
pass
elif hwaccel_args:
cmd.extend(hwaccel_args)
# 输入文件
cmd.extend(["-i", input_path])
# 时间范围
cmd.extend(["-ss", start_time, "-to", end_time])
# 视频编码器设置
cmd.extend(["-c:v", encoder_config["video_codec"]])
# 音频处理
if remove_audio:
# OST=0: 移除音频
cmd.extend(["-an"]) # -an 表示不包含音频流
logger.debug("OST=0: 移除音频流")
else:
# OST=1,2: 保持原声
cmd.extend(["-c:a", encoder_config["audio_codec"]])
cmd.extend(["-ar", "44100", "-ac", "2"])
logger.debug("OST=1/2: 保持原声")
# 像素格式
cmd.extend(["-pix_fmt", encoder_config["pixel_format"]])
# 质量和预设参数(参考原有逻辑)
if encoder_config["video_codec"] == "h264_nvenc":
cmd.extend(["-preset", encoder_config["preset"]])
cmd.extend(["-cq", encoder_config["quality_value"]])
cmd.extend(["-profile:v", "main"])
elif encoder_config["video_codec"] == "h264_amf":
cmd.extend(["-quality", encoder_config["preset"]])
cmd.extend(["-qp_i", encoder_config["quality_value"]])
elif encoder_config["video_codec"] == "h264_qsv":
cmd.extend(["-preset", encoder_config["preset"]])
cmd.extend(["-global_quality", encoder_config["quality_value"]])
elif encoder_config["video_codec"] == "h264_videotoolbox":
cmd.extend(["-profile:v", "high"])
cmd.extend(["-b:v", encoder_config["quality_value"]])
else:
# 软件编码器libx264
cmd.extend(["-preset", encoder_config["preset"]])
cmd.extend(["-crf", encoder_config["quality_value"]])
# 优化参数
cmd.extend(["-avoid_negative_ts", "make_zero"])
cmd.extend(["-movflags", "+faststart"])
# 输出文件
cmd.append(output_path)
return cmd
def clip_video_unified(
video_origin_path: str,
script_list: List[Dict],
tts_results: List[Dict],
output_dir: Optional[str] = None,
task_id: Optional[str] = None
) -> Dict[str, str]:
"""
基于OST类型的统一视频裁剪策略 - 消除双重裁剪问题
Args:
video_origin_path: 原始视频的路径
script_list: 完整的脚本列表包含所有片段信息
tts_results: TTS结果列表仅包含OST=0和OST=2的片段
output_dir: 输出目录路径默认为None时会自动生成
task_id: 任务ID用于生成唯一的输出目录默认为None时会自动生成
Returns:
Dict[str, str]: 片段ID到裁剪后视频路径的映射
"""
# 检查视频文件是否存在
if not os.path.exists(video_origin_path):
raise FileNotFoundError(f"视频文件不存在: {video_origin_path}")
# 如果未提供task_id则根据输入生成一个唯一ID
if task_id is None:
content_for_hash = f"{video_origin_path}_{json.dumps(script_list)}"
task_id = hashlib.md5(content_for_hash.encode()).hexdigest()
# 设置输出目录
if output_dir is None:
output_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"storage", "temp", "clip_video_unified", task_id
)
# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 创建TTS结果的快速查找映射
tts_map = {item['_id']: item for item in tts_results}
# 获取硬件加速支持
hwaccel_type = check_hardware_acceleration()
hwaccel_args = []
if hwaccel_type:
hwaccel_args = ffmpeg_utils.get_ffmpeg_hwaccel_args()
hwaccel_info = ffmpeg_utils.get_ffmpeg_hwaccel_info()
logger.info(f"🚀 使用硬件加速: {hwaccel_type} ({hwaccel_info.get('message', '')})")
else:
logger.info("🔧 使用软件编码")
# 获取编码器配置
encoder_config = get_safe_encoder_config(hwaccel_type)
logger.debug(f"编码器配置: {encoder_config}")
# 统计信息
total_clips = len(script_list)
result = {}
failed_clips = []
success_count = 0
logger.info(f"📹 开始统一视频裁剪,总共{total_clips}个片段")
for i, script_item in enumerate(script_list, 1):
_id = script_item.get("_id")
ost = script_item.get("OST", 0)
timestamp = script_item["timestamp"]
logger.info(f"📹 [{i}/{total_clips}] 处理片段 ID:{_id}, OST:{ost}, 时间戳:{timestamp}")
try:
if ost == 0: # 纯解说片段
output_path = _process_narration_only_segment(
video_origin_path, script_item, tts_map, output_dir,
encoder_config, hwaccel_args
)
elif ost == 1: # 纯原声片段
output_path = _process_original_audio_segment(
video_origin_path, script_item, output_dir,
encoder_config, hwaccel_args
)
elif ost == 2: # 解说+原声混合片段
output_path = _process_mixed_segment(
video_origin_path, script_item, tts_map, output_dir,
encoder_config, hwaccel_args
)
else:
logger.warning(f"未知的OST类型: {ost},跳过片段 {_id}")
continue
if output_path and os.path.exists(output_path) and os.path.getsize(output_path) > 0:
result[_id] = output_path
success_count += 1
logger.info(f"✅ [{i}/{total_clips}] 片段处理成功: OST={ost}, ID={_id}")
else:
failed_clips.append(f"ID:{_id}, OST:{ost}")
logger.error(f"❌ [{i}/{total_clips}] 片段处理失败: OST={ost}, ID={_id}")
except Exception as e:
failed_clips.append(f"ID:{_id}, OST:{ost}")
logger.error(f"❌ [{i}/{total_clips}] 片段处理异常: OST={ost}, ID={_id}, 错误: {str(e)}")
# 最终统计
logger.info(f"📊 统一视频裁剪完成: 成功 {success_count}/{total_clips}, 失败 {len(failed_clips)}")
# 检查是否有失败的片段
if failed_clips:
logger.warning(f"⚠️ 以下片段处理失败: {failed_clips}")
if len(failed_clips) == total_clips:
raise RuntimeError("所有视频片段处理都失败了请检查视频文件和ffmpeg配置")
elif len(failed_clips) > total_clips / 2:
logger.warning(f"⚠️ 超过一半的片段处理失败 ({len(failed_clips)}/{total_clips}),请检查硬件加速配置")
if success_count > 0:
logger.info(f"🎉 统一视频裁剪任务完成! 输出目录: {output_dir}")
return result
def clip_video(
video_origin_path: str,
tts_result: List[Dict],
@ -613,6 +966,49 @@ def clip_video(
# 根据持续时间计算真正的结束时间加上1秒余量
duration = item["duration"]
# 时长合理性检查和修正
if duration <= 0 or duration > 300: # 超过5分钟认为不合理
logger.warning(f"检测到异常时长 {duration}秒,片段: {timestamp}")
# 尝试从时间戳计算实际时长
try:
start_time_str, end_time_str = timestamp.split('-')
# 解析开始时间
if ',' in start_time_str:
time_part, ms_part = start_time_str.split(',')
h1, m1, s1 = map(int, time_part.split(':'))
ms1 = int(ms_part)
else:
h1, m1, s1 = map(int, start_time_str.split(':'))
ms1 = 0
# 解析结束时间
if ',' in end_time_str:
time_part, ms_part = end_time_str.split(',')
h2, m2, s2 = map(int, time_part.split(':'))
ms2 = int(ms_part)
else:
h2, m2, s2 = map(int, end_time_str.split(':'))
ms2 = 0
# 计算实际时长
start_total_ms = (h1 * 3600 + m1 * 60 + s1) * 1000 + ms1
end_total_ms = (h2 * 3600 + m2 * 60 + s2) * 1000 + ms2
actual_duration = (end_total_ms - start_total_ms) / 1000.0
if actual_duration > 0 and actual_duration <= 300:
duration = actual_duration
logger.info(f"使用时间戳计算的实际时长: {duration:.3f}")
else:
duration = 5.0 # 默认5秒
logger.warning(f"时间戳计算也异常,使用默认时长: {duration}")
except Exception as e:
duration = 5.0 # 默认5秒
logger.warning(f"时长修正失败,使用默认时长: {duration}秒, 错误: {str(e)}")
calculated_end_time = calculate_end_time(start_time, duration)
# 转换为FFmpeg兼容的时间格式逗号替换为点

View File

@ -57,14 +57,33 @@ class BaseLLMProvider(ABC):
"""验证配置参数"""
if not self.api_key:
raise ConfigurationError("API密钥不能为空", "api_key")
if not self.model_name:
raise ConfigurationError("模型名称不能为空", "model_name")
if self.model_name not in self.supported_models:
from .exceptions import ModelNotSupportedError
raise ModelNotSupportedError(self.model_name, self.provider_name)
# 检查模型支持情况
self._validate_model_support()
def _validate_model_support(self):
"""验证模型支持情况"""
from app.config import config
from .exceptions import ModelNotSupportedError
from loguru import logger
# 获取模型验证模式配置
strict_model_validation = config.app.get('strict_model_validation', True)
if self.model_name not in self.supported_models:
if strict_model_validation:
# 严格模式:抛出异常
raise ModelNotSupportedError(self.model_name, self.provider_name)
else:
# 宽松模式:仅记录警告
logger.warning(
f"模型 {self.model_name} 未在供应商 {self.provider_name} 的预定义支持列表中,"
f"但已启用宽松验证模式。支持的模型列表: {self.supported_models}"
)
def _initialize(self):
"""初始化提供商特定设置,子类可重写"""
pass
@ -77,11 +96,15 @@ class BaseLLMProvider(ABC):
def _handle_api_error(self, status_code: int, response_text: str) -> LLMServiceError:
"""处理API错误返回适当的异常"""
from .exceptions import APICallError, RateLimitError, AuthenticationError
if status_code == 401:
return AuthenticationError()
elif status_code == 429:
return RateLimitError()
elif status_code in [502, 503, 504]:
return APICallError(f"服务器错误 HTTP {status_code}", status_code, response_text)
elif status_code == 524:
return APICallError(f"服务器处理超时 HTTP {status_code}", status_code, response_text)
else:
return APICallError(f"HTTP {status_code}", status_code, response_text)

View File

@ -213,7 +213,8 @@ class LLMConfigValidator:
"确保所有API密钥都已正确配置",
"建议为每个提供商配置base_url以提高稳定性",
"定期检查模型名称是否为最新版本",
"建议配置多个提供商作为备用方案"
"建议配置多个提供商作为备用方案",
"如果使用新发布的模型遇到MODEL_NOT_SUPPORTED错误可以设置 strict_model_validation = false 启用宽松验证模式"
]
}
@ -252,8 +253,8 @@ class LLMConfigValidator:
"""获取示例模型名称"""
examples = {
"gemini": {
"vision": ["gemini-2.0-flash-lite", "gemini-2.0-flash"],
"text": ["gemini-2.0-flash", "gemini-1.5-pro"]
"vision": ["gemini-2.5-flash", "gemini-2.0-flash-lite", "gemini-2.0-flash"],
"text": ["gemini-2.5-flash", "gemini-2.0-flash", "gemini-1.5-pro"]
},
"openai": {
"vision": [],

View File

@ -27,6 +27,7 @@ class GeminiOpenAIVisionProvider(VisionModelProvider):
@property
def supported_models(self) -> List[str]:
return [
"gemini-2.5-flash",
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-1.5-pro",
@ -137,6 +138,7 @@ class GeminiOpenAITextProvider(TextModelProvider):
@property
def supported_models(self) -> List[str]:
return [
"gemini-2.5-flash",
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-1.5-pro",

View File

@ -27,6 +27,7 @@ class GeminiVisionProvider(VisionModelProvider):
@property
def supported_models(self) -> List[str]:
return [
"gemini-2.5-flash",
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-1.5-pro",
@ -136,25 +137,72 @@ class GeminiVisionProvider(VisionModelProvider):
return base64.b64encode(img_bytes).decode('utf-8')
async def _make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""执行原生Gemini API调用"""
"""执行原生Gemini API调用包含重试机制"""
from app.config import config
url = f"{self.base_url}/models/{self.model_name}:generateContent?key={self.api_key}"
response = await asyncio.to_thread(
requests.post,
url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "NarratoAI/1.0"
},
timeout=120
)
if response.status_code != 200:
error = self._handle_api_error(response.status_code, response.text)
raise error
return response.json()
max_retries = config.app.get('llm_max_retries', 3)
base_timeout = config.app.get('llm_vision_timeout', 120)
for attempt in range(max_retries):
try:
# 根据尝试次数调整超时时间
timeout = base_timeout * (attempt + 1)
logger.debug(f"Gemini API调用尝试 {attempt + 1}/{max_retries},超时设置: {timeout}")
response = await asyncio.to_thread(
requests.post,
url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "NarratoAI/1.0"
},
timeout=timeout
)
if response.status_code == 200:
return response.json()
# 处理特定的错误状态码
if response.status_code == 429:
# 速率限制,等待后重试
wait_time = 30 * (attempt + 1)
logger.warning(f"Gemini API速率限制等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
elif response.status_code in [502, 503, 504, 524]:
# 服务器错误或超时,可以重试
if attempt < max_retries - 1:
wait_time = 10 * (attempt + 1)
logger.warning(f"Gemini API服务器错误 {response.status_code},等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
# 其他错误,直接抛出
error = self._handle_api_error(response.status_code, response.text)
raise error
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
wait_time = 15 * (attempt + 1)
logger.warning(f"Gemini API请求超时等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
else:
raise APICallError("Gemini API请求超时已达到最大重试次数")
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 10 * (attempt + 1)
logger.warning(f"Gemini API网络错误: {str(e)},等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
else:
raise APICallError(f"Gemini API网络错误: {str(e)}")
# 如果所有重试都失败了
raise APICallError("Gemini API调用失败已达到最大重试次数")
def _parse_vision_response(self, response_data: Dict[str, Any]) -> str:
"""解析视觉分析响应"""
@ -192,6 +240,7 @@ class GeminiTextProvider(TextModelProvider):
@property
def supported_models(self) -> List[str]:
return [
"gemini-2.5-flash",
"gemini-2.0-flash-lite",
"gemini-2.0-flash",
"gemini-1.5-pro",
@ -278,25 +327,72 @@ class GeminiTextProvider(TextModelProvider):
return self._parse_text_response(response_data)
async def _make_api_call(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""执行原生Gemini API调用"""
"""执行原生Gemini API调用包含重试机制"""
from app.config import config
url = f"{self.base_url}/models/{self.model_name}:generateContent?key={self.api_key}"
response = await asyncio.to_thread(
requests.post,
url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "NarratoAI/1.0"
},
timeout=120
)
if response.status_code != 200:
error = self._handle_api_error(response.status_code, response.text)
raise error
return response.json()
max_retries = config.app.get('llm_max_retries', 3)
base_timeout = config.app.get('llm_text_timeout', 180) # 文本生成任务使用更长的基础超时时间
for attempt in range(max_retries):
try:
# 根据尝试次数调整超时时间
timeout = base_timeout * (attempt + 1)
logger.debug(f"Gemini文本API调用尝试 {attempt + 1}/{max_retries},超时设置: {timeout}")
response = await asyncio.to_thread(
requests.post,
url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": "NarratoAI/1.0"
},
timeout=timeout
)
if response.status_code == 200:
return response.json()
# 处理特定的错误状态码
if response.status_code == 429:
# 速率限制,等待后重试
wait_time = 30 * (attempt + 1)
logger.warning(f"Gemini API速率限制等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
elif response.status_code in [502, 503, 504, 524]:
# 服务器错误或超时,可以重试
if attempt < max_retries - 1:
wait_time = 15 * (attempt + 1)
logger.warning(f"Gemini API服务器错误 {response.status_code},等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
# 其他错误,直接抛出
error = self._handle_api_error(response.status_code, response.text)
raise error
except requests.exceptions.Timeout:
if attempt < max_retries - 1:
wait_time = 20 * (attempt + 1)
logger.warning(f"Gemini文本API请求超时等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
else:
raise APICallError("Gemini文本API请求超时已达到最大重试次数")
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 15 * (attempt + 1)
logger.warning(f"Gemini文本API网络错误: {str(e)},等待 {wait_time} 秒后重试")
await asyncio.sleep(wait_time)
continue
else:
raise APICallError(f"Gemini文本API网络错误: {str(e)}")
# 如果所有重试都失败了
raise APICallError("Gemini文本API调用失败已达到最大重试次数")
def _parse_text_response(self, response_data: Dict[str, Any]) -> str:
"""解析文本生成响应"""

View File

@ -15,13 +15,19 @@ from app.services import state as sm
from app.utils import utils
def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: dict):
def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: dict = None):
"""
后台任务自动剪辑视频进行剪辑
后台任务统一视频裁剪处理- 优化版本
实施基于OST类型的统一视频裁剪策略消除双重裁剪问题
- OST=0: 根据TTS音频时长动态裁剪移除原声
- OST=1: 严格按照脚本timestamp精确裁剪保持原声
- OST=2: 根据TTS音频时长动态裁剪保持原声
Args:
task_id: 任务ID
params: 视频参数
subclip_path_videos: 视频片段路径
subclip_path_videos: 视频片段路径可选仅作为备用方案
"""
global merged_audio_path, merged_subtitle_path
@ -94,17 +100,26 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: di
# sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=40)
"""
3. 裁剪视频 - 将超出音频长度的视频进行裁剪
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
"""
logger.info("\n\n## 3. 裁剪视频")
video_clip_result = clip_video.clip_video(params.video_origin_path, tts_results)
# 更新 list_script 中的时间戳
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
# 使用新的统一裁剪策略
video_clip_result = clip_video.clip_video_unified(
video_origin_path=params.video_origin_path,
script_list=list_script,
tts_results=tts_results
)
# 更新 list_script 中的时间戳和路径信息
tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results}
subclip_clip_result = {
tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results
}
new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result)
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60)
"""
@ -139,8 +154,27 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: di
combined_video_path = path.join(utils.task_dir(task_id), f"merger.mp4")
logger.info(f"\n\n## 5. 合并视频: => {combined_video_path}")
# 如果 new_script_list 中没有 video则使用 subclip_path_videos 中的视频
video_clips = [new_script['video'] if new_script.get('video') else subclip_path_videos.get(new_script.get('_id', '')) for new_script in new_script_list]
# 使用统一裁剪后的视频片段
video_clips = []
for new_script in new_script_list:
video_path = new_script.get('video')
if video_path and os.path.exists(video_path):
video_clips.append(video_path)
else:
logger.warning(f"片段 {new_script.get('_id')} 的视频文件不存在或未生成: {video_path}")
# 如果统一裁剪失败尝试使用备用方案如果提供了subclip_path_videos
if subclip_path_videos and new_script.get('_id') in subclip_path_videos:
backup_video = subclip_path_videos[new_script.get('_id')]
if os.path.exists(backup_video):
video_clips.append(backup_video)
logger.info(f"使用备用视频: {backup_video}")
else:
logger.error(f"备用视频也不存在: {backup_video}")
else:
logger.error(f"无法找到片段 {new_script.get('_id')} 的视频文件")
logger.info(f"准备合并 {len(video_clips)} 个视频片段")
merger_video.combine_clip_videos(
output_video_path=combined_video_path,
@ -208,6 +242,199 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos: di
return kwargs
def start_subclip_unified(task_id: str, params: VideoClipParams):
"""
统一视频裁剪处理函数 - 完全基于OST类型的新实现
这是优化后的版本完全移除了对预裁剪视频的依赖
实现真正的统一裁剪策略
Args:
task_id: 任务ID
params: 视频参数
"""
global merged_audio_path, merged_subtitle_path
logger.info(f"\n\n## 开始统一视频处理任务: {task_id}")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=0)
"""
1. 加载剪辑脚本
"""
logger.info("\n\n## 1. 加载视频脚本")
video_script_path = path.join(params.video_clip_json_path)
if path.exists(video_script_path):
try:
with open(video_script_path, "r", encoding="utf-8") as f:
list_script = json.load(f)
video_list = [i['narration'] for i in list_script]
video_ost = [i['OST'] for i in list_script]
time_list = [i['timestamp'] for i in list_script]
video_script = " ".join(video_list)
logger.debug(f"解说完整脚本: \n{video_script}")
logger.debug(f"解说 OST 列表: \n{video_ost}")
logger.debug(f"解说时间戳列表: \n{time_list}")
except Exception as e:
logger.error(f"无法读取视频json脚本请检查脚本格式是否正确")
raise ValueError("无法读取视频json脚本请检查脚本格式是否正确")
else:
logger.error(f"video_script_path: {video_script_path}")
raise ValueError("解说脚本不存在!请检查配置是否正确。")
"""
2. 使用 TTS 生成音频素材
"""
logger.info("\n\n## 2. 根据OST设置生成音频列表")
# 只为OST=0 or 2的判断生成音频 OST=0 仅保留解说 OST=2 保留解说和原声
tts_segments = [
segment for segment in list_script
if segment['OST'] in [0, 2]
]
logger.debug(f"需要生成TTS的片段数: {len(tts_segments)}")
tts_results = voice.tts_multiple(
task_id=task_id,
list_script=tts_segments, # 只传入需要TTS的片段
voice_name=params.voice_name,
voice_rate=params.voice_rate,
voice_pitch=params.voice_pitch,
)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20)
"""
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
"""
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
# 使用新的统一裁剪策略
video_clip_result = clip_video.clip_video_unified(
video_origin_path=params.video_origin_path,
script_list=list_script,
tts_results=tts_results
)
# 更新 list_script 中的时间戳和路径信息
tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results}
subclip_clip_result = {
tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results
}
new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result)
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60)
"""
4. 合并音频和字幕
"""
logger.info("\n\n## 4. 合并音频和字幕")
total_duration = sum([script["duration"] for script in new_script_list])
if tts_segments:
try:
# 合并音频文件
merged_audio_path = audio_merger.merge_audio_files(
task_id=task_id,
total_duration=total_duration,
list_script=new_script_list
)
logger.info(f"音频文件合并成功->{merged_audio_path}")
# 合并字幕文件
merged_subtitle_path = subtitle_merger.merge_subtitle_files(new_script_list)
logger.info(f"字幕文件合并成功->{merged_subtitle_path}")
except Exception as e:
logger.error(f"合并音频文件失败: {str(e)}")
else:
logger.warning("没有需要合并的音频/字幕")
merged_audio_path = ""
merged_subtitle_path = ""
"""
5. 合并视频
"""
final_video_paths = []
combined_video_paths = []
combined_video_path = path.join(utils.task_dir(task_id), f"merger.mp4")
logger.info(f"\n\n## 5. 合并视频: => {combined_video_path}")
# 使用统一裁剪后的视频片段
video_clips = []
for new_script in new_script_list:
video_path = new_script.get('video')
if video_path and os.path.exists(video_path):
video_clips.append(video_path)
else:
logger.error(f"片段 {new_script.get('_id')} 的视频文件不存在: {video_path}")
logger.info(f"准备合并 {len(video_clips)} 个视频片段")
merger_video.combine_clip_videos(
output_video_path=combined_video_path,
video_paths=video_clips,
video_ost_list=video_ost,
video_aspect=params.video_aspect,
threads=params.n_threads
)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=80)
"""
6. 合并字幕/BGM/配音/视频
"""
output_video_path = path.join(utils.task_dir(task_id), f"combined.mp4")
logger.info(f"\n\n## 6. 最后一步: 合并字幕/BGM/配音/视频 -> {output_video_path}")
bgm_path = utils.get_bgm_file()
# 获取优化的音量配置
optimized_volumes = get_recommended_volumes_for_content('mixed')
# 应用用户设置和优化建议的组合
final_tts_volume = params.tts_volume if hasattr(params, 'tts_volume') and params.tts_volume != 1.0 else optimized_volumes['tts_volume']
final_original_volume = params.original_volume if hasattr(params, 'original_volume') and params.original_volume != 0.7 else optimized_volumes['original_volume']
final_bgm_volume = params.bgm_volume if hasattr(params, 'bgm_volume') and params.bgm_volume != 0.3 else optimized_volumes['bgm_volume']
logger.info(f"音量配置 - TTS: {final_tts_volume}, 原声: {final_original_volume}, BGM: {final_bgm_volume}")
# 调用示例
options = {
'voice_volume': final_tts_volume,
'bgm_volume': final_bgm_volume,
'original_audio_volume': final_original_volume,
'keep_original_audio': True,
'subtitle_enabled': params.subtitle_enabled,
'subtitle_font': params.font_name,
'subtitle_font_size': params.font_size,
'subtitle_color': params.text_fore_color,
'subtitle_bg_color': None,
'subtitle_position': params.subtitle_position,
'custom_position': params.custom_position,
'threads': params.n_threads
}
generate_video.merge_materials(
video_path=combined_video_path,
audio_path=merged_audio_path,
subtitle_path=merged_subtitle_path,
bgm_path=bgm_path,
output_path=output_video_path,
options=options
)
final_video_paths.append(output_video_path)
combined_video_paths.append(combined_video_path)
logger.success(f"统一处理任务 {task_id} 已完成, 生成 {len(final_video_paths)} 个视频.")
kwargs = {
"videos": final_video_paths,
"combined_videos": combined_video_paths
}
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, **kwargs)
return kwargs
def validate_params(video_path, audio_path, output_file, params):
"""
验证输入参数

View File

@ -4,19 +4,42 @@ import json
import traceback
import edge_tts
import asyncio
import requests
from loguru import logger
from typing import List, Union
from typing import List, Union, Tuple
from datetime import datetime
from xml.sax.saxutils import unescape
from edge_tts import submaker, SubMaker
from edge_tts.submaker import mktimestamp
# from edge_tts.submaker import mktimestamp # 函数可能不存在,我们自己实现
from moviepy.video.tools import subtitles
try:
from moviepy import AudioFileClip
MOVIEPY_AVAILABLE = True
except ImportError:
MOVIEPY_AVAILABLE = False
logger.warning("moviepy 未安装,将使用估算方法计算音频时长")
import time
from app.config import config
from app.utils import utils
def mktimestamp(time_seconds: float) -> str:
"""
将秒数转换为 SRT 时间戳格式
Args:
time_seconds: 时间
Returns:
str: SRT 格式的时间戳 "00:01:23.456"
"""
hours = int(time_seconds // 3600)
minutes = int((time_seconds % 3600) // 60)
seconds = time_seconds % 60
return f"{hours:02d}:{minutes:02d}:{seconds:06.3f}"
def get_all_azure_voices(filter_locals=None) -> list[str]:
if filter_locals is None:
filter_locals = ["zh-CN", "en-US", "zh-HK", "zh-TW", "vi-VN"]
@ -1035,11 +1058,39 @@ def is_azure_v2_voice(voice_name: str):
return ""
def should_use_azure_speech_services(voice_name: str) -> bool:
"""判断音色是否应该使用Azure Speech Services"""
if not voice_name or is_soulvoice_voice(voice_name):
return False
voice_name = voice_name.strip()
# 如果是带-V2后缀的肯定是Azure Speech Services
if voice_name.endswith("-V2"):
return True
# 检查是否为Azure官方音色格式 (如: zh-CN-YunzeNeural)
# Azure音色通常格式为: [语言]-[地区]-[名称]Neural
import re
pattern = r'^[a-z]{2}-[A-Z]{2}-\w+Neural$'
if re.match(pattern, voice_name):
return True
return False
def tts(
text: str, voice_name: str, voice_rate: float, voice_pitch: float, voice_file: str
) -> Union[SubMaker, None]:
if is_azure_v2_voice(voice_name):
# 检查是否为 SoulVoice 引擎
if is_soulvoice_voice(voice_name):
return soulvoice_tts(text, voice_name, voice_file, speed=voice_rate)
# 检查是否应该使用 Azure Speech Services
if should_use_azure_speech_services(voice_name):
return azure_tts_v2(text, voice_name, voice_file)
# 默认使用 Edge TTS (Azure V1)
return azure_tts_v1(text, voice_name, voice_rate, voice_pitch, voice_file)
@ -1110,12 +1161,22 @@ def azure_tts_v1(
def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> Union[SubMaker, None]:
voice_name = is_azure_v2_voice(voice_name)
if not voice_name:
logger.error(f"invalid voice name: {voice_name}")
raise ValueError(f"invalid voice name: {voice_name}")
# 直接使用官方音色名称不需要V2后缀验证
# Azure Speech Services 的音色名称如: zh-CN-YunzeNeural, en-US-AvaMultilingualNeural
processed_voice_name = voice_name.strip()
if not processed_voice_name:
logger.error(f"invalid voice name: {voice_name} (empty)")
raise ValueError(f"invalid voice name: {voice_name} (empty)")
text = text.strip()
# 检查Azure Speech SDK是否可用
try:
import azure.cognitiveservices.speech as speechsdk
except ImportError as e:
logger.error("Azure Speech SDK 未安装。请运行: pip install azure-cognitiveservices-speech")
logger.error("或者使用 Edge TTS 引擎作为替代方案")
return None
def _format_duration_to_offset(duration) -> int:
if isinstance(duration, str):
time_obj = datetime.strptime(duration, "%H:%M:%S.%f")
@ -1134,9 +1195,7 @@ def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> Union[SubMaker,
for i in range(3):
try:
logger.info(f"start, voice name: {voice_name}, try: {i + 1}")
import azure.cognitiveservices.speech as speechsdk
logger.info(f"start, voice name: {processed_voice_name}, try: {i + 1}")
sub_maker = SubMaker()
@ -1155,7 +1214,7 @@ def azure_tts_v2(text: str, voice_name: str, voice_file: str) -> Union[SubMaker,
speech_config = speechsdk.SpeechConfig(
subscription=speech_key, region=service_region
)
speech_config.speech_synthesis_voice_name = voice_name
speech_config.speech_synthesis_voice_name = processed_voice_name
# speech_config.set_property(property_id=speechsdk.PropertyId.SpeechServiceResponse_RequestSentenceBoundary,
# value='true')
speech_config.set_property(
@ -1368,6 +1427,10 @@ def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str)
if start_time < 0:
start_time = _start_time
# 将 100纳秒单位转换为秒
start_time_seconds = start_time / 10000000
end_time_seconds = end_time / 10000000
sub = unescape(sub)
sub_line += sub
sub_text = match_line(sub_line, sub_index)
@ -1375,8 +1438,8 @@ def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str)
sub_index += 1
line = formatter(
idx=sub_index,
start_time=start_time,
end_time=end_time,
start_time=start_time_seconds,
end_time=end_time_seconds,
sub_text=sub_text,
)
sub_items.append(line)
@ -1402,9 +1465,13 @@ def create_subtitle(sub_maker: submaker.SubMaker, text: str, subtitle_file: str)
f"\nsub_items:{json.dumps(sub_items, indent=4, ensure_ascii=False)}"
f"\nscript_lines:{json.dumps(script_lines, indent=4, ensure_ascii=False)}"
)
# 返回默认值,避免 None 错误
return subtitle_file, 3.0
except Exception as e:
logger.error(f"failed, error: {str(e)}")
# 返回默认值,避免 None 错误
return subtitle_file, 3.0
def get_audio_duration(sub_maker: submaker.SubMaker):
@ -1453,8 +1520,21 @@ def tts_multiple(task_id: str, list_script: list, voice_name: str, voice_rate: f
f"或者使用其他 tts 引擎")
continue
else:
# 为当前片段生成字幕文件
_, duration = create_subtitle(sub_maker=sub_maker, text=text, subtitle_file=subtitle_file)
# SoulVoice 引擎不生成字幕文件
if is_soulvoice_voice(voice_name):
# 获取实际音频文件的时长
duration = get_audio_duration_from_file(audio_file)
if duration <= 0:
# 如果无法获取文件时长,尝试从 SubMaker 获取
duration = get_audio_duration(sub_maker)
if duration <= 0:
# 最后的 fallback基于文本长度估算
duration = max(1.0, len(text) / 3.0)
logger.warning(f"无法获取音频时长,使用文本估算: {duration:.2f}")
# 不创建字幕文件
subtitle_file = ""
else:
_, duration = create_subtitle(sub_maker=sub_maker, text=text, subtitle_file=subtitle_file)
tts_results.append({
"_id": item['_id'],
@ -1467,3 +1547,168 @@ def tts_multiple(task_id: str, list_script: list, voice_name: str, voice_rate: f
logger.info(f"已生成音频文件: {audio_file}")
return tts_results
def get_audio_duration_from_file(audio_file: str) -> float:
"""
获取音频文件的时长
"""
if MOVIEPY_AVAILABLE:
try:
audio_clip = AudioFileClip(audio_file)
duration = audio_clip.duration
audio_clip.close()
return duration
except Exception as e:
logger.error(f"使用 moviepy 获取音频时长失败: {str(e)}")
# Fallback: 使用更准确的估算方法
try:
import os
file_size = os.path.getsize(audio_file)
# 更准确的 MP3 时长估算
# 假设 MP3 平均比特率为 128kbps = 16KB/s
# 但实际文件还包含头部信息,所以调整系数
estimated_duration = max(1.0, file_size / 20000) # 调整为更保守的估算
# 对于中文语音,根据文本长度进行二次校正
# 一般中文语音速度约为 3-4 字/秒
logger.warning(f"使用文件大小估算音频时长: {estimated_duration:.2f}")
return estimated_duration
except Exception as e:
logger.error(f"获取音频时长失败: {str(e)}")
# 如果所有方法都失败,返回一个基于文本长度的估算
return 3.0 # 默认3秒避免返回0
def is_soulvoice_voice(voice_name: str) -> bool:
"""
检查是否为 SoulVoice 语音
"""
return voice_name.startswith("soulvoice:") or voice_name.startswith("speech:")
def parse_soulvoice_voice(voice_name: str) -> str:
"""
解析 SoulVoice 语音名称
支持格式
- soulvoice:speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr
- speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr
"""
if voice_name.startswith("soulvoice:"):
return voice_name[10:] # 移除 "soulvoice:" 前缀
return voice_name
def soulvoice_tts(text: str, voice_name: str, voice_file: str, speed: float = 1.0) -> Union[SubMaker, None]:
"""
使用 SoulVoice API 进行文本转语音
Args:
text: 要转换的文本
voice_name: 语音名称
voice_file: 输出音频文件路径
speed: 语音速度
Returns:
SubMaker: 包含时间戳信息的字幕制作器失败时返回 None
"""
# 获取配置
api_key = config.soulvoice.get("api_key", "")
api_url = config.soulvoice.get("api_url", "https://tts.scsmtech.cn/tts")
default_model = config.soulvoice.get("model", "FunAudioLLM/CosyVoice2-0.5B")
if not api_key:
logger.error("SoulVoice API key 未配置")
return None
# 解析语音名称
parsed_voice = parse_soulvoice_voice(voice_name)
# 准备请求数据
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
data = {
'text': text.strip(),
'model': default_model,
'voice': parsed_voice,
'speed': speed
}
# 重试机制
for attempt in range(3):
try:
logger.info(f"{attempt + 1} 次调用 SoulVoice API")
# 设置代理
proxies = {}
if config.proxy.get("http"):
proxies = {
'http': config.proxy.get("http"),
'https': config.proxy.get("https", config.proxy.get("http"))
}
# 调用 API
response = requests.post(
api_url,
headers=headers,
json=data,
proxies=proxies,
timeout=60
)
if response.status_code == 200:
# 保存音频文件
with open(voice_file, 'wb') as f:
f.write(response.content)
logger.info(f"SoulVoice TTS 成功生成音频: {voice_file}")
# SoulVoice 不支持精确字幕生成,返回简单的 SubMaker 对象
sub_maker = SubMaker()
sub_maker.subs = [text] # 整个文本作为一个段落
sub_maker.offset = [(0, 0)] # 占位时间戳
return sub_maker
else:
logger.error(f"SoulVoice API 调用失败: {response.status_code} - {response.text}")
except requests.exceptions.Timeout:
logger.error(f"SoulVoice API 调用超时 (尝试 {attempt + 1}/3)")
except requests.exceptions.RequestException as e:
logger.error(f"SoulVoice API 网络错误: {str(e)} (尝试 {attempt + 1}/3)")
except Exception as e:
logger.error(f"SoulVoice TTS 处理错误: {str(e)} (尝试 {attempt + 1}/3)")
if attempt < 2: # 不是最后一次尝试
time.sleep(2) # 等待2秒后重试
logger.error("SoulVoice TTS 生成失败,已达到最大重试次数")
return None
def is_soulvoice_voice(voice_name: str) -> bool:
"""
检查是否为 SoulVoice 语音
"""
return voice_name.startswith("soulvoice:") or voice_name.startswith("speech:")
def parse_soulvoice_voice(voice_name: str) -> str:
"""
解析 SoulVoice 语音名称
支持格式
- soulvoice:speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr
- speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr
"""
if voice_name.startswith("soulvoice:"):
return voice_name[10:] # 移除 "soulvoice:" 前缀
return voice_name

View File

@ -1,4 +1,5 @@
import json
import re
from typing import Dict, Any
def check_format(script_content: str) -> Dict[str, Any]:
@ -6,76 +7,104 @@ def check_format(script_content: str) -> Dict[str, Any]:
Args:
script_content: 脚本内容
Returns:
Dict: {'success': bool, 'message': str}
Dict: {'success': bool, 'message': str, 'details': str}
"""
try:
# 检查是否为有效的JSON
data = json.loads(script_content)
# 检查是否为列表
if not isinstance(data, list):
return {
'success': False,
'message': '脚本必须是JSON数组格式'
'message': '脚本必须是JSON数组格式',
'details': '正确格式应该是: [{"_id": 1, "timestamp": "...", ...}, ...]'
}
# 检查数组不能为空
if len(data) == 0:
return {
'success': False,
'message': '脚本数组不能为空',
'details': '至少需要包含一个脚本片段'
}
# 检查每个片段
for i, clip in enumerate(data):
# 检查是否为对象类型
if not isinstance(clip, dict):
return {
'success': False,
'message': f'{i+1}个元素必须是对象类型',
'details': f'当前类型: {type(clip).__name__}'
}
# 检查必需字段
required_fields = ['narration', 'picture', 'timestamp']
required_fields = ['_id', 'timestamp', 'picture', 'narration', 'OST']
for field in required_fields:
if field not in clip:
return {
'success': False,
'message': f'{i+1}个片段缺少必需字段: {field}'
'message': f'{i+1}个片段缺少必需字段: {field}',
'details': f'必需字段: {", ".join(required_fields)}'
}
# 检查字段类型
if not isinstance(clip['narration'], str):
# 验证 _id 字段
if not isinstance(clip['_id'], int) or clip['_id'] <= 0:
return {
'success': False,
'message': f'{i+1}个片段的narration必须是字符串'
'message': f'{i+1}个片段的_id必须是正整数',
'details': f'当前值: {clip["_id"]} (类型: {type(clip["_id"]).__name__})'
}
if not isinstance(clip['picture'], str):
# 验证 timestamp 字段格式
timestamp_pattern = r'^\d{2}:\d{2}:\d{2},\d{3}-\d{2}:\d{2}:\d{2},\d{3}$'
if not isinstance(clip['timestamp'], str) or not re.match(timestamp_pattern, clip['timestamp']):
return {
'success': False,
'message': f'{i+1}个片段的picture必须是字符串'
'message': f'{i+1}个片段的timestamp格式错误',
'details': f'正确格式: "HH:MM:SS,mmm-HH:MM:SS,mmm",示例: "00:00:00,600-00:00:07,559"'
}
if not isinstance(clip['timestamp'], str):
# 验证 picture 字段
if not isinstance(clip['picture'], str) or not clip['picture'].strip():
return {
'success': False,
'message': f'{i+1}个片段的timestamp必须是字符串'
'message': f'{i+1}个片段的picture必须是非空字符串',
'details': f'当前值: {clip.get("picture", "未定义")}'
}
# 检查字段内容不能为空
if not clip['narration'].strip():
# 验证 narration 字段
if not isinstance(clip['narration'], str) or not clip['narration'].strip():
return {
'success': False,
'message': f'{i+1}个片段的narration不能为空'
'message': f'{i+1}个片段的narration必须是非空字符串',
'details': f'当前值: {clip.get("narration", "未定义")}'
}
if not clip['picture'].strip():
# 验证 OST 字段
if not isinstance(clip['OST'], int):
return {
'success': False,
'message': f'{i+1}个片段的picture不能为空'
}
if not clip['timestamp'].strip():
return {
'success': False,
'message': f'{i+1}个片段的timestamp不能为空'
'message': f'{i+1}个片段的OST必须是整数',
'details': f'当前值: {clip["OST"]} (类型: {type(clip["OST"]).__name__}),常用值: 0, 1, 2'
}
return {
'success': True,
'message': '脚本格式检查通过'
'message': '脚本格式检查通过',
'details': f'共验证 {len(data)} 个脚本片段,格式正确'
}
except json.JSONDecodeError as e:
return {
'success': False,
'message': f'JSON格式错误: {str(e)}'
'message': f'JSON格式错误: {str(e)}',
'details': '请检查JSON语法确保所有括号、引号、逗号正确'
}
except Exception as e:
return {
'success': False,
'message': f'检查过程中发生错误: {str(e)}'
'message': f'检查过程中发生错误: {str(e)}',
'details': '请联系技术支持'
}

View File

@ -509,6 +509,12 @@ def clean_model_output(output):
def cut_video(params, progress_callback=None):
"""
旧的视频裁剪函数 - 已弃用
注意此函数已被统一裁剪策略取代不再推荐使用
新的实现请使用 task.start_subclip_unified() 函数
"""
try:
task_id = str(uuid4())
st.session_state['task_id'] = task_id

View File

@ -1,17 +0,0 @@
from git_changelog.cli import build_and_render
# 运行这段脚本自动生成CHANGELOG.md文件
build_and_render(
repository=".",
output="CHANGELOG.md",
convention="angular",
provider="github",
template="keepachangelog",
parse_trailers=True,
parse_refs=False,
sections=["build", "deps", "feat", "fix", "refactor"],
versioning="pep440",
bump="1.1.2", # 指定bump版本
in_place=True,
)

Binary file not shown.

View File

@ -1,5 +1,19 @@
[app]
project_version="0.6.8"
project_version="0.7.0"
# 模型验证模式配置
# true: 严格模式,只允许使用预定义支持列表中的模型(默认)
# false: 宽松模式,允许使用任何模型名称,仅记录警告
strict_model_validation = true
# LLM API 超时配置(秒)
# 视觉模型基础超时时间
llm_vision_timeout = 120
# 文本模型基础超时时间(解说文案生成等复杂任务需要更长时间)
llm_text_timeout = 180
# API 重试次数
llm_max_retries = 3
# 支持视频理解的大模型提供商
# gemini (谷歌, 需要 VPN)
# siliconflow (硅基流动)
@ -77,6 +91,37 @@
# webui界面是否显示配置项
hide_config = true
[azure]
# Azure TTS 配置
speech_key = ""
speech_region = ""
[soulvoice]
# SoulVoice TTS API 密钥
api_key = ""
# 音色 URI必需
voice_uri = "speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr"
# API 接口地址(可选,默认值如下)
api_url = "https://tts.scsmtech.cn/tts"
# 默认模型(可选)
model = "FunAudioLLM/CosyVoice2-0.5B"
[ui]
# TTS引擎选择 (edge_tts, azure_speech, soulvoice)
tts_engine = "edge_tts"
# Edge TTS 配置
edge_voice_name = "zh-CN-XiaoyiNeural-Female"
edge_volume = 80
edge_rate = 1.0
edge_pitch = 0
# Azure Speech Services 配置
azure_voice_name = "zh-CN-XiaoyiNeural-Female"
azure_volume = 80
azure_rate = 1.0
azure_pitch = 0
[proxy]
# clash 默认地址http://127.0.0.1:7890
http = ""

19
main.py
View File

@ -1,19 +0,0 @@
import os
import uvicorn
from loguru import logger
from app.config import config
if __name__ == "__main__":
logger.info(
"start server, docs: http://127.0.0.1:" + str(config.listen_port) + "/docs"
)
os.environ["HTTP_PROXY"] = config.proxy.get("http")
os.environ["HTTPS_PROXY"] = config.proxy.get("https")
uvicorn.run(
app="app.asgi:app",
host=config.listen_host,
port=config.listen_port,
reload=config.reload_debug,
log_level="warning",
)

View File

@ -1 +1 @@
0.6.8
0.7.0

View File

@ -1,17 +0,0 @@
# Release Notes
## Latest Changes
* docs(README): 更新README. PR [#138](https://github.com/linyqh/NarratoAI/pull/138) by [@linyqh](https://github.com/linyqh).
* Dev 0.6.0. PR [#137](https://github.com/linyqh/NarratoAI/pull/137) by [@linyqh](https://github.com/linyqh).
* Dev 0.6.0 . PR [#134](https://github.com/linyqh/NarratoAI/pull/134) by [@linyqh](https://github.com/linyqh).
* Dev-0.3.9. PR [#73](https://github.com/linyqh/NarratoAI/pull/73) by [@linyqh](https://github.com/linyqh).
* 0.3.9 版本发布. PR [#71](https://github.com/linyqh/NarratoAI/pull/71) by [@linyqh](https://github.com/linyqh).
* docs: add Japanese README. PR [#66](https://github.com/linyqh/NarratoAI/pull/66) by [@eltociear](https://github.com/eltociear).
* docs: 测试 release 2. PR [#62](https://github.com/linyqh/NarratoAI/pull/62) by [@linyqh](https://github.com/linyqh).
* docs: 测试 release. PR [#61](https://github.com/linyqh/NarratoAI/pull/61) by [@linyqh](https://github.com/linyqh).
* docs: 测试commit. PR [#60](https://github.com/linyqh/NarratoAI/pull/60) by [@linyqh](https://github.com/linyqh).
* Dev. PR [#59](https://github.com/linyqh/NarratoAI/pull/59) by [@linyqh](https://github.com/linyqh).
* 0.2.0新版预发布. PR [#37](https://github.com/linyqh/NarratoAI/pull/37) by [@linyqh](https://github.com/linyqh).
* v0.3.6. PR [#58](https://github.com/linyqh/NarratoAI/pull/58) by [@linyqh](https://github.com/linyqh).
* 0.3.4 修改各种bug. PR [#49](https://github.com/linyqh/NarratoAI/pull/49) by [@linyqh](https://github.com/linyqh).

View File

@ -11,6 +11,7 @@ pysrt==1.1.2
openai~=1.77.0
google-generativeai>=0.8.5
azure-cognitiveservices-speech~=1.37.0
# 待优化项
# opencv-python==4.11.0.86
@ -29,7 +30,6 @@ google-generativeai>=0.8.5
# python-multipart~=0.0.9
# redis==5.0.3
# opencv-python~=4.10.0.84
# azure-cognitiveservices-speech~=1.37.0
# git-changelog~=2.5.2
# watchdog==5.0.2
# pydub==0.25.1

View File

@ -1,88 +0,0 @@
@echo off
:: 设置控制台代码页为UTF-8解决中文显示问题
chcp 65001 >nul
:: 关闭命令回显,使脚本运行时更整洁
:: 获取当前脚本所在目录路径并存储在变量中
set "CURRENT_DIR=%~dp0"
echo ***** 当前工作目录: %CURRENT_DIR% *****
:: ==================== FFmpeg 配置 ====================
:: 设置 FFmpeg 可执行文件的完整路径
set "FFMPEG_BINARY=%CURRENT_DIR%lib\ffmpeg\ffmpeg-7.0-essentials_build\ffmpeg.exe"
set "FFMPEG_PATH=%CURRENT_DIR%lib\ffmpeg\ffmpeg-7.0-essentials_build"
echo ***** FFmpeg 执行文件路径: %FFMPEG_BINARY% *****
:: 将 FFmpeg 目录添加到系统 PATH 环境变量,使其可以在命令行中直接调用
set "PATH=%FFMPEG_PATH%;%PATH%"
:: ==================== ImageMagick 配置 ====================
:: 设置 ImageMagick 可执行文件的完整路径(用于图像处理)
set "IMAGEMAGICK_BINARY=%CURRENT_DIR%lib\imagemagic\ImageMagick-7.1.1-29-portable-Q16-x64\magick.exe"
set "IMAGEMAGICK_PATH=%CURRENT_DIR%lib\imagemagic\ImageMagick-7.1.1-29-portable-Q16-x64"
echo ***** ImageMagick 执行文件路径: %IMAGEMAGICK_BINARY% *****
:: 将 ImageMagick 目录添加到系统 PATH 环境变量
set "PATH=%IMAGEMAGICK_PATH%;%PATH%"
:: ==================== Python 环境配置 ====================
:: 设置 Python 模块搜索路径,确保能够正确导入项目模块
set "PYTHONPATH=%CURRENT_DIR%NarratoAI;%PYTHONPATH%"
echo ***** Python模块搜索路径: %PYTHONPATH% *****
:: ==================== 项目特定环境变量配置 ====================
:: 设置项目根目录和依赖工具的路径,供应用程序内部使用
set "NARRATO_ROOT=%CURRENT_DIR%NarratoAI"
set "NARRATO_FFMPEG=%FFMPEG_BINARY%"
set "NARRATO_IMAGEMAGICK=%IMAGEMAGICK_BINARY%"
:: ==================== Streamlit 配置 ====================
:: 设置 StreamlitPython Web应用框架的配置文件路径
set "USER_HOME=%USERPROFILE%"
set "STREAMLIT_DIR=%USER_HOME%\.streamlit"
set "CREDENTIAL_FILE=%STREAMLIT_DIR%\credentials.toml"
echo ***** Streamlit 凭证文件路径: %CREDENTIAL_FILE% *****
:: 检查并创建 Streamlit 配置目录和凭证文件(如果不存在)
if not exist "%STREAMLIT_DIR%" (
echo 创建 Streamlit 配置目录...
mkdir "%STREAMLIT_DIR%"
(
echo [general]
echo email=""
) > "%CREDENTIAL_FILE%"
echo Streamlit 配置文件已创建!
)
:: ==================== 依赖检查 ====================
:: 验证必要的外部工具是否存在,确保应用可以正常运行
if not exist "%FFMPEG_BINARY%" (
echo 错误: 未找到 FFmpeg 执行文件,路径: %FFMPEG_BINARY%
echo 请确保已正确安装 FFmpeg 或检查路径配置
pause
exit /b 1
)
if not exist "%IMAGEMAGICK_BINARY%" (
echo 错误: 未找到 ImageMagick 执行文件,路径: %IMAGEMAGICK_BINARY%
echo 请确保已正确安装 ImageMagick 或检查路径配置
pause
exit /b 1
)
:: ==================== 启动应用 ====================
:: 切换到项目目录并启动应用
echo ***** 切换工作目录到: %CURRENT_DIR%NarratoAI *****
cd /d "%CURRENT_DIR%NarratoAI"
echo ***** 正在启动 NarratoAI 应用... *****
:: 使用项目自带的Python解释器启动Streamlit应用
"%CURRENT_DIR%lib\python\python.exe" -m streamlit run webui.py --browser.serverAddress="127.0.0.1" --server.enableCORS=True --server.maxUploadSize=2048 --browser.gatherUsageStats=False
:: 参数说明:
:: --browser.serverAddress="127.0.0.1" - 将服务器绑定到本地地址
:: --server.enableCORS=True - 启用跨域资源共享
:: --server.maxUploadSize=2048 - 设置最大上传文件大小为2048MB
:: --browser.gatherUsageStats=False - 禁用使用统计收集
:: 应用关闭后暂停,让用户看到最终输出
pause

View File

@ -1,112 +0,0 @@
@echo off
chcp 65001 >nul
setlocal EnableDelayedExpansion
set "CURRENT_DIR=%~dp0"
echo ***** 当前目录: %CURRENT_DIR% *****
REM 清除可能影响的环境变量
set PYTHONPATH=
set PYTHONHOME=
REM 初始化代理设置为空
set "HTTP_PROXY="
set "HTTPS_PROXY="
:git_pull
echo 正在更新代码,请稍候...
REM 使用git更新代码并检查是否成功
"%CURRENT_DIR%lib\git\bin\git.exe" -C "%CURRENT_DIR%NarratoAI" pull > "%TEMP%\git_output.txt" 2>&1
set GIT_EXIT_CODE=%ERRORLEVEL%
if %GIT_EXIT_CODE% NEQ 0 (
echo [错误] 代码更新失败!错误代码: %GIT_EXIT_CODE%
type "%TEMP%\git_output.txt"
findstr /C:"error: 403" /C:"fatal: unable to access" /C:"The requested URL returned error: 403" "%TEMP%\git_output.txt" >nul
if !ERRORLEVEL! EQU 0 (
echo.
echo [提示] 检测到 GitHub 403 错误,可能是由于网络问题导致。
if not defined HTTP_PROXY (
echo.
echo 请输入代理地址(例如 http://127.0.0.1:7890或直接按回车跳过:
set /p PROXY_INPUT="> "
if not "!PROXY_INPUT!"=="" (
set "HTTP_PROXY=!PROXY_INPUT!"
set "HTTPS_PROXY=!PROXY_INPUT!"
echo.
echo [信息] 已设置代理: !PROXY_INPUT!
echo 正在使用代理重试...
goto git_pull
) else (
echo.
echo [警告] 未设置代理,建议:
echo - 手动设置系统代理
echo - 使用VPN或其他网络工具
echo - 重新运行此脚本并输入代理地址
)
) else (
echo.
echo [警告] 使用代理 !HTTP_PROXY! 仍然失败。
echo 您可以:
echo 1. 输入新的代理地址(或直接按回车使用当前代理: !HTTP_PROXY!
echo 2. 输入 "clear" 清除代理设置
set /p PROXY_INPUT="> "
if "!PROXY_INPUT!"=="clear" (
set "HTTP_PROXY="
set "HTTPS_PROXY="
echo [信息] 已清除代理设置
goto end
) else if not "!PROXY_INPUT!"=="" (
set "HTTP_PROXY=!PROXY_INPUT!"
set "HTTPS_PROXY=!PROXY_INPUT!"
echo [信息] 已更新代理为: !PROXY_INPUT!
echo 正在使用新代理重试...
goto git_pull
) else (
echo [信息] 保持当前代理: !HTTP_PROXY!
echo 您可以稍后再次尝试或手动解决网络问题
)
)
) else (
echo.
echo [警告] 遇到其他错误,请检查输出信息以获取更多详情。
)
goto end
) else (
echo [成功] 代码已成功更新!
)
echo 正在更新pip请稍候...
"%CURRENT_DIR%lib\python\python.exe" -m pip install --upgrade pip >nul 2>&1
if %ERRORLEVEL% NEQ 0 (
echo [警告] pip更新失败将继续使用当前版本。
) else (
echo [成功] pip已更新至最新版本
)
echo 正在安装依赖,请稍候...
REM 确保使用正确的Python和pip
"%CURRENT_DIR%lib\python\python.exe" -m pip install -q -r "%CURRENT_DIR%NarratoAI\requirements.txt"
if %ERRORLEVEL% NEQ 0 (
echo [错误] 依赖安装失败请检查requirements.txt文件是否存在。
goto end
) else (
echo [成功] 依赖安装完成!
)
echo ===================================
echo ✓ 程序更新已完成
echo ===================================
:end
if exist "%TEMP%\git_output.txt" del "%TEMP%\git_output.txt"
REM 清除设置的代理环境变量
if defined HTTP_PROXY (
echo [信息] 本次会话的代理设置已清除
set "HTTP_PROXY="
set "HTTPS_PROXY="
)
pause

View File

@ -1,178 +0,0 @@
import requests
import json
import os
import time
from typing import Dict, Any
class VideoPipeline:
def __init__(self, base_url: str = "http://127.0.0.1:8080"):
self.base_url = base_url
def download_video(self, url: str, resolution: str = "1080p",
output_format: str = "mp4", rename: str = None) -> Dict[str, Any]:
"""下载视频的第一步"""
endpoint = f"{self.base_url}/api/v2/youtube/download"
payload = {
"url": url,
"resolution": resolution,
"output_format": output_format,
"rename": rename or time.strftime("%Y-%m-%d")
}
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
def generate_script(self, video_path: str, skip_seconds: int = 0,
threshold: int = 30, vision_batch_size: int = 10,
vision_llm_provider: str = "gemini") -> Dict[str, Any]:
"""生成脚本的第二步"""
endpoint = f"{self.base_url}/api/v2/scripts/generate"
payload = {
"video_path": video_path,
"skip_seconds": skip_seconds,
"threshold": threshold,
"vision_batch_size": vision_batch_size,
"vision_llm_provider": vision_llm_provider
}
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
def crop_video(self, video_path: str, script: list) -> Dict[str, Any]:
"""剪辑视频的第三步"""
endpoint = f"{self.base_url}/api/v2/scripts/crop"
payload = {
"video_origin_path": video_path,
"video_script": script
}
response = requests.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
def generate_final_video(self, task_id: str, video_path: str,
script_path: str, script: list, subclip_videos: Dict[str, str], voice_name: str) -> Dict[str, Any]:
"""生成最终视频的第四步"""
endpoint = f"{self.base_url}/api/v2/scripts/start-subclip"
request_data = {
"video_clip_json": script,
"video_clip_json_path": script_path,
"video_origin_path": video_path,
"video_aspect": "16:9",
"video_language": "zh-CN",
"voice_name": voice_name,
"voice_volume": 1,
"voice_rate": 1.2,
"voice_pitch": 1,
"bgm_name": "random",
"bgm_type": "random",
"bgm_file": "",
"bgm_volume": 0.3,
"subtitle_enabled": True,
"subtitle_position": "bottom",
"font_name": "STHeitiMedium.ttc",
"text_fore_color": "#FFFFFF",
"text_background_color": "transparent",
"font_size": 75,
"stroke_color": "#000000",
"stroke_width": 1.5,
"custom_position": 70,
"n_threads": 8
}
payload = {
"request": request_data,
"subclip_videos": subclip_videos
}
params = {"task_id": task_id}
response = requests.post(endpoint, params=params, json=payload)
response.raise_for_status()
return response.json()
def save_script_to_json(self, script: list, script_path: str) -> str:
"""保存脚本到json文件"""
try:
with open(script_path, 'w', encoding='utf-8') as f:
json.dump(script, f, ensure_ascii=False, indent=2)
print(f"脚本已保存到: {script_path}")
return script_path
except Exception as e:
print(f"保存脚本失败: {str(e)}")
raise
def run_pipeline(self, task_id: str, script_name: str, youtube_url: str, video_name: str="null", skip_seconds: int = 0, threshold: int = 30, vision_batch_size: int = 10, vision_llm_provider: str = "gemini", voice_name: str = "zh-CN-YunjianNeural") -> Dict[str, Any]:
"""运行完整的pipeline"""
try:
current_path = os.path.dirname(os.path.abspath(__file__))
video_path = os.path.join(current_path, "resource", "videos", f"{video_name}.mp4")
# 判断视频是否存在
if not os.path.exists(video_path):
# 1. 下载视频
print(f"视频不存在, 开始下载视频: {video_path}")
download_result = self.download_video(url=youtube_url, resolution="1080p", output_format="mp4", rename=video_name)
video_path = download_result["output_path"]
else:
print(f"视频已存在: {video_path}")
# 2. 判断script_name是否存在
# 2.1.1 拼接脚本路径 NarratoAI/resource/scripts
script_path = os.path.join(current_path, "resource", "scripts", script_name)
if os.path.exists(script_path):
script = json.load(open(script_path, "r", encoding="utf-8"))
else:
# 2.1.2 生成脚本
print("开始生成脚本...")
script_result = self.generate_script(video_path=video_path, skip_seconds=skip_seconds, threshold=threshold, vision_batch_size=vision_batch_size, vision_llm_provider=vision_llm_provider)
script = script_result["script"]
# 2.2 保存脚本到json文件
print("保存脚本到json文件...")
self.save_script_to_json(script=script, script_path=script_path)
# 3. 剪辑视频
print("开始剪辑视频...")
crop_result = self.crop_video(video_path=video_path, script=script)
subclip_videos = crop_result["subclip_videos"]
# 4. 生成最终视频
print("开始生成最终视频...")
self.generate_final_video(
task_id=task_id,
video_path=video_path,
script_path=script_path,
script=script,
subclip_videos=subclip_videos,
voice_name=voice_name
)
return {
"status": "等待异步生成视频",
"path": os.path.join(current_path, "storage", "tasks", task_id)
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
# 使用示例
if __name__ == "__main__":
pipeline = VideoPipeline()
result = pipeline.run_pipeline(
task_id="test_111901",
script_name="test.json",
youtube_url="https://www.youtube.com/watch?v=vLJ7Yed6FQ4",
video_name="2024-11-19-01",
skip_seconds=50,
threshold=35,
vision_batch_size=10,
vision_llm_provider="gemini",
voice_name="zh-CN-YunjianNeural",
)
print(result)

View File

@ -4,7 +4,7 @@ import sys
from loguru import logger
from app.config import config
from webui.components import basic_settings, video_settings, audio_settings, subtitle_settings, script_settings, \
review_settings, merge_settings, system_settings
system_settings
# from webui.utils import cache, file_utils
from app.utils import utils
from app.utils import ffmpeg_utils
@ -106,8 +106,7 @@ def init_global_state():
st.session_state['video_plot'] = ''
if 'ui_language' not in st.session_state:
st.session_state['ui_language'] = config.ui.get("language", utils.get_system_locale())
if 'subclip_videos' not in st.session_state:
st.session_state['subclip_videos'] = {}
# 移除subclip_videos初始化 - 现在使用统一裁剪策略
def tr(key):
@ -136,11 +135,9 @@ def render_generate_button():
logger.add(log_received)
config.save_config()
task_id = st.session_state.get('task_id')
if not task_id:
st.error(tr("请先裁剪视频"))
return
# 移除task_id检查 - 现在使用统一裁剪策略,不再需要预裁剪
# 直接检查必要的文件是否存在
if not st.session_state.get('video_clip_json_path'):
st.error(tr("脚本文件不能为空"))
return
@ -168,10 +165,14 @@ def render_generate_button():
# 创建参数对象
params = VideoClipParams(**all_params)
result = tm.start_subclip(
# 使用新的统一裁剪策略不再需要预裁剪的subclip_videos
# 生成一个新的task_id用于本次处理
import uuid
task_id = str(uuid.uuid4())
result = tm.start_subclip_unified(
task_id=task_id,
params=params,
subclip_path_videos=st.session_state['subclip_videos']
params=params
)
video_files = result.get("videos", [])
@ -220,22 +221,17 @@ def main():
# 首先渲染不依赖PyTorch的UI部分
# 渲染基础设置面板
basic_settings.render_basic_settings(tr)
# 渲染合并设置
merge_settings.render_merge_settings(tr)
# 渲染主面板
panel = st.columns(3)
with panel[0]:
script_settings.render_script_panel(tr)
with panel[1]:
video_settings.render_video_panel(tr)
audio_settings.render_audio_panel(tr)
with panel[2]:
video_settings.render_video_panel(tr)
subtitle_settings.render_subtitle_panel(tr)
# 渲染视频审查面板
review_settings.render_review_panel(tr)
# 放到最后渲染可能使用PyTorch的部分
# 渲染系统设置面板
with panel[2]:

View File

@ -3,13 +3,11 @@ from .script_settings import render_script_panel
from .video_settings import render_video_panel
from .audio_settings import render_audio_panel
from .subtitle_settings import render_subtitle_panel
from .review_settings import render_review_panel
__all__ = [
'render_basic_settings',
'render_script_panel',
'render_video_panel',
'render_audio_panel',
'render_subtitle_panel',
'render_review_panel'
]
'render_subtitle_panel'
]

View File

@ -8,6 +8,64 @@ from app.utils import utils
from webui.utils.cache import get_songs_cache
def get_soulvoice_voices():
"""获取 SoulVoice 语音列表"""
# 检查是否配置了 SoulVoice API key
api_key = config.soulvoice.get("api_key", "")
if not api_key:
return []
# 只返回一个 SoulVoice 选项,音色通过输入框自定义
return ["soulvoice:custom"]
def get_tts_engine_options():
"""获取TTS引擎选项"""
return {
"edge_tts": "Edge TTS",
"azure_speech": "Azure Speech Services",
"soulvoice": "SoulVoice"
}
def get_tts_engine_descriptions():
"""获取TTS引擎详细描述"""
return {
"edge_tts": {
"title": "Edge TTS",
"features": "完全免费,但服务稳定性一般,不支持语音克隆功能",
"use_case": "测试和轻量级使用",
"registration": None
},
"azure_speech": {
"title": "Azure Speech Services",
"features": "提供一定免费额度,超出后按量付费,需要绑定海外信用卡",
"use_case": "企业级应用,需要稳定服务",
"registration": "https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices"
},
"soulvoice": {
"title": "SoulVoice",
"features": "提供免费额度,支持语音克隆,支持微信购买额度,无需信用卡,性价比极高",
"use_case": "个人用户和中小企业,需要语音克隆功能",
"registration": "https://soulvoice.scsmtech.cn/"
}
}
def is_valid_azure_voice_name(voice_name: str) -> bool:
"""检查是否为有效的Azure音色名称格式"""
if not voice_name or not isinstance(voice_name, str):
return False
voice_name = voice_name.strip()
# Azure音色名称通常格式为: [语言]-[地区]-[名称]Neural
# 例如: zh-CN-YunzeNeural, en-US-AvaMultilingualNeural
import re
pattern = r'^[a-z]{2}-[A-Z]{2}-\w+Neural$'
return bool(re.match(pattern, voice_name))
def render_audio_panel(tr):
"""渲染音频设置面板"""
with st.container(border=True):
@ -22,37 +80,91 @@ def render_audio_panel(tr):
def render_tts_settings(tr):
"""渲染TTS(文本转语音)设置"""
# 1. TTS引擎选择器
# st.subheader("🎤 TTS引擎选择")
engine_options = get_tts_engine_options()
engine_descriptions = get_tts_engine_descriptions()
# 获取保存的TTS引擎设置
saved_tts_engine = config.ui.get("tts_engine", "edge_tts")
# 确保保存的引擎在可用选项中
if saved_tts_engine not in engine_options:
saved_tts_engine = "edge_tts"
# TTS引擎选择下拉框
selected_engine = st.selectbox(
"选择TTS引擎",
options=list(engine_options.keys()),
format_func=lambda x: engine_options[x],
index=list(engine_options.keys()).index(saved_tts_engine),
help="选择您要使用的文本转语音引擎"
)
# 保存TTS引擎选择
config.ui["tts_engine"] = selected_engine
# 2. 显示引擎详细说明
if selected_engine in engine_descriptions:
desc = engine_descriptions[selected_engine]
with st.expander(f"📋 {desc['title']} 详细说明", expanded=True):
st.markdown(f"**特点:** {desc['features']}")
st.markdown(f"**适用场景:** {desc['use_case']}")
if desc['registration']:
st.markdown(f"**注册地址:** [{desc['registration']}]({desc['registration']})")
# 3. 根据选择的引擎渲染对应的配置界面
# st.subheader("⚙️ 引擎配置")
if selected_engine == "edge_tts":
render_edge_tts_settings(tr)
elif selected_engine == "azure_speech":
render_azure_speech_settings(tr)
elif selected_engine == "soulvoice":
render_soulvoice_engine_settings(tr)
# 4. 试听功能
render_voice_preview_new(tr, selected_engine)
def render_edge_tts_settings(tr):
"""渲染 Edge TTS 引擎设置"""
# 获取支持的语音列表
support_locales = ["zh-CN", "en-US"]
voices = voice.get_all_azure_voices(filter_locals=support_locales)
all_voices = voice.get_all_azure_voices(filter_locals=support_locales)
# 只保留标准版本的语音Edge TTS专用不包含V2
edge_voices = [v for v in all_voices if "-V2" not in v]
# 创建友好的显示名称
friendly_names = {
v: v.replace("Female", tr("Female"))
.replace("Male", tr("Male"))
.replace("Neural", "")
for v in voices
}
friendly_names = {}
for v in edge_voices:
friendly_names[v] = v.replace("Female", tr("Female")).replace("Male", tr("Male")).replace("Neural", "")
# 获取保存的语音设置
saved_voice_name = config.ui.get("voice_name", "")
saved_voice_name_index = 0
saved_voice_name = config.ui.get("edge_voice_name", "zh-CN-XiaoxiaoNeural-Female")
if saved_voice_name in friendly_names:
saved_voice_name_index = list(friendly_names.keys()).index(saved_voice_name)
else:
# 如果没有保存的设置选择与UI语言匹配的第一个语音
for i, v in enumerate(voices):
if (v.lower().startswith(st.session_state["ui_language"].lower())
and "V2" not in v):
saved_voice_name_index = i
# 确保保存的音色在可用列表中
if saved_voice_name not in friendly_names:
# 选择与UI语言匹配的第一个语音
for v in edge_voices:
if v.lower().startswith(st.session_state.get("ui_language", "zh-CN").lower()):
saved_voice_name = v
break
else:
# 如果没找到匹配的,使用第一个
saved_voice_name = edge_voices[0] if edge_voices else ""
# 语音选择下拉框
# 选择下拉框Edge TTS音色相对较少保留下拉框
selected_friendly_name = st.selectbox(
tr("Speech Synthesis"),
"音色选择",
options=list(friendly_names.values()),
index=saved_voice_name_index,
index=list(friendly_names.keys()).index(saved_voice_name) if saved_voice_name in friendly_names else 0,
help="选择Edge TTS音色"
)
# 获取实际的语音名称
@ -60,22 +172,323 @@ def render_tts_settings(tr):
list(friendly_names.values()).index(selected_friendly_name)
]
# 保存设置
config.ui["voice_name"] = voice_name
# 显示音色信息
with st.expander("💡 Edge TTS 音色说明", expanded=False):
st.write("**中文音色:**")
zh_voices = [v for v in edge_voices if v.startswith("zh-CN")]
for v in zh_voices:
gender = "女声" if "Female" in v else "男声"
name = v.replace("-Female", "").replace("-Male", "").replace("zh-CN-", "").replace("Neural", "")
st.write(f"{name} ({gender})")
# Azure V2语音特殊处理
if voice.is_azure_v2_voice(voice_name):
render_azure_v2_settings(tr)
st.write("")
st.write("**英文音色:**")
en_voices = [v for v in edge_voices if v.startswith("en-US")][:5] # 只显示前5个
for v in en_voices:
gender = "女声" if "Female" in v else "男声"
name = v.replace("-Female", "").replace("-Male", "").replace("en-US-", "").replace("Neural", "")
st.write(f"{name} ({gender})")
# 语音参数设置
render_voice_parameters(tr)
if len([v for v in edge_voices if v.startswith("en-US")]) > 5:
st.write("• ... 更多英文音色")
# 试听按钮
render_voice_preview(tr, voice_name)
config.ui["edge_voice_name"] = voice_name
config.ui["voice_name"] = voice_name # 兼容性
# 音量调节
voice_volume = st.slider(
"音量调节",
min_value=0,
max_value=100,
value=int(config.ui.get("edge_volume", 80)),
step=1,
help="调节语音音量 (0-100)"
)
config.ui["edge_volume"] = voice_volume
st.session_state['voice_volume'] = voice_volume / 100.0
# 语速调节
voice_rate = st.slider(
"语速调节",
min_value=0.5,
max_value=2.0,
value=config.ui.get("edge_rate", 1.0),
step=0.1,
help="调节语音速度 (0.5-2.0倍速)"
)
config.ui["edge_rate"] = voice_rate
st.session_state['voice_rate'] = voice_rate
# 语调调节
voice_pitch = st.slider(
"语调调节",
min_value=-50,
max_value=50,
value=int(config.ui.get("edge_pitch", 0)),
step=5,
help="调节语音音调 (-50%到+50%)"
)
config.ui["edge_pitch"] = voice_pitch
# 转换为比例值
st.session_state['voice_pitch'] = 1.0 + (voice_pitch / 100.0)
def render_azure_speech_settings(tr):
"""渲染 Azure Speech Services 引擎设置"""
# 服务区域配置
azure_speech_region = st.text_input(
"服务区域",
value=config.azure.get("speech_region", ""),
placeholder="例如eastus",
help="Azure Speech Services 服务区域eastus, westus2, eastasia 等"
)
# API Key配置
azure_speech_key = st.text_input(
"API Key",
value=config.azure.get("speech_key", ""),
type="password",
help="Azure Speech Services API 密钥"
)
# 保存Azure配置
config.azure["speech_region"] = azure_speech_region
config.azure["speech_key"] = azure_speech_key
# 音色名称输入框
saved_voice_name = config.ui.get("azure_voice_name", "zh-CN-XiaoxiaoMultilingualNeural")
# 音色名称输入
voice_name = st.text_input(
"音色名称",
value=saved_voice_name,
help="输入Azure Speech Services音色名称直接使用官方音色名称即可。例如zh-CN-YunzeNeural",
placeholder="zh-CN-YunzeNeural"
)
# 显示常用音色示例
with st.expander("💡 常用音色参考", expanded=False):
st.write("**中文音色:**")
st.write("• zh-CN-XiaoxiaoMultilingualNeural (女声,多语言)")
st.write("• zh-CN-YunzeNeural (男声)")
st.write("• zh-CN-YunxiNeural (男声)")
st.write("• zh-CN-XiaochenNeural (女声)")
st.write("")
st.write("**英文音色:**")
st.write("• en-US-AndrewMultilingualNeural (男声,多语言)")
st.write("• en-US-AvaMultilingualNeural (女声,多语言)")
st.write("• en-US-BrianMultilingualNeural (男声,多语言)")
st.write("• en-US-EmmaMultilingualNeural (女声,多语言)")
st.write("")
st.info("💡 更多音色请参考 [Azure Speech Services 官方文档](https://docs.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support)")
# 快速选择按钮
st.write("**快速选择:**")
cols = st.columns(3)
with cols[0]:
if st.button("中文女声", help="zh-CN-XiaoxiaoMultilingualNeural"):
voice_name = "zh-CN-XiaoxiaoMultilingualNeural"
st.rerun()
with cols[1]:
if st.button("中文男声", help="zh-CN-YunzeNeural"):
voice_name = "zh-CN-YunzeNeural"
st.rerun()
with cols[2]:
if st.button("英文女声", help="en-US-AvaMultilingualNeural"):
voice_name = "en-US-AvaMultilingualNeural"
st.rerun()
# 验证音色名称并显示状态
if voice_name.strip():
# 检查是否为有效的Azure音色格式
if is_valid_azure_voice_name(voice_name):
st.success(f"✅ 音色名称有效: {voice_name}")
else:
st.warning(f"⚠️ 音色名称格式可能不正确: {voice_name}")
st.info("💡 Azure音色名称通常格式为: [语言]-[地区]-[名称]Neural")
# 保存配置
config.ui["azure_voice_name"] = voice_name
config.ui["voice_name"] = voice_name # 兼容性
# 音量调节
voice_volume = st.slider(
"音量调节",
min_value=0,
max_value=100,
value=int(config.ui.get("azure_volume", 80)),
step=1,
help="调节语音音量 (0-100)"
)
config.ui["azure_volume"] = voice_volume
st.session_state['voice_volume'] = voice_volume / 100.0
# 语速调节
voice_rate = st.slider(
"语速调节",
min_value=0.5,
max_value=2.0,
value=config.ui.get("azure_rate", 1.0),
step=0.1,
help="调节语音速度 (0.5-2.0倍速)"
)
config.ui["azure_rate"] = voice_rate
st.session_state['voice_rate'] = voice_rate
# 语调调节
voice_pitch = st.slider(
"语调调节",
min_value=-50,
max_value=50,
value=int(config.ui.get("azure_pitch", 0)),
step=5,
help="调节语音音调 (-50%到+50%)"
)
config.ui["azure_pitch"] = voice_pitch
# 转换为比例值
st.session_state['voice_pitch'] = 1.0 + (voice_pitch / 100.0)
# 显示配置状态
if azure_speech_region and azure_speech_key:
st.success("✅ Azure Speech Services 配置已设置")
elif not azure_speech_region:
st.warning("⚠️ 请配置服务区域")
elif not azure_speech_key:
st.warning("⚠️ 请配置 API Key")
def render_soulvoice_engine_settings(tr):
"""渲染 SoulVoice 引擎设置"""
# API Key 输入
api_key = st.text_input(
"API Key",
value=config.soulvoice.get("api_key", ""),
type="password",
help="请输入您的 SoulVoice API 密钥"
)
# 音色 URI 输入
voice_uri = st.text_input(
"音色URI",
value=config.soulvoice.get("voice_uri", "speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr"),
help="请输入 SoulVoice 音色标识符",
placeholder="speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr"
)
# 模型名称选择
model_options = [
"FunAudioLLM/CosyVoice2-0.5B",
"FunAudioLLM/CosyVoice-300M",
"FunAudioLLM/CosyVoice-300M-SFT",
"FunAudioLLM/CosyVoice-300M-Instruct"
]
saved_model = config.soulvoice.get("model", "FunAudioLLM/CosyVoice2-0.5B")
if saved_model not in model_options:
model_options.append(saved_model)
model = st.selectbox(
"模型名称",
options=model_options,
index=model_options.index(saved_model),
help="选择使用的 TTS 模型"
)
# 高级设置
with st.expander("高级设置", expanded=False):
api_url = st.text_input(
"API 地址",
value=config.soulvoice.get("api_url", "https://tts.scsmtech.cn/tts"),
help="SoulVoice API 接口地址"
)
# 保存配置
config.soulvoice["api_key"] = api_key
config.soulvoice["voice_uri"] = voice_uri
config.soulvoice["model"] = model
config.soulvoice["api_url"] = api_url
# 设置兼容性配置
if voice_uri:
# 确保音色 URI 有正确的前缀
if not voice_uri.startswith("soulvoice:") and not voice_uri.startswith("speech:"):
voice_name = f"soulvoice:{voice_uri}"
else:
voice_name = voice_uri if voice_uri.startswith("soulvoice:") else f"soulvoice:{voice_uri}"
config.ui["voice_name"] = voice_name
# 显示配置状态
if api_key and voice_uri:
st.success("✅ SoulVoice 配置已设置")
elif not api_key:
st.warning("⚠️ 请配置 SoulVoice API Key")
elif not voice_uri:
st.warning("⚠️ 请配置音色 URI")
def render_voice_preview_new(tr, selected_engine):
"""渲染新的语音试听功能"""
if st.button("🎵 试听语音合成", use_container_width=True):
play_content = "感谢关注 NarratoAI有任何问题或建议可以关注微信公众号求助或讨论"
# 根据选择的引擎获取对应的语音配置
voice_name = ""
voice_rate = 1.0
voice_pitch = 1.0
if selected_engine == "edge_tts":
voice_name = config.ui.get("edge_voice_name", "zh-CN-XiaoyiNeural-Female")
voice_rate = config.ui.get("edge_rate", 1.0)
voice_pitch = 1.0 + (config.ui.get("edge_pitch", 0) / 100.0)
elif selected_engine == "azure_speech":
voice_name = config.ui.get("azure_voice_name", "zh-CN-XiaoxiaoMultilingualNeural")
voice_rate = config.ui.get("azure_rate", 1.0)
voice_pitch = 1.0 + (config.ui.get("azure_pitch", 0) / 100.0)
elif selected_engine == "soulvoice":
voice_uri = config.soulvoice.get("voice_uri", "")
if voice_uri:
if not voice_uri.startswith("soulvoice:") and not voice_uri.startswith("speech:"):
voice_name = f"soulvoice:{voice_uri}"
else:
voice_name = voice_uri if voice_uri.startswith("soulvoice:") else f"soulvoice:{voice_uri}"
voice_rate = 1.0 # SoulVoice 使用默认语速
voice_pitch = 1.0 # SoulVoice 不支持音调调节
if not voice_name:
st.error("请先配置语音设置")
return
with st.spinner("正在合成语音..."):
temp_dir = utils.storage_dir("temp", create=True)
audio_file = os.path.join(temp_dir, f"tmp-voice-{str(uuid4())}.mp3")
sub_maker = voice.tts(
text=play_content,
voice_name=voice_name,
voice_rate=voice_rate,
voice_pitch=voice_pitch,
voice_file=audio_file,
)
if sub_maker and os.path.exists(audio_file):
st.success("✅ 语音合成成功!")
# 播放音频
with open(audio_file, 'rb') as audio_file_obj:
audio_bytes = audio_file_obj.read()
st.audio(audio_bytes, format='audio/mp3')
# 清理临时文件
try:
os.remove(audio_file)
except:
pass
else:
st.error("❌ 语音合成失败,请检查配置")
def render_azure_v2_settings(tr):
"""渲染Azure V2语音设置"""
"""渲染Azure V2语音设置(保留兼容性)"""
saved_azure_speech_region = config.azure.get("speech_region", "")
saved_azure_speech_key = config.azure.get("speech_key", "")
@ -93,8 +506,60 @@ def render_azure_v2_settings(tr):
config.azure["speech_key"] = azure_speech_key
def render_voice_parameters(tr):
"""渲染语音参数设置"""
def render_soulvoice_settings(tr):
"""渲染 SoulVoice 语音设置(保留兼容性)"""
saved_api_key = config.soulvoice.get("api_key", "")
saved_api_url = config.soulvoice.get("api_url", "https://tts.scsmtech.cn/tts")
saved_model = config.soulvoice.get("model", "FunAudioLLM/CosyVoice2-0.5B")
saved_voice_uri = config.soulvoice.get("voice_uri", "speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr")
# API Key 输入
api_key = st.text_input(
"SoulVoice API Key",
value=saved_api_key,
type="password",
help="请输入您的 SoulVoice API 密钥"
)
# 音色 URI 输入
voice_uri = st.text_input(
"音色 URI",
value=saved_voice_uri,
help="请输入 SoulVoice 音色标识符格式如speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr",
placeholder="speech:mcg3fdnx:clzkyf4vy00e5qr6hywum4u84:bzznlkuhcjzpbosexitr"
)
# API URL 输入(可选)
with st.expander("高级设置", expanded=False):
api_url = st.text_input(
"API 地址",
value=saved_api_url,
help="SoulVoice API 接口地址"
)
model = st.text_input(
"模型名称",
value=saved_model,
help="使用的 TTS 模型"
)
# 保存配置
config.soulvoice["api_key"] = api_key
config.soulvoice["voice_uri"] = voice_uri
config.soulvoice["api_url"] = api_url
config.soulvoice["model"] = model
# 显示配置状态
if api_key and voice_uri:
st.success("✅ SoulVoice 配置已设置")
elif not api_key:
st.warning("⚠️ 请配置 SoulVoice API Key")
elif not voice_uri:
st.warning("⚠️ 请配置音色 URI")
def render_voice_parameters(tr, voice_name):
"""渲染语音参数设置(保留兼容性)"""
# 音量 - 使用统一的默认值
voice_volume = st.slider(
tr("Speech Volume"),
@ -106,22 +571,41 @@ def render_voice_parameters(tr):
)
st.session_state['voice_volume'] = voice_volume
# 检查是否为 SoulVoice 引擎
is_soulvoice = voice.is_soulvoice_voice(voice_name)
# 语速
voice_rate = st.selectbox(
tr("Speech Rate"),
options=[0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.5, 1.8, 2.0],
index=2,
)
if is_soulvoice:
# SoulVoice 支持更精细的语速控制
voice_rate = st.slider(
tr("Speech Rate"),
min_value=0.5,
max_value=2.0,
value=1.0,
step=0.1,
help="SoulVoice 语音速度控制"
)
else:
# Azure TTS 使用预设选项
voice_rate = st.selectbox(
tr("Speech Rate"),
options=[0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.5, 1.8, 2.0],
index=2,
)
st.session_state['voice_rate'] = voice_rate
# 音调
voice_pitch = st.selectbox(
tr("Speech Pitch"),
options=[0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.5, 1.8, 2.0],
index=2,
)
st.session_state['voice_pitch'] = voice_pitch
# 音调 - SoulVoice 不支持音调调节
if not is_soulvoice:
voice_pitch = st.selectbox(
tr("Speech Pitch"),
options=[0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.5, 1.8, 2.0],
index=2,
)
st.session_state['voice_pitch'] = voice_pitch
else:
# SoulVoice 不支持音调调节,设置默认值
st.session_state['voice_pitch'] = 1.0
st.info(" SoulVoice 引擎不支持音调调节")
def render_voice_preview(tr, voice_name):
@ -157,9 +641,12 @@ def render_voice_preview(tr, voice_name):
)
if sub_maker and os.path.exists(audio_file):
st.success(tr("Voice synthesis successful"))
st.audio(audio_file, format="audio/mp3")
if os.path.exists(audio_file):
os.remove(audio_file)
else:
st.error(tr("Voice synthesis failed"))
def render_bgm_settings(tr):

View File

@ -1,296 +0,0 @@
import os
import time
import streamlit as st
from loguru import logger
from typing import List, Dict
from dataclasses import dataclass
from streamlit.runtime.uploaded_file_manager import UploadedFile
from webui.utils.merge_video import merge_videos_and_subtitles
from app.utils.utils import video_dir, srt_dir
# 定义临时目录路径
TEMP_MERGE_DIR = os.path.join("storage", "temp", "merge")
# 确保临时目录存在
os.makedirs(TEMP_MERGE_DIR, exist_ok=True)
@dataclass
class VideoSubtitlePair:
video_file: UploadedFile | None
subtitle_file: str | None
base_name: str
order: int = 0
def save_uploaded_file(uploaded_file: UploadedFile, target_dir: str) -> str:
"""Save uploaded file to target directory and return the file path"""
file_path = os.path.join(target_dir, uploaded_file.name)
# 如果文件已存在,先删除它
if os.path.exists(file_path):
os.remove(file_path)
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
return file_path
def clean_temp_dir():
"""清空临时目录"""
if os.path.exists(TEMP_MERGE_DIR):
for file in os.listdir(TEMP_MERGE_DIR):
file_path = os.path.join(TEMP_MERGE_DIR, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"清理临时文件失败: {str(e)}")
def group_files(files: List[UploadedFile]) -> Dict[str, VideoSubtitlePair]:
"""Group uploaded files by their base names"""
pairs = {}
order_counter = 0
# 首先处理所有视频文件
for file in files:
base_name = os.path.splitext(file.name)[0]
ext = os.path.splitext(file.name)[1].lower()
if ext == ".mp4":
if base_name not in pairs:
pairs[base_name] = VideoSubtitlePair(None, None, base_name, order_counter)
order_counter += 1
pairs[base_name].video_file = file
# 保存视频文件到临时目录
video_path = save_uploaded_file(file, TEMP_MERGE_DIR)
# 然后处理所有字幕文件
for file in files:
base_name = os.path.splitext(file.name)[0]
ext = os.path.splitext(file.name)[1].lower()
if ext == ".srt":
# 即使没有对应视频也保存字幕文件
subtitle_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt")
save_uploaded_file(file, TEMP_MERGE_DIR)
if base_name in pairs: # 如果有对应的视频
pairs[base_name].subtitle_file = subtitle_path
return pairs
def render_merge_settings(tr):
"""Render the merge settings section"""
with st.expander(tr("Video Subtitle Merge"), expanded=False):
# 上传文件区域
uploaded_files = st.file_uploader(
tr("Upload Video and Subtitle Files"),
type=["mp4", "srt"],
accept_multiple_files=True,
key="merge_files"
)
if uploaded_files:
all_pairs = group_files(uploaded_files)
if all_pairs:
st.write(tr("All Uploaded Files"))
# 初始化或更新session state中的排序信息
if 'file_orders' not in st.session_state:
st.session_state.file_orders = {
name: pair.order for name, pair in all_pairs.items()
}
st.session_state.needs_reorder = False
# 确保所有新文件都有排序值
for name, pair in all_pairs.items():
if name not in st.session_state.file_orders:
st.session_state.file_orders[name] = pair.order
# 移除不存在的文件的排序值
st.session_state.file_orders = {
k: v for k, v in st.session_state.file_orders.items()
if k in all_pairs
}
# 按照排序值对文件对进行排序
sorted_pairs = sorted(
all_pairs.items(),
key=lambda x: st.session_state.file_orders[x[0]]
)
# 计算需要多少行来显示所有视频每行5个
num_pairs = len(sorted_pairs)
num_rows = (num_pairs + 4) // 5 # 向上取整,每行5个
# 遍历每一行
for row in range(num_rows):
# 创建5列
cols = st.columns(5)
# 在这一行中填充视频最多5个
for col_idx in range(5):
pair_idx = row * 5 + col_idx
if pair_idx < num_pairs:
base_name, pair = sorted_pairs[pair_idx]
with cols[col_idx]:
st.caption(base_name)
# 显示视频预览(如果存在)
video_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.mp4")
if os.path.exists(video_path):
st.video(video_path)
else:
st.warning(tr("Missing Video"))
# 显示字幕预览(如果存在)
subtitle_path = os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt")
if os.path.exists(subtitle_path):
with open(subtitle_path, 'r', encoding='utf-8') as f:
subtitle_content = f.read()
st.markdown(tr("Subtitle Preview"))
st.text_area(
"Subtitle Content",
value=subtitle_content,
height=100, # 减高度以适应5列布局
label_visibility="collapsed",
key=f"subtitle_preview_{base_name}"
)
else:
st.warning(tr("Missing Subtitle"))
# 如果有视频但没有字幕,显示一键转录按钮
# if os.path.exists(video_path):
# if st.button(tr("One-Click Transcribe"), key=f"transcribe_{base_name}"):
# with st.spinner(tr("Transcribing...")):
# try:
# # 生成字幕文件
# result = extract_audio_and_create_subtitle(video_path, subtitle_path)
# if result:
# # 读取生成的字幕文件内容并显示预览
# with open(subtitle_path, 'r', encoding='utf-8') as f:
# subtitle_content = f.read()
# st.markdown(tr("Subtitle Preview"))
# st.text_area(
# "Subtitle Content",
# value=subtitle_content,
# height=150,
# label_visibility="collapsed",
# key=f"subtitle_preview_transcribed_{base_name}"
# )
# st.success(tr("Transcription Complete!"))
# # 更新pair的字幕文件路径
# pair.subtitle_file = subtitle_path
# else:
# st.error(tr("Transcription Failed. Please try again."))
# except Exception as e:
# error_message = str(e)
# logger.error(traceback.format_exc())
# if "rate limit exceeded" in error_message.lower():
# st.error(tr("API rate limit exceeded. Please wait about an hour and try again."))
# elif "resource_exhausted" in error_message.lower():
# st.error(tr("Resources exhausted. Please try again later."))
# else:
# st.error(f"{tr('Transcription Failed')}: {str(e)}")
# 排序输入框
order = st.number_input(
tr("Order"),
min_value=0,
value=st.session_state.file_orders[base_name],
key=f"order_{base_name}",
on_change=lambda: setattr(st.session_state, 'needs_reorder', True)
)
if order != st.session_state.file_orders[base_name]:
st.session_state.file_orders[base_name] = order
st.session_state.needs_reorder = True
# 如果需要重新排序,重新加载页面
if st.session_state.needs_reorder:
st.session_state.needs_reorder = False
st.rerun()
# 找出有完整视频和字幕的文件对
complete_pairs = {
k: v for k, v in all_pairs.items()
if os.path.exists(os.path.join(TEMP_MERGE_DIR, f"{k}.mp4")) and
os.path.exists(os.path.join(TEMP_MERGE_DIR, f"{k}.srt"))
}
# 合并按钮和结果显示
cols = st.columns([1, 2, 1])
with cols[0]:
st.write(f"{tr('Mergeable Files')}: {len(complete_pairs)}")
merge_videos_result = None
with cols[1]:
if st.button(tr("Merge All Files"), type="primary", use_container_width=True):
try:
# 获取排序后的完整文件对
sorted_complete_pairs = sorted(
[(k, v) for k, v in complete_pairs.items()],
key=lambda x: st.session_state.file_orders[x[0]]
)
video_paths = []
subtitle_paths = []
for base_name, _ in sorted_complete_pairs:
video_paths.append(os.path.join(TEMP_MERGE_DIR, f"{base_name}.mp4"))
subtitle_paths.append(os.path.join(TEMP_MERGE_DIR, f"{base_name}.srt"))
# 获取输出文件路径
output_video = os.path.join(video_dir(), f"merged_video_{time.strftime('%M%S')}.mp4")
output_subtitle = os.path.join(srt_dir(), f"merged_subtitle_{time.strftime('%M%S')}.srt")
with st.spinner(tr("Merging files...")):
# 合并文件
merge_videos_and_subtitles(
video_paths,
subtitle_paths,
output_video,
output_subtitle
)
success = True
error_msg = ""
# 检查输出文件是否成功生成
if not os.path.exists(output_video):
success = False
error_msg += tr("Failed to generate merged video. ")
if not os.path.exists(output_subtitle):
success = False
error_msg += tr("Failed to generate merged subtitle. ")
if success:
# 显示成功消息
st.success(tr("Merge completed!"))
merge_videos_result = (output_video, output_subtitle)
# 清理临时目录
clean_temp_dir()
else:
st.error(error_msg)
except Exception as e:
error_message = str(e)
if "moviepy" in error_message.lower():
st.error(tr("Error processing video files. Please check if the videos are valid MP4 files."))
# elif "pysrt" in error_message.lower():
# st.error(tr("Error processing subtitle files. Please check if the subtitles are valid SRT files."))
else:
st.error(f"{tr('Error during merge')}: {error_message}")
# 合并结果预览放在合并按钮下方
if merge_videos_result:
st.markdown(f"<h3 style='text-align: center'>{tr('Merge Result Preview')}</h3>", unsafe_allow_html=True)
# 使用列布局使视频居中
col1, col2, col3 = st.columns([1,2,1])
with col2:
st.video(merge_videos_result[0])
st.code(f"{tr('Video Path')}: {merge_videos_result[0]}")
st.code(f"{tr('Subtitle Path')}: {merge_videos_result[1]}")
else:
st.warning(tr("No Files Found"))

View File

@ -1,88 +0,0 @@
import streamlit as st
import os
from loguru import logger
def render_review_panel(tr):
"""渲染视频审查面板"""
with st.expander(tr("Video Check"), expanded=False):
try:
video_list = st.session_state.get('video_clip_json', [])
subclip_videos = st.session_state.get('subclip_videos', {})
except KeyError:
video_list = []
subclip_videos = {}
# 计算列数和行数
num_videos = len(video_list)
cols_per_row = 3
rows = (num_videos + cols_per_row - 1) // cols_per_row # 向上取整计算行数
# 使用容器展示视频
for row in range(rows):
cols = st.columns(cols_per_row)
for col in range(cols_per_row):
index = row * cols_per_row + col
if index < num_videos:
with cols[col]:
render_video_item(tr, video_list, subclip_videos, index)
def render_video_item(tr, video_list, subclip_videos, index):
"""渲染单个视频项"""
video_script = video_list[index]
# 显示时间戳
timestamp = video_script.get('_id', '')
st.text_area(
tr("Timestamp"),
value=timestamp,
height=70,
disabled=True,
key=f"timestamp_{index}"
)
# 显示视频播放器
video_path = subclip_videos.get(timestamp)
if video_path and os.path.exists(video_path):
try:
st.video(video_path)
except Exception as e:
logger.error(f"加载视频失败 {video_path}: {e}")
st.error(f"无法加载视频: {os.path.basename(video_path)}")
else:
st.warning(tr("视频文件未找到"))
# 显示画面描述
st.text_area(
tr("Picture Description"),
value=video_script.get('picture', ''),
height=150,
disabled=True,
key=f"picture_{index}"
)
# 显示旁白文本
narration = st.text_area(
tr("Narration"),
value=video_script.get('narration', ''),
height=150,
key=f"narration_{index}"
)
# 保存修改后的旁白文本
if narration != video_script.get('narration', ''):
video_script['narration'] = narration
st.session_state['video_clip_json'] = video_list
# 显示剪辑模式
ost = st.selectbox(
tr("Clip Mode"),
options=range(0, 3),
index=video_script.get('OST', 0),
key=f"ost_{index}",
help=tr("0: Keep the audio only, 1: Keep the original sound only, 2: Keep the original sound and audio")
)
# 保存修改后的剪辑模式
if ost != video_script.get('OST', 0):
video_script['OST'] = ost
st.session_state['video_clip_json'] = video_list

View File

@ -333,38 +333,12 @@ def render_script_buttons(tr, params):
video_clip_json_details = st.text_area(
tr("Video Script"),
value=json.dumps(st.session_state.get('video_clip_json', []), indent=2, ensure_ascii=False),
height=180
height=500
)
# 操作按钮行
button_cols = st.columns(3)
with button_cols[0]:
if st.button(tr("Check Format"), key="check_format", use_container_width=True):
check_script_format(tr, video_clip_json_details)
with button_cols[1]:
if st.button(tr("Save Script"), key="save_script", use_container_width=True):
save_script(tr, video_clip_json_details)
with button_cols[2]:
script_valid = st.session_state.get('script_format_valid', False)
if st.button(tr("Crop Video"), key="crop_video", disabled=not script_valid, use_container_width=True):
crop_video(tr, params)
def check_script_format(tr, script_content):
"""检查脚本格式"""
try:
result = check_script.check_format(script_content)
if result.get('success'):
st.success(tr("Script format check passed"))
st.session_state['script_format_valid'] = True
else:
st.error(f"{tr('Script format check failed')}: {result.get('message')}")
st.session_state['script_format_valid'] = False
except Exception as e:
st.error(f"{tr('Script format check error')}: {str(e)}")
st.session_state['script_format_valid'] = False
# 操作按钮行 - 合并格式检查和保存功能
if st.button(tr("Save Script"), key="save_script", use_container_width=True):
save_script_with_validation(tr, video_clip_json_details)
def load_script(tr, script_path):
@ -381,12 +355,52 @@ def load_script(tr, script_path):
st.error(f"{tr('Failed to load script')}: {str(e)}")
def save_script(tr, video_clip_json_details):
"""保存视频脚本"""
def save_script_with_validation(tr, video_clip_json_details):
"""保存视频脚本(包含格式验证)"""
if not video_clip_json_details:
st.error(tr("请输入视频脚本"))
st.stop()
# 第一步:格式验证
with st.spinner("正在验证脚本格式..."):
try:
result = check_script.check_format(video_clip_json_details)
if not result.get('success'):
# 格式验证失败,显示详细错误信息
error_message = result.get('message', '未知错误')
error_details = result.get('details', '')
st.error(f"**脚本格式验证失败**")
st.error(f"**错误信息:** {error_message}")
if error_details:
st.error(f"**详细说明:** {error_details}")
# 显示正确格式示例
st.info("**正确的脚本格式示例:**")
example_script = [
{
"_id": 1,
"timestamp": "00:00:00,600-00:00:07,559",
"picture": "工地上,蔡晓艳奋力救人,场面混乱",
"narration": "灾后重建,工地上险象环生!泼辣女工蔡晓艳挺身而出,救人第一!",
"OST": 0
},
{
"_id": 2,
"timestamp": "00:00:08,240-00:00:12,359",
"picture": "领导视察,蔡晓艳不屑一顾",
"narration": "播放原片4",
"OST": 1
}
]
st.code(json.dumps(example_script, ensure_ascii=False, indent=2), language='json')
st.stop()
except Exception as e:
st.error(f"格式验证过程中发生错误: {str(e)}")
st.stop()
# 第二步:保存脚本
with st.spinner(tr("Save Script")):
script_dir = utils.script_dir()
timestamp = time.strftime("%Y-%m%d-%H%M%S")
@ -403,7 +417,7 @@ def save_script(tr, video_clip_json_details):
config.app["video_clip_json_path"] = save_path
# 显示成功消息
st.success(tr("Script saved successfully"))
st.success("✅ 脚本格式验证通过,保存成功!")
# 强制重新加载页面更新选择框
time.sleep(0.5) # 给一点时间让用户看到成功消息
@ -414,26 +428,7 @@ def save_script(tr, video_clip_json_details):
st.stop()
def crop_video(tr, params):
"""裁剪视频"""
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(progress):
progress_bar.progress(progress)
status_text.text(f"剪辑进度: {progress}%")
try:
utils.cut_video(params, update_progress)
time.sleep(0.5)
progress_bar.progress(100)
st.success("视频剪辑成功完成!")
except Exception as e:
st.error(f"剪辑过程中发生错误: {str(e)}")
finally:
time.sleep(1)
progress_bar.empty()
status_text.empty()
# crop_video函数已移除 - 现在使用统一裁剪策略,不再需要预裁剪步骤
def get_script_params():

View File

@ -9,14 +9,35 @@ def render_subtitle_panel(tr):
with st.container(border=True):
st.write(tr("Subtitle Settings"))
# 启用字幕选项
enable_subtitles = st.checkbox(tr("Enable Subtitles"), value=True)
st.session_state['subtitle_enabled'] = enable_subtitles
# 检查是否选择了 SoulVoice 引擎
from app.services import voice
current_voice = st.session_state.get('voice_name', '')
is_soulvoice = voice.is_soulvoice_voice(current_voice)
if enable_subtitles:
render_font_settings(tr)
render_position_settings(tr)
render_style_settings(tr)
if is_soulvoice:
# SoulVoice 引擎时显示禁用提示
st.warning("⚠️ SoulVoice TTS 不支持精确字幕生成")
st.info("💡 建议使用专业剪辑工具如剪映、PR等手动添加字幕")
# 强制禁用字幕
st.session_state['subtitle_enabled'] = False
# 显示禁用状态的复选框
st.checkbox(
tr("Enable Subtitles"),
value=False,
disabled=True,
help="SoulVoice 引擎不支持字幕生成,请使用其他 TTS 引擎"
)
else:
# 其他引擎正常显示字幕选项
enable_subtitles = st.checkbox(tr("Enable Subtitles"), value=True)
st.session_state['subtitle_enabled'] = enable_subtitles
if enable_subtitles:
render_font_settings(tr)
render_position_settings(tr)
render_style_settings(tr)
def render_font_settings(tr):

View File

@ -29,7 +29,7 @@
"Clip Duration": "Maximum Clip Duration (Seconds) (**Not the total length of the video**, refers to the length of each **composite segment**)",
"Number of Videos Generated Simultaneously": "Number of Videos Generated Simultaneously",
"Audio Settings": "**Audio Settings**",
"Speech Synthesis": "Speech Synthesis Voice (:red[**Keep consistent with the script language**. Note: V2 version performs better, but requires an API KEY])",
"Speech Synthesis": "Speech Synthesis Voice (:red[**Keep consistent with the script language**. Note: V2 version performs better, but requires an API KEY; SoulVoice provides high-quality Chinese voices])",
"Speech Region": "Service Region (:red[Required, [Click to Get](https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices)])",
"Speech Key": "API Key (:red[Required, either Key 1 or Key 2 is acceptable [Click to Get](https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices)])",
"Speech Volume": "Speech Volume (1.0 represents 100%)",

View File

@ -11,7 +11,6 @@
"Video Theme": "视频主题",
"Generation Prompt": "自定义提示词",
"Save Script": "保存脚本",
"Crop Video": "裁剪视频",
"Video File": "视频文件(:blue[1⃣支持上传视频文件(限制2G) 2⃣大文件建议直接导入 ./resource/videos 目录]",
"Plot Description": "剧情描述 (:blue[可从 https://www.tvmao.com/ 获取])",
"Generate Video Keywords": "点击使用AI根据**文案**生成【视频关键】",
@ -29,7 +28,7 @@
"Clip Duration": "视频片段最大时长(秒)**不是视频总长度**,是指每个**合成片段**的长度)",
"Number of Videos Generated Simultaneously": "同时生成视频数量",
"Audio Settings": "**音频设置**",
"Speech Synthesis": "朗读声音(:red[**与文案语言保持一致**。注意V2版效果更好但是需要API KEY]",
"Speech Synthesis": "朗读声音(:red[**与文案语言保持一致**。注意V2版效果更好但是需要API KEYSoulVoice 提供高质量中文语音]",
"Speech Region": "服务区域 (:red[必填,[点击获取](https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices)])",
"Speech Key": "API Key (:red[必填密钥1 或 密钥2 均可 [点击获取](https://portal.azure.com/#view/Microsoft_Azure_ProjectOxford/CognitiveServicesHub/~/SpeechServices)])",
"Speech Volume": "朗读音量1.0表示100%",
@ -82,7 +81,6 @@
"TTS Provider": "语音合成提供商",
"Hide Log": "隐藏日志",
"Upload Local Files": "上传本地文件",
"Video Check": "视频审查",
"File Uploaded Successfully": "文件上传成功",
"timestamp": "时间戳",
"Picture description": "图片描述",
@ -137,31 +135,6 @@
"Script Uploaded Successfully": "脚本上传成功",
"Invalid JSON format": "无效的JSON格式",
"Upload failed": "上传失败",
"Video Subtitle Merge": "**合并视频与字幕**",
"Upload Video and Subtitle Files": "上传视频和字幕文件",
"Matched File Pairs": "已匹配的文件对",
"Merge All Files": "合并所有文件",
"Merge Function Not Implemented": "合并功能待实现",
"No Matched Pairs Found": "未找到匹配的文件对",
"Missing Subtitle": "缺少对应的字幕文件, 请使用其他软件完成字幕转录,比如剪映等",
"Missing Video": "缺少对应的视频文件",
"All Uploaded Files": "所有上传的文件",
"Order": "排序序号",
"Reorder": "重新排序",
"Merging files...": "正在合并文件...",
"Merge completed!": "合并完成!",
"Download Merged Video": "下载合并后的视频",
"Download Merged Subtitle": "下载合并后的字幕",
"Error during merge": "合并过程中出错",
"Failed to generate merged video.": "生成合并视频失败。",
"Failed to generate merged subtitle.": "生成合并字幕失败。",
"Error reading merged video file": "读取合并后的视频文件时出错",
"Error reading merged subtitle file": "读取合并后的字幕文件时出错",
"Error processing video files. Please check if the videos are valid MP4 files.": "处理视频文件时出错。请检查视频是否为有效的MP4文件。",
"Error processing subtitle files. Please check if the subtitles are valid SRT files.": "处理字幕文件时出错。请检查字幕是否为有效的SRT文件。",
"Preview Merged Video": "预览合并后的视频",
"Video Path": "视频路径",
"Subtitle Path": "字幕路径",
"Enable Proxy": "启用代理",
"QwenVL model is available": "QwenVL 模型可用",
"QwenVL model is not available": "QwenVL 模型不可用",
@ -184,9 +157,6 @@
"API rate limit exceeded. Please wait about an hour and try again.": "API 调用次数已达到限制,请等待约一小时后再试。",
"Resources exhausted. Please try again later.": "资源已耗尽,请稍后再试。",
"Transcription Failed": "转录失败",
"Mergeable Files": "可合并文件数",
"Subtitle Content": "字幕内容",
"Merge Result Preview": "合并结果预览",
"Short Generate": "短剧混剪 (高燃剪辑)",
"Generate Short Video Script": "AI生成短剧混剪脚本",
"Adjust the volume of the original audio": "调整原始音频的音量",

View File

@ -1,115 +0,0 @@
"""
合并视频和字幕文件
"""
import os
import pysrt
from moviepy import VideoFileClip, concatenate_videoclips
def get_video_duration(video_path):
"""获取视频时长(秒)"""
video = VideoFileClip(video_path)
duration = video.duration
video.close()
return duration
def adjust_subtitle_timing(subtitle_path, time_offset):
"""调整字幕时间戳"""
subs = pysrt.open(subtitle_path)
# 为每个字幕项添加时间偏移
for sub in subs:
sub.start.hours += int(time_offset / 3600)
sub.start.minutes += int((time_offset % 3600) / 60)
sub.start.seconds += int(time_offset % 60)
sub.start.milliseconds += int((time_offset * 1000) % 1000)
sub.end.hours += int(time_offset / 3600)
sub.end.minutes += int((time_offset % 3600) / 60)
sub.end.seconds += int(time_offset % 60)
sub.end.milliseconds += int((time_offset * 1000) % 1000)
return subs
def merge_videos_and_subtitles(video_paths, subtitle_paths, output_video_path, output_subtitle_path):
"""合并视频和字幕文件"""
if len(video_paths) != len(subtitle_paths):
raise ValueError("视频文件数量与字幕文件数量不匹配")
# 1. 合并视频
video_clips = []
accumulated_duration = 0
merged_subs = pysrt.SubRipFile()
try:
# 处理所有视频和字幕
for i, (video_path, subtitle_path) in enumerate(zip(video_paths, subtitle_paths)):
# 添加视频
print(f"处理视频 {i + 1}/{len(video_paths)}: {video_path}")
video_clip = VideoFileClip(video_path)
video_clips.append(video_clip)
# 处理字幕
print(f"处理字幕 {i + 1}/{len(subtitle_paths)}: {subtitle_path}")
if i == 0:
# 第一个字幕文件直接读取
current_subs = pysrt.open(subtitle_path)
else:
# 后续字幕文件需要调整时间戳
current_subs = adjust_subtitle_timing(subtitle_path, accumulated_duration)
# 合并字幕
merged_subs.extend(current_subs)
# 更新累计时长
accumulated_duration += video_clip.duration
# 判断视频是否存在,若已经存在不重复合并
if not os.path.exists(output_video_path):
print("合并视频中...")
final_video = concatenate_videoclips(video_clips)
# 保存合并后的视频
print("保存合并后的视频...")
final_video.write_videofile(output_video_path, audio_codec='aac')
# 保存合并后的字幕
print("保存合并后的字幕...")
merged_subs.save(output_subtitle_path, encoding='utf-8')
print("合并完成")
finally:
# 清理资源
for clip in video_clips:
clip.close()
def main():
# 示例用法
video_paths = [
"temp/1.mp4",
"temp/2.mp4",
"temp/3.mp4",
"temp/4.mp4",
"temp/5.mp4",
]
subtitle_paths = [
"temp/1.srt",
"temp/2.srt",
"temp/3.srt",
"temp/4.srt",
"temp/5.srt",
]
output_video_path = "temp/merged_video.mp4"
output_subtitle_path = "temp/merged_subtitle.srt"
merge_videos_and_subtitles(video_paths, subtitle_paths, output_video_path, output_subtitle_path)
if __name__ == "__main__":
main()