add export jianying draft feature

This commit is contained in:
aw123456dew 2026-04-07 11:33:12 +08:00
parent be653c5748
commit d45c1858c9
5 changed files with 338 additions and 0 deletions

View File

@ -196,6 +196,7 @@ class VideoClipParams(BaseModel):
tts_volume: Optional[float] = Field(default=AudioVolumeDefaults.TTS_VOLUME, description="解说语音音量(后处理)")
original_volume: Optional[float] = Field(default=AudioVolumeDefaults.ORIGINAL_VOLUME, description="视频原声音量")
bgm_volume: Optional[float] = Field(default=AudioVolumeDefaults.BGM_VOLUME, description="背景音乐音量")
draft_name: Optional[str] = Field(default="", description="剪映草稿名称")

View File

@ -0,0 +1,203 @@
import json
import os
import time
from os import path
from loguru import logger
from app.config import config
from app.models import const
from app.models.schema import VideoClipParams
from app.services import voice, clip_video, update_script
from app.services import state as sm
from app.utils import utils
def start_export_jianying_draft(task_id: str, params: VideoClipParams):
"""
导出到剪映草稿的后台任务
Args:
task_id: 任务ID
params: 视频参数
"""
logger.info(f"\n\n## 开始导出到剪映草稿任务: {task_id}")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=0)
"""
1. 加载剪辑脚本
"""
logger.info("\n\n## 1. 加载视频脚本")
video_script_path = path.join(params.video_clip_json_path)
if path.exists(video_script_path):
try:
with open(video_script_path, "r", encoding="utf-8") as f:
list_script = json.load(f)
video_list = [i['narration'] for i in list_script]
video_ost = [i['OST'] for i in list_script]
time_list = [i['timestamp'] for i in list_script]
video_script = " ".join(video_list)
logger.debug(f"解说完整脚本: \n{video_script}")
logger.debug(f"解说 OST 列表: \n{video_ost}")
logger.debug(f"解说时间戳列表: \n{time_list}")
except Exception as e:
logger.error(f"无法读取视频json脚本请检查脚本格式是否正确")
raise ValueError("无法读取视频json脚本请检查脚本格式是否正确")
else:
logger.error(f"解说脚本文件不存在: {video_script_path},请先点击【保存脚本】按钮保存脚本后再生成视频")
raise ValueError("解说脚本文件不存在!请先点击【保存脚本】按钮保存脚本后再生成视频。")
"""
2. 使用 TTS 生成音频素材
"""
logger.info("\n\n## 2. 根据OST设置生成音频列表")
tts_segments = [
segment for segment in list_script
if segment['OST'] in [0, 2]
]
logger.debug(f"需要生成TTS的片段数: {len(tts_segments)}")
tts_results = voice.tts_multiple(
task_id=task_id,
list_script=tts_segments, # 只传入需要TTS的片段
tts_engine=params.tts_engine,
voice_name=params.voice_name,
voice_rate=params.voice_rate,
voice_pitch=params.voice_pitch,
)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20)
"""
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
"""
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
video_clip_result = clip_video.clip_video_unified(
video_origin_path=params.video_origin_path,
script_list=list_script,
tts_results=tts_results
)
tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results}
subclip_clip_result = {
tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results
}
new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result)
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60)
"""
4. 导出到剪映草稿
"""
logger.info("\n\n## 4. 导出到剪映草稿")
try:
import pyJianYingDraft
from pyJianYingDraft import DraftFolder, VideoSegment, AudioSegment, trange, TrackType
jianying_draft_path = config.ui.get("jianying_draft_path", "")
if not jianying_draft_path:
raise ValueError("剪映草稿路径未配置")
# 创建DraftFolder实例
draft_folder = DraftFolder(jianying_draft_path)
# 使用从参数中获取的草稿名称,如果为空则使用默认名称
draft_name = getattr(params, 'draft_name', "")
logger.debug(f"从params获取的草稿名称: '{draft_name}' (类型: {type(draft_name)})")
if not draft_name:
draft_name = f"NarratoAI_{int(time.time())}"
logger.debug(f"使用默认草稿名称: '{draft_name}'")
# 创建新草稿
script = draft_folder.create_draft(draft_name, 1920, 1080)
# 添加视频轨道和音频轨道
script.add_track(TrackType.video, '视频轨道')
script.add_track(TrackType.audio, '音频轨道')
# 处理脚本数据
current_time = 0
output_dir = utils.task_dir(task_id)
for item in new_script_list:
# 获取时间信息
start_time = float(item.get('start_time', 0.0))
duration = float(item.get('duration', 0.0))
timestamp = item.get('timestamp', '')
logger.info(f"处理片段: OST={item['OST']}, start_time={start_time}, duration={duration}, timestamp={timestamp}")
# 生成音频文件路径
audio_file = ""
if timestamp:
timestamp_formatted = timestamp.replace(':', '_')
audio_file = os.path.join(
output_dir,
f"audio_{timestamp_formatted}.mp3"
)
# 检查是否有裁剪后的视频文件
video_file = item.get('video', '')
if video_file and not os.path.exists(video_file):
video_file = ""
# 添加视频片段
if video_file:
# 使用裁剪后的视频文件
# 对于裁剪后的视频target_timerange的第二个参数是持续时间
video_segment = VideoSegment(
video_file,
trange(f"{current_time}s", f"{duration}s")
)
else:
# 使用原始视频文件
# source_timerange是从原始视频中截取的部分
# target_timerange是片段在时间轴上的位置
video_segment = VideoSegment(
params.video_origin_path,
trange(f"{current_time}s", f"{duration}s"),
source_timerange=trange(f"{start_time}s", f"{duration}s")
)
script.add_segment(video_segment, '视频轨道')
# 处理音频
if item['OST'] in [0, 2]: # 需要TTS的片段
if os.path.exists(audio_file):
# 添加TTS音频片段
# 对于音频片段target_timerange的第二个参数是持续时间
audio_segment = AudioSegment(
audio_file,
trange(f"{current_time}s", f"{duration}s")
)
script.add_segment(audio_segment, '音频轨道')
else:
logger.warning(f"音频文件不存在: {audio_file}")
# OST=1的片段保留原声不需要添加额外音频
# 更新当前时间
current_time += duration
# 保存草稿
script.save()
draft_path = os.path.join(jianying_draft_path, draft_name)
logger.success(f"成功导出到剪映草稿: {draft_name}")
logger.info(f"草稿已保存到: {draft_path}")
# 更新任务状态
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, draft_path=draft_path, draft_name=draft_name)
return {"draft_path": draft_path, "draft_name": draft_name}
except ImportError as e:
logger.error(f"导入pyJianYingDraft失败: {e}")
raise ImportError(f"pyJianYingDraft库导入失败: {e}\n请确保已正确安装该库")
except Exception as e:
logger.error(f"导出到剪映草稿失败: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
raise Exception(f"导出到剪映草稿失败: {e}")

View File

@ -35,3 +35,6 @@ tenacity>=9.0.0
# torch>=2.0.0
# torchvision>=0.15.0
# torchaudio>=2.0.0
# 剪映草稿导出依赖
pyJianYingDraft>=0.1.0

122
webui.py
View File

@ -221,6 +221,127 @@ def render_generate_button():
time.sleep(0.5)
def render_export_jianying_button():
"""渲染导出到剪映草稿按钮和处理逻辑"""
import os
import time
import uuid
from loguru import logger
# 初始化session state
if 'show_jianying_export_form' not in st.session_state:
st.session_state['show_jianying_export_form'] = False
if 'jianying_export_result' not in st.session_state:
st.session_state['jianying_export_result'] = None
if 'jianying_export_error' not in st.session_state:
st.session_state['jianying_export_error'] = None
if st.button("📤 导出到剪映草稿", use_container_width=True, type="secondary"):
config.save_config()
if not st.session_state.get('video_clip_json_path'):
st.error("脚本文件不能为空")
return
if not st.session_state.get('video_origin_path'):
st.error("视频文件不能为空")
return
jianying_draft_path = config.ui.get("jianying_draft_path", "")
if not jianying_draft_path:
st.error("请在基础设置中配置剪映草稿地址")
return
if not os.path.exists(jianying_draft_path):
st.error(f"剪映草稿文件夹不存在: {jianying_draft_path}")
return
# 显示导出表单
st.session_state['show_jianying_export_form'] = True
st.session_state['jianying_export_result'] = None
st.session_state['jianying_export_error'] = None
# 显示导出表单
if st.session_state['show_jianying_export_form']:
st.markdown("---")
st.subheader("导出到剪映草稿")
draft_name = st.text_input(
"请输入剪映草稿名称",
value=f"NarratoAI_{int(time.time())}",
key="draft_name_input"
)
if st.button("确认导出", key="confirm_export"):
if not draft_name:
st.error("请输入草稿名称")
return
# 获取音频设置
tts_engine = st.session_state.get('tts_engine', 'azure')
voice_name = st.session_state.get('voice_name', 'zh-CN-YunjianNeural')
voice_rate = st.session_state.get('voice_rate', 1.0)
voice_pitch = st.session_state.get('voice_pitch', 1.0)
# 创建任务ID
task_id = str(uuid.uuid4())
st.session_state['task_id'] = task_id
# 构建参数
logger.debug(f"准备创建VideoClipParams草稿名称: '{draft_name}'")
params = VideoClipParams(
video_clip_json_path=st.session_state['video_clip_json_path'],
video_origin_path=st.session_state['video_origin_path'],
tts_engine=tts_engine,
voice_name=voice_name,
voice_rate=voice_rate,
voice_pitch=voice_pitch,
n_threads=config.app.get('n_threads', 4),
video_aspect=VideoAspect.landscape,
subtitle_enabled=st.session_state.get('subtitle_enabled', False),
font_name=st.session_state.get('font_name', 'Microsoft YaHei'),
font_size=st.session_state.get('font_size', 24),
text_fore_color=st.session_state.get('text_fore_color', '#FFFFFF'),
subtitle_position=st.session_state.get('subtitle_position', 'bottom'),
custom_position=st.session_state.get('custom_position', 70.0),
tts_volume=st.session_state.get('tts_volume', 1.0),
original_volume=st.session_state.get('original_volume', 0.7),
bgm_volume=st.session_state.get('bgm_volume', 0.3),
draft_name=draft_name
)
with st.spinner("正在导出到剪映草稿,请稍候..."):
try:
from app.services import jianying_task
# 调用导出到剪映草稿的任务
result = jianying_task.start_export_jianying_draft(task_id, params)
# 记录日志
logger.info(f"成功导出到剪映草稿: {result['draft_name']}")
logger.info(f"草稿已保存到: {result['draft_path']}")
# 保存结果到session state
st.session_state['jianying_export_result'] = result
st.session_state['jianying_export_error'] = None
st.session_state['show_jianying_export_form'] = False
st.success(f"✅ 成功导出到剪映草稿: {result['draft_name']}")
st.info(f"📁 草稿已保存到: {result['draft_path']}")
except Exception as e:
logger.error(f"导出到剪映草稿失败: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
st.session_state['jianying_export_error'] = str(e)
st.session_state['jianying_export_result'] = None
st.error(f"❌ 导出到剪映草稿失败: {e}")
if st.button("取消", key="cancel_export"):
st.session_state['show_jianying_export_form'] = False
st.session_state['jianying_export_result'] = None
st.session_state['jianying_export_error'] = None
st.rerun()
def main():
"""主函数"""
@ -285,6 +406,7 @@ def main():
# 放到最后渲染生成按钮和处理逻辑
render_generate_button()
render_export_jianying_button()
if __name__ == "__main__":

View File

@ -217,6 +217,15 @@ def render_proxy_settings(tr):
config.proxy["http"] = ""
config.proxy["https"] = ""
# 剪映草稿地址设置
st.subheader("剪映草稿设置")
jianying_draft_path = st.text_input(
"剪映草稿文件夹路径",
value=config.ui.get("jianying_draft_path", ""),
help="剪映草稿文件夹路径例如C:\\Users\\用户名\\Documents\\JianyingPro Drafts"
)
config.ui["jianying_draft_path"] = jianying_draft_path
def test_vision_model_connection(api_key, base_url, model_name, provider, tr):
"""测试视觉模型连接