feat: 新增视频生成进度追踪与WebUI展示,优化剪映导出功能

- 添加FFmpeg视频合并进度回调支持,实现实时进度上报
- 为进度回调添加参数校验与异常捕获,避免回调失败影响主流程
- 重构任务状态更新逻辑,统一封装任务更新函数减少重复代码
- 重新划分视频生成的6个标准步骤,更新各阶段的状态上报逻辑
- 更新WebUI界面,支持展示当前生成步骤、总步骤与FFmpeg实时进度
- 优化剪映草稿导出功能,不再预先裁剪原视频,直接引用源素材时间戳
- 新增剪映草稿字幕生成功能,并补充对应的单元测试用例
This commit is contained in:
viccy 2026-06-08 16:02:20 +08:00
parent 7a5303aa20
commit 7d4bd45f69
6 changed files with 762 additions and 70 deletions

View File

@ -15,7 +15,7 @@ import subprocess
import time
import traceback
import tempfile
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Callable
from loguru import logger
import numpy as np
from moviepy import (
@ -471,7 +471,23 @@ def _parse_ffmpeg_progress_time(progress: Dict[str, str]) -> float:
return 0.0
def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str]:
def _emit_ffmpeg_progress(
progress_callback: Optional[Callable[[float], None]],
percent: float,
) -> None:
if not progress_callback:
return
try:
progress_callback(max(0.0, min(100.0, float(percent))))
except Exception as e:
logger.debug(f"ffmpeg 进度回调失败: {e}")
def _run_ffmpeg_with_progress(
cmd: list[str],
duration: float,
progress_callback: Optional[Callable[[float], None]] = None,
) -> tuple[int, str]:
progress_keys = {
"frame",
"fps",
@ -497,6 +513,7 @@ def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str
output_tail: list[str] = []
last_log_time = 0.0
last_logged_percent = -1.0
_emit_ffmpeg_progress(progress_callback, 0)
assert process.stdout is not None
for raw_line in process.stdout:
@ -537,11 +554,14 @@ def _run_ffmpeg_with_progress(cmd: list[str], duration: float) -> tuple[int, str
f"({_format_duration(current)}/{_format_duration(duration)}), "
f"speed={speed}"
)
_emit_ffmpeg_progress(progress_callback, percent)
last_log_time = now
last_logged_percent = percent
progress = {}
return_code = process.wait()
if return_code == 0:
_emit_ffmpeg_progress(progress_callback, 100)
return return_code, "\n".join(output_tail[-80:])
@ -1264,6 +1284,7 @@ def _merge_materials_with_ffmpeg(
subtitle_path: Optional[str] = None,
bgm_path: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[float], None]] = None,
) -> bool:
ffmpeg_binary = _get_ffmpeg_binary()
if not _check_ffmpeg_binary(ffmpeg_binary):
@ -1285,7 +1306,11 @@ def _merge_materials_with_ffmpeg(
f"video={video_path}, audio={audio_path}, output={output_path}, "
f"duration={_format_duration(duration)}"
)
return_code, ffmpeg_output = _run_ffmpeg_with_progress(cmd, duration)
return_code, ffmpeg_output = _run_ffmpeg_with_progress(
cmd,
duration,
progress_callback=progress_callback,
)
if return_code != 0:
logger.warning(f"ffmpeg 快速合并失败,将回退 MoviePy: {ffmpeg_output[-3000:]}")
if os.path.exists(output_path):
@ -1315,7 +1340,8 @@ def merge_materials(
output_path: str,
subtitle_path: Optional[str] = None,
bgm_path: Optional[str] = None,
options: Optional[Dict[str, Any]] = None
options: Optional[Dict[str, Any]] = None,
progress_callback: Optional[Callable[[float], None]] = None,
) -> str:
"""
合并视频音频BGM和字幕素材生成最终视频
@ -1342,6 +1368,7 @@ def merge_materials(
- threads: 处理线程数默认2
- fps: 输出帧率默认30
- subtitle_enabled: 是否启用字幕默认True
progress_callback: ffmpeg 快速合并进度回调参数为 0-100 的百分比
返回:
输出视频的路径
@ -1439,6 +1466,7 @@ def merge_materials(
subtitle_path=subtitle_path,
bgm_path=bgm_path,
options=ffmpeg_options,
progress_callback=progress_callback,
):
return output_path
logger.warning("ffmpeg 快速合并失败,继续使用 MoviePy 兼容路径")

View File

@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple
from loguru import logger
from app.models.schema import VideoClipParams
from app.services import script_subtitle
MICROSECONDS = 1_000_000
@ -567,6 +568,213 @@ def _create_audio_segment(
}
def _normalize_hex_color(color: Optional[str], default: str = "#FFFFFF") -> str:
color = str(color or default).strip()
if not color.startswith("#"):
color = f"#{color}"
if re.fullmatch(r"#[0-9a-fA-F]{3}", color):
color = "#" + "".join(char * 2 for char in color[1:])
if not re.fullmatch(r"#[0-9a-fA-F]{6}", color):
color = default
return color.upper()
def _hex_color_to_rgb_float(color: Optional[str], default: str = "#FFFFFF") -> Tuple[float, float, float]:
normalized = _normalize_hex_color(color, default)
return (
int(normalized[1:3], 16) / 255,
int(normalized[3:5], 16) / 255,
int(normalized[5:7], 16) / 255,
)
def _resolve_subtitle_text_size(params: VideoClipParams) -> float:
raw_size = getattr(params, "font_size", 60) or 60
try:
font_size = float(raw_size)
except (TypeError, ValueError):
font_size = 60.0
return max(4.0, min(10.0, font_size / 12.0))
def _resolve_subtitle_transform_y(params: VideoClipParams) -> float:
subtitle_position = str(getattr(params, "subtitle_position", "bottom") or "bottom").lower()
if subtitle_position == "top":
return 0.82
if subtitle_position == "center":
return 0.0
if subtitle_position == "custom":
try:
y_percent = float(getattr(params, "custom_position", 85.0))
except (TypeError, ValueError):
y_percent = 85.0
y_percent = max(0.0, min(100.0, y_percent))
return max(-0.92, min(0.92, 1.0 - 2.0 * (y_percent / 100.0)))
return -0.8
def _create_text_material(text: str, params: VideoClipParams) -> Dict[str, Any]:
material_id = uuid.uuid4().hex
text = str(text or "")
text_color = _hex_color_to_rgb_float(getattr(params, "text_fore_color", "#FFFFFF"), "#FFFFFF")
stroke_color = _hex_color_to_rgb_float(getattr(params, "stroke_color", "#000000"), "#000000")
try:
stroke_width = float(getattr(params, "stroke_width", 1.5) or 0)
except (TypeError, ValueError):
stroke_width = 1.5
text_style = {
"fill": {
"alpha": 1.0,
"content": {
"render_type": "solid",
"solid": {
"alpha": 1.0,
"color": list(text_color),
},
},
},
"range": [0, len(text)],
"size": _resolve_subtitle_text_size(params),
"bold": False,
"italic": False,
"underline": False,
"strokes": [],
}
check_flag = 7
if stroke_width > 0:
text_style["strokes"] = [
{
"content": {
"solid": {
"alpha": 1.0,
"color": list(stroke_color),
}
},
"width": max(0.0, min(0.2, stroke_width / 100.0 * 0.2)),
}
]
check_flag |= 8
return {
"id": material_id,
"content": json.dumps(
{
"styles": [text_style],
"text": text,
},
ensure_ascii=False,
),
"typesetting": 0,
"alignment": 1,
"letter_spacing": 0.0,
"line_spacing": 0.02,
"line_feed": 1,
"line_max_width": 0.82,
"force_apply_line_max_width": False,
"check_flag": check_flag,
"type": "subtitle",
"global_alpha": 1.0,
}
def _create_text_segment(
material_id: str,
start_us: int,
duration_us: int,
params: VideoClipParams,
) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"material_id": material_id,
"target_timerange": {"start": start_us, "duration": duration_us},
"source_timerange": None,
"speed": 1.0,
"volume": 1.0,
"extra_material_refs": [],
"is_tone_modify": False,
"clip": {
"alpha": 1.0,
"flip": {"horizontal": False, "vertical": False},
"rotation": 0.0,
"scale": {"x": 1.0, "y": 1.0},
"transform": {"x": 0.0, "y": _resolve_subtitle_transform_y(params)},
},
"uniform_scale": {"on": True, "value": 1.0},
"render_index": 15000,
"common_keyframes": [],
}
def _parse_srt_entries(subtitle_path: str) -> List[Tuple[float, float, str]]:
if not subtitle_path or not os.path.exists(subtitle_path):
return []
with open(subtitle_path, "r", encoding="utf-8-sig") as f:
content = f.read().strip()
if not content:
return []
entries: List[Tuple[float, float, str]] = []
for block in re.split(r"\n\s*\n", content):
lines = [line.strip() for line in block.splitlines() if line.strip()]
time_line_index = next(
(index for index, line in enumerate(lines) if "-->" in line),
None,
)
if time_line_index is None or time_line_index + 1 >= len(lines):
continue
try:
start_text, end_text = lines[time_line_index].split("-->", 1)
start = script_subtitle.parse_srt_like_time(start_text)
end = script_subtitle.parse_srt_like_time(end_text)
except Exception as e:
logger.warning(f"解析剪映字幕时间失败,跳过字幕块: {e}")
continue
text = "\n".join(lines[time_line_index + 1:]).strip()
if end <= start or not text:
continue
entries.append((start, end, text))
return entries
def _add_subtitle_track_from_srt(
draft: Dict[str, Any],
subtitle_path: str,
params: VideoClipParams,
) -> int:
entries = _parse_srt_entries(subtitle_path)
if not entries:
return 0
text_track = _create_track("text", "字幕轨道")
text_track["is_default_name"] = False
max_end_us = 0
for start, end, text in entries:
start_us = _seconds_to_microseconds(start)
duration_us = _seconds_to_microseconds(end - start)
if duration_us <= 0:
continue
text_material = _create_text_material(text, params)
draft["materials"]["texts"].append(text_material)
text_track["segments"].append(_create_text_segment(
text_material["id"],
start_us,
duration_us,
params,
))
max_end_us = max(max_end_us, start_us + duration_us)
if text_track["segments"]:
draft["tracks"].append(text_track)
logger.info(f"已写入剪映字幕轨: {len(text_track['segments'])} 条, {subtitle_path}")
return max_end_us
def _normalize_video_material(material: Dict[str, Any]) -> Dict[str, Any]:
fallback_path = f"assets/video/{material.get('material_name') or 'source.mp4'}"
result = {
@ -1313,6 +1521,7 @@ def write_plaintext_jianying_draft(
new_script_list: List[Dict[str, Any]],
params: VideoClipParams,
output_dir: str,
subtitle_path: str = "",
) -> Tuple[str, str]:
os.makedirs(jianying_draft_path, exist_ok=True)
@ -1332,13 +1541,16 @@ def write_plaintext_jianying_draft(
metadata_cache: Dict[str, Tuple[int, int, int]] = {}
used_asset_paths: Set[str] = set()
asset_path_cache: Dict[str, str] = {}
video_material_cache: Dict[str, Dict[str, Any]] = {}
current_time_us = 0
for item in new_script_list:
start_time = float(item.get("start_time", 0.0) or 0.0)
source_start_time = float(item.get("source_start_time", start_time) or 0.0)
requested_duration = float(item.get("duration", 0.0) or 0.0)
timestamp = item.get("timestamp", "")
ost = int(item.get("OST", 0) or 0)
use_source_timerange = bool(item.get("use_source_timerange", False))
logger.info(
f"处理片段: OST={ost}, start_time={start_time}, "
@ -1346,15 +1558,15 @@ def write_plaintext_jianying_draft(
)
video_file = item.get("video", "")
use_clipped_video = bool(video_file and os.path.exists(video_file))
if not use_clipped_video:
use_clipped_video = bool(video_file and os.path.exists(video_file) and not use_source_timerange)
if not use_clipped_video and not video_file:
video_file = params.video_origin_path
if not video_file or not os.path.exists(video_file):
logger.warning(f"视频素材不存在,跳过片段: {video_file or timestamp}")
continue
source_start_time = 0.0 if use_clipped_video else start_time
source_start_time = 0.0 if use_clipped_video else source_start_time
video_duration = _clamp_duration_to_media(
requested_duration,
video_file,
@ -1381,23 +1593,32 @@ def write_plaintext_jianying_draft(
continue
segment_duration_us = _seconds_to_microseconds(segment_duration)
video_material_duration_us, width, height = _get_video_metadata_ffprobe(video_file, metadata_cache)
video_relative_path = _register_asset(
video_file,
draft_path,
"assets/video",
f"video_{len(video_track['segments']) + 1}.mp4",
used_asset_paths,
asset_path_cache,
video_material_key = os.path.abspath(video_file)
video_material = video_material_cache.get(video_material_key)
if video_material is None:
video_material_duration_us, width, height = _get_video_metadata_ffprobe(video_file, metadata_cache)
video_relative_path = _register_asset(
video_file,
draft_path,
"assets/video",
f"video_{len(video_material_cache) + 1}.mp4",
used_asset_paths,
asset_path_cache,
)
video_material = _create_video_material(video_relative_path, video_material_duration_us, width, height)
draft["materials"]["videos"].append(video_material)
video_material_cache[video_material_key] = video_material
video_volume = (
0.0
if ost == 0
else float(getattr(params, "original_volume", 1.0) or 1.0)
)
video_material = _create_video_material(video_relative_path, video_material_duration_us, width, height)
draft["materials"]["videos"].append(video_material)
video_track["segments"].append(_create_video_segment(
video_material["id"],
_seconds_to_microseconds(_floor_duration_to_milliseconds(source_start_time)),
segment_duration_us,
current_time_us,
float(getattr(params, "original_volume", 1.0) or 1.0),
video_volume,
))
if ost in [0, 2] and audio_file and os.path.exists(audio_file):
@ -1428,10 +1649,14 @@ def write_plaintext_jianying_draft(
if not video_track["segments"]:
raise ValueError("没有可写入剪映草稿的视频片段")
subtitle_end_us = 0
if getattr(params, "subtitle_enabled", True) and subtitle_path:
subtitle_end_us = _add_subtitle_track_from_srt(draft, subtitle_path, params)
first_video = draft["materials"]["videos"][0]
draft["canvas_config"]["width"] = int(first_video.get("width", 1920) or 1920)
draft["canvas_config"]["height"] = int(first_video.get("height", 1080) or 1080)
draft["duration"] = current_time_us
draft["duration"] = max(current_time_us, subtitle_end_us)
draft["update_time"] = int(time.time() * MICROSECONDS)
asset_size = sum(

View File

@ -9,7 +9,7 @@ from loguru import logger
from app.config import config
from app.models import const
from app.models.schema import VideoClipParams
from app.services import voice, clip_video, update_script
from app.services import voice, clip_video, script_subtitle
from app.services.jianying_draft_builder import write_plaintext_jianying_draft
from app.services import state as sm
from app.utils import utils
@ -141,6 +141,141 @@ def _normalize_indextts_reference_audio(params: VideoClipParams) -> None:
raise ValueError(f"{display_name} 参考音频不存在,请在音频设置中上传或选择有效的参考音频")
def _index_tts_results(tts_results: list[Dict]) -> Dict:
indexed = {}
for tts_result in tts_results or []:
item_id = tts_result.get("_id")
timestamp = tts_result.get("timestamp")
if item_id is not None:
indexed[item_id] = tts_result
if timestamp:
indexed[timestamp] = tts_result
return indexed
def _get_video_source_paths(params: VideoClipParams) -> list[str]:
return clip_video._normalize_video_origin_paths(
getattr(params, "video_origin_path", ""),
getattr(params, "video_origin_paths", []),
)
def _resolve_script_video_path(item: Dict, video_source_paths: list[str]) -> str:
if not video_source_paths:
return ""
return clip_video._resolve_script_video_path(item, video_source_paths)
def _resolve_tts_result(item: Dict, tts_map: Dict) -> Dict:
item_id = item.get("_id")
timestamp = item.get("timestamp")
if item_id is not None and item_id in tts_map:
return tts_map[item_id]
if timestamp in tts_map:
return tts_map[timestamp]
return {}
def _build_jianying_draft_script(
list_script: list[Dict],
params: VideoClipParams,
tts_results: list[Dict],
) -> list[Dict]:
video_source_paths = _get_video_source_paths(params)
if not video_source_paths:
raise ValueError("视频文件不能为空")
tts_map = _index_tts_results(tts_results)
draft_script = []
accumulated_duration = 0.0
for item in list_script:
item_copy = dict(item)
timestamp = item_copy.get("timestamp", "")
try:
source_start, source_end = script_subtitle.parse_time_range(timestamp)
except ValueError as e:
logger.warning(f"解析剪映片段时间戳失败,跳过片段 {item_copy.get('_id')}: {e}")
continue
timestamp_duration = _floor_duration_to_milliseconds(source_end - source_start)
if timestamp_duration <= 0:
logger.warning(f"剪映片段时长无效,跳过片段 {item_copy.get('_id')}: {timestamp}")
continue
ost = int(item_copy.get("OST", 0) or 0)
tts_result = _resolve_tts_result(item_copy, tts_map) if ost in [0, 2] else {}
item_duration = timestamp_duration
if tts_result.get("duration"):
item_duration = _floor_duration_to_milliseconds(float(tts_result.get("duration") or 0.0))
if item_duration <= 0:
item_duration = timestamp_duration
item_copy.update({
"video": _resolve_script_video_path(item_copy, video_source_paths),
"audio": tts_result.get("audio_file", ""),
"subtitle": tts_result.get("subtitle_file", ""),
"sourceTimeRange": timestamp,
"start_time": source_start,
"source_start_time": source_start,
"duration": item_duration,
"use_source_timerange": True,
"editedTimeRange": (
f"{script_subtitle.format_srt_time(accumulated_duration)}-"
f"{script_subtitle.format_srt_time(accumulated_duration + item_duration)}"
),
})
accumulated_duration += item_duration
draft_script.append(item_copy)
if not draft_script:
raise ValueError("没有可写入剪映草稿的视频片段")
return draft_script
def _get_original_subtitle_paths(params: VideoClipParams) -> list[str]:
subtitle_paths = getattr(params, "original_subtitle_paths", []) or []
if isinstance(subtitle_paths, str):
subtitle_paths = [subtitle_paths]
normalized_paths = []
seen = set()
for subtitle_path in subtitle_paths:
if not isinstance(subtitle_path, str):
continue
subtitle_path = subtitle_path.strip()
if subtitle_path and subtitle_path not in seen:
normalized_paths.append(subtitle_path)
seen.add(subtitle_path)
single_subtitle_path = str(getattr(params, "original_subtitle_path", "") or "").strip()
if single_subtitle_path and single_subtitle_path not in seen:
normalized_paths.insert(0, single_subtitle_path)
return normalized_paths
def _create_jianying_subtitle_file(
task_id: str,
draft_script: list[Dict],
params: VideoClipParams,
) -> str:
if not getattr(params, "subtitle_enabled", True):
return ""
try:
return script_subtitle.create_script_subtitle_file(
task_id=task_id,
list_script=draft_script,
original_subtitle_paths=_get_original_subtitle_paths(params),
video_origin_paths=_get_video_source_paths(params),
)
except Exception as e:
logger.warning(f"剪映草稿字幕生成失败,将导出无字幕草稿: {e}")
return ""
def start_export_jianying_draft(task_id: str, params: VideoClipParams):
"""
导出到剪映草稿的后台任务
@ -200,23 +335,15 @@ def start_export_jianying_draft(task_id: str, params: VideoClipParams):
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20)
"""
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
3. 准备剪映草稿时间线 - 直接引用原视频素材和源时间戳
"""
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
video_clip_result = clip_video.clip_video_unified(
video_origin_path=params.video_origin_path,
video_origin_paths=getattr(params, "video_origin_paths", []),
script_list=list_script,
tts_results=tts_results
)
logger.info("\n\n## 3. 准备剪映草稿时间线(不裁剪视频)")
new_script_list = _build_jianying_draft_script(list_script, params, tts_results)
subtitle_path = _create_jianying_subtitle_file(task_id, new_script_list, params)
tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results}
subclip_clip_result = {
tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results
}
new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result)
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
logger.info(f"剪映草稿时间线准备完成,处理了 {len(new_script_list)} 个视频片段")
if subtitle_path:
logger.info(f"剪映草稿字幕文件: {subtitle_path}")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60)
@ -245,15 +372,19 @@ def start_export_jianying_draft(task_id: str, params: VideoClipParams):
new_script_list=new_script_list,
params=params,
output_dir=output_dir,
subtitle_path=subtitle_path,
)
logger.success(f"成功导出到剪映草稿: {draft_name}")
logger.info(f"草稿已保存到: {draft_path}")
# 更新任务状态
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, draft_path=draft_path, draft_name=draft_name)
task_kwargs = {"draft_path": draft_path, "draft_name": draft_name}
if subtitle_path:
task_kwargs["subtitles"] = [subtitle_path]
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, **task_kwargs)
return {"draft_path": draft_path, "draft_name": draft_name}
return task_kwargs
except Exception as e:
logger.error(f"导出到剪映草稿失败: {e}")
import traceback

View File

@ -24,6 +24,38 @@ from app.services import state as sm
from app.utils import utils
VIDEO_GENERATION_TOTAL_STEPS = 6
def _update_video_generation_task(
task_id: str,
progress: int,
message: str,
step_current: int = 0,
ffmpeg_progress: float | None = None,
state: int = const.TASK_STATE_PROCESSING,
**kwargs,
) -> None:
task_fields = {
"message": message,
"step_current": step_current,
"step_total": VIDEO_GENERATION_TOTAL_STEPS,
**kwargs,
}
if ffmpeg_progress is not None:
task_fields["ffmpeg_progress"] = round(
max(0.0, min(100.0, float(ffmpeg_progress))),
1,
)
sm.state.update_task(
task_id,
state=state,
progress=progress,
**task_fields,
)
def _is_auto_transcription_enabled(params: VideoClipParams) -> bool:
return bool(
getattr(params, "subtitle_enabled", True)
@ -583,22 +615,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
global merged_audio_path, merged_subtitle_path
logger.info(f"\n\n## 开始统一视频处理任务: {task_id}")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=0,
message="正在初始化视频生成任务",
step_current=0,
)
"""
1. 加载剪辑脚本
"""
logger.info("\n\n## 1. 加载视频脚本")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=5,
message="正在加载剪辑脚本",
step_current=1,
)
video_script_path = path.join(params.video_clip_json_path)
@ -625,11 +657,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
2. 使用 TTS 生成音频素材
"""
logger.info("\n\n## 2. 根据OST设置生成音频列表")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=10,
message="正在生成 TTS 配音",
step_current=2,
)
# 只为OST=0 or 2的判断生成音频 OST=0 仅保留解说 OST=2 保留解说和原声
tts_segments = [
@ -647,22 +679,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
voice_pitch=params.voice_pitch,
)
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=20,
message="TTS 配音生成完成",
step_current=2,
)
"""
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
"""
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=30,
message="正在按脚本裁剪视频片段",
step_current=3,
)
# 使用新的统一裁剪策略
@ -682,22 +714,22 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=60,
message="视频片段裁剪完成",
step_current=3,
)
"""
4. 合并音频和字幕
"""
logger.info("\n\n## 4. 合并音频和字幕")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=65,
message="正在合并配音和字幕",
step_current=4,
)
total_duration = sum([script["duration"] for script in new_script_list])
if tts_segments:
@ -750,11 +782,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
)
except Exception as e:
logger.warning(f"程序化字幕生成失败: {e}")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=70,
message="配音和字幕合并完成",
step_current=4,
)
"""
@ -765,11 +797,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
combined_video_path = path.join(utils.task_dir(task_id), f"merger.mp4")
logger.info(f"\n\n## 5. 合并视频: => {combined_video_path}")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=75,
message="正在合并视频片段",
step_current=5,
)
# 使用统一裁剪后的视频片段
@ -790,11 +822,11 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
video_aspect=params.video_aspect,
threads=params.n_threads
)
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=80,
message="视频片段合并完成",
step_current=5,
)
"""
@ -810,11 +842,12 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
else output_video_path
)
logger.info(f"\n\n## 6. 最后一步: 合并字幕/BGM/配音/视频 -> {merge_output_video_path}")
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=85,
message="正在合成最终视频",
step_current=6,
ffmpeg_progress=0,
)
bgm_path = utils.get_bgm_file(
@ -858,30 +891,47 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
'threads': params.n_threads,
**_build_subtitle_mask_options(params, enabled=not auto_transcription_enabled),
}
final_merge_progress_start = 85
final_merge_progress_end = 89 if auto_transcription_enabled else 99
def update_final_merge_progress(ffmpeg_progress: float):
progress_span = final_merge_progress_end - final_merge_progress_start
overall_progress = final_merge_progress_start + int(
round((max(0.0, min(100.0, float(ffmpeg_progress))) / 100) * progress_span)
)
_update_video_generation_task(
task_id,
progress=overall_progress,
message="正在合成最终视频",
step_current=6,
ffmpeg_progress=ffmpeg_progress,
)
generate_video.merge_materials(
video_path=combined_video_path,
audio_path=merged_audio_path,
subtitle_path=merged_subtitle_path,
bgm_path=bgm_path,
output_path=merge_output_video_path,
options=options
options=options,
progress_callback=update_final_merge_progress,
)
auto_subtitle_path = ""
if auto_transcription_enabled:
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=90,
message="正在自动转录最终视频",
step_current=6,
)
logger.info("\n\n## 7. 自动转录最终视频字幕")
auto_subtitle_path = _transcribe_final_video(task_id, merge_output_video_path, params)
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_PROCESSING,
progress=95,
message="正在压入自动转录字幕",
step_current=6,
)
logger.info(f"\n\n## 8. 压入自动转录字幕 -> {output_video_path}")
_merge_auto_transcribed_subtitles(
@ -902,11 +952,12 @@ def start_subclip_unified(task_id: str, params: VideoClipParams):
}
if auto_subtitle_path:
kwargs["subtitles"] = [auto_subtitle_path]
sm.state.update_task(
_update_video_generation_task(
task_id,
state=const.TASK_STATE_COMPLETE,
progress=100,
message="视频生成完成",
step_current=VIDEO_GENERATION_TOTAL_STEPS,
state=const.TASK_STATE_COMPLETE,
**kwargs
)
return kwargs

View File

@ -193,6 +193,177 @@ class JianyingTaskTests(unittest.TestCase):
self.assertEqual("NarratoAI_test", root_meta["all_draft_store"][0]["draft_name"])
self.assertEqual(str(draft_dir / "draft_info.json"), root_meta["all_draft_store"][0]["draft_json_file"])
def test_write_plaintext_jianying_draft_uses_source_timerange_and_writes_subtitles(self):
with tempfile.TemporaryDirectory() as temp_dir:
root_path = Path(temp_dir) / "drafts"
output_dir = Path(temp_dir) / "task"
root_path.mkdir()
output_dir.mkdir()
video_path = output_dir / "source.mp4"
audio_path = output_dir / "audio_00_00_02,000-00_00_04,000.mp3"
subtitle_path = output_dir / "script_subtitles.srt"
video_path.write_bytes(b"fake source video")
audio_path.write_bytes(b"fake audio")
subtitle_path.write_text(
"1\n00:00:00,000 --> 00:00:01,500\n测试字幕\n",
encoding="utf-8",
)
params = VideoClipParams(
video_origin_path=str(video_path),
original_volume=0.4,
tts_volume=0.9,
subtitle_enabled=True,
font_size=60,
text_fore_color="#FFFFFF",
)
script = [
{
"OST": 0,
"start_time": 2.0,
"source_start_time": 2.0,
"duration": 3.0,
"timestamp": "00:00:02,000-00:00:05,000",
"video": str(video_path),
"audio": str(audio_path),
"use_source_timerange": True,
}
]
def fake_duration(file_path):
return 10.0 if file_path == str(video_path) else 3.0
with (
patch.object(jianying_draft_builder, "_get_media_duration_ffprobe", side_effect=fake_duration),
patch.object(
jianying_draft_builder,
"_get_video_metadata_ffprobe",
return_value=(10_000_000, 1920, 1080),
),
):
draft_path, _ = jianying_draft_builder.write_plaintext_jianying_draft(
str(root_path),
"NarratoAI_source",
script,
params,
str(output_dir),
subtitle_path=str(subtitle_path),
)
draft_info = json.loads((Path(draft_path) / "draft_info.json").read_text(encoding="utf-8"))
self.assertEqual(1, len(draft_info["materials"]["videos"]))
self.assertEqual(1, len(draft_info["materials"]["texts"]))
self.assertIn("测试字幕", draft_info["materials"]["texts"][0]["content"])
video_segment = draft_info["tracks"][0]["segments"][0]
self.assertEqual(2_000_000, video_segment["source_timerange"]["start"])
self.assertEqual(3_000_000, video_segment["source_timerange"]["duration"])
self.assertEqual(0.0, video_segment["volume"])
text_tracks = [track for track in draft_info["tracks"] if track["type"] == "text"]
self.assertEqual(1, len(text_tracks))
self.assertEqual(1, len(text_tracks[0]["segments"]))
self.assertEqual(1_500_000, text_tracks[0]["segments"][0]["target_timerange"]["duration"])
def test_build_jianying_draft_script_references_original_video(self):
with tempfile.TemporaryDirectory() as temp_dir:
video_one = Path(temp_dir) / "one.mp4"
video_two = Path(temp_dir) / "two.mp4"
audio_path = Path(temp_dir) / "audio.mp3"
video_one.write_bytes(b"one")
video_two.write_bytes(b"two")
audio_path.write_bytes(b"audio")
params = VideoClipParams(
video_origin_path=str(video_one),
video_origin_paths=[str(video_one), str(video_two)],
)
script = [
{
"_id": 9,
"video_id": 2,
"timestamp": "00:00:05,000-00:00:07,000",
"narration": "解说",
"OST": 0,
}
]
tts_results = [
{
"_id": 9,
"timestamp": "00:00:05,000-00:00:07,000",
"audio_file": str(audio_path),
"subtitle_file": "",
"duration": 1.25,
}
]
draft_script = jianying_task._build_jianying_draft_script(script, params, tts_results)
self.assertEqual(str(video_two), draft_script[0]["video"])
self.assertEqual(str(audio_path), draft_script[0]["audio"])
self.assertEqual(5.0, draft_script[0]["source_start_time"])
self.assertEqual(1.25, draft_script[0]["duration"])
self.assertTrue(draft_script[0]["use_source_timerange"])
def test_start_export_jianying_draft_does_not_clip_video(self):
with tempfile.TemporaryDirectory() as temp_dir:
root_path = Path(temp_dir) / "drafts"
task_dir = Path(temp_dir) / "task"
root_path.mkdir()
task_dir.mkdir()
video_path = Path(temp_dir) / "source.mp4"
audio_path = task_dir / "audio.mp3"
script_path = Path(temp_dir) / "script.json"
subtitle_path = task_dir / "script_subtitles.srt"
video_path.write_bytes(b"video")
audio_path.write_bytes(b"audio")
script_path.write_text(
json.dumps([
{
"_id": 1,
"timestamp": "00:00:01,000-00:00:03,000",
"narration": "测试解说",
"OST": 0,
}
], ensure_ascii=False),
encoding="utf-8",
)
params = VideoClipParams(
video_clip_json_path=str(script_path),
video_origin_path=str(video_path),
tts_engine="edge_tts",
voice_name="zh-CN-YunjianNeural",
subtitle_enabled=True,
draft_name="NarratoAI_no_clip",
)
tts_results = [
{
"_id": 1,
"timestamp": "00:00:01,000-00:00:03,000",
"audio_file": str(audio_path),
"subtitle_file": "",
"duration": 1.5,
}
]
with (
patch.dict(jianying_task.config.ui, {"jianying_draft_path": str(root_path)}, clear=False),
patch.object(jianying_task.utils, "task_dir", return_value=str(task_dir)),
patch.object(jianying_task.voice, "tts_multiple", return_value=tts_results),
patch.object(jianying_task, "_create_jianying_subtitle_file", return_value=str(subtitle_path)),
patch.object(jianying_task, "write_plaintext_jianying_draft", return_value=(str(root_path / "draft"), "NarratoAI_no_clip")) as write_draft,
patch.object(jianying_task.clip_video, "clip_video_unified") as clip_video_unified,
):
result = jianying_task.start_export_jianying_draft("task-id", params)
clip_video_unified.assert_not_called()
write_kwargs = write_draft.call_args.kwargs
self.assertTrue(write_kwargs["new_script_list"][0]["use_source_timerange"])
self.assertEqual(str(audio_path), write_kwargs["new_script_list"][0]["audio"])
self.assertEqual(str(subtitle_path), write_kwargs["subtitle_path"])
self.assertEqual(str(subtitle_path), result["subtitles"][0])
if __name__ == "__main__":
unittest.main()

View File

@ -10,6 +10,7 @@ from webui.components import basic_settings, video_settings, audio_settings, sub
# from webui.utils import cache, file_utils
from app.utils import utils
from app.utils import ffmpeg_utils
from app.models import const
from app.models.schema import VideoClipParams, VideoAspect
@ -129,6 +130,77 @@ def tr(key):
return loc.get("Translation", {}).get(key, key)
VIDEO_GENERATION_STEP_LABELS = [
"正在加载剪辑脚本",
"正在生成 TTS 配音",
"正在按脚本裁剪视频片段",
"正在合并配音和字幕",
"正在合并视频片段",
"正在合成最终视频",
]
def _safe_int(value, default=0):
try:
return int(value)
except (TypeError, ValueError):
return default
def _format_optional_percent(value):
try:
percent = max(0.0, min(100.0, float(value)))
except (TypeError, ValueError):
return None
if percent.is_integer():
return str(int(percent))
return f"{percent:.1f}"
def _render_generation_status(task: dict | None) -> str:
task = task or {}
state = task.get("state")
current_step = _safe_int(task.get("step_current"), 0)
step_total = _safe_int(task.get("step_total"), len(VIDEO_GENERATION_STEP_LABELS))
message = str(task.get("message") or "")
ffmpeg_percent = _format_optional_percent(task.get("ffmpeg_progress"))
if current_step <= 0:
return f"<div style='font-weight:650;color:#262730;'>{escape(message or '正在生成视频,请稍候...')}</div>"
lines = []
for index, default_label in enumerate(VIDEO_GENERATION_STEP_LABELS, start=1):
is_current = index == current_step
is_complete = state == const.TASK_STATE_COMPLETE
is_done = is_complete or index < current_step
label = message if is_current and message else default_label
suffix = f"{index}/{step_total}"
if (
is_current
and index == step_total
and ffmpeg_percent is not None
and not is_complete
):
suffix = f"{suffix}ffmpeg {ffmpeg_percent}%"
color = "#262730" if is_current else "#8b9099" if is_done else "#b9bec7"
weight = "650" if is_current else "500"
lines.append(
"<div style='"
"font-size:1.02rem;"
"line-height:1.85;"
"margin:0.28rem 0;"
f"color:{color};"
f"font-weight:{weight};"
"'>"
f"{escape(label)} <span style='white-space:nowrap;'>({escape(suffix)})</span>"
"</div>"
)
return "".join(lines)
def get_help_text():
"""返回带当前项目版本号的帮助文案"""
return tr("Get Help").replace("🎉🎉🎉", f" v{config.project_version}")
@ -198,7 +270,12 @@ def render_generate_button():
progress_bar = st.progress(0)
status_panel = st.status(tr("Generating Video"), expanded=True)
status_panel.write(tr("Generating Video"))
with status_panel:
status_placeholder = st.empty()
status_placeholder.markdown(
_render_generation_status(None),
unsafe_allow_html=True,
)
def run_task():
try:
@ -238,10 +315,19 @@ def render_generate_button():
# 更新进度条和阶段状态
progress_bar.progress(progress / 100)
current_message = task.get("message") or f"Processing... {progress}%"
status_label = f"{current_message} ({progress}%)"
status_key = (state, progress, current_message)
status_key = (
state,
progress,
current_message,
task.get("step_current"),
task.get("step_total"),
task.get("ffmpeg_progress"),
)
if status_key != last_status_key:
status_panel.write(status_label)
status_placeholder.markdown(
_render_generation_status(task),
unsafe_allow_html=True,
)
last_status_key = status_key
if state == const.TASK_STATE_COMPLETE: