mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-12 11:22:51 +00:00
- 修改支持的语音列表,仅保留中文语音 - 在主程序中添加代理配置环境变量 -优化剪辑视频函数,改为返回字典类型 - 更新任务服务中的剪辑视频函数,适应新的参数类型 - 修改测试用例中的视频剪辑函数,增加输出路径参数 - 更新脚本控制器中的剪辑视频函数,集成任务 ID 和子视频字典
163 lines
5.9 KiB
Python
163 lines
5.9 KiB
Python
import requests
|
|
import json
|
|
import time
|
|
from typing import Dict, Any
|
|
|
|
class VideoPipeline:
|
|
def __init__(self, base_url: str = "http://127.0.0.1:8080"):
|
|
self.base_url = base_url
|
|
|
|
def download_video(self, url: str, resolution: str = "1080p",
|
|
output_format: str = "mp4", rename: str = None) -> Dict[str, Any]:
|
|
"""下载视频的第一步"""
|
|
endpoint = f"{self.base_url}/api/v2/youtube/download"
|
|
payload = {
|
|
"url": url,
|
|
"resolution": resolution,
|
|
"output_format": output_format,
|
|
"rename": rename or time.strftime("%Y-%m-%d")
|
|
}
|
|
|
|
response = requests.post(endpoint, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def generate_script(self, video_path: str, skip_seconds: int = 0,
|
|
threshold: int = 30, vision_batch_size: int = 10,
|
|
vision_llm_provider: str = "gemini") -> Dict[str, Any]:
|
|
"""生成脚本的第二步"""
|
|
endpoint = f"{self.base_url}/api/v2/scripts/generate"
|
|
payload = {
|
|
"video_path": video_path,
|
|
"skip_seconds": skip_seconds,
|
|
"threshold": threshold,
|
|
"vision_batch_size": vision_batch_size,
|
|
"vision_llm_provider": vision_llm_provider
|
|
}
|
|
|
|
response = requests.post(endpoint, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def crop_video(self, video_path: str, script: list) -> Dict[str, Any]:
|
|
"""剪辑视频的第三步"""
|
|
endpoint = f"{self.base_url}/api/v2/scripts/crop"
|
|
payload = {
|
|
"video_origin_path": video_path,
|
|
"video_script": script
|
|
}
|
|
|
|
response = requests.post(endpoint, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def generate_final_video(self, task_id: str, video_path: str,
|
|
script_path: str, script: list, subclip_videos: Dict[str, str]) -> Dict[str, Any]:
|
|
"""生成最终视频的第四步"""
|
|
endpoint = f"{self.base_url}/api/v2/scripts/start-subclip"
|
|
|
|
request_data = {
|
|
"video_clip_json": script,
|
|
"video_clip_json_path": script_path,
|
|
"video_origin_path": video_path,
|
|
"video_aspect": "16:9",
|
|
"video_language": "zh-CN",
|
|
"voice_name": "zh-CN-YunjianNeural",
|
|
"voice_volume": 1,
|
|
"voice_rate": 1.2,
|
|
"voice_pitch": 1,
|
|
"bgm_name": "random",
|
|
"bgm_type": "random",
|
|
"bgm_file": "",
|
|
"bgm_volume": 0.3,
|
|
"subtitle_enabled": True,
|
|
"subtitle_position": "bottom",
|
|
"font_name": "STHeitiMedium.ttc",
|
|
"text_fore_color": "#FFFFFF",
|
|
"text_background_color": "transparent",
|
|
"font_size": 75,
|
|
"stroke_color": "#000000",
|
|
"stroke_width": 1.5,
|
|
"custom_position": 70,
|
|
"n_threads": 8
|
|
}
|
|
|
|
payload = {
|
|
"request": request_data,
|
|
"subclip_videos": subclip_videos
|
|
}
|
|
|
|
params = {"task_id": task_id}
|
|
response = requests.post(endpoint, params=params, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
def save_script_to_json(self, script: list) -> str:
|
|
"""保存脚本到json文件"""
|
|
timestamp = time.strftime("%Y-%m%d-%H%M%S")
|
|
script_path = f"E:\\projects\\NarratoAI\\resource\\scripts\\{timestamp}.json"
|
|
|
|
try:
|
|
with open(script_path, 'w', encoding='utf-8') as f:
|
|
json.dump(script, f, ensure_ascii=False, indent=2)
|
|
print(f"脚本已保存到: {script_path}")
|
|
return script_path
|
|
except Exception as e:
|
|
print(f"保存脚本失败: {str(e)}")
|
|
raise
|
|
|
|
def run_pipeline(self, youtube_url: str) -> Dict[str, Any]:
|
|
"""运行完整的pipeline"""
|
|
try:
|
|
# 1. 下载视频
|
|
print("开始下载视频...")
|
|
download_result = self.download_video(youtube_url)
|
|
video_path = download_result["output_path"]
|
|
|
|
# 2. 生成脚本
|
|
print("开始生成脚本...")
|
|
script_result = self.generate_script(video_path)
|
|
script = script_result["script"]
|
|
|
|
# 2.1 保存脚本到json文件
|
|
print("保存脚本到json文件...")
|
|
script_path = self.save_script_to_json(script)
|
|
script_result["script_path"] = script_path
|
|
|
|
# 3. 剪辑视频
|
|
print("开始剪辑视频...")
|
|
crop_result = self.crop_video(video_path, script)
|
|
subclip_videos = crop_result["subclip_videos"]
|
|
|
|
# 4. 生成最终视频
|
|
print("开始生成最终视频...")
|
|
final_result = self.generate_final_video(
|
|
crop_result["task_id"],
|
|
video_path,
|
|
script_path,
|
|
script,
|
|
subclip_videos
|
|
)
|
|
|
|
return {
|
|
"status": "success",
|
|
"download_result": download_result,
|
|
"script_result": script_result,
|
|
"crop_result": crop_result,
|
|
"final_result": final_result
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error": str(e)
|
|
}
|
|
|
|
# 使用示例
|
|
if __name__ == "__main__":
|
|
pipeline = VideoPipeline()
|
|
result = pipeline.run_pipeline("https://www.youtube.com/watch?v=Kenm35gdqtk")
|
|
print(json.dumps(result, indent=2, ensure_ascii=False))
|
|
result2 = pipeline.run_pipeline("https://www.youtube.com/watch?v=aEsHAcedzgw")
|
|
print(json.dumps(result2, indent=2, ensure_ascii=False))
|