Merge pull request #238 from aw123456dew/feature/export-jianying-draft

add export jianying draft feature
This commit is contained in:
viccy 2026-04-08 15:13:02 +08:00 committed by GitHub
commit de33c6d0bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 395 additions and 0 deletions

View File

@ -196,6 +196,7 @@ class VideoClipParams(BaseModel):
tts_volume: Optional[float] = Field(default=AudioVolumeDefaults.TTS_VOLUME, description="解说语音音量(后处理)")
original_volume: Optional[float] = Field(default=AudioVolumeDefaults.ORIGINAL_VOLUME, description="视频原声音量")
bgm_volume: Optional[float] = Field(default=AudioVolumeDefaults.BGM_VOLUME, description="背景音乐音量")
draft_name: Optional[str] = Field(default="", description="剪映草稿名称")

View File

@ -0,0 +1,241 @@
import json
import os
import subprocess
import time
from os import path
from loguru import logger
from app.config import config
from app.models import const
from app.models.schema import VideoClipParams
from app.services import voice, clip_video, update_script
from app.services import state as sm
from app.utils import utils
def get_audio_duration_ffprobe(audio_file: str) -> float:
"""
使用ffprobe获取音频文件的精确时长
Args:
audio_file: 音频文件路径
Returns:
float: 音频时长精确到微秒
"""
try:
cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'csv=p=0',
audio_file
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
duration = float(result.stdout.strip())
logger.debug(f"使用ffprobe获取音频时长: {duration:.6f}")
return duration
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe执行失败: {e.stderr}")
raise
except Exception as e:
logger.error(f"获取音频时长失败: {str(e)}")
raise
def start_export_jianying_draft(task_id: str, params: VideoClipParams):
"""
导出到剪映草稿的后台任务
Args:
task_id: 任务ID
params: 视频参数
"""
logger.info(f"\n\n## 开始导出到剪映草稿任务: {task_id}")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=0)
"""
1. 加载剪辑脚本
"""
logger.info("\n\n## 1. 加载视频脚本")
video_script_path = path.join(params.video_clip_json_path)
if path.exists(video_script_path):
try:
with open(video_script_path, "r", encoding="utf-8") as f:
list_script = json.load(f)
video_list = [i['narration'] for i in list_script]
video_ost = [i['OST'] for i in list_script]
time_list = [i['timestamp'] for i in list_script]
video_script = " ".join(video_list)
logger.debug(f"解说完整脚本: \n{video_script}")
logger.debug(f"解说 OST 列表: \n{video_ost}")
logger.debug(f"解说时间戳列表: \n{time_list}")
except Exception as e:
logger.error(f"无法读取视频json脚本请检查脚本格式是否正确")
raise ValueError("无法读取视频json脚本请检查脚本格式是否正确")
else:
logger.error(f"解说脚本文件不存在: {video_script_path},请先点击【保存脚本】按钮保存脚本后再生成视频")
raise ValueError("解说脚本文件不存在!请先点击【保存脚本】按钮保存脚本后再生成视频。")
"""
2. 使用 TTS 生成音频素材
"""
logger.info("\n\n## 2. 根据OST设置生成音频列表")
tts_segments = [
segment for segment in list_script
if segment['OST'] in [0, 2]
]
logger.debug(f"需要生成TTS的片段数: {len(tts_segments)}")
tts_results = voice.tts_multiple(
task_id=task_id,
list_script=tts_segments, # 只传入需要TTS的片段
tts_engine=params.tts_engine,
voice_name=params.voice_name,
voice_rate=params.voice_rate,
voice_pitch=params.voice_pitch,
)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20)
"""
3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略
"""
logger.info("\n\n## 3. 统一视频裁剪基于OST类型")
video_clip_result = clip_video.clip_video_unified(
video_origin_path=params.video_origin_path,
script_list=list_script,
tts_results=tts_results
)
tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results}
subclip_clip_result = {
tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results
}
new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result)
logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段")
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60)
"""
4. 导出到剪映草稿
"""
logger.info("\n\n## 4. 导出到剪映草稿")
try:
import pyJianYingDraft
from pyJianYingDraft import DraftFolder, VideoSegment, AudioSegment, trange, TrackType
jianying_draft_path = config.ui.get("jianying_draft_path", "")
if not jianying_draft_path:
raise ValueError("剪映草稿路径未配置")
# 创建DraftFolder实例
draft_folder = DraftFolder(jianying_draft_path)
# 使用从参数中获取的草稿名称,如果为空则使用默认名称
draft_name = getattr(params, 'draft_name', "")
logger.debug(f"从params获取的草稿名称: '{draft_name}' (类型: {type(draft_name)})")
if not draft_name:
draft_name = f"NarratoAI_{int(time.time())}"
logger.debug(f"使用默认草稿名称: '{draft_name}'")
# 创建新草稿
script = draft_folder.create_draft(draft_name, 1920, 1080)
# 添加视频轨道和音频轨道
script.add_track(TrackType.video, '视频轨道')
script.add_track(TrackType.audio, '音频轨道')
# 处理脚本数据
current_time = 0
output_dir = utils.task_dir(task_id)
for item in new_script_list:
# 获取时间信息
start_time = float(item.get('start_time', 0.0))
duration = float(item.get('duration', 0.0))
timestamp = item.get('timestamp', '')
logger.info(f"处理片段: OST={item['OST']}, start_time={start_time}, duration={duration}, timestamp={timestamp}")
# 生成音频文件路径
audio_file = ""
if timestamp:
timestamp_formatted = timestamp.replace(':', '_')
audio_file = os.path.join(
output_dir,
f"audio_{timestamp_formatted}.mp3"
)
# 检查是否有裁剪后的视频文件
video_file = item.get('video', '')
if video_file and not os.path.exists(video_file):
video_file = ""
# 添加视频片段
if video_file:
# 使用裁剪后的视频文件
# 对于裁剪后的视频target_timerange的第二个参数是持续时间
video_segment = VideoSegment(
video_file,
trange(f"{current_time}s", f"{duration}s")
)
else:
# 使用原始视频文件
# source_timerange是从原始视频中截取的部分
# target_timerange是片段在时间轴上的位置
video_segment = VideoSegment(
params.video_origin_path,
trange(f"{current_time}s", f"{duration}s"),
source_timerange=trange(f"{start_time}s", f"{duration}s")
)
script.add_segment(video_segment, '视频轨道')
# 处理音频
if item['OST'] in [0, 2]: # 需要TTS的片段
if os.path.exists(audio_file):
# 使用ffprobe获取精确的音频时长避免因TTS引擎差异导致时长不匹配
actual_audio_duration = get_audio_duration_ffprobe(audio_file)
logger.info(f"音频文件实际时长: {actual_audio_duration:.6f}秒, 脚本时长(视频): {duration:.3f}")
# 使用音频实际时长和视频时长中的较小值,确保不超过素材时长
# 当TTS语速调整时音频可能比视频长或短取较小值可以避免超出素材
safe_duration = min(actual_audio_duration, duration)
logger.info(f"使用时长: {safe_duration:.6f}秒 (取音频和视频时长的较小值)")
audio_segment = AudioSegment(
audio_file,
trange(f"{current_time}s", f"{safe_duration}s")
)
script.add_segment(audio_segment, '音频轨道')
else:
logger.warning(f"音频文件不存在: {audio_file}")
# OST=1的片段保留原声不需要添加额外音频
# 更新当前时间
current_time += duration
# 保存草稿
script.save()
draft_path = os.path.join(jianying_draft_path, draft_name)
logger.success(f"成功导出到剪映草稿: {draft_name}")
logger.info(f"草稿已保存到: {draft_path}")
# 更新任务状态
sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, draft_path=draft_path, draft_name=draft_name)
return {"draft_path": draft_path, "draft_name": draft_name}
except ImportError as e:
logger.error(f"导入pyJianYingDraft失败: {e}")
raise ImportError(f"pyJianYingDraft库导入失败: {e}\n请确保已正确安装该库")
except Exception as e:
logger.error(f"导出到剪映草稿失败: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
raise Exception(f"导出到剪映草稿失败: {e}")

View File

@ -35,3 +35,6 @@ tenacity>=9.0.0
# torch>=2.0.0
# torchvision>=0.15.0
# torchaudio>=2.0.0
# 剪映草稿导出依赖
pyJianYingDraft>=0.1.0

141
webui.py
View File

@ -1,6 +1,7 @@
import streamlit as st
import os
import sys
import time
from loguru import logger
from app.config import config
from webui.components import basic_settings, video_settings, audio_settings, subtitle_settings, script_settings, \
@ -221,6 +222,145 @@ def render_generate_button():
time.sleep(0.5)
def get_voice_name_for_tts_engine(tts_engine: str) -> str:
"""根据TTS引擎获取用户选择的音色"""
if tts_engine == 'doubaotts':
return st.session_state.get('voice_name', config.ui.get('doubaotts_voice_type', 'BV700_streaming'))
elif tts_engine == 'azure_speech':
return st.session_state.get('voice_name', config.ui.get('azure_voice_name', 'zh-CN-XiaoxiaoMultilingualNeural'))
else:
return st.session_state.get('voice_name', config.ui.get('edge_voice_name', 'zh-CN-XiaoxiaoNeural-Female'))
def get_jianying_export_params() -> VideoClipParams:
"""获取导出到剪映草稿的参数"""
tts_engine = st.session_state.get('tts_engine', 'azure')
voice_name = get_voice_name_for_tts_engine(tts_engine)
voice_rate = st.session_state.get('voice_rate', 1.0)
voice_pitch = st.session_state.get('voice_pitch', 1.0)
return VideoClipParams(
video_clip_json_path=st.session_state['video_clip_json_path'],
video_origin_path=st.session_state['video_origin_path'],
tts_engine=tts_engine,
voice_name=voice_name,
voice_rate=voice_rate,
voice_pitch=voice_pitch,
n_threads=config.app.get('n_threads', 4),
video_aspect=VideoAspect.landscape,
subtitle_enabled=st.session_state.get('subtitle_enabled', False),
font_name=st.session_state.get('font_name', 'Microsoft YaHei'),
font_size=st.session_state.get('font_size', 24),
text_fore_color=st.session_state.get('text_fore_color', '#FFFFFF'),
subtitle_position=st.session_state.get('subtitle_position', 'bottom'),
custom_position=st.session_state.get('custom_position', 70.0),
tts_volume=st.session_state.get('tts_volume', 1.0),
original_volume=st.session_state.get('original_volume', 0.7),
bgm_volume=st.session_state.get('bgm_volume', 0.3),
draft_name=st.session_state.get('draft_name_input', f"NarratoAI_{int(time.time())}")
)
def render_export_jianying_button():
"""渲染导出到剪映草稿按钮和处理逻辑"""
import os
import time
import uuid
from loguru import logger
# 初始化session state
if 'show_jianying_export_form' not in st.session_state:
st.session_state['show_jianying_export_form'] = False
if 'jianying_export_result' not in st.session_state:
st.session_state['jianying_export_result'] = None
if 'jianying_export_error' not in st.session_state:
st.session_state['jianying_export_error'] = None
if st.button("📤 导出到剪映草稿", use_container_width=True, type="secondary"):
config.save_config()
if not st.session_state.get('video_clip_json_path'):
st.error("脚本文件不能为空")
return
if not st.session_state.get('video_origin_path'):
st.error("视频文件不能为空")
return
jianying_draft_path = config.ui.get("jianying_draft_path", "")
if not jianying_draft_path:
st.error("请在基础设置中配置剪映草稿地址")
return
if not os.path.exists(jianying_draft_path):
st.error(f"剪映草稿文件夹不存在: {jianying_draft_path}")
return
# 显示导出表单
st.session_state['show_jianying_export_form'] = True
st.session_state['jianying_export_result'] = None
st.session_state['jianying_export_error'] = None
# 显示导出表单
if st.session_state['show_jianying_export_form']:
st.markdown("---")
st.subheader("导出到剪映草稿")
draft_name = st.text_input(
"请输入剪映草稿名称",
value=f"NarratoAI_{int(time.time())}",
key="draft_name_input"
)
if st.button("确认导出", key="confirm_export"):
if not draft_name:
st.error("请输入草稿名称")
return
# 创建任务ID
task_id = str(uuid.uuid4())
st.session_state['task_id'] = task_id
# 构建参数
try:
params = get_jianying_export_params()
except Exception as e:
logger.error(f"构建参数失败: {e}")
st.error(f"参数构建失败: {e}")
return
with st.spinner("正在导出到剪映草稿,请稍候..."):
try:
from app.services import jianying_task
# 调用导出到剪映草稿的任务
result = jianying_task.start_export_jianying_draft(task_id, params)
# 记录日志
logger.info(f"成功导出到剪映草稿: {result['draft_name']}")
logger.info(f"草稿已保存到: {result['draft_path']}")
# 保存结果到session state
st.session_state['jianying_export_result'] = result
st.session_state['jianying_export_error'] = None
st.session_state['show_jianying_export_form'] = False
st.success(f"✅ 成功导出到剪映草稿: {result['draft_name']}")
st.info(f"📁 草稿已保存到: {result['draft_path']}")
except Exception as e:
logger.error(f"导出到剪映草稿失败: {e}")
import traceback
logger.error(f"错误详情: {traceback.format_exc()}")
st.session_state['jianying_export_error'] = str(e)
st.session_state['jianying_export_result'] = None
st.error(f"❌ 导出到剪映草稿失败: {e}")
if st.button("取消", key="cancel_export"):
st.session_state['show_jianying_export_form'] = False
st.session_state['jianying_export_result'] = None
st.session_state['jianying_export_error'] = None
st.rerun()
def main():
"""主函数"""
@ -285,6 +425,7 @@ def main():
# 放到最后渲染生成按钮和处理逻辑
render_generate_button()
render_export_jianying_button()
if __name__ == "__main__":

View File

@ -217,6 +217,15 @@ def render_proxy_settings(tr):
config.proxy["http"] = ""
config.proxy["https"] = ""
# 剪映草稿地址设置
st.subheader("剪映草稿设置")
jianying_draft_path = st.text_input(
"剪映草稿文件夹路径",
value=config.ui.get("jianying_draft_path", ""),
help="剪映草稿文件夹路径例如C:\\Users\\用户名\\Documents\\JianyingPro Drafts"
)
config.ui["jianying_draft_path"] = jianying_draft_path
def test_vision_model_connection(api_key, base_url, model_name, provider, tr):
"""测试视觉模型连接