feat(v2): 新增视频裁剪和YouTube视频下载功能

- 在 schema_v2.py 中添加了新的请求和响应模型
- 在 script.py 中实现了 /scripts/crop 和 /youtube/download 接口- 新增 video_service.py 和 youtube_service.py 服务模块
- 更新 utils.py 中的工具函数以支持新功能
This commit is contained in:
linyq 2024-11-18 17:38:30 +08:00
parent 8dd4b27fc3
commit bd1ce5c7b9
5 changed files with 304 additions and 7 deletions

View File

@ -1,18 +1,29 @@
from fastapi import APIRouter, BackgroundTasks
from loguru import logger
import os
from app.models.schema_v2 import GenerateScriptRequest, GenerateScriptResponse
from app.models.schema_v2 import (
GenerateScriptRequest,
GenerateScriptResponse,
CropVideoRequest,
CropVideoResponse,
DownloadVideoRequest,
DownloadVideoResponse
)
from app.services.script_service import ScriptGenerator
from app.services.video_service import VideoService
from app.utils import utils
from app.controllers.v2.base import v2_router
from app.models.schema import VideoClipParams
from app.services.youtube_service import YoutubeService
# router = APIRouter(prefix="/api/v2", tags=["Script Generation V2"])
router = v2_router()
@router.post(
"/scripts/generate",
response_model=GenerateScriptResponse,
summary="生成视频脚本 (V2)"
summary="同步请求;生成视频脚本 (V2)"
)
async def generate_script(
request: GenerateScriptRequest,
@ -42,4 +53,69 @@ async def generate_script(
except Exception as e:
logger.exception(f"Generate script failed: {str(e)}")
raise
raise
@router.post(
"/scripts/crop",
response_model=CropVideoResponse,
summary="同步请求;裁剪视频 (V2)"
)
async def crop_video(
request: CropVideoRequest,
background_tasks: BackgroundTasks
):
"""
根据脚本裁剪视频的V2版本API
"""
try:
# 调用视频裁剪服务
video_service = VideoService()
task_id, subclip_videos = await video_service.crop_video(
video_path=request.video_origin_path,
video_script=request.video_script
)
logger.debug(f"裁剪视频成功,视频片段路径: {subclip_videos}")
logger.debug(type(subclip_videos))
return {
"task_id": task_id,
"subclip_videos": subclip_videos
}
except Exception as e:
logger.exception(f"Crop video failed: {str(e)}")
raise
@router.post(
"/youtube/download",
response_model=DownloadVideoResponse,
summary="同步请求下载YouTube视频 (V2)"
)
async def download_youtube_video(
request: DownloadVideoRequest,
background_tasks: BackgroundTasks
):
"""
下载指定分辨率的YouTube视频
"""
try:
youtube_service = YoutubeService()
task_id, output_path, filename = await youtube_service.download_video(
url=request.url,
resolution=request.resolution,
output_format=request.output_format,
rename=request.rename
)
return {
"task_id": task_id,
"output_path": output_path,
"resolution": request.resolution,
"format": request.output_format,
"filename": filename
}
except Exception as e:
logger.exception(f"Download YouTube video failed: {str(e)}")
raise

View File

@ -1,6 +1,7 @@
from typing import Optional, List
from pydantic import BaseModel
class GenerateScriptRequest(BaseModel):
video_path: str
video_theme: Optional[str] = ""
@ -9,7 +10,33 @@ class GenerateScriptRequest(BaseModel):
threshold: Optional[int] = 30
vision_batch_size: Optional[int] = 5
vision_llm_provider: Optional[str] = "gemini"
class GenerateScriptResponse(BaseModel):
task_id: str
script: List[dict]
script: List[dict]
class CropVideoRequest(BaseModel):
video_origin_path: str
video_script: List[dict]
class CropVideoResponse(BaseModel):
task_id: str
subclip_videos: dict
class DownloadVideoRequest(BaseModel):
url: str
resolution: str
output_format: Optional[str] = "mp4"
rename: Optional[str] = None
class DownloadVideoResponse(BaseModel):
task_id: str
output_path: str
resolution: str
format: str
filename: str

View File

@ -0,0 +1,58 @@
import os
from uuid import uuid4
from loguru import logger
from typing import Dict, List, Optional, Tuple
from app.services import material
from app.models.schema import VideoClipParams
from app.utils import utils
class VideoService:
@staticmethod
async def crop_video(
video_path: str,
video_script: List[dict]
) -> Tuple[str, Dict[str, str]]:
"""
裁剪视频服务
Args:
video_path: 视频文件路径
video_script: 视频脚本列表
Returns:
Tuple[str, Dict[str, str]]: (task_id, 裁剪后的视频片段字典)
视频片段字典格式: {timestamp: video_path}
"""
try:
task_id = str(uuid4())
# 从脚本中提取时间戳列表
time_list = [scene['timestamp'] for scene in video_script]
# 调用裁剪服务
subclip_videos = material.clip_videos(
task_id=task_id,
timestamp_terms=time_list,
origin_video=video_path
)
if subclip_videos is None:
raise ValueError("裁剪视频失败")
# 更新脚本中的视频路径
for scene in video_script:
try:
scene['path'] = subclip_videos[scene['timestamp']]
except KeyError as err:
logger.error(f"更新视频路径失败: {err}")
logger.debug(f"裁剪视频成功,共生成 {len(time_list)} 个视频片段")
logger.debug(f"视频片段路径: {subclip_videos}")
return task_id, subclip_videos
except Exception as e:
logger.exception("裁剪视频失败")
raise

View File

@ -0,0 +1,135 @@
import yt_dlp
import os
from typing import List, Dict, Optional, Tuple
from loguru import logger
from uuid import uuid4
from app.utils import utils
class YoutubeService:
def __init__(self):
self.supported_formats = ['mp4', 'mkv', 'webm', 'flv', 'avi']
def _get_video_formats(self, url: str) -> List[Dict]:
"""获取视频可用的格式列表"""
ydl_opts = {
'quiet': True,
'no_warnings': True
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = info.get('formats', [])
format_list = []
for f in formats:
format_info = {
'format_id': f.get('format_id', 'N/A'),
'ext': f.get('ext', 'N/A'),
'resolution': f.get('format_note', 'N/A'),
'filesize': f.get('filesize', 'N/A'),
'vcodec': f.get('vcodec', 'N/A'),
'acodec': f.get('acodec', 'N/A')
}
format_list.append(format_info)
return format_list
except Exception as e:
logger.error(f"获取视频格式失败: {str(e)}")
raise
def _validate_format(self, output_format: str) -> None:
"""验证输出格式是否支持"""
if output_format.lower() not in self.supported_formats:
raise ValueError(
f"不支持的视频格式: {output_format}"
f"支持的格式: {', '.join(self.supported_formats)}"
)
async def download_video(
self,
url: str,
resolution: str,
output_format: str = 'mp4',
rename: Optional[str] = None
) -> Tuple[str, str, str]:
"""
下载指定分辨率的视频
Args:
url: YouTube视频URL
resolution: 目标分辨率 ('2160p', '1440p', '1080p', '720p' etc.)
output_format: 输出视频格式
rename: 可选的重命名
Returns:
Tuple[str, str, str]: (task_id, output_path, filename)
"""
try:
task_id = str(uuid4())
self._validate_format(output_format)
# 获取所有可用格式
formats = self._get_video_formats(url)
# 查找指定分辨率的最佳视频格式
target_format = None
for fmt in formats:
if fmt['resolution'] == resolution and fmt['vcodec'] != 'none':
target_format = fmt
break
if target_format is None:
available_resolutions = set(
fmt['resolution'] for fmt in formats
if fmt['resolution'] != 'N/A' and fmt['vcodec'] != 'none'
)
raise ValueError(
f"未找到 {resolution} 分辨率的视频。"
f"可用分辨率: {', '.join(sorted(available_resolutions))}"
)
# 创建输出目录
output_dir = utils.video_dir()
os.makedirs(output_dir, exist_ok=True)
# 设置下载选项
if rename:
# 如果指定了重命名,直接使用新名字
filename = f"{rename}.{output_format}"
output_template = os.path.join(output_dir, filename)
else:
# 否则使用任务ID和原标题
output_template = os.path.join(output_dir, f'{task_id}_%(title)s.%(ext)s')
ydl_opts = {
'format': f"{target_format['format_id']}+bestaudio[ext=m4a]/best",
'outtmpl': output_template,
'merge_output_format': output_format.lower(),
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': output_format.lower(),
}]
}
# 执行下载
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
if rename:
# 如果指定了重命名,使用新文件名
output_path = output_template
filename = os.path.basename(output_path)
else:
# 否则使用原始标题
video_title = info.get('title', task_id)
filename = f"{task_id}_{video_title}.{output_format}"
output_path = os.path.join(output_dir, filename)
logger.info(f"视频下载成功: {output_path}")
return task_id, output_path, filename
except Exception as e:
logger.exception("下载视频失败")
raise

View File

@ -31,4 +31,5 @@ python-dotenv~=1.0.1
openai~=1.53.0
tqdm>=4.66.6
tenacity>=9.0.0
tiktoken==0.8.0
tiktoken==0.8.0
yt-dlp==2024.11.18