import json import os import subprocess import time from os import path from loguru import logger from app.config import config from app.models import const from app.models.schema import VideoClipParams from app.services import voice, clip_video, update_script from app.services import state as sm from app.utils import utils def get_audio_duration_ffprobe(audio_file: str) -> float: """ 使用ffprobe获取音频文件的精确时长(秒) Args: audio_file: 音频文件路径 Returns: float: 音频时长(秒),精确到微秒 """ try: cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', audio_file ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) duration = float(result.stdout.strip()) logger.debug(f"使用ffprobe获取音频时长: {duration:.6f}秒") return duration except subprocess.CalledProcessError as e: logger.error(f"ffprobe执行失败: {e.stderr}") raise except Exception as e: logger.error(f"获取音频时长失败: {str(e)}") raise def start_export_jianying_draft(task_id: str, params: VideoClipParams): """ 导出到剪映草稿的后台任务 Args: task_id: 任务ID params: 视频参数 """ logger.info(f"\n\n## 开始导出到剪映草稿任务: {task_id}") sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=0) """ 1. 加载剪辑脚本 """ logger.info("\n\n## 1. 加载视频脚本") video_script_path = path.join(params.video_clip_json_path) if path.exists(video_script_path): try: with open(video_script_path, "r", encoding="utf-8") as f: list_script = json.load(f) video_list = [i['narration'] for i in list_script] video_ost = [i['OST'] for i in list_script] time_list = [i['timestamp'] for i in list_script] video_script = " ".join(video_list) logger.debug(f"解说完整脚本: \n{video_script}") logger.debug(f"解说 OST 列表: \n{video_ost}") logger.debug(f"解说时间戳列表: \n{time_list}") except Exception as e: logger.error(f"无法读取视频json脚本,请检查脚本格式是否正确") raise ValueError("无法读取视频json脚本,请检查脚本格式是否正确") else: logger.error(f"解说脚本文件不存在: {video_script_path},请先点击【保存脚本】按钮保存脚本后再生成视频") raise ValueError("解说脚本文件不存在!请先点击【保存脚本】按钮保存脚本后再生成视频。") """ 2. 使用 TTS 生成音频素材 """ logger.info("\n\n## 2. 根据OST设置生成音频列表") tts_segments = [ segment for segment in list_script if segment['OST'] in [0, 2] ] logger.debug(f"需要生成TTS的片段数: {len(tts_segments)}") tts_results = voice.tts_multiple( task_id=task_id, list_script=tts_segments, # 只传入需要TTS的片段 tts_engine=params.tts_engine, voice_name=params.voice_name, voice_rate=params.voice_rate, voice_pitch=params.voice_pitch, ) sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=20) """ 3. 统一视频裁剪 - 基于OST类型的差异化裁剪策略 """ logger.info("\n\n## 3. 统一视频裁剪(基于OST类型)") video_clip_result = clip_video.clip_video_unified( video_origin_path=params.video_origin_path, script_list=list_script, tts_results=tts_results ) tts_clip_result = {tts_result['_id']: tts_result['audio_file'] for tts_result in tts_results} subclip_clip_result = { tts_result['_id']: tts_result['subtitle_file'] for tts_result in tts_results } new_script_list = update_script.update_script_timestamps(list_script, video_clip_result, tts_clip_result, subclip_clip_result) logger.info(f"统一裁剪完成,处理了 {len(video_clip_result)} 个视频片段") sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=60) """ 4. 导出到剪映草稿 """ logger.info("\n\n## 4. 导出到剪映草稿") try: import pyJianYingDraft from pyJianYingDraft import DraftFolder, VideoSegment, AudioSegment, trange, TrackType jianying_draft_path = config.ui.get("jianying_draft_path", "") if not jianying_draft_path: raise ValueError("剪映草稿路径未配置") # 创建DraftFolder实例 draft_folder = DraftFolder(jianying_draft_path) # 使用从参数中获取的草稿名称,如果为空则使用默认名称 draft_name = getattr(params, 'draft_name', "") logger.debug(f"从params获取的草稿名称: '{draft_name}' (类型: {type(draft_name)})") if not draft_name: draft_name = f"NarratoAI_{int(time.time())}" logger.debug(f"使用默认草稿名称: '{draft_name}'") # 创建新草稿 script = draft_folder.create_draft(draft_name, 1920, 1080) # 添加视频轨道和音频轨道 script.add_track(TrackType.video, '视频轨道') script.add_track(TrackType.audio, '音频轨道') # 处理脚本数据 current_time = 0 output_dir = utils.task_dir(task_id) for item in new_script_list: # 获取时间信息 start_time = float(item.get('start_time', 0.0)) duration = float(item.get('duration', 0.0)) timestamp = item.get('timestamp', '') logger.info(f"处理片段: OST={item['OST']}, start_time={start_time}, duration={duration}, timestamp={timestamp}") # 生成音频文件路径 audio_file = "" if timestamp: timestamp_formatted = timestamp.replace(':', '_') audio_file = os.path.join( output_dir, f"audio_{timestamp_formatted}.mp3" ) # 检查是否有裁剪后的视频文件 video_file = item.get('video', '') if video_file and not os.path.exists(video_file): video_file = "" # 添加视频片段 if video_file: # 使用裁剪后的视频文件 # 对于裁剪后的视频,target_timerange的第二个参数是持续时间 video_segment = VideoSegment( video_file, trange(f"{current_time}s", f"{duration}s") ) else: # 使用原始视频文件 # source_timerange是从原始视频中截取的部分 # target_timerange是片段在时间轴上的位置 video_segment = VideoSegment( params.video_origin_path, trange(f"{current_time}s", f"{duration}s"), source_timerange=trange(f"{start_time}s", f"{duration}s") ) script.add_segment(video_segment, '视频轨道') # 处理音频 if item['OST'] in [0, 2]: # 需要TTS的片段 if os.path.exists(audio_file): # 使用ffprobe获取精确的音频时长,避免因TTS引擎差异导致时长不匹配 actual_audio_duration = get_audio_duration_ffprobe(audio_file) logger.info(f"音频文件实际时长: {actual_audio_duration:.6f}秒, 脚本时长(视频): {duration:.3f}秒") # 使用音频实际时长和视频时长中的较小值,确保不超过素材时长 # 当TTS语速调整时,音频可能比视频长或短,取较小值可以避免超出素材 safe_duration = min(actual_audio_duration, duration) logger.info(f"使用时长: {safe_duration:.6f}秒 (取音频和视频时长的较小值)") audio_segment = AudioSegment( audio_file, trange(f"{current_time}s", f"{safe_duration}s") ) script.add_segment(audio_segment, '音频轨道') else: logger.warning(f"音频文件不存在: {audio_file}") # OST=1的片段保留原声,不需要添加额外音频 # 更新当前时间 current_time += duration # 保存草稿 script.save() draft_path = os.path.join(jianying_draft_path, draft_name) logger.success(f"成功导出到剪映草稿: {draft_name}") logger.info(f"草稿已保存到: {draft_path}") # 更新任务状态 sm.state.update_task(task_id, state=const.TASK_STATE_COMPLETE, progress=100, draft_path=draft_path, draft_name=draft_name) return {"draft_path": draft_path, "draft_name": draft_name} except ImportError as e: logger.error(f"导入pyJianYingDraft失败: {e}") raise ImportError(f"pyJianYingDraft库导入失败: {e}\n请确保已正确安装该库") except Exception as e: logger.error(f"导出到剪映草稿失败: {e}") import traceback logger.error(f"错误详情: {traceback.format_exc()}") raise Exception(f"导出到剪映草稿失败: {e}")