From 7d4bd45f692df311cb29f828d330e7ab1828d5be Mon Sep 17 00:00:00 2001 From: viccy Date: Mon, 8 Jun 2026 16:02:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E7=94=9F=E6=88=90=E8=BF=9B=E5=BA=A6=E8=BF=BD=E8=B8=AA=E4=B8=8E?= =?UTF-8?q?WebUI=E5=B1=95=E7=A4=BA=EF=BC=8C=E4=BC=98=E5=8C=96=E5=89=AA?= =?UTF-8?q?=E6=98=A0=E5=AF=BC=E5=87=BA=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 添加FFmpeg视频合并进度回调支持,实现实时进度上报 - 为进度回调添加参数校验与异常捕获,避免回调失败影响主流程 - 重构任务状态更新逻辑,统一封装任务更新函数减少重复代码 - 重新划分视频生成的6个标准步骤,更新各阶段的状态上报逻辑 - 更新WebUI界面,支持展示当前生成步骤、总步骤与FFmpeg实时进度 - 优化剪映草稿导出功能,不再预先裁剪原视频,直接引用源素材时间戳 - 新增剪映草稿字幕生成功能,并补充对应的单元测试用例 --- app/services/generate_video.py | 36 ++- app/services/jianying_draft_builder.py | 255 ++++++++++++++++++-- app/services/jianying_task.py | 167 +++++++++++-- app/services/task.py | 109 ++++++--- app/services/test_jianying_task_unittest.py | 171 +++++++++++++ webui.py | 94 +++++++- 6 files changed, 762 insertions(+), 70 deletions(-) diff --git a/app/services/generate_video.py b/app/services/generate_video.py index 0d2c11d..1fe41fd 100644 --- a/app/services/generate_video.py +++ b/app/services/generate_video.py @@ -15,7 +15,7 @@ import subprocess import time import traceback import tempfile -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, Callable from loguru import logger import numpy as np from moviepy import ( @@ -471,7 +471,23 @@ def _parse_ffmpeg_progress_time(progress: Dict[str, str]) -> float: return 0.0 -def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str]: +def _emit_ffmpeg_progress( + progress_callback: Optional[Callable[[float], None]], + percent: float, +) -> None: + if not progress_callback: + return + try: + progress_callback(max(0.0, min(100.0, float(percent)))) + except Exception as e: + logger.debug(f"ffmpeg 进度回调失败: {e}") + + +def _run_ffmpeg_with_progress( + cmd: list[str], + duration: float, + progress_callback: Optional[Callable[[float], None]] = None, +) -> tuple[int, str]: progress_keys = { "frame", "fps", @@ -497,6 +513,7 @@ def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str output_tail: list[str] = [] last_log_time = 0.0 last_logged_percent = -1.0 + _emit_ffmpeg_progress(progress_callback, 0) assert process.stdout is not None for raw_line in process.stdout: @@ -537,11 +554,14 @@ def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str f"({_format_duration(current)}/{_format_duration(duration)}), " f"speed={speed}" ) + _emit_ffmpeg_progress(progress_callback, percent) last_log_time = now last_logged_percent = percent progress = {} return_code = process.wait() + if return_code == 0: + _emit_ffmpeg_progress(progress_callback, 100) return return_code, "\n".join(output_tail[-80:]) @@ -1264,6 +1284,7 @@ def _merge_materials_with_ffmpeg( subtitle_path: Optional[str] = None, bgm_path: Optional[str] = None, options: Optional[Dict[str, Any]] = None, + progress_callback: Optional[Callable[[float], None]] = None, ) -> bool: ffmpeg_binary = _get_ffmpeg_binary() if not _check_ffmpeg_binary(ffmpeg_binary): @@ -1285,7 +1306,11 @@ def _merge_materials_with_ffmpeg( f"video={video_path}, audio={audio_path}, output={output_path}, " f"duration={_format_duration(duration)}" ) - return_code, ffmpeg_output = _run_ffmpeg_with_progress(cmd, duration) + return_code, ffmpeg_output = _run_ffmpeg_with_progress( + cmd, + duration, + progress_callback=progress_callback, + ) if return_code != 0: logger.warning(f"ffmpeg 快速合并失败,将回退 MoviePy: {ffmpeg_output[-3000:]}") if os.path.exists(output_path): @@ -1315,7 +1340,8 @@ def merge_materials( output_path: str, subtitle_path: Optional[str] = None, bgm_path: Optional[str] = None, - options: Optional[Dict[str, Any]] = None + options: Optional[Dict[str, Any]] = None, + progress_callback: Optional[Callable[[float], None]] = None, ) -> str: """ 合并视频、音频、BGM和字幕素材生成最终视频 @@ -1342,6 +1368,7 @@ def merge_materials( - threads: 处理线程数,默认2 - fps: 输出帧率,默认30 - subtitle_enabled: 是否启用字幕,默认True + progress_callback: ffmpeg 快速合并进度回调,参数为 0-100 的百分比 返回: 输出视频的路径 @@ -1439,6 +1466,7 @@ def merge_materials( subtitle_path=subtitle_path, bgm_path=bgm_path, options=ffmpeg_options, + progress_callback=progress_callback, ): return output_path logger.warning("ffmpeg 快速合并失败,继续使用 MoviePy 兼容路径") diff --git a/app/services/jianying_draft_builder.py b/app/services/jianying_draft_builder.py index 3f00422..c998099 100644 --- a/app/services/jianying_draft_builder.py +++ b/app/services/jianying_draft_builder.py @@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple from loguru import logger from app.models.schema import VideoClipParams +from app.services import script_subtitle MICROSECONDS = 1_000_000 @@ -567,6 +568,213 @@ def _create_audio_segment( } +def _normalize_hex_color(color: Optional[str], default: str = "#FFFFFF") -> str: + color = str(color or default).strip() + if not color.startswith("#"): + color = f"#{color}" + if re.fullmatch(r"#[0-9a-fA-F]{3}", color): + color = "#" + "".join(char * 2 for char in color[1:]) + if not re.fullmatch(r"#[0-9a-fA-F]{6}", color): + color = default + return color.upper() + + +def _hex_color_to_rgb_float(color: Optional[str], default: str = "#FFFFFF") -> Tuple[float, float, float]: + normalized = _normalize_hex_color(color, default) + return ( + int(normalized[1:3], 16) / 255, + int(normalized[3:5], 16) / 255, + int(normalized[5:7], 16) / 255, + ) + + +def _resolve_subtitle_text_size(params: VideoClipParams) -> float: + raw_size = getattr(params, "font_size", 60) or 60 + try: + font_size = float(raw_size) + except (TypeError, ValueError): + font_size = 60.0 + return max(4.0, min(10.0, font_size / 12.0)) + + +def _resolve_subtitle_transform_y(params: VideoClipParams) -> float: + subtitle_position = str(getattr(params, "subtitle_position", "bottom") or "bottom").lower() + if subtitle_position == "top": + return 0.82 + if subtitle_position == "center": + return 0.0 + if subtitle_position == "custom": + try: + y_percent = float(getattr(params, "custom_position", 85.0)) + except (TypeError, ValueError): + y_percent = 85.0 + y_percent = max(0.0, min(100.0, y_percent)) + return max(-0.92, min(0.92, 1.0 - 2.0 * (y_percent / 100.0))) + return -0.8 + + +def _create_text_material(text: str, params: VideoClipParams) -> Dict[str, Any]: + material_id = uuid.uuid4().hex + text = str(text or "") + text_color = _hex_color_to_rgb_float(getattr(params, "text_fore_color", "#FFFFFF"), "#FFFFFF") + stroke_color = _hex_color_to_rgb_float(getattr(params, "stroke_color", "#000000"), "#000000") + try: + stroke_width = float(getattr(params, "stroke_width", 1.5) or 0) + except (TypeError, ValueError): + stroke_width = 1.5 + + text_style = { + "fill": { + "alpha": 1.0, + "content": { + "render_type": "solid", + "solid": { + "alpha": 1.0, + "color": list(text_color), + }, + }, + }, + "range": [0, len(text)], + "size": _resolve_subtitle_text_size(params), + "bold": False, + "italic": False, + "underline": False, + "strokes": [], + } + check_flag = 7 + if stroke_width > 0: + text_style["strokes"] = [ + { + "content": { + "solid": { + "alpha": 1.0, + "color": list(stroke_color), + } + }, + "width": max(0.0, min(0.2, stroke_width / 100.0 * 0.2)), + } + ] + check_flag |= 8 + + return { + "id": material_id, + "content": json.dumps( + { + "styles": [text_style], + "text": text, + }, + ensure_ascii=False, + ), + "typesetting": 0, + "alignment": 1, + "letter_spacing": 0.0, + "line_spacing": 0.02, + "line_feed": 1, + "line_max_width": 0.82, + "force_apply_line_max_width": False, + "check_flag": check_flag, + "type": "subtitle", + "global_alpha": 1.0, + } + + +def _create_text_segment( + material_id: str, + start_us: int, + duration_us: int, + params: VideoClipParams, +) -> Dict[str, Any]: + return { + "id": uuid.uuid4().hex, + "material_id": material_id, + "target_timerange": {"start": start_us, "duration": duration_us}, + "source_timerange": None, + "speed": 1.0, + "volume": 1.0, + "extra_material_refs": [], + "is_tone_modify": False, + "clip": { + "alpha": 1.0, + "flip": {"horizontal": False, "vertical": False}, + "rotation": 0.0, + "scale": {"x": 1.0, "y": 1.0}, + "transform": {"x": 0.0, "y": _resolve_subtitle_transform_y(params)}, + }, + "uniform_scale": {"on": True, "value": 1.0}, + "render_index": 15000, + "common_keyframes": [], + } + + +def _parse_srt_entries(subtitle_path: str) -> List[Tuple[float, float, str]]: + if not subtitle_path or not os.path.exists(subtitle_path): + return [] + + with open(subtitle_path, "r", encoding="utf-8-sig") as f: + content = f.read().strip() + if not content: + return [] + + entries: List[Tuple[float, float, str]] = [] + for block in re.split(r"\n\s*\n", content): + lines = [line.strip() for line in block.splitlines() if line.strip()] + time_line_index = next( + (index for index, line in enumerate(lines) if "-->" in line), + None, + ) + if time_line_index is None or time_line_index + 1 >= len(lines): + continue + + try: + start_text, end_text = lines[time_line_index].split("-->", 1) + start = script_subtitle.parse_srt_like_time(start_text) + end = script_subtitle.parse_srt_like_time(end_text) + except Exception as e: + logger.warning(f"解析剪映字幕时间失败,跳过字幕块: {e}") + continue + + text = "\n".join(lines[time_line_index + 1:]).strip() + if end <= start or not text: + continue + entries.append((start, end, text)) + + return entries + + +def _add_subtitle_track_from_srt( + draft: Dict[str, Any], + subtitle_path: str, + params: VideoClipParams, +) -> int: + entries = _parse_srt_entries(subtitle_path) + if not entries: + return 0 + + text_track = _create_track("text", "字幕轨道") + text_track["is_default_name"] = False + max_end_us = 0 + for start, end, text in entries: + start_us = _seconds_to_microseconds(start) + duration_us = _seconds_to_microseconds(end - start) + if duration_us <= 0: + continue + + text_material = _create_text_material(text, params) + draft["materials"]["texts"].append(text_material) + text_track["segments"].append(_create_text_segment( + text_material["id"], + start_us, + duration_us, + params, + )) + max_end_us = max(max_end_us, start_us + duration_us) + + if text_track["segments"]: + draft["tracks"].append(text_track) + logger.info(f"已写入剪映字幕轨: {len(text_track['segments'])} 条, {subtitle_path}") + return max_end_us + + def _normalize_video_material(material: Dict[str, Any]) -> Dict[str, Any]: fallback_path = f"assets/video/{material.get('material_name') or 'source.mp4'}" result = { @@ -1313,6 +1521,7 @@ def write_plaintext_jianying_draft( new_script_list: List[Dict[str, Any]], params: VideoClipParams, output_dir: str, + subtitle_path: str = "", ) -> Tuple[str, str]: os.makedirs(jianying_draft_path, exist_ok=True) @@ -1332,13 +1541,16 @@ def write_plaintext_jianying_draft( metadata_cache: Dict[str, Tuple[int, int, int]] = {} used_asset_paths: Set[str] = set() asset_path_cache: Dict[str, str] = {} + video_material_cache: Dict[str, Dict[str, Any]] = {} current_time_us = 0 for item in new_script_list: start_time = float(item.get("start_time", 0.0) or 0.0) + source_start_time = float(item.get("source_start_time", start_time) or 0.0) requested_duration = float(item.get("duration", 0.0) or 0.0) timestamp = item.get("timestamp", "") ost = int(item.get("OST", 0) or 0) + use_source_timerange = bool(item.get("use_source_timerange", False)) logger.info( f"处理片段: OST={ost}, start_time={start_time}, " @@ -1346,15 +1558,15 @@ def write_plaintext_jianying_draft( ) video_file = item.get("video", "") - use_clipped_video = bool(video_file and os.path.exists(video_file)) - if not use_clipped_video: + use_clipped_video = bool(video_file and os.path.exists(video_file) and not use_source_timerange) + if not use_clipped_video and not video_file: video_file = params.video_origin_path if not video_file or not os.path.exists(video_file): logger.warning(f"视频素材不存在,跳过片段: {video_file or timestamp}") continue - source_start_time = 0.0 if use_clipped_video else start_time + source_start_time = 0.0 if use_clipped_video else source_start_time video_duration = _clamp_duration_to_media( requested_duration, video_file, @@ -1381,23 +1593,32 @@ def write_plaintext_jianying_draft( continue segment_duration_us = _seconds_to_microseconds(segment_duration) - video_material_duration_us, width, height = _get_video_metadata_ffprobe(video_file, metadata_cache) - video_relative_path = _register_asset( - video_file, - draft_path, - "assets/video", - f"video_{len(video_track['segments']) + 1}.mp4", - used_asset_paths, - asset_path_cache, + video_material_key = os.path.abspath(video_file) + video_material = video_material_cache.get(video_material_key) + if video_material is None: + video_material_duration_us, width, height = _get_video_metadata_ffprobe(video_file, metadata_cache) + video_relative_path = _register_asset( + video_file, + draft_path, + "assets/video", + f"video_{len(video_material_cache) + 1}.mp4", + used_asset_paths, + asset_path_cache, + ) + video_material = _create_video_material(video_relative_path, video_material_duration_us, width, height) + draft["materials"]["videos"].append(video_material) + video_material_cache[video_material_key] = video_material + video_volume = ( + 0.0 + if ost == 0 + else float(getattr(params, "original_volume", 1.0) or 1.0) ) - video_material = _create_video_material(video_relative_path, video_material_duration_us, width, height) - draft["materials"]["videos"].append(video_material) video_track["segments"].append(_create_video_segment( video_material["id"], _seconds_to_microseconds(_floor_duration_to_milliseconds(source_start_time)), segment_duration_us, current_time_us, - float(getattr(params, "original_volume", 1.0) or 1.0), + video_volume, )) if ost in [0, 2] and audio_file and os.path.exists(audio_file): @@ -1428,10 +1649,14 @@ def write_plaintext_jianying_draft( if not video_track["segments"]: raise ValueError("没有可写入剪映草稿的视频片段") + subtitle_end_us = 0 + if getattr(params, "subtitle_enabled", True) and subtitle_path: + subtitle_end_us = _add_subtitle_track_from_srt(draft, subtitle_path, params) + first_video = draft["materials"]["videos"][0] draft["canvas_config"]["width"] = int(first_video.get("width", 1920) or 1920) draft["canvas_config"]["height"] = int(first_video.get("height", 1080) or 1080) - draft["duration"] = current_time_us + draft["duration"] = max(current_time_us, subtitle_end_us) draft["update_time"] = int(time.time() * MICROSECONDS) asset_size = sum( diff --git a/app/services/jianying_task.py b/app/services/jianying_task.py index a24304c..21e2c01 100644 --- a/app/services/jianying_task.py +++ b/app/services/jianying_task.py @@ -9,7 +9,7 @@ from loguru import logger from app.config import config from app.models import const from app.models.schema import VideoClipParams -from app.services import voice, clip_video, update_script +from app.services import voice, clip_video, script_subtitle from app.services.jianying_draft_builder import write_plaintext_jianying_draft from app.services import state as sm from app.utils import utils @@ -141,6 +141,141 @@ def _normalize_indextts_reference_audio(params: VideoClipParams) -> None: raise ValueError(f"{display_name} 参考音频不存在,请在音频设置中上传或选择有效的参考音频") +def _index_tts_results(tts_results: list[Dict]) -> Dict: + indexed = {} + for tts_result in tts_results or []: + item_id = tts_result.get("_id") + timestamp = tts_result.get("timestamp") + if item_id is not None: + indexed[item_id] = tts_result + if timestamp: + indexed[timestamp] = tts_result + return indexed + + +def _get_video_source_paths(params: VideoClipParams) -> list[str]: + return clip_video._normalize_video_origin_paths( + getattr(params, "video_origin_path", ""), + getattr(params, "video_origin_paths", []), + ) + + +def _resolve_script_video_path(item: Dict, video_source_paths: list[str]) -> str: + if not video_source_paths: + return "" + return clip_video._resolve_script_video_path(item, video_source_paths) + + +def _resolve_tts_result(item: Dict, tts_map: Dict) -> Dict: + item_id = item.get("_id") + timestamp = item.get("timestamp") + if item_id is not None and item_id in tts_map: + return tts_map[item_id] + if timestamp in tts_map: + return tts_map[timestamp] + return {} + + +def _build_jianying_draft_script( + list_script: list[Dict], + params: VideoClipParams, + tts_results: list[Dict], +) -> list[Dict]: + video_source_paths = _get_video_source_paths(params) + if not video_source_paths: + raise ValueError("视频文件不能为空") + + tts_map = _index_tts_results(tts_results) + draft_script = [] + accumulated_duration = 0.0 + + for item in list_script: + item_copy = dict(item) + timestamp = item_copy.get("timestamp", "") + try: + source_start, source_end = script_subtitle.parse_time_range(timestamp) + except ValueError as e: + logger.warning(f"解析剪映片段时间戳失败,跳过片段 {item_copy.get('_id')}: {e}") + continue + + timestamp_duration = _floor_duration_to_milliseconds(source_end - source_start) + if timestamp_duration <= 0: + logger.warning(f"剪映片段时长无效,跳过片段 {item_copy.get('_id')}: {timestamp}") + continue + + ost = int(item_copy.get("OST", 0) or 0) + tts_result = _resolve_tts_result(item_copy, tts_map) if ost in [0, 2] else {} + item_duration = timestamp_duration + if tts_result.get("duration"): + item_duration = _floor_duration_to_milliseconds(float(tts_result.get("duration") or 0.0)) + if item_duration <= 0: + item_duration = timestamp_duration + + item_copy.update({ + "video": _resolve_script_video_path(item_copy, video_source_paths), + "audio": tts_result.get("audio_file", ""), + "subtitle": tts_result.get("subtitle_file", ""), + "sourceTimeRange": timestamp, + "start_time": source_start, + "source_start_time": source_start, + "duration": item_duration, + "use_source_timerange": True, + "editedTimeRange": ( + f"{script_subtitle.format_srt_time(accumulated_duration)}-" + f"{script_subtitle.format_srt_time(accumulated_duration + item_duration)}" + ), + }) + accumulated_duration += item_duration + draft_script.append(item_copy) + + if not draft_script: + raise ValueError("没有可写入剪映草稿的视频片段") + + return draft_script + + +def _get_original_subtitle_paths(params: VideoClipParams) -> list[str]: + subtitle_paths = getattr(params, "original_subtitle_paths", []) or [] + if isinstance(subtitle_paths, str): + subtitle_paths = [subtitle_paths] + + normalized_paths = [] + seen = set() + for subtitle_path in subtitle_paths: + if not isinstance(subtitle_path, str): + continue + subtitle_path = subtitle_path.strip() + if subtitle_path and subtitle_path not in seen: + normalized_paths.append(subtitle_path) + seen.add(subtitle_path) + + single_subtitle_path = str(getattr(params, "original_subtitle_path", "") or "").strip() + if single_subtitle_path and single_subtitle_path not in seen: + normalized_paths.insert(0, single_subtitle_path) + + return normalized_paths + + +def _create_jianying_subtitle_file( + task_id: str, + draft_script: list[Dict], + params: VideoClipParams, +) -> str: + if not getattr(params, "subtitle_enabled", True): + return "" + + try: + return script_subtitle.create_script_subtitle_file( + task_id=task_id, + list_script=draft_script, + original_subtitle_paths=_get_original_subtitle_paths(params), + video_origin_paths=_get_video_source_paths(params), + ) + except Exception as e: + logger.warning(f"剪映草稿字幕生成失败,将导出无字幕草稿: {e}") + return "" + + def start_export_jianying_draft(task_id: str, params: VideoClipParams): """ 导出到剪映草稿的后台任务 @@ -200,23 +335,15 @@ def start_export_jianying_draft(task_id: str, params: VideoClipParams): sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20) """ - 3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略 + 3. 准备剪映草稿时间线 - 直接引用原视频素材和源时间戳 """ - logger.info("\n\n## 3. 统一视频裁剪(基于OST类型)") - video_clip_result = clip_video.clip_video_unified( - video_origin_path=params.video_origin_path, - video_origin_paths=getattr(params, "video_origin_paths", []), - script_list=list_script, - tts_results=tts_results - ) + logger.info("\n\n## 3. 准备剪映草稿时间线(不裁剪视频)") + new_script_list = _build_jianying_draft_script(list_script, params, tts_results) + subtitle_path = _create_jianying_subtitle_file(task_id, new_script_list, params) - tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results} - subclip_clip_result = { - tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results - } - new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result) - - logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段") + logger.info(f"剪映草稿时间线准备完成,处理了 {len(new_script_list)} 个视频片段") + if subtitle_path: + logger.info(f"剪映草稿字幕文件: {subtitle_path}") sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60) @@ -245,15 +372,19 @@ def start_export_jianying_draft(task_id: str, params: VideoClipParams): new_script_list=new_script_list, params=params, output_dir=output_dir, + subtitle_path=subtitle_path, ) logger.success(f"成功导出到剪映草稿: {draft_name}") logger.info(f"草稿已保存到: {draft_path}") # 更新任务状态 - sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, draft_path=draft_path, draft_name=draft_name) + task_kwargs = {"draft_path": draft_path, "draft_name": draft_name} + if subtitle_path: + task_kwargs["subtitles"] = [subtitle_path] + sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, **task_kwargs) - return {"draft_path": draft_path, "draft_name": draft_name} + return task_kwargs except Exception as e: logger.error(f"导出到剪映草稿失败: {e}") import traceback diff --git a/app/services/task.py b/app/services/task.py index d7aa1c9..b23a5b9 100644 --- a/app/services/task.py +++ b/app/services/task.py @@ -24,6 +24,38 @@ from app.services import state as sm from app.utils import utils +VIDEO_GENERATION_TOTAL_STEPS = 6 + + +def _update_video_generation_task( + task_id: str, + progress: int, + message: str, + step_current: int = 0, + ffmpeg_progress: float | None = None, + state: int = const.TASK_STATE_PROCESSING, + **kwargs, +) -> None: + task_fields = { + "message": message, + "step_current": step_current, + "step_total": VIDEO_GENERATION_TOTAL_STEPS, + **kwargs, + } + if ffmpeg_progress is not None: + task_fields["ffmpeg_progress"] = round( + max(0.0, min(100.0, float(ffmpeg_progress))), + 1, + ) + + sm.state.update_task( + task_id, + state=state, + progress=progress, + **task_fields, + ) + + def _is_auto_transcription_enabled(params: VideoClipParams) -> bool: return bool( getattr(params, "subtitle_enabled", True) @@ -583,22 +615,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): global merged_audio_path, merged_subtitle_path logger.info(f"\n\n## 开始统一视频处理任务: {task_id}") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=0, message="正在初始化视频生成任务", + step_current=0, ) """ 1. 加载剪辑脚本 """ logger.info("\n\n## 1. 加载视频脚本") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=5, message="正在加载剪辑脚本", + step_current=1, ) video_script_path = path.join(params.video_clip_json_path) @@ -625,11 +657,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): 2. 使用 TTS 生成音频素材 """ logger.info("\n\n## 2. 根据OST设置生成音频列表") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=10, message="正在生成 TTS 配音", + step_current=2, ) # 只为OST=0 or 2的判断生成音频, OST=0 仅保留解说 OST=2 保留解说和原声 tts_segments = [ @@ -647,22 +679,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): voice_pitch=params.voice_pitch, ) - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=20, message="TTS 配音生成完成", + step_current=2, ) """ 3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略 """ logger.info("\n\n## 3. 统一视频裁剪(基于OST类型)") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=30, message="正在按脚本裁剪视频片段", + step_current=3, ) # 使用新的统一裁剪策略 @@ -682,22 +714,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=60, message="视频片段裁剪完成", + step_current=3, ) """ 4. 合并音频和字幕 """ logger.info("\n\n## 4. 合并音频和字幕") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=65, message="正在合并配音和字幕", + step_current=4, ) total_duration = sum([script["duration"] for script in new_script_list]) if tts_segments: @@ -750,11 +782,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): ) except Exception as e: logger.warning(f"程序化字幕生成失败: {e}") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=70, message="配音和字幕合并完成", + step_current=4, ) """ @@ -765,11 +797,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): combined_video_path = path.join(utils.task_dir(task_id), f"merger.mp4") logger.info(f"\n\n## 5. 合并视频: => {combined_video_path}") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=75, message="正在合并视频片段", + step_current=5, ) # 使用统一裁剪后的视频片段 @@ -790,11 +822,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): video_aspect=params.video_aspect, threads=params.n_threads ) - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=80, message="视频片段合并完成", + step_current=5, ) """ @@ -810,11 +842,12 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): else output_video_path ) logger.info(f"\n\n## 6. 最后一步: 合并字幕/BGM/配音/视频 -> {merge_output_video_path}") - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=85, message="正在合成最终视频", + step_current=6, + ffmpeg_progress=0, ) bgm_path = utils.get_bgm_file( @@ -858,30 +891,47 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): 'threads': params.n_threads, **_build_subtitle_mask_options(params, enabled=not auto_transcription_enabled), } + final_merge_progress_start = 85 + final_merge_progress_end = 89 if auto_transcription_enabled else 99 + + def update_final_merge_progress(ffmpeg_progress: float): + progress_span = final_merge_progress_end - final_merge_progress_start + overall_progress = final_merge_progress_start + int( + round((max(0.0, min(100.0, float(ffmpeg_progress))) / 100) * progress_span) + ) + _update_video_generation_task( + task_id, + progress=overall_progress, + message="正在合成最终视频", + step_current=6, + ffmpeg_progress=ffmpeg_progress, + ) + generate_video.merge_materials( video_path=combined_video_path, audio_path=merged_audio_path, subtitle_path=merged_subtitle_path, bgm_path=bgm_path, output_path=merge_output_video_path, - options=options + options=options, + progress_callback=update_final_merge_progress, ) auto_subtitle_path = "" if auto_transcription_enabled: - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=90, message="正在自动转录最终视频", + step_current=6, ) logger.info("\n\n## 7. 自动转录最终视频字幕") auto_subtitle_path = _transcribe_final_video(task_id, merge_output_video_path, params) - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_PROCESSING, progress=95, message="正在压入自动转录字幕", + step_current=6, ) logger.info(f"\n\n## 8. 压入自动转录字幕 -> {output_video_path}") _merge_auto_transcribed_subtitles( @@ -902,11 +952,12 @@ def start_subclip_unified(task_id: str, params: VideoClipParams): } if auto_subtitle_path: kwargs["subtitles"] = [auto_subtitle_path] - sm.state.update_task( + _update_video_generation_task( task_id, - state=const.TASK_STATE_COMPLETE, progress=100, message="视频生成完成", + step_current=VIDEO_GENERATION_TOTAL_STEPS, + state=const.TASK_STATE_COMPLETE, **kwargs ) return kwargs diff --git a/app/services/test_jianying_task_unittest.py b/app/services/test_jianying_task_unittest.py index 0a1660f..e977242 100644 --- a/app/services/test_jianying_task_unittest.py +++ b/app/services/test_jianying_task_unittest.py @@ -193,6 +193,177 @@ class JianyingTaskTests(unittest.TestCase): self.assertEqual("NarratoAI_test", root_meta["all_draft_store"][0]["draft_name"]) self.assertEqual(str(draft_dir / "draft_info.json"), root_meta["all_draft_store"][0]["draft_json_file"]) + def test_write_plaintext_jianying_draft_uses_source_timerange_and_writes_subtitles(self): + with tempfile.TemporaryDirectory() as temp_dir: + root_path = Path(temp_dir) / "drafts" + output_dir = Path(temp_dir) / "task" + root_path.mkdir() + output_dir.mkdir() + video_path = output_dir / "source.mp4" + audio_path = output_dir / "audio_00_00_02,000-00_00_04,000.mp3" + subtitle_path = output_dir / "script_subtitles.srt" + video_path.write_bytes(b"fake source video") + audio_path.write_bytes(b"fake audio") + subtitle_path.write_text( + "1\n00:00:00,000 --> 00:00:01,500\n测试字幕\n", + encoding="utf-8", + ) + + params = VideoClipParams( + video_origin_path=str(video_path), + original_volume=0.4, + tts_volume=0.9, + subtitle_enabled=True, + font_size=60, + text_fore_color="#FFFFFF", + ) + script = [ + { + "OST": 0, + "start_time": 2.0, + "source_start_time": 2.0, + "duration": 3.0, + "timestamp": "00:00:02,000-00:00:05,000", + "video": str(video_path), + "audio": str(audio_path), + "use_source_timerange": True, + } + ] + + def fake_duration(file_path): + return 10.0 if file_path == str(video_path) else 3.0 + + with ( + patch.object(jianying_draft_builder, "_get_media_duration_ffprobe", side_effect=fake_duration), + patch.object( + jianying_draft_builder, + "_get_video_metadata_ffprobe", + return_value=(10_000_000, 1920, 1080), + ), + ): + draft_path, _ = jianying_draft_builder.write_plaintext_jianying_draft( + str(root_path), + "NarratoAI_source", + script, + params, + str(output_dir), + subtitle_path=str(subtitle_path), + ) + + draft_info = json.loads((Path(draft_path) / "draft_info.json").read_text(encoding="utf-8")) + self.assertEqual(1, len(draft_info["materials"]["videos"])) + self.assertEqual(1, len(draft_info["materials"]["texts"])) + self.assertIn("测试字幕", draft_info["materials"]["texts"][0]["content"]) + + video_segment = draft_info["tracks"][0]["segments"][0] + self.assertEqual(2_000_000, video_segment["source_timerange"]["start"]) + self.assertEqual(3_000_000, video_segment["source_timerange"]["duration"]) + self.assertEqual(0.0, video_segment["volume"]) + + text_tracks = [track for track in draft_info["tracks"] if track["type"] == "text"] + self.assertEqual(1, len(text_tracks)) + self.assertEqual(1, len(text_tracks[0]["segments"])) + self.assertEqual(1_500_000, text_tracks[0]["segments"][0]["target_timerange"]["duration"]) + + def test_build_jianying_draft_script_references_original_video(self): + with tempfile.TemporaryDirectory() as temp_dir: + video_one = Path(temp_dir) / "one.mp4" + video_two = Path(temp_dir) / "two.mp4" + audio_path = Path(temp_dir) / "audio.mp3" + video_one.write_bytes(b"one") + video_two.write_bytes(b"two") + audio_path.write_bytes(b"audio") + + params = VideoClipParams( + video_origin_path=str(video_one), + video_origin_paths=[str(video_one), str(video_two)], + ) + script = [ + { + "_id": 9, + "video_id": 2, + "timestamp": "00:00:05,000-00:00:07,000", + "narration": "解说", + "OST": 0, + } + ] + tts_results = [ + { + "_id": 9, + "timestamp": "00:00:05,000-00:00:07,000", + "audio_file": str(audio_path), + "subtitle_file": "", + "duration": 1.25, + } + ] + + draft_script = jianying_task._build_jianying_draft_script(script, params, tts_results) + + self.assertEqual(str(video_two), draft_script[0]["video"]) + self.assertEqual(str(audio_path), draft_script[0]["audio"]) + self.assertEqual(5.0, draft_script[0]["source_start_time"]) + self.assertEqual(1.25, draft_script[0]["duration"]) + self.assertTrue(draft_script[0]["use_source_timerange"]) + + def test_start_export_jianying_draft_does_not_clip_video(self): + with tempfile.TemporaryDirectory() as temp_dir: + root_path = Path(temp_dir) / "drafts" + task_dir = Path(temp_dir) / "task" + root_path.mkdir() + task_dir.mkdir() + video_path = Path(temp_dir) / "source.mp4" + audio_path = task_dir / "audio.mp3" + script_path = Path(temp_dir) / "script.json" + subtitle_path = task_dir / "script_subtitles.srt" + video_path.write_bytes(b"video") + audio_path.write_bytes(b"audio") + script_path.write_text( + json.dumps([ + { + "_id": 1, + "timestamp": "00:00:01,000-00:00:03,000", + "narration": "测试解说", + "OST": 0, + } + ], ensure_ascii=False), + encoding="utf-8", + ) + + params = VideoClipParams( + video_clip_json_path=str(script_path), + video_origin_path=str(video_path), + tts_engine="edge_tts", + voice_name="zh-CN-YunjianNeural", + subtitle_enabled=True, + draft_name="NarratoAI_no_clip", + ) + tts_results = [ + { + "_id": 1, + "timestamp": "00:00:01,000-00:00:03,000", + "audio_file": str(audio_path), + "subtitle_file": "", + "duration": 1.5, + } + ] + + with ( + patch.dict(jianying_task.config.ui, {"jianying_draft_path": str(root_path)}, clear=False), + patch.object(jianying_task.utils, "task_dir", return_value=str(task_dir)), + patch.object(jianying_task.voice, "tts_multiple", return_value=tts_results), + patch.object(jianying_task, "_create_jianying_subtitle_file", return_value=str(subtitle_path)), + patch.object(jianying_task, "write_plaintext_jianying_draft", return_value=(str(root_path / "draft"), "NarratoAI_no_clip")) as write_draft, + patch.object(jianying_task.clip_video, "clip_video_unified") as clip_video_unified, + ): + result = jianying_task.start_export_jianying_draft("task-id", params) + + clip_video_unified.assert_not_called() + write_kwargs = write_draft.call_args.kwargs + self.assertTrue(write_kwargs["new_script_list"][0]["use_source_timerange"]) + self.assertEqual(str(audio_path), write_kwargs["new_script_list"][0]["audio"]) + self.assertEqual(str(subtitle_path), write_kwargs["subtitle_path"]) + self.assertEqual(str(subtitle_path), result["subtitles"][0]) + if __name__ == "__main__": unittest.main() diff --git a/webui.py b/webui.py index bf9dd71..7897fbb 100644 --- a/webui.py +++ b/webui.py @@ -10,6 +10,7 @@ from webui.components import basic_settings, video_settings, audio_settings, sub # from webui.utils import cache, file_utils from app.utils import utils from app.utils import ffmpeg_utils +from app.models import const from app.models.schema import VideoClipParams, VideoAspect @@ -129,6 +130,77 @@ def tr(key): return loc.get("Translation", {}).get(key, key) +VIDEO_GENERATION_STEP_LABELS = [ + "正在加载剪辑脚本", + "正在生成 TTS 配音", + "正在按脚本裁剪视频片段", + "正在合并配音和字幕", + "正在合并视频片段", + "正在合成最终视频", +] + + +def _safe_int(value, default=0): + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _format_optional_percent(value): + try: + percent = max(0.0, min(100.0, float(value))) + except (TypeError, ValueError): + return None + if percent.is_integer(): + return str(int(percent)) + return f"{percent:.1f}" + + +def _render_generation_status(task: dict | None) -> str: + task = task or {} + state = task.get("state") + current_step = _safe_int(task.get("step_current"), 0) + step_total = _safe_int(task.get("step_total"), len(VIDEO_GENERATION_STEP_LABELS)) + message = str(task.get("message") or "") + ffmpeg_percent = _format_optional_percent(task.get("ffmpeg_progress")) + + if current_step <= 0: + return f"
{escape(message or '正在生成视频,请稍候...')}
" + + lines = [] + for index, default_label in enumerate(VIDEO_GENERATION_STEP_LABELS, start=1): + is_current = index == current_step + is_complete = state == const.TASK_STATE_COMPLETE + is_done = is_complete or index < current_step + label = message if is_current and message else default_label + + suffix = f"{index}/{step_total}" + if ( + is_current + and index == step_total + and ffmpeg_percent is not None + and not is_complete + ): + suffix = f"{suffix},ffmpeg {ffmpeg_percent}%" + + color = "#262730" if is_current else "#8b9099" if is_done else "#b9bec7" + weight = "650" if is_current else "500" + lines.append( + "
" + f"{escape(label)} ({escape(suffix)})" + "
" + ) + + return "".join(lines) + + def get_help_text(): """返回带当前项目版本号的帮助文案""" return tr("Get Help").replace("🎉🎉🎉", f" v{config.project_version}") @@ -198,7 +270,12 @@ def render_generate_button(): progress_bar = st.progress(0) status_panel = st.status(tr("Generating Video"), expanded=True) - status_panel.write(tr("Generating Video")) + with status_panel: + status_placeholder = st.empty() + status_placeholder.markdown( + _render_generation_status(None), + unsafe_allow_html=True, + ) def run_task(): try: @@ -238,10 +315,19 @@ def render_generate_button(): # 更新进度条和阶段状态 progress_bar.progress(progress / 100) current_message = task.get("message") or f"Processing... {progress}%" - status_label = f"{current_message} ({progress}%)" - status_key = (state, progress, current_message) + status_key = ( + state, + progress, + current_message, + task.get("step_current"), + task.get("step_total"), + task.get("ffmpeg_progress"), + ) if status_key != last_status_key: - status_panel.write(status_label) + status_placeholder.markdown( + _render_generation_status(task), + unsafe_allow_html=True, + ) last_status_key = status_key if state == const.TASK_STATE_COMPLETE: