NarratoAI/app/services/jianying_draft_builder.py
viccy 7d4bd45f69 feat: 新增视频生成进度追踪与WebUI展示,优化剪映导出功能
- 添加FFmpeg视频合并进度回调支持,实现实时进度上报
- 为进度回调添加参数校验与异常捕获,避免回调失败影响主流程
- 重构任务状态更新逻辑,统一封装任务更新函数减少重复代码
- 重新划分视频生成的6个标准步骤,更新各阶段的状态上报逻辑
- 更新WebUI界面,支持展示当前生成步骤、总步骤与FFmpeg实时进度
- 优化剪映草稿导出功能,不再预先裁剪原视频,直接引用源素材时间戳
- 新增剪映草稿字幕生成功能,并补充对应的单元测试用例
2026-06-08 16:02:20 +08:00

1678 lines
55 KiB
Python

import json
import os
import re
import shutil
import subprocess
import time
import uuid
from typing import Any, Dict, List, Optional, Set, Tuple
from loguru import logger
from app.models.schema import VideoClipParams
from app.services import script_subtitle
MICROSECONDS = 1_000_000
DRAFT_PATH_PLACEHOLDER = "##_draftpath_placeholder_0E685133-18CE-45ED-8CB8-2904A212EC80_##"
DRAFT_PATH_PLACEHOLDER_PATTERN = re.compile(r"^##_draftpath_placeholder_[^#]+_##/")
MATERIAL_COLLECTION_KEYS = [
"ai_translates",
"audio_balances",
"audio_effects",
"audio_fades",
"audio_pannings",
"audio_pitch_shifts",
"audio_track_indexes",
"audios",
"beats",
"canvases",
"chromas",
"color_curves",
"common_mask",
"digital_human_model_dressing",
"digital_humans",
"drafts",
"effects",
"flowers",
"green_screens",
"handwrites",
"hsl",
"hsl_curves",
"images",
"log_color_wheels",
"loudnesses",
"manual_beautys",
"manual_deformations",
"material_animations",
"material_colors",
"multi_language_refs",
"placeholder_infos",
"placeholders",
"plugin_effects",
"primary_color_wheels",
"realtime_denoises",
"shapes",
"smart_crops",
"smart_relights",
"sound_channel_mappings",
"speeds",
"stickers",
"tail_leaders",
"text_templates",
"texts",
"time_marks",
"transitions",
"video_effects",
"video_radius",
"video_shadows",
"video_strokes",
"video_trackings",
"videos",
"vocal_beautifys",
"vocal_separations",
]
DRAFT_PACKAGE_DIRECTORIES = [
"qr_upload",
"matting",
"common_attachment",
"Resources/audioAlg",
"Resources/digitalHuman",
"Resources/restore_lut",
"Resources/videoAlg",
"subdraft",
"adjust_mask",
"assets/audio",
"assets/video",
"smart_crop",
]
DEFAULT_DRAFT_COVER_BYTES = bytes([
0xFF, 0xD8, 0xFF, 0xDB, 0x00, 0x43, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xC0, 0x00, 0x0B, 0x08, 0x00, 0x01, 0x00,
0x01, 0x01, 0x01, 0x11, 0x00, 0xFF, 0xC4, 0x00, 0x14, 0x00, 0x01, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x01, 0xFF, 0xC4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0xFF, 0xDA, 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3F, 0x00, 0x7F,
0xFF, 0xD9,
])
def _write_json_file(file_path: str, data: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
def _floor_duration_to_milliseconds(duration: float) -> float:
return int(max(duration, 0.0) * 1000) / 1000.0
def _seconds_to_microseconds(seconds: float) -> int:
return int(round(max(seconds, 0.0) * MICROSECONDS))
def _get_media_duration_ffprobe(media_file: str) -> float:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "csv=p=0",
media_file,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
def _get_cached_media_duration(media_file: str, duration_cache: Dict[str, float]) -> float:
if media_file not in duration_cache:
duration_cache[media_file] = _floor_duration_to_milliseconds(
_get_media_duration_ffprobe(media_file)
)
return duration_cache[media_file]
def _clamp_duration_to_media(
requested_duration: float,
media_file: str,
duration_cache: Dict[str, float],
media_label: str,
source_start_time: float = 0.0,
) -> float:
requested_duration = _floor_duration_to_milliseconds(requested_duration)
actual_duration = _get_cached_media_duration(media_file, duration_cache)
available_duration = _floor_duration_to_milliseconds(
max(actual_duration - max(source_start_time, 0.0), 0.0)
)
safe_duration = min(requested_duration, available_duration)
logger.info(
f"{media_label}实际时长: {actual_duration:.6f}秒, "
f"可用时长: {available_duration:.6f}秒, 请求时长: {requested_duration:.3f}"
)
if safe_duration < requested_duration:
logger.warning(
f"{media_label}短于脚本时长,已将剪映片段时长从 "
f"{requested_duration:.3f}秒 调整为 {safe_duration:.3f}"
)
return safe_duration
def _get_video_metadata_ffprobe(
media_file: str,
metadata_cache: Dict[str, Tuple[int, int, int]],
) -> Tuple[int, int, int]:
if media_file in metadata_cache:
return metadata_cache[media_file]
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "stream=width,height:format=duration",
"-of", "json",
media_file,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
info = json.loads(result.stdout or "{}")
stream = next(
(
item for item in info.get("streams", [])
if item.get("width") and item.get("height")
),
{},
)
duration = _floor_duration_to_milliseconds(
float(info.get("format", {}).get("duration") or 0.0)
)
width = int(stream.get("width") or 1920)
height = int(stream.get("height") or 1080)
metadata_cache[media_file] = (_seconds_to_microseconds(duration), width, height)
except Exception as e:
logger.warning(f"读取视频元信息失败,将使用默认分辨率: {media_file}, {e}")
duration = _floor_duration_to_milliseconds(_get_media_duration_ffprobe(media_file))
metadata_cache[media_file] = (_seconds_to_microseconds(duration), 1920, 1080)
return metadata_cache[media_file]
def _format_draft_uuid(draft_id: str) -> str:
compact = draft_id.replace("-", "")
if not re.fullmatch(r"[a-fA-F0-9]{32}", compact):
return draft_id
return "-".join([
compact[0:8],
compact[8:12],
compact[12:16],
compact[16:20],
compact[20:32],
]).upper()
def _detect_platform(draft_root_path: str) -> str:
return "windows" if re.match(r"^(?:[a-zA-Z]:[\\/]|\\\\)", draft_root_path) else "mac"
def _create_platform_info(draft_root_path: str) -> Dict[str, Any]:
return {
"app_id": 3704,
"app_source": "lv",
"app_version": "10.6.0",
"device_id": "",
"hard_disk_id": "",
"mac_address": "",
"os": _detect_platform(draft_root_path),
"os_version": "",
}
def _default_function_assistant_info() -> Dict[str, Any]:
return {
"audio_noise_segid_list": [],
"auto_adjust": False,
"auto_adjust_fixed": False,
"auto_adjust_fixed_value": 50.0,
"auto_adjust_segid_list": [],
"auto_caption": False,
"auto_caption_segid_list": [],
"auto_caption_template_id": "",
"caption_opt": False,
"caption_opt_segid_list": [],
"color_correction": False,
"color_correction_fixed": False,
"color_correction_fixed_value": 50.0,
"color_correction_segid_list": [],
"deflicker_segid_list": [],
"enhance_quality": False,
"enhance_quality_fixed": False,
"enhance_quality_segid_list": [],
"enhance_voice_segid_list": [],
"enhande_voice": False,
"enhande_voice_fixed": False,
"eye_correction": False,
"eye_correction_segid_list": [],
"fixed_rec_applied": False,
"fps": {"den": 1, "num": 0},
"normalize_loudness": False,
"normalize_loudness_audio_denoise_segid_list": [],
"normalize_loudness_fixed": False,
"normalize_loudness_segid_list": [],
"retouch": False,
"retouch_fixed": False,
"retouch_segid_list": [],
"smart_rec_applied": False,
"smart_segid_list": [],
"smooth_slow_motion": False,
"smooth_slow_motion_fixed": False,
"video_noise_segid_list": [],
}
def _safe_file_name(file_path: str, fallback: str) -> str:
name = os.path.basename(file_path) or fallback
name = re.sub(r'[<>:"|?*\x00-\x1f/\\]+', "_", name).strip(" ._")
return name or fallback
def _normalize_asset_path(file_path: Optional[str], fallback: str) -> str:
normalized = (file_path or "").replace("\\", "/").lstrip("./")
without_draft_placeholder = DRAFT_PATH_PLACEHOLDER_PATTERN.sub("", normalized)
if without_draft_placeholder.startswith("assets/"):
return without_draft_placeholder
assets_index = without_draft_placeholder.rfind("/assets/")
if assets_index >= 0:
return without_draft_placeholder[assets_index + 1:]
return fallback
def _to_draft_material_path(relative_path: str) -> str:
return f"{DRAFT_PATH_PLACEHOLDER}/{relative_path}"
def _unique_relative_asset_path(
directory: str,
file_name: str,
used_paths: Set[str],
) -> str:
base_name, ext = os.path.splitext(file_name)
candidate_name = file_name
counter = 2
while True:
relative_path = f"{directory}/{candidate_name}"
if relative_path not in used_paths:
used_paths.add(relative_path)
return relative_path
candidate_name = f"{base_name}_{counter}{ext}"
counter += 1
def _copy_asset_into_draft(source_file: str, draft_path: str, relative_path: str) -> None:
destination = os.path.join(draft_path, *relative_path.split("/"))
os.makedirs(os.path.dirname(destination), exist_ok=True)
if os.path.abspath(source_file) != os.path.abspath(destination):
shutil.copy2(source_file, destination)
def _register_asset(
source_file: str,
draft_path: str,
asset_dir: str,
fallback_name: str,
used_paths: Set[str],
asset_path_cache: Dict[str, str],
) -> str:
source_key = os.path.abspath(source_file)
if source_key in asset_path_cache:
return asset_path_cache[source_key]
file_name = _safe_file_name(source_file, fallback_name)
relative_path = _unique_relative_asset_path(asset_dir, file_name, used_paths)
_copy_asset_into_draft(source_file, draft_path, relative_path)
asset_path_cache[source_key] = relative_path
return relative_path
def _create_unique_draft_path(drafts_root: str, draft_name: str) -> Tuple[str, str]:
folder_base = _safe_file_name(draft_name, f"NarratoAI_{int(time.time())}")
folder_name = folder_base
counter = 2
while os.path.exists(os.path.join(drafts_root, folder_name)):
folder_name = f"{folder_base}_{counter}"
counter += 1
return folder_name, os.path.join(drafts_root, folder_name)
def _create_material_collections() -> Dict[str, List[Any]]:
return {key: [] for key in MATERIAL_COLLECTION_KEYS}
def _create_draft_template(
draft_id: str,
draft_name: str,
draft_root_path: str,
width: int = 1920,
height: int = 1080,
) -> Dict[str, Any]:
now_us = int(time.time() * MICROSECONDS)
platform_info = _create_platform_info(draft_root_path)
return {
"canvas_config": {"height": height, "ratio": "original", "width": width},
"color_space": 0,
"config": {
"adjust_max_index": 1,
"attachment_info": [],
"combination_max_index": 1,
"export_range": None,
"extract_audio_last_index": 1,
"lyrics_recognition_id": "",
"lyrics_sync": True,
"lyrics_taskinfo": [],
"maintrack_adsorb": True,
"material_save_mode": 0,
"multi_language_current": "none",
"multi_language_list": [],
"multi_language_main": "none",
"multi_language_mode": "none",
"original_sound_last_index": 1,
"record_audio_last_index": 1,
"sticker_max_index": 1,
"subtitle_keywords_config": None,
"subtitle_recognition_id": "",
"subtitle_sync": True,
"subtitle_taskinfo": [],
"system_font_list": [],
"video_mute": False,
"zoom_info_params": None,
},
"cover": None,
"create_time": now_us,
"duration": 0,
"extra_info": None,
"fps": 30.0,
"free_render_index_mode_on": False,
"group_container": None,
"id": draft_id,
"keyframe_graph_list": [],
"keyframes": {
"adjusts": [],
"audios": [],
"effects": [],
"filters": [],
"handwrites": [],
"stickers": [],
"texts": [],
"videos": [],
},
"last_modified_platform": platform_info,
"materials": _create_material_collections(),
"mutable_config": None,
"name": draft_name,
"new_version": "169.0.0",
"relationships": [],
"render_index_track_mode_on": True,
"retouch_cover": None,
"source": "default",
"static_cover_image_path": "",
"time_marks": None,
"tracks": [],
"update_time": now_us,
"version": 360000,
}
def _create_track(track_type: str, name: str) -> Dict[str, Any]:
return {
"attribute": 0,
"flag": 0,
"id": uuid.uuid4().hex,
"is_default_name": True,
"name": name,
"segments": [],
"type": track_type,
}
def _create_video_material(
relative_path: str,
duration_us: int,
width: int,
height: int,
) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"path": relative_path,
"type": "video",
"duration": duration_us,
"width": width,
"height": height,
"material_name": os.path.basename(relative_path),
"create_time": int(time.time() * MICROSECONDS),
"crop": {
"lower_left_x": 0.0,
"lower_left_y": 1.0,
"lower_right_x": 1.0,
"lower_right_y": 1.0,
"upper_left_x": 0.0,
"upper_left_y": 0.0,
"upper_right_x": 1.0,
"upper_right_y": 0.0,
},
"extra_type_option": 0,
"source_platform": 0,
}
def _create_audio_material(relative_path: str, duration_us: int) -> Dict[str, Any]:
material_id = uuid.uuid4().hex
return {
"app_id": 0,
"category_id": "",
"category_name": "local",
"check_flag": 1,
"copyright_limit_type": "none",
"duration": duration_us,
"effect_id": "",
"formula_id": "",
"id": material_id,
"intensifies_path": "",
"is_ai_clone_tone": False,
"is_text_edit_overdub": False,
"is_ugc": False,
"local_material_id": material_id,
"music_id": material_id,
"name": os.path.basename(relative_path),
"path": relative_path,
"remote_url": "",
"query": "",
"request_id": "",
"resource_id": "",
"search_id": "",
"source_from": "",
"source_platform": 0,
"team_id": "",
"text_id": "",
"tone_category_id": "",
"tone_category_name": "",
"tone_effect_id": "",
"tone_effect_name": "",
"tone_platform": "",
"tone_second_category_id": "",
"tone_second_category_name": "",
"tone_speaker": "",
"tone_type": "",
"type": "extract_music",
"video_id": "",
"wave_points": [],
}
def _create_video_segment(
material_id: str,
source_start_us: int,
duration_us: int,
target_start_us: int,
volume: float,
) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"material_id": material_id,
"target_timerange": {"start": target_start_us, "duration": duration_us},
"source_timerange": {"start": source_start_us, "duration": duration_us},
"speed": 1.0,
"volume": volume,
"enable_adjust": True,
"enable_color_curves": True,
"enable_color_match_adjust": False,
"enable_color_wheels": True,
"enable_lut": True,
"enable_smart_color_adjust": False,
"extra_material_refs": [],
"hdr_settings": {"intensity": 1.0, "mode": 1, "nits": 1000},
"uniform_scale": {"on": True, "value": 1.0},
"clip": {
"alpha": 1.0,
"flip": {"horizontal": False, "vertical": False},
"rotation": 0.0,
"scale": {"x": 1.0, "y": 1.0},
"transform": {"x": 0.0, "y": 0.0},
},
"common_keyframes": [],
}
def _create_audio_segment(
material_id: str,
duration_us: int,
target_start_us: int,
volume: float,
) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"material_id": material_id,
"target_timerange": {"start": target_start_us, "duration": duration_us},
"source_timerange": {"start": 0, "duration": duration_us},
"speed": 1.0,
"volume": volume,
"extra_material_refs": [],
"clip": None,
"hdr_settings": None,
"uniform_scale": None,
"common_keyframes": [],
}
def _normalize_hex_color(color: Optional[str], default: str = "#FFFFFF") -> str:
color = str(color or default).strip()
if not color.startswith("#"):
color = f"#{color}"
if re.fullmatch(r"#[0-9a-fA-F]{3}", color):
color = "#" + "".join(char * 2 for char in color[1:])
if not re.fullmatch(r"#[0-9a-fA-F]{6}", color):
color = default
return color.upper()
def _hex_color_to_rgb_float(color: Optional[str], default: str = "#FFFFFF") -> Tuple[float, float, float]:
normalized = _normalize_hex_color(color, default)
return (
int(normalized[1:3], 16) / 255,
int(normalized[3:5], 16) / 255,
int(normalized[5:7], 16) / 255,
)
def _resolve_subtitle_text_size(params: VideoClipParams) -> float:
raw_size = getattr(params, "font_size", 60) or 60
try:
font_size = float(raw_size)
except (TypeError, ValueError):
font_size = 60.0
return max(4.0, min(10.0, font_size / 12.0))
def _resolve_subtitle_transform_y(params: VideoClipParams) -> float:
subtitle_position = str(getattr(params, "subtitle_position", "bottom") or "bottom").lower()
if subtitle_position == "top":
return 0.82
if subtitle_position == "center":
return 0.0
if subtitle_position == "custom":
try:
y_percent = float(getattr(params, "custom_position", 85.0))
except (TypeError, ValueError):
y_percent = 85.0
y_percent = max(0.0, min(100.0, y_percent))
return max(-0.92, min(0.92, 1.0 - 2.0 * (y_percent / 100.0)))
return -0.8
def _create_text_material(text: str, params: VideoClipParams) -> Dict[str, Any]:
material_id = uuid.uuid4().hex
text = str(text or "")
text_color = _hex_color_to_rgb_float(getattr(params, "text_fore_color", "#FFFFFF"), "#FFFFFF")
stroke_color = _hex_color_to_rgb_float(getattr(params, "stroke_color", "#000000"), "#000000")
try:
stroke_width = float(getattr(params, "stroke_width", 1.5) or 0)
except (TypeError, ValueError):
stroke_width = 1.5
text_style = {
"fill": {
"alpha": 1.0,
"content": {
"render_type": "solid",
"solid": {
"alpha": 1.0,
"color": list(text_color),
},
},
},
"range": [0, len(text)],
"size": _resolve_subtitle_text_size(params),
"bold": False,
"italic": False,
"underline": False,
"strokes": [],
}
check_flag = 7
if stroke_width > 0:
text_style["strokes"] = [
{
"content": {
"solid": {
"alpha": 1.0,
"color": list(stroke_color),
}
},
"width": max(0.0, min(0.2, stroke_width / 100.0 * 0.2)),
}
]
check_flag |= 8
return {
"id": material_id,
"content": json.dumps(
{
"styles": [text_style],
"text": text,
},
ensure_ascii=False,
),
"typesetting": 0,
"alignment": 1,
"letter_spacing": 0.0,
"line_spacing": 0.02,
"line_feed": 1,
"line_max_width": 0.82,
"force_apply_line_max_width": False,
"check_flag": check_flag,
"type": "subtitle",
"global_alpha": 1.0,
}
def _create_text_segment(
material_id: str,
start_us: int,
duration_us: int,
params: VideoClipParams,
) -> Dict[str, Any]:
return {
"id": uuid.uuid4().hex,
"material_id": material_id,
"target_timerange": {"start": start_us, "duration": duration_us},
"source_timerange": None,
"speed": 1.0,
"volume": 1.0,
"extra_material_refs": [],
"is_tone_modify": False,
"clip": {
"alpha": 1.0,
"flip": {"horizontal": False, "vertical": False},
"rotation": 0.0,
"scale": {"x": 1.0, "y": 1.0},
"transform": {"x": 0.0, "y": _resolve_subtitle_transform_y(params)},
},
"uniform_scale": {"on": True, "value": 1.0},
"render_index": 15000,
"common_keyframes": [],
}
def _parse_srt_entries(subtitle_path: str) -> List[Tuple[float, float, str]]:
if not subtitle_path or not os.path.exists(subtitle_path):
return []
with open(subtitle_path, "r", encoding="utf-8-sig") as f:
content = f.read().strip()
if not content:
return []
entries: List[Tuple[float, float, str]] = []
for block in re.split(r"\n\s*\n", content):
lines = [line.strip() for line in block.splitlines() if line.strip()]
time_line_index = next(
(index for index, line in enumerate(lines) if "-->" in line),
None,
)
if time_line_index is None or time_line_index + 1 >= len(lines):
continue
try:
start_text, end_text = lines[time_line_index].split("-->", 1)
start = script_subtitle.parse_srt_like_time(start_text)
end = script_subtitle.parse_srt_like_time(end_text)
except Exception as e:
logger.warning(f"解析剪映字幕时间失败,跳过字幕块: {e}")
continue
text = "\n".join(lines[time_line_index + 1:]).strip()
if end <= start or not text:
continue
entries.append((start, end, text))
return entries
def _add_subtitle_track_from_srt(
draft: Dict[str, Any],
subtitle_path: str,
params: VideoClipParams,
) -> int:
entries = _parse_srt_entries(subtitle_path)
if not entries:
return 0
text_track = _create_track("text", "字幕轨道")
text_track["is_default_name"] = False
max_end_us = 0
for start, end, text in entries:
start_us = _seconds_to_microseconds(start)
duration_us = _seconds_to_microseconds(end - start)
if duration_us <= 0:
continue
text_material = _create_text_material(text, params)
draft["materials"]["texts"].append(text_material)
text_track["segments"].append(_create_text_segment(
text_material["id"],
start_us,
duration_us,
params,
))
max_end_us = max(max_end_us, start_us + duration_us)
if text_track["segments"]:
draft["tracks"].append(text_track)
logger.info(f"已写入剪映字幕轨: {len(text_track['segments'])} 条, {subtitle_path}")
return max_end_us
def _normalize_video_material(material: Dict[str, Any]) -> Dict[str, Any]:
fallback_path = f"assets/video/{material.get('material_name') or 'source.mp4'}"
result = {
"aigc_history_id": "",
"aigc_item_id": "",
"aigc_type": "none",
"audio_fade": None,
"beauty_body_auto_preset": None,
"beauty_body_preset_id": "",
"beauty_face_auto_preset": None,
"beauty_face_auto_preset_infos": [],
"beauty_face_preset_infos": [],
"cartoon_path": "",
"category_id": "",
"category_name": "local",
"check_flag": 65535,
"content_feature_info": None,
"corner_pin": None,
"crop_ratio": "free",
"crop_scale": 1.0,
"formula_id": "",
"freeze": None,
"has_audio": True,
"has_sound_separated": False,
"intensifies_audio_path": "",
"intensifies_path": "",
"is_ai_generate_content": False,
"is_copyright": False,
"is_set_beauty_mode": False,
"is_text_edit_overdub": False,
"is_unified_beauty_mode": False,
"live_photo_cover_path": "",
"live_photo_timestamp": 0,
"local_id": "",
"local_material_from": 0,
"local_material_id": "",
"material_id": "",
"material_url": "",
"matting": None,
"media_path": "",
"multi_camera_info": None,
"object_locked": None,
"origin_material_id": "",
"picture_from": "none",
"picture_set_category_id": "",
"picture_set_category_name": "",
"request_id": "",
"reverse_intensifies_path": "",
"reverse_path": "",
"smart_match_info": None,
"smart_motion": None,
"source": 0,
"stable": None,
"surface_trackings": None,
"team_id": "",
"unique_id": "",
"video_algorithm": None,
"video_mask_shadow": None,
"video_mask_stroke": None,
}
result.update(material)
result["path"] = _to_draft_material_path(
_normalize_asset_path(material.get("path"), fallback_path)
)
result["type"] = "video"
return result
def _normalize_audio_material(material: Dict[str, Any]) -> Dict[str, Any]:
fallback_path = f"assets/audio/{material.get('name') or 'audio.mp3'}"
result = {
"ai_music_enter_from": "",
"ai_music_generate_scene": "",
"ai_music_type": 0,
"aigc_history_id": "",
"aigc_item_id": "",
"app_id": 0,
"category_id": "",
"category_name": "local",
"check_flag": 1,
"cloned_model_type": "",
"copyright_limit_type": "none",
"effect_id": "",
"formula_id": "",
"intensifies_path": "",
"is_ai_clone_tone": False,
"is_ai_clone_tone_post": False,
"is_text_edit_overdub": False,
"is_ugc": False,
"lyric_type": 0,
"mock_tone_speaker": "",
"moyin_emotion": "",
"music_source": "",
"pgc_id": "",
"pgc_name": "",
"query": "",
"request_id": "",
"resource_id": "",
"search_id": "",
"similiar_music_info": None,
"sound_separate_type": 0,
"source_from": "",
"source_platform": 0,
"team_id": "",
"text_id": "",
"third_resource_id": "",
"tone_category_id": "",
"tone_category_name": "",
"tone_effect_id": "",
"tone_effect_name": "",
"tone_emotion_name_key": "",
"tone_emotion_role": "",
"tone_emotion_scale": 0,
"tone_emotion_selection": "",
"tone_emotion_style": "",
"tone_platform": "",
"tone_second_category_id": "",
"tone_second_category_name": "",
"tone_speaker": "",
"tone_type": "",
"tts_benefit_info": None,
"tts_generate_scene": 0,
"tts_task_id": "",
"unique_id": "",
"video_id": "",
"wave_points": [],
}
result.update(material)
result["path"] = _to_draft_material_path(
_normalize_asset_path(material.get("path"), fallback_path)
)
result["type"] = "extract_music"
return result
def _normalize_materials(draft: Dict[str, Any]) -> Dict[str, List[Any]]:
source = draft.get("materials", {})
materials = {
key: source.get(key, []) if isinstance(source.get(key, []), list) else []
for key in MATERIAL_COLLECTION_KEYS
}
materials["videos"] = [_normalize_video_material(item) for item in source.get("videos", [])]
materials["audios"] = [_normalize_audio_material(item) for item in source.get("audios", [])]
return materials
def _create_responsive_layout() -> Dict[str, Any]:
return {
"enable": False,
"horizontal_pos_layout": 0,
"size_layout": 0,
"target_follow": "",
"vertical_pos_layout": 0,
}
def _normalize_segment(
segment: Dict[str, Any],
track_type: str,
track_index: int,
track_attribute: int,
) -> Dict[str, Any]:
is_video = track_type == "video"
result = {
"caption_info": None,
"cartoon": False,
"color_correct_alg_result": "",
"common_keyframes": [],
"desc": "",
"digital_human_template_group_id": "",
"enable_adjust": is_video,
"enable_adjust_mask": is_video,
"enable_color_adjust_pro": False,
"enable_color_correct_adjust": False,
"enable_color_curves": True,
"enable_color_match_adjust": False,
"enable_color_wheels": True,
"enable_hsl": is_video,
"enable_hsl_curves": True,
"enable_lut": is_video,
"enable_mask_shadow": False,
"enable_mask_stroke": False,
"enable_smart_color_adjust": False,
"enable_video_mask": True,
"extra_material_refs": [],
"group_id": "",
"hdr_settings": segment.get("hdr_settings"),
"intensifies_audio": False,
"is_loop": False,
"is_placeholder": False,
"is_tone_modify": False,
"keyframe_refs": [],
"last_nonzero_volume": 1.0,
"lyric_keyframes": None,
"raw_segment_id": "",
"render_index": 0,
"render_timerange": {"duration": 0, "start": 0},
"responsive_layout": _create_responsive_layout(),
"reverse": False,
"source": "segmentsourcenormal",
"state": 0,
"template_id": "",
"template_scene": "default",
"uniform_scale": {"on": True, "value": 1.0},
"visible": True,
}
result.update(segment)
result["track_attribute"] = track_attribute
result["track_render_index"] = track_index
return result
def _normalize_tracks(draft: Dict[str, Any]) -> List[Dict[str, Any]]:
tracks = []
for index, track in enumerate(draft.get("tracks", [])):
track_copy = dict(track)
track_attribute = int(track_copy.get("attribute", 0) or 0)
track_copy["segments"] = [
_normalize_segment(segment, track_copy.get("type", ""), index, track_attribute)
for segment in track.get("segments", [])
]
tracks.append(track_copy)
return tracks
def _create_draft_info(
draft: Dict[str, Any],
draft_name: str,
draft_root_path: str,
new_version: str = "169.0.0",
) -> Dict[str, Any]:
info = json.loads(json.dumps(draft, ensure_ascii=False))
canvas_config = info.get("canvas_config", {})
platform_info = _create_platform_info(draft_root_path)
info.update({
"canvas_config": {
"background": canvas_config.get("background"),
"height": canvas_config.get("height", 1080),
"ratio": canvas_config.get("ratio", "original"),
"width": canvas_config.get("width", 1920),
},
"draft_type": "video",
"function_assistant_info": _default_function_assistant_info(),
"is_drop_frame_timecode": False,
"last_modified_platform": platform_info,
"lyrics_effects": [],
"materials": _normalize_materials(info),
"name": draft_name,
"new_version": new_version,
"path": "",
"platform": platform_info,
"render_index_track_mode_on": True,
"smart_ads_info": {"draft_url": "", "page_from": "", "routine": ""},
"tracks": _normalize_tracks(info),
"uneven_animation_template_info": {
"composition": "",
"content": "",
"order": "",
"sub_template_info_list": [],
},
})
return info
def _create_empty_template(draft: Dict[str, Any], draft_root_path: str) -> Dict[str, Any]:
empty_draft = json.loads(json.dumps(draft, ensure_ascii=False))
empty_draft["canvas_config"] = {
"background": None,
"height": 0,
"ratio": "original",
"width": 0,
}
empty_draft["color_space"] = -1
empty_draft["duration"] = 0
empty_draft["keyframes"] = {
"adjusts": [],
"audios": [],
"effects": [],
"filters": [],
"handwrites": [],
"stickers": [],
"texts": [],
"videos": [],
}
empty_draft["materials"] = _create_material_collections()
empty_draft["tracks"] = []
return _create_draft_info(empty_draft, "", draft_root_path, "75.0.0")
def _create_draft_material_index_item(
material: Dict[str, Any],
file_name: str,
metetype: str,
width: int,
height: int,
) -> Dict[str, Any]:
duration = int(material.get("duration", 0) or 0)
return {
"ai_group_type": "",
"create_time": -1,
"duration": duration,
"enter_from": 0,
"extra_info": file_name,
"file_Path": material.get("path", ""),
"height": height,
"id": material.get("id", ""),
"import_time": -1,
"import_time_ms": -1,
"item_source": 1,
"md5": "",
"metetype": metetype,
"roughcut_time_range": {"duration": duration, "start": 0},
"sub_time_range": {"duration": -1, "start": -1},
"type": 0,
"width": width,
}
def _create_draft_material_index(draft: Dict[str, Any]) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
for video in draft.get("materials", {}).get("videos", []):
relative_path = _normalize_asset_path(
video.get("path"),
f"assets/video/{video.get('material_name') or 'source.mp4'}",
)
items.append(_create_draft_material_index_item(
{**video, "path": _to_draft_material_path(relative_path)},
os.path.basename(relative_path),
"video",
int(video.get("width", 0) or 0),
int(video.get("height", 0) or 0),
))
for audio in draft.get("materials", {}).get("audios", []):
relative_path = _normalize_asset_path(
audio.get("path"),
f"assets/audio/{audio.get('name') or 'audio.mp3'}",
)
items.append(_create_draft_material_index_item(
{**audio, "path": _to_draft_material_path(relative_path)},
os.path.basename(relative_path),
"music",
0,
0,
))
return items
def _create_meta_info(
draft: Dict[str, Any],
draft_name: str,
draft_id: str,
draft_root_path: str,
draft_path: str,
asset_size: int,
) -> Dict[str, Any]:
return {
"cloud_draft_cover": False,
"cloud_draft_sync": False,
"cloud_package_completed_time": "",
"draft_cloud_capcut_purchase_info": "",
"draft_cloud_last_action_download": False,
"draft_cloud_package_type": "",
"draft_cloud_purchase_info": "",
"draft_cloud_template_id": "",
"draft_cloud_tutorial_info": "",
"draft_cloud_videocut_purchase_info": "",
"draft_cover": "draft_cover.jpg",
"draft_deeplink_url": "",
"draft_enterprise_info": {
"draft_enterprise_extra": "",
"draft_enterprise_id": "",
"draft_enterprise_name": "",
"enterprise_material": [],
},
"draft_fold_path": draft_path,
"draft_id": _format_draft_uuid(draft_id),
"draft_is_ae_produce": False,
"draft_is_ai_packaging_used": False,
"draft_is_ai_shorts": False,
"draft_is_ai_translate": False,
"draft_is_article_video_draft": False,
"draft_is_cloud_temp_draft": False,
"draft_is_from_deeplink": "false",
"draft_is_invisible": False,
"draft_is_pippit_draft": False,
"draft_is_web_article_video": False,
"draft_materials": [
{"type": 0, "value": _create_draft_material_index(draft)},
{"type": 1, "value": []},
{"type": 2, "value": []},
{"type": 3, "value": []},
{"type": 6, "value": []},
{"type": 7, "value": []},
{"type": 8, "value": []},
],
"draft_materials_copied_info": [],
"draft_name": draft_name,
"draft_need_rename_folder": False,
"draft_new_version": "",
"draft_removable_storage_device": "",
"draft_root_path": draft_root_path,
"draft_segment_extra_info": [],
"draft_timeline_materials_size_": asset_size,
"draft_type": "",
"draft_web_article_video_enter_from": "",
"tm_draft_cloud_completed": "",
"tm_draft_cloud_entry_id": -1,
"tm_draft_cloud_modified": 0,
"tm_draft_cloud_parent_entry_id": -1,
"tm_draft_cloud_space_id": -1,
"tm_draft_cloud_user_id": -1,
"tm_draft_create": draft.get("create_time", 0),
"tm_draft_modified": draft.get("update_time", 0),
"tm_draft_removed": 0,
"tm_duration": draft.get("duration", 0),
}
def _create_root_meta_entry(
draft: Dict[str, Any],
draft_name: str,
draft_id: str,
draft_root_path: str,
draft_path: str,
asset_size: int,
) -> Dict[str, Any]:
return {
"cloud_draft_cover": False,
"cloud_draft_sync": False,
"draft_cloud_last_action_download": False,
"draft_cloud_purchase_info": "",
"draft_cloud_template_id": "",
"draft_cloud_tutorial_info": "",
"draft_cloud_videocut_purchase_info": "",
"draft_cover": os.path.join(draft_path, "draft_cover.jpg"),
"draft_fold_path": draft_path,
"draft_id": _format_draft_uuid(draft_id),
"draft_is_ai_shorts": False,
"draft_is_cloud_temp_draft": False,
"draft_is_invisible": False,
"draft_is_web_article_video": False,
"draft_json_file": os.path.join(draft_path, "draft_info.json"),
"draft_name": draft_name,
"draft_new_version": "",
"draft_root_path": draft_root_path,
"draft_timeline_materials_size": asset_size,
"draft_type": "",
"draft_web_article_video_enter_from": "",
"streaming_edit_draft_ready": True,
"tm_draft_cloud_completed": "",
"tm_draft_cloud_entry_id": -1,
"tm_draft_cloud_modified": 0,
"tm_draft_cloud_parent_entry_id": -1,
"tm_draft_cloud_space_id": -1,
"tm_draft_cloud_user_id": -1,
"tm_draft_create": draft.get("create_time", 0),
"tm_draft_modified": draft.get("update_time", 0),
"tm_draft_removed": 0,
"tm_duration": draft.get("duration", 0),
}
def _merge_root_meta_info(
existing_value: Any,
next_entry: Dict[str, Any],
root_path: str,
) -> Dict[str, Any]:
existing = existing_value if isinstance(existing_value, dict) else {}
existing_store = existing.get("all_draft_store")
if not isinstance(existing_store, list):
existing_store = []
all_draft_store = [
next_entry,
*[
entry for entry in existing_store
if (
isinstance(entry, dict)
and entry.get("draft_id") != next_entry.get("draft_id")
and entry.get("draft_fold_path") != next_entry.get("draft_fold_path")
and entry.get("draft_name") != next_entry.get("draft_name")
)
],
]
return {
"all_draft_store": all_draft_store,
"draft_ids": existing.get("draft_ids") if isinstance(existing.get("draft_ids"), int) else 1,
"root_path": existing.get("root_path") or root_path,
}
def _create_draft_settings(draft: Dict[str, Any], draft_root_path: str) -> str:
created_at = round(int(draft.get("create_time", 0) or 0) / MICROSECONDS)
updated_at = round(int(draft.get("update_time", 0) or 0) / MICROSECONDS)
return "\n".join([
"[General]",
f"cloud_last_modify_platform={_detect_platform(draft_root_path)}",
f"draft_create_time={created_at}",
f"draft_last_edit_time={updated_at}",
"real_edit_keys=1",
"real_edit_seconds=0",
"",
])
def _create_reference_line_attachment() -> Dict[str, Any]:
return {
"reference_lines_config": {
"horizontal_lines": [],
"is_lock": False,
"is_visible": False,
"vertical_lines": [],
},
"safe_area_type": 0,
}
def _create_editing_attachment() -> Dict[str, Any]:
return {
"editing_draft": {
"ai_remove_filter_words": {
"enter_source": "",
"right_id": "",
},
"ai_shorts_info": {
"report_params": "",
"type": 0,
},
"cover_extra_info": {
"draft_id": "",
"position": 0,
"select_segment_id": "",
"select_segment_source_start": 0,
"select_segment_target_start": 0,
"type": 1,
},
"crop_info_extra": {
"crop_mirror_type": 0,
"crop_rotate": 0,
"crop_rotate_total": 0,
},
"digital_human_template_to_video_info": {
"has_upload_material": False,
"template_type": 0,
},
"draft_used_recommend_function": "",
"edit_type": 0,
"eye_correct_enabled_multi_face_time": 0,
"has_adjusted_render_layer": False,
"image_ai_chat_info": {
"before_chat_edit": False,
"draft_modify_time": 0,
"generate_type": "",
"keyword_content": "",
"keyword_type": "",
"message_id": "",
"model_name": "",
"need_restore": False,
"picture_id": "",
"prompt_content": "",
"prompt_from": "",
"sugs_info": [],
},
"is_open_expand_player": False,
"is_template_text_ai_generate": False,
"is_use_adjust": False,
"is_use_ai_expand": False,
"is_use_ai_remove": False,
"is_use_ai_video": False,
"is_use_audio_separation": False,
"is_use_chroma_key": False,
"is_use_curve_speed": False,
"is_use_digital_human": False,
"is_use_edit_multi_camera": False,
"is_use_lip_sync": False,
"is_use_lock_object": False,
"is_use_loudness_unify": False,
"is_use_noise_reduction": False,
"is_use_one_click_beauty": False,
"is_use_one_click_ultra_hd": False,
"is_use_retouch_face": False,
"is_use_smart_adjust_color": False,
"is_use_smart_body_beautify": False,
"is_use_smart_motion": False,
"is_use_subtitle_recognition": False,
"is_use_text_to_audio": False,
"material_edit_session": {
"material_edit_info": [],
"session_id": "",
"session_time": 0,
},
"paste_segment_list": [],
"profile_entrance_type": "",
"publish_enter_from": "",
"publish_type": "",
"single_function_type": 0,
"text_convert_case_types": [],
"version": "1.0.0",
"video_recording_create_draft": "",
}
}
def _create_draft_virtual_store(draft: Dict[str, Any]) -> Dict[str, Any]:
materials = [
*draft.get("materials", {}).get("videos", []),
*draft.get("materials", {}).get("audios", []),
]
return {
"draft_materials": [],
"draft_virtual_store": [
{
"type": 0,
"value": [
{
"creation_time": 0,
"display_name": "",
"filter_type": 0,
"id": "",
"import_time": 0,
"import_time_us": 0,
"sort_sub_type": 0,
"sort_type": 0,
"subdraft_filter_type": 0,
}
],
},
{
"type": 1,
"value": [
{"child_id": material.get("id", ""), "parent_id": ""}
for material in materials
],
},
{"type": 2, "value": []},
],
}
def _write_root_meta_info(draft_root_path: str, root_meta_entry: Dict[str, Any]) -> None:
root_meta_path = os.path.join(draft_root_path, "root_meta_info.json")
existing_value: Any = {}
if os.path.exists(root_meta_path):
try:
with open(root_meta_path, "r", encoding="utf-8") as f:
existing_value = json.load(f)
except Exception as e:
logger.warning(f"读取 root_meta_info.json 失败,将重建索引: {e}")
_write_json_file(
root_meta_path,
_merge_root_meta_info(existing_value, root_meta_entry, draft_root_path),
)
def _write_plaintext_draft_files(
draft_root_path: str,
draft_path: str,
draft_name: str,
draft_id: str,
draft: Dict[str, Any],
asset_size: int,
) -> None:
draft_info = _create_draft_info(draft, draft_name, draft_root_path)
empty_template = _create_empty_template(draft, draft_root_path)
_write_json_file(
os.path.join(draft_path, "draft_meta_info.json"),
_create_meta_info(draft, draft_name, draft_id, draft_root_path, draft_path, asset_size),
)
with open(os.path.join(draft_path, "draft_settings"), "w", encoding="utf-8") as f:
f.write(_create_draft_settings(draft, draft_root_path))
with open(os.path.join(draft_path, "draft_cover.jpg"), "wb") as f:
f.write(DEFAULT_DRAFT_COVER_BYTES)
_write_json_file(os.path.join(draft_path, "draft_info.json"), draft_info)
_write_json_file(os.path.join(draft_path, "template-2.tmp"), draft_info)
_write_json_file(os.path.join(draft_path, "template.tmp"), empty_template)
_write_json_file(
os.path.join(draft_path, "common_attachment", "attachment_pc_timeline.json"),
_create_reference_line_attachment(),
)
_write_json_file(os.path.join(draft_path, "common_attachment", "attachment_action_scene.json"), {})
_write_json_file(os.path.join(draft_path, "common_attachment", "attachment_script_video.json"), {})
_write_json_file(os.path.join(draft_path, "common_attachment", "attachment_gen_ai_info.json"), {})
_write_json_file(os.path.join(draft_path, "attachment_editing.json"), _create_editing_attachment())
_write_json_file(
os.path.join(draft_path, "draft_agency_config.json"),
{
"is_auto_agency_enabled": False,
"is_auto_agency_popup": False,
"is_single_agency_mode": False,
"marterials": None,
"use_converter": False,
"video_resolution": draft.get("canvas_config", {}).get("height", 1080),
},
)
with open(os.path.join(draft_path, "draft_biz_config.json"), "w", encoding="utf-8"):
pass
_write_json_file(
os.path.join(draft_path, "draft_virtual_store.json"),
_create_draft_virtual_store(draft),
)
_write_json_file(
os.path.join(draft_path, "performance_opt_info.json"),
{"manual_cancle_precombine_segs": None, "need_auto_precombine_segs": None},
)
_write_json_file(
os.path.join(draft_path, "timeline_layout.json"),
{
"activeTimeline": draft_id,
"dockItems": [
{
"dockIndex": 0,
"ratio": 1,
"timelineIds": [draft_id],
"timelineNames": ["时间线01"],
}
],
"layoutOrientation": 1,
},
)
_write_root_meta_info(
draft_root_path,
_create_root_meta_entry(draft, draft_name, draft_id, draft_root_path, draft_path, asset_size),
)
def _resolve_item_audio_file(item: Dict[str, Any], output_dir: str) -> str:
audio_file = ""
timestamp = item.get("timestamp", "")
if timestamp:
audio_file = os.path.join(output_dir, f"audio_{timestamp.replace(':', '_')}.mp3")
item_audio_file = item.get("audio", "")
if item_audio_file and os.path.exists(item_audio_file):
audio_file = item_audio_file
return audio_file
def write_plaintext_jianying_draft(
jianying_draft_path: str,
draft_name: str,
new_script_list: List[Dict[str, Any]],
params: VideoClipParams,
output_dir: str,
subtitle_path: str = "",
) -> Tuple[str, str]:
os.makedirs(jianying_draft_path, exist_ok=True)
display_name = draft_name or f"NarratoAI_{int(time.time())}"
folder_name, draft_path = _create_unique_draft_path(jianying_draft_path, display_name)
os.makedirs(draft_path, exist_ok=False)
for rel_dir in DRAFT_PACKAGE_DIRECTORIES:
os.makedirs(os.path.join(draft_path, *rel_dir.split("/")), exist_ok=True)
draft_id = uuid.uuid4().hex
draft = _create_draft_template(draft_id, display_name, jianying_draft_path)
video_track = _create_track("video", "视频轨道")
audio_track = _create_track("audio", "音频轨道")
draft["tracks"] = [video_track, audio_track]
duration_cache: Dict[str, float] = {}
metadata_cache: Dict[str, Tuple[int, int, int]] = {}
used_asset_paths: Set[str] = set()
asset_path_cache: Dict[str, str] = {}
video_material_cache: Dict[str, Dict[str, Any]] = {}
current_time_us = 0
for item in new_script_list:
start_time = float(item.get("start_time", 0.0) or 0.0)
source_start_time = float(item.get("source_start_time", start_time) or 0.0)
requested_duration = float(item.get("duration", 0.0) or 0.0)
timestamp = item.get("timestamp", "")
ost = int(item.get("OST", 0) or 0)
use_source_timerange = bool(item.get("use_source_timerange", False))
logger.info(
f"处理片段: OST={ost}, start_time={start_time}, "
f"duration={requested_duration}, timestamp={timestamp}"
)
video_file = item.get("video", "")
use_clipped_video = bool(video_file and os.path.exists(video_file) and not use_source_timerange)
if not use_clipped_video and not video_file:
video_file = params.video_origin_path
if not video_file or not os.path.exists(video_file):
logger.warning(f"视频素材不存在,跳过片段: {video_file or timestamp}")
continue
source_start_time = 0.0 if use_clipped_video else source_start_time
video_duration = _clamp_duration_to_media(
requested_duration,
video_file,
duration_cache,
"视频素材" if use_clipped_video else "原始视频素材",
source_start_time=source_start_time,
)
audio_file = _resolve_item_audio_file(item, output_dir)
audio_duration = None
if ost in [0, 2] and audio_file and os.path.exists(audio_file):
audio_duration = _get_cached_media_duration(audio_file, duration_cache)
logger.info(
f"音频文件实际时长: {audio_duration:.6f}秒, 视频片段时长: {video_duration:.3f}"
)
segment_duration = min(
video_duration,
audio_duration if audio_duration is not None else video_duration,
)
segment_duration = _floor_duration_to_milliseconds(segment_duration)
if segment_duration <= 0:
logger.warning(f"片段时长无效,跳过: {timestamp}")
continue
segment_duration_us = _seconds_to_microseconds(segment_duration)
video_material_key = os.path.abspath(video_file)
video_material = video_material_cache.get(video_material_key)
if video_material is None:
video_material_duration_us, width, height = _get_video_metadata_ffprobe(video_file, metadata_cache)
video_relative_path = _register_asset(
video_file,
draft_path,
"assets/video",
f"video_{len(video_material_cache) + 1}.mp4",
used_asset_paths,
asset_path_cache,
)
video_material = _create_video_material(video_relative_path, video_material_duration_us, width, height)
draft["materials"]["videos"].append(video_material)
video_material_cache[video_material_key] = video_material
video_volume = (
0.0
if ost == 0
else float(getattr(params, "original_volume", 1.0) or 1.0)
)
video_track["segments"].append(_create_video_segment(
video_material["id"],
_seconds_to_microseconds(_floor_duration_to_milliseconds(source_start_time)),
segment_duration_us,
current_time_us,
video_volume,
))
if ost in [0, 2] and audio_file and os.path.exists(audio_file):
audio_material_duration_us = _seconds_to_microseconds(
_get_cached_media_duration(audio_file, duration_cache)
)
audio_relative_path = _register_asset(
audio_file,
draft_path,
"assets/audio",
f"audio_{len(audio_track['segments']) + 1}.mp3",
used_asset_paths,
asset_path_cache,
)
audio_material = _create_audio_material(audio_relative_path, audio_material_duration_us)
draft["materials"]["audios"].append(audio_material)
audio_track["segments"].append(_create_audio_segment(
audio_material["id"],
segment_duration_us,
current_time_us,
float(getattr(params, "tts_volume", 1.0) or 1.0),
))
elif ost in [0, 2]:
logger.warning(f"音频文件不存在: {audio_file}")
current_time_us += segment_duration_us
if not video_track["segments"]:
raise ValueError("没有可写入剪映草稿的视频片段")
subtitle_end_us = 0
if getattr(params, "subtitle_enabled", True) and subtitle_path:
subtitle_end_us = _add_subtitle_track_from_srt(draft, subtitle_path, params)
first_video = draft["materials"]["videos"][0]
draft["canvas_config"]["width"] = int(first_video.get("width", 1920) or 1920)
draft["canvas_config"]["height"] = int(first_video.get("height", 1080) or 1080)
draft["duration"] = max(current_time_us, subtitle_end_us)
draft["update_time"] = int(time.time() * MICROSECONDS)
asset_size = sum(
os.path.getsize(source_file)
for source_file in asset_path_cache.keys()
if os.path.exists(source_file)
)
_write_plaintext_draft_files(
jianying_draft_path,
draft_path,
display_name,
draft_id,
draft,
asset_size,
)
logger.info(f"剪映明文草稿包已写入: {draft_path} (folder={folder_name})")
return draft_path, display_name