refactor(video_pipeline): 重构视频处理管道

- 新增参数以支持更灵活的配置
-优化脚本保存逻辑,支持自定义脚本名称
-增加视频下载和脚本生成的条件判断,提高效率
- 异步生成最终视频,返回任务路径
- 更新示例使用新的管道配置
This commit is contained in:
linyq 2024-11-19 14:50:30 +08:00
parent b34d9fe14c
commit 38f23983ef

View File

@ -1,5 +1,6 @@
import requests
import json
import os
import time
from typing import Dict, Any
@ -52,7 +53,7 @@ class VideoPipeline:
return response.json()
def generate_final_video(self, task_id: str, video_path: str,
script_path: str, script: list, subclip_videos: Dict[str, str]) -> Dict[str, Any]:
script_path: str, script: list, subclip_videos: Dict[str, str], voice_name: str) -> Dict[str, Any]:
"""生成最终视频的第四步"""
endpoint = f"{self.base_url}/api/v2/scripts/start-subclip"
@ -62,7 +63,7 @@ class VideoPipeline:
"video_origin_path": video_path,
"video_aspect": "16:9",
"video_language": "zh-CN",
"voice_name": "zh-CN-YunjianNeural",
"voice_name": voice_name,
"voice_volume": 1,
"voice_rate": 1.2,
"voice_pitch": 1,
@ -92,10 +93,9 @@ class VideoPipeline:
response.raise_for_status()
return response.json()
def save_script_to_json(self, script: list) -> str:
def save_script_to_json(self, script: list, script_name: str) -> str:
"""保存脚本到json文件"""
timestamp = time.strftime("%Y-%m%d-%H%M%S")
script_path = f"E:\\projects\\NarratoAI\\resource\\scripts\\{timestamp}.json"
script_path = f"E:\\projects\\NarratoAI\\resource\\scripts\\{script_name}.json"
try:
with open(script_path, 'w', encoding='utf-8') as f:
@ -106,45 +106,55 @@ class VideoPipeline:
print(f"保存脚本失败: {str(e)}")
raise
def run_pipeline(self, youtube_url: str) -> Dict[str, Any]:
def run_pipeline(self, task_id: str, script_name: str, youtube_url: str, video_name: str="null", skip_seconds: int = 0, threshold: int = 30, vision_batch_size: int = 10, vision_llm_provider: str = "gemini", voice_name: str = "zh-CN-YunjianNeural") -> Dict[str, Any]:
"""运行完整的pipeline"""
try:
# 1. 下载视频
print("开始下载视频...")
download_result = self.download_video(youtube_url)
video_path = download_result["output_path"]
current_path = os.path.dirname(os.path.abspath(__file__))
video_path = os.path.join(current_path, "resource", "videos", video_name)
# 判断视频是否存在
if not os.path.exists(video_path):
# 1. 下载视频
print(f"视频不存在, 开始下载视频: {video_path}")
download_result = self.download_video(youtube_url=youtube_url, resolution="1080p", output_format="mp4", rename=video_name)
video_path = download_result["output_path"]
else:
print(f"视频已存在: {video_path}")
# 2. 生成脚本
print("开始生成脚本...")
script_result = self.generate_script(video_path)
script = script_result["script"]
# 2. 判断script_name是否存在
# 2.1.1 拼接脚本路径 NarratoAI/resource/scripts
script_path = os.path.join(current_path, "resource", "scripts", script_name)
if os.path.exists(script_path):
script = json.load(open(script_path, "r", encoding="utf-8"))
else:
# 2.1.2 生成脚本
print("开始生成脚本...")
script_result = self.generate_script(video_path=video_path, skip_seconds=skip_seconds, threshold=threshold, vision_batch_size=vision_batch_size, vision_llm_provider=vision_llm_provider)
script = script_result["script"]
# 2.1 保存脚本到json文件
# 2.2 保存脚本到json文件
print("保存脚本到json文件...")
script_path = self.save_script_to_json(script)
script_path = self.save_script_to_json(script, script_name)
script_result["script_path"] = script_path
# 3. 剪辑视频
print("开始剪辑视频...")
crop_result = self.crop_video(video_path, script)
crop_result = self.crop_video(video_path=video_path, script=script)
subclip_videos = crop_result["subclip_videos"]
# 4. 生成最终视频
print("开始生成最终视频...")
final_result = self.generate_final_video(
crop_result["task_id"],
video_path,
script_path,
script,
subclip_videos
task_id=task_id,
video_path=video_path,
script_path=script_path,
script=script,
subclip_videos=subclip_videos,
voice_name=voice_name
)
return {
"status": "success",
"download_result": download_result,
"script_result": script_result,
"crop_result": crop_result,
"final_result": final_result
"status": "等待异步生成视频",
"path": os.path.join(current_path, "storage", "tasks", task_id)
}
except Exception as e:
@ -153,10 +163,19 @@ class VideoPipeline:
"error": str(e)
}
# 使用示例
if __name__ == "__main__":
pipeline = VideoPipeline()
result = pipeline.run_pipeline("https://www.youtube.com/watch?v=Kenm35gdqtk")
print(json.dumps(result, indent=2, ensure_ascii=False))
result2 = pipeline.run_pipeline("https://www.youtube.com/watch?v=aEsHAcedzgw")
print(json.dumps(result2, indent=2, ensure_ascii=False))
result = pipeline.run_pipeline(
task_id="test_123",
script_name="test.json",
youtube_url="https://www.youtube.com/watch?v=Kenm35gdqtk",
video_name="test.mp4",
skip_seconds=0,
threshold=30,
vision_batch_size=10,
vision_llm_provider="gemini",
voice_name="zh-CN-YunjianNeural",
)
print(result)