diff --git a/app/controllers/v2/script.py b/app/controllers/v2/script.py index 85f4238..94a337b 100644 --- a/app/controllers/v2/script.py +++ b/app/controllers/v2/script.py @@ -1,18 +1,29 @@ from fastapi import APIRouter, BackgroundTasks from loguru import logger +import os -from app.models.schema_v2 import GenerateScriptRequest, GenerateScriptResponse +from app.models.schema_v2 import ( + GenerateScriptRequest, + GenerateScriptResponse, + CropVideoRequest, + CropVideoResponse, + DownloadVideoRequest, + DownloadVideoResponse +) from app.services.script_service import ScriptGenerator +from app.services.video_service import VideoService from app.utils import utils from app.controllers.v2.base import v2_router +from app.models.schema import VideoClipParams +from app.services.youtube_service import YoutubeService -# router = APIRouter(prefix="/api/v2", tags=["Script Generation V2"]) router = v2_router() + @router.post( "/scripts/generate", response_model=GenerateScriptResponse, - summary="生成视频脚本 (V2)" + summary="同步请求;生成视频脚本 (V2)" ) async def generate_script( request: GenerateScriptRequest, @@ -42,4 +53,69 @@ async def generate_script( except Exception as e: logger.exception(f"Generate script failed: {str(e)}") - raise \ No newline at end of file + raise + + +@router.post( + "/scripts/crop", + response_model=CropVideoResponse, + summary="同步请求;裁剪视频 (V2)" +) +async def crop_video( + request: CropVideoRequest, + background_tasks: BackgroundTasks +): + """ + 根据脚本裁剪视频的V2版本API + """ + try: + # 调用视频裁剪服务 + video_service = VideoService() + task_id, subclip_videos = await video_service.crop_video( + video_path=request.video_origin_path, + video_script=request.video_script + ) + logger.debug(f"裁剪视频成功,视频片段路径: {subclip_videos}") + logger.debug(type(subclip_videos)) + return { + "task_id": task_id, + "subclip_videos": subclip_videos + } + + except Exception as e: + logger.exception(f"Crop video failed: {str(e)}") + raise + + +@router.post( + "/youtube/download", + response_model=DownloadVideoResponse, + summary="同步请求;下载YouTube视频 (V2)" +) +async def download_youtube_video( + request: DownloadVideoRequest, + background_tasks: BackgroundTasks +): + """ + 下载指定分辨率的YouTube视频 + """ + try: + youtube_service = YoutubeService() + task_id, output_path, filename = await youtube_service.download_video( + url=request.url, + resolution=request.resolution, + output_format=request.output_format, + rename=request.rename + ) + + return { + "task_id": task_id, + "output_path": output_path, + "resolution": request.resolution, + "format": request.output_format, + "filename": filename + } + + except Exception as e: + logger.exception(f"Download YouTube video failed: {str(e)}") + raise diff --git a/app/models/schema_v2.py b/app/models/schema_v2.py index 786c018..9894d89 100644 --- a/app/models/schema_v2.py +++ b/app/models/schema_v2.py @@ -1,6 +1,7 @@ from typing import Optional, List from pydantic import BaseModel + class GenerateScriptRequest(BaseModel): video_path: str video_theme: Optional[str] = "" @@ -9,7 +10,33 @@ class GenerateScriptRequest(BaseModel): threshold: Optional[int] = 30 vision_batch_size: Optional[int] = 5 vision_llm_provider: Optional[str] = "gemini" - + + class GenerateScriptResponse(BaseModel): task_id: str - script: List[dict] \ No newline at end of file + script: List[dict] + + +class CropVideoRequest(BaseModel): + video_origin_path: str + video_script: List[dict] + + +class CropVideoResponse(BaseModel): + task_id: str + subclip_videos: dict + + +class DownloadVideoRequest(BaseModel): + url: str + resolution: str + output_format: Optional[str] = "mp4" + rename: Optional[str] = None + + +class DownloadVideoResponse(BaseModel): + task_id: str + output_path: str + resolution: str + format: str + filename: str diff --git a/app/services/video_service.py b/app/services/video_service.py new file mode 100644 index 0000000..2a0a9a6 --- /dev/null +++ b/app/services/video_service.py @@ -0,0 +1,58 @@ +import os +from uuid import uuid4 +from loguru import logger +from typing import Dict, List, Optional, Tuple + +from app.services import material +from app.models.schema import VideoClipParams +from app.utils import utils + + +class VideoService: + @staticmethod + async def crop_video( + video_path: str, + video_script: List[dict] + ) -> Tuple[str, Dict[str, str]]: + """ + 裁剪视频服务 + + Args: + video_path: 视频文件路径 + video_script: 视频脚本列表 + + Returns: + Tuple[str, Dict[str, str]]: (task_id, 裁剪后的视频片段字典) + 视频片段字典格式: {timestamp: video_path} + """ + try: + task_id = str(uuid4()) + + # 从脚本中提取时间戳列表 + time_list = [scene['timestamp'] for scene in video_script] + + # 调用裁剪服务 + subclip_videos = material.clip_videos( + task_id=task_id, + timestamp_terms=time_list, + origin_video=video_path + ) + + if subclip_videos is None: + raise ValueError("裁剪视频失败") + + # 更新脚本中的视频路径 + for scene in video_script: + try: + scene['path'] = subclip_videos[scene['timestamp']] + except KeyError as err: + logger.error(f"更新视频路径失败: {err}") + + logger.debug(f"裁剪视频成功,共生成 {len(time_list)} 个视频片段") + logger.debug(f"视频片段路径: {subclip_videos}") + + return task_id, subclip_videos + + except Exception as e: + logger.exception("裁剪视频失败") + raise \ No newline at end of file diff --git a/app/services/youtube_service.py b/app/services/youtube_service.py new file mode 100644 index 0000000..d478198 --- /dev/null +++ b/app/services/youtube_service.py @@ -0,0 +1,135 @@ +import yt_dlp +import os +from typing import List, Dict, Optional, Tuple +from loguru import logger +from uuid import uuid4 + +from app.utils import utils + + +class YoutubeService: + def __init__(self): + self.supported_formats = ['mp4', 'mkv', 'webm', 'flv', 'avi'] + + def _get_video_formats(self, url: str) -> List[Dict]: + """获取视频可用的格式列表""" + ydl_opts = { + 'quiet': True, + 'no_warnings': True + } + + try: + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + formats = info.get('formats', []) + + format_list = [] + for f in formats: + format_info = { + 'format_id': f.get('format_id', 'N/A'), + 'ext': f.get('ext', 'N/A'), + 'resolution': f.get('format_note', 'N/A'), + 'filesize': f.get('filesize', 'N/A'), + 'vcodec': f.get('vcodec', 'N/A'), + 'acodec': f.get('acodec', 'N/A') + } + format_list.append(format_info) + + return format_list + except Exception as e: + logger.error(f"获取视频格式失败: {str(e)}") + raise + + def _validate_format(self, output_format: str) -> None: + """验证输出格式是否支持""" + if output_format.lower() not in self.supported_formats: + raise ValueError( + f"不支持的视频格式: {output_format}。" + f"支持的格式: {', '.join(self.supported_formats)}" + ) + + async def download_video( + self, + url: str, + resolution: str, + output_format: str = 'mp4', + rename: Optional[str] = None + ) -> Tuple[str, str, str]: + """ + 下载指定分辨率的视频 + + Args: + url: YouTube视频URL + resolution: 目标分辨率 ('2160p', '1440p', '1080p', '720p' etc.) + output_format: 输出视频格式 + rename: 可选的重命名 + + Returns: + Tuple[str, str, str]: (task_id, output_path, filename) + """ + try: + task_id = str(uuid4()) + self._validate_format(output_format) + + # 获取所有可用格式 + formats = self._get_video_formats(url) + + # 查找指定分辨率的最佳视频格式 + target_format = None + for fmt in formats: + if fmt['resolution'] == resolution and fmt['vcodec'] != 'none': + target_format = fmt + break + + if target_format is None: + available_resolutions = set( + fmt['resolution'] for fmt in formats + if fmt['resolution'] != 'N/A' and fmt['vcodec'] != 'none' + ) + raise ValueError( + f"未找到 {resolution} 分辨率的视频。" + f"可用分辨率: {', '.join(sorted(available_resolutions))}" + ) + + # 创建输出目录 + output_dir = utils.video_dir() + os.makedirs(output_dir, exist_ok=True) + + # 设置下载选项 + if rename: + # 如果指定了重命名,直接使用新名字 + filename = f"{rename}.{output_format}" + output_template = os.path.join(output_dir, filename) + else: + # 否则使用任务ID和原标题 + output_template = os.path.join(output_dir, f'{task_id}_%(title)s.%(ext)s') + + ydl_opts = { + 'format': f"{target_format['format_id']}+bestaudio[ext=m4a]/best", + 'outtmpl': output_template, + 'merge_output_format': output_format.lower(), + 'postprocessors': [{ + 'key': 'FFmpegVideoConvertor', + 'preferedformat': output_format.lower(), + }] + } + + # 执行下载 + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=True) + if rename: + # 如果指定了重命名,使用新文件名 + output_path = output_template + filename = os.path.basename(output_path) + else: + # 否则使用原始标题 + video_title = info.get('title', task_id) + filename = f"{task_id}_{video_title}.{output_format}" + output_path = os.path.join(output_dir, filename) + + logger.info(f"视频下载成功: {output_path}") + return task_id, output_path, filename + + except Exception as e: + logger.exception("下载视频失败") + raise diff --git a/requirements.txt b/requirements.txt index 2ae1f29..3024e71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -31,4 +31,5 @@ python-dotenv~=1.0.1 openai~=1.53.0 tqdm>=4.66.6 tenacity>=9.0.0 -tiktoken==0.8.0 \ No newline at end of file +tiktoken==0.8.0 +yt-dlp==2024.11.18