refactor: 移除未使用的代码文件和端口配置

清理未使用的控制器、测试文件和模型定义
移除Dockerfile中未使用的8080端口暴露
删除requirements.txt中的注释依赖
This commit is contained in:
linyq 2025-08-16 01:16:05 +08:00
parent e9d0c013ef
commit 1fba4414aa
21 changed files with 2 additions and 1647 deletions

View File

@ -55,7 +55,7 @@ RUN git lfs install
COPY . .
# 暴露端口
EXPOSE 8501 8080
EXPOSE 8501
# 使用脚本作为入口点
COPY docker-entrypoint.sh /usr/local/bin/

View File

@ -1,90 +0,0 @@
"""Application implementation - ASGI."""
import os
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from loguru import logger
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from app.config import config
from app.models.exception import HttpException
from app.router import root_api_router
from app.utils import utils
from app.utils import ffmpeg_utils
def exception_handler(request: Request, e: HttpException):
return JSONResponse(
status_code=e.status_code,
content=utils.get_response(e.status_code, e.data, e.message),
)
def validation_exception_handler(request: Request, e: RequestValidationError):
return JSONResponse(
status_code=400,
content=utils.get_response(
status=400, data=e.errors(), message="field required"
),
)
def get_application() -> FastAPI:
"""Initialize FastAPI application.
Returns:
FastAPI: Application object instance.
"""
instance = FastAPI(
title=config.project_name,
description=config.project_description,
version=config.project_version,
debug=False,
)
instance.include_router(root_api_router)
instance.add_exception_handler(HttpException, exception_handler)
instance.add_exception_handler(RequestValidationError, validation_exception_handler)
return instance
app = get_application()
# Configures the CORS middleware for the FastAPI app
cors_allowed_origins_str = os.getenv("CORS_ALLOWED_ORIGINS", "")
origins = cors_allowed_origins_str.split(",") if cors_allowed_origins_str else ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
task_dir = utils.task_dir()
app.mount(
"/tasks", StaticFiles(directory=task_dir, html=True, follow_symlink=True), name=""
)
public_dir = utils.public_dir()
app.mount("/", StaticFiles(directory=public_dir, html=True), name="")
@app.on_event("shutdown")
def shutdown_event():
logger.info("shutdown event")
@app.on_event("startup")
def startup_event():
logger.info("startup event")
# 检测FFmpeg硬件加速
hwaccel_info = ffmpeg_utils.detect_hardware_acceleration()
if hwaccel_info["available"]:
logger.info(f"FFmpeg硬件加速检测结果: 可用 | 类型: {hwaccel_info['type']} | 编码器: {hwaccel_info['encoder']} | 独立显卡: {hwaccel_info['is_dedicated_gpu']} | 参数: {hwaccel_info['hwaccel_args']}")
else:
logger.warning(f"FFmpeg硬件加速不可用: {hwaccel_info['message']}, 将使用CPU软件编码")

View File

@ -1,31 +0,0 @@
from uuid import uuid4
from fastapi import Request
from app.config import config
from app.models.exception import HttpException
def get_task_id(request: Request):
task_id = request.headers.get("x-task-id")
if not task_id:
task_id = uuid4()
return str(task_id)
def get_api_key(request: Request):
api_key = request.headers.get("x-api-key")
return api_key
def verify_token(request: Request):
token = get_api_key(request)
if token != config.app.get("api_key", ""):
request_id = get_task_id(request)
request_url = request.url
user_agent = request.headers.get("user-agent")
raise HttpException(
task_id=request_id,
status_code=401,
message=f"invalid token: {request_url}, {user_agent}",
)

View File

@ -1,64 +0,0 @@
import threading
from typing import Callable, Any, Dict
class TaskManager:
def __init__(self, max_concurrent_tasks: int):
self.max_concurrent_tasks = max_concurrent_tasks
self.current_tasks = 0
self.lock = threading.Lock()
self.queue = self.create_queue()
def create_queue(self):
raise NotImplementedError()
def add_task(self, func: Callable, *args: Any, **kwargs: Any):
with self.lock:
if self.current_tasks < self.max_concurrent_tasks:
print(f"add task: {func.__name__}, current_tasks: {self.current_tasks}")
self.execute_task(func, *args, **kwargs)
else:
print(
f"enqueue task: {func.__name__}, current_tasks: {self.current_tasks}"
)
self.enqueue({"func": func, "args": args, "kwargs": kwargs})
def execute_task(self, func: Callable, *args: Any, **kwargs: Any):
thread = threading.Thread(
target=self.run_task, args=(func, *args), kwargs=kwargs
)
thread.start()
def run_task(self, func: Callable, *args: Any, **kwargs: Any):
try:
with self.lock:
self.current_tasks += 1
func(*args, **kwargs) # 在这里调用函数,传递*args和**kwargs
finally:
self.task_done()
def check_queue(self):
with self.lock:
if (
self.current_tasks < self.max_concurrent_tasks
and not self.is_queue_empty()
):
task_info = self.dequeue()
func = task_info["func"]
args = task_info.get("args", ())
kwargs = task_info.get("kwargs", {})
self.execute_task(func, *args, **kwargs)
def task_done(self):
with self.lock:
self.current_tasks -= 1
self.check_queue()
def enqueue(self, task: Dict):
raise NotImplementedError()
def dequeue(self):
raise NotImplementedError()
def is_queue_empty(self):
raise NotImplementedError()

View File

@ -1,18 +0,0 @@
from queue import Queue
from typing import Dict
from app.controllers.manager.base_manager import TaskManager
class InMemoryTaskManager(TaskManager):
def create_queue(self):
return Queue()
def enqueue(self, task: Dict):
self.queue.put(task)
def dequeue(self):
return self.queue.get()
def is_queue_empty(self):
return self.queue.empty()

View File

@ -1,56 +0,0 @@
import json
from typing import Dict
import redis
from app.controllers.manager.base_manager import TaskManager
from app.models.schema import VideoParams
from app.services import task as tm
FUNC_MAP = {
"start": tm.start,
# 'start_test': tm.start_test
}
class RedisTaskManager(TaskManager):
def __init__(self, max_concurrent_tasks: int, redis_url: str):
self.redis_client = redis.Redis.from_url(redis_url)
super().__init__(max_concurrent_tasks)
def create_queue(self):
return "task_queue"
def enqueue(self, task: Dict):
task_with_serializable_params = task.copy()
if "params" in task["kwargs"] and isinstance(
task["kwargs"]["params"], VideoParams
):
task_with_serializable_params["kwargs"]["params"] = task["kwargs"][
"params"
].dict()
# 将函数对象转换为其名称
task_with_serializable_params["func"] = task["func"].__name__
self.redis_client.rpush(self.queue, json.dumps(task_with_serializable_params))
def dequeue(self):
task_json = self.redis_client.lpop(self.queue)
if task_json:
task_info = json.loads(task_json)
# 将函数名称转换回函数对象
task_info["func"] = FUNC_MAP[task_info["func"]]
if "params" in task_info["kwargs"] and isinstance(
task_info["kwargs"]["params"], dict
):
task_info["kwargs"]["params"] = VideoParams(
**task_info["kwargs"]["params"]
)
return task_info
return None
def is_queue_empty(self):
return self.redis_client.llen(self.queue) == 0

View File

@ -1,14 +0,0 @@
from fastapi import APIRouter
from fastapi import Request
router = APIRouter()
@router.get(
"/ping",
tags=["Health Check"],
description="检查服务可用性",
response_description="pong",
)
def ping(request: Request) -> str:
return "pong"

View File

@ -1,11 +0,0 @@
from fastapi import APIRouter, Depends
def new_router(dependencies=None):
router = APIRouter()
router.tags = ["V1"]
router.prefix = "/api/v1"
# 将认证依赖项应用于所有路由
if dependencies:
router.dependencies = dependencies
return router

View File

@ -1,93 +0,0 @@
from fastapi import Request, File, UploadFile
import os
from app.controllers.v1.base import new_router
from app.models.schema import (
VideoScriptResponse,
VideoScriptRequest,
VideoTermsResponse,
VideoTermsRequest,
VideoTranscriptionRequest,
VideoTranscriptionResponse,
)
from app.services import llm
from app.utils import utils
from app.config import config
# 认证依赖项
# router = new_router(dependencies=[Depends(base.verify_token)])
router = new_router()
# 定义上传目录
UPLOAD_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "uploads")
@router.post(
"/scripts",
response_model=VideoScriptResponse,
summary="Create a script for the video",
)
def generate_video_script(request: Request, body: VideoScriptRequest):
video_script = llm.generate_script(
video_subject=body.video_subject,
language=body.video_language,
paragraph_number=body.paragraph_number,
)
response = {"video_script": video_script}
return utils.get_response(200, response)
@router.post(
"/terms",
response_model=VideoTermsResponse,
summary="Generate video terms based on the video script",
)
def generate_video_terms(request: Request, body: VideoTermsRequest):
video_terms = llm.generate_terms(
video_subject=body.video_subject,
video_script=body.video_script,
amount=body.amount,
)
response = {"video_terms": video_terms}
return utils.get_response(200, response)
@router.post(
"/transcription",
response_model=VideoTranscriptionResponse,
summary="Transcribe video content using Gemini"
)
async def transcribe_video(
request: Request,
video_name: str,
language: str = "zh-CN",
video_file: UploadFile = File(...)
):
"""
使用 Gemini 转录视频内容,包括时间戳画面描述和语音内容
Args:
video_name: 视频名称
language: 语言代码,默认zh-CN
video_file: 上传的视频文件
"""
# 创建临时目录用于存储上传的视频
os.makedirs(UPLOAD_DIR, exist_ok=True)
# 保存上传的视频文件
video_path = os.path.join(UPLOAD_DIR, video_file.filename)
with open(video_path, "wb") as buffer:
content = await video_file.read()
buffer.write(content)
try:
transcription = llm.gemini_video_transcription(
video_name=video_name,
video_path=video_path,
language=language,
llm_provider_video=config.app.get("video_llm_provider", "gemini")
)
response = {"transcription": transcription}
return utils.get_response(200, response)
finally:
# 处理完成后删除临时文件
if os.path.exists(video_path):
os.remove(video_path)

View File

@ -1,271 +0,0 @@
import glob
import os
import pathlib
import shutil
from typing import Union
from fastapi import BackgroundTasks, Depends, Path, Request, UploadFile
from fastapi.params import File
from fastapi.responses import FileResponse, StreamingResponse
from loguru import logger
from app.config import config
from app.controllers import base
from app.controllers.manager.memory_manager import InMemoryTaskManager
from app.controllers.manager.redis_manager import RedisTaskManager
from app.controllers.v1.base import new_router
from app.models.exception import HttpException
from app.models.schema import (
AudioRequest,
BgmRetrieveResponse,
BgmUploadResponse,
SubtitleRequest,
TaskDeletionResponse,
TaskQueryRequest,
TaskQueryResponse,
TaskResponse,
TaskVideoRequest,
)
from app.services import state as sm
from app.services import task as tm
from app.utils import utils
# 认证依赖项
# router = new_router(dependencies=[Depends(base.verify_token)])
router = new_router()
_enable_redis = config.app.get("enable_redis", False)
_redis_host = config.app.get("redis_host", "localhost")
_redis_port = config.app.get("redis_port", 6379)
_redis_db = config.app.get("redis_db", 0)
_redis_password = config.app.get("redis_password", None)
_max_concurrent_tasks = config.app.get("max_concurrent_tasks", 5)
redis_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/{_redis_db}"
# 根据配置选择合适的任务管理器
if _enable_redis:
task_manager = RedisTaskManager(
max_concurrent_tasks=_max_concurrent_tasks, redis_url=redis_url
)
else:
task_manager = InMemoryTaskManager(max_concurrent_tasks=_max_concurrent_tasks)
@router.post("/videos", response_model=TaskResponse, summary="Generate a short video")
def create_video(
background_tasks: BackgroundTasks, request: Request, body: TaskVideoRequest
):
return create_task(request, body, stop_at="video")
@router.post("/subtitle", response_model=TaskResponse, summary="Generate subtitle only")
def create_subtitle(
background_tasks: BackgroundTasks, request: Request, body: SubtitleRequest
):
return create_task(request, body, stop_at="subtitle")
@router.post("/audio", response_model=TaskResponse, summary="Generate audio only")
def create_audio(
background_tasks: BackgroundTasks, request: Request, body: AudioRequest
):
return create_task(request, body, stop_at="audio")
def create_task(
request: Request,
body: Union[TaskVideoRequest, SubtitleRequest, AudioRequest],
stop_at: str,
):
task_id = utils.get_uuid()
request_id = base.get_task_id(request)
try:
task = {
"task_id": task_id,
"request_id": request_id,
"params": body.model_dump(),
}
sm.state.update_task(task_id)
task_manager.add_task(tm.start, task_id=task_id, params=body, stop_at=stop_at)
logger.success(f"Task created: {utils.to_json(task)}")
return utils.get_response(200, task)
except ValueError as e:
raise HttpException(
task_id=task_id, status_code=400, message=f"{request_id}: {str(e)}"
)
@router.get(
"/tasks/{task_id}", response_model=TaskQueryResponse, summary="Query task status"
)
def get_task(
request: Request,
task_id: str = Path(..., description="Task ID"),
query: TaskQueryRequest = Depends(),
):
endpoint = config.app.get("endpoint", "")
if not endpoint:
endpoint = str(request.base_url)
endpoint = endpoint.rstrip("/")
request_id = base.get_task_id(request)
task = sm.state.get_task(task_id)
if task:
task_dir = utils.task_dir()
def file_to_uri(file):
if not file.startswith(endpoint):
_uri_path = v.replace(task_dir, "tasks").replace("\\", "/")
_uri_path = f"{endpoint}/{_uri_path}"
else:
_uri_path = file
return _uri_path
if "videos" in task:
videos = task["videos"]
urls = []
for v in videos:
urls.append(file_to_uri(v))
task["videos"] = urls
if "combined_videos" in task:
combined_videos = task["combined_videos"]
urls = []
for v in combined_videos:
urls.append(file_to_uri(v))
task["combined_videos"] = urls
return utils.get_response(200, task)
raise HttpException(
task_id=task_id, status_code=404, message=f"{request_id}: task not found"
)
@router.delete(
"/tasks/{task_id}",
response_model=TaskDeletionResponse,
summary="Delete a generated short video task",
)
def delete_video(request: Request, task_id: str = Path(..., description="Task ID")):
request_id = base.get_task_id(request)
task = sm.state.get_task(task_id)
if task:
tasks_dir = utils.task_dir()
current_task_dir = os.path.join(tasks_dir, task_id)
if os.path.exists(current_task_dir):
shutil.rmtree(current_task_dir)
sm.state.delete_task(task_id)
logger.success(f"video deleted: {utils.to_json(task)}")
return utils.get_response(200)
raise HttpException(
task_id=task_id, status_code=404, message=f"{request_id}: task not found"
)
# @router.get(
# "/musics", response_model=BgmRetrieveResponse, summary="Retrieve local BGM files"
# )
# def get_bgm_list(request: Request):
# suffix = "*.mp3"
# song_dir = utils.song_dir()
# files = glob.glob(os.path.join(song_dir, suffix))
# bgm_list = []
# for file in files:
# bgm_list.append(
# {
# "name": os.path.basename(file),
# "size": os.path.getsize(file),
# "file": file,
# }
# )
# response = {"files": bgm_list}
# return utils.get_response(200, response)
#
# @router.post(
# "/musics",
# response_model=BgmUploadResponse,
# summary="Upload the BGM file to the songs directory",
# )
# def upload_bgm_file(request: Request, file: UploadFile = File(...)):
# request_id = base.get_task_id(request)
# # check file ext
# if file.filename.endswith("mp3"):
# song_dir = utils.song_dir()
# save_path = os.path.join(song_dir, file.filename)
# # save file
# with open(save_path, "wb+") as buffer:
# # If the file already exists, it will be overwritten
# file.file.seek(0)
# buffer.write(file.file.read())
# response = {"file": save_path}
# return utils.get_response(200, response)
#
# raise HttpException(
# "", status_code=400, message=f"{request_id}: Only *.mp3 files can be uploaded"
# )
#
#
# @router.get("/stream/{file_path:path}")
# async def stream_video(request: Request, file_path: str):
# tasks_dir = utils.task_dir()
# video_path = os.path.join(tasks_dir, file_path)
# range_header = request.headers.get("Range")
# video_size = os.path.getsize(video_path)
# start, end = 0, video_size - 1
#
# length = video_size
# if range_header:
# range_ = range_header.split("bytes=")[1]
# start, end = [int(part) if part else None for part in range_.split("-")]
# if start is None:
# start = video_size - end
# end = video_size - 1
# if end is None:
# end = video_size - 1
# length = end - start + 1
#
# def file_iterator(file_path, offset=0, bytes_to_read=None):
# with open(file_path, "rb") as f:
# f.seek(offset, os.SEEK_SET)
# remaining = bytes_to_read or video_size
# while remaining > 0:
# bytes_to_read = min(4096, remaining)
# data = f.read(bytes_to_read)
# if not data:
# break
# remaining -= len(data)
# yield data
#
# response = StreamingResponse(
# file_iterator(video_path, start, length), media_type="video/mp4"
# )
# response.headers["Content-Range"] = f"bytes {start}-{end}/{video_size}"
# response.headers["Accept-Ranges"] = "bytes"
# response.headers["Content-Length"] = str(length)
# response.status_code = 206 # Partial Content
#
# return response
#
#
# @router.get("/download/{file_path:path}")
# async def download_video(_: Request, file_path: str):
# """
# download video
# :param _: Request request
# :param file_path: video file path, eg: /cd1727ed-3473-42a2-a7da-4faafafec72b/final-1.mp4
# :return: video file
# """
# tasks_dir = utils.task_dir()
# video_path = os.path.join(tasks_dir, file_path)
# file_path = pathlib.Path(video_path)
# filename = file_path.stem
# extension = file_path.suffix
# headers = {"Content-Disposition": f"attachment; filename={filename}{extension}"}
# return FileResponse(
# path=video_path,
# headers=headers,
# filename=f"{filename}{extension}",
# media_type=f"video/{extension[1:]}",
# )

View File

@ -1,11 +0,0 @@
from fastapi import APIRouter, Depends
def v2_router(dependencies=None):
router = APIRouter()
router.tags = ["V2"]
router.prefix = "/api/v2"
# 将认证依赖项应用于所有路由
if dependencies:
router.dependencies = dependencies
return router

View File

@ -1,170 +0,0 @@
from fastapi import APIRouter, BackgroundTasks
from loguru import logger
import os
from app.models.schema_v2 import (
GenerateScriptRequest,
GenerateScriptResponse,
CropVideoRequest,
CropVideoResponse,
DownloadVideoRequest,
DownloadVideoResponse,
StartSubclipRequest,
StartSubclipResponse
)
from app.models.schema import VideoClipParams
from app.services.script_service import ScriptGenerator
from app.services.video_service import VideoService
from app.utils import utils
from app.controllers.v2.base import v2_router
from app.models.schema import VideoClipParams
from app.services.youtube_service import YoutubeService
from app.services import task as task_service
router = v2_router()
@router.post(
"/scripts/generate",
response_model=GenerateScriptResponse,
summary="同步请求;生成视频脚本 (V2)"
)
async def generate_script(
request: GenerateScriptRequest,
background_tasks: BackgroundTasks
):
"""
生成视频脚本的V2版本API
"""
task_id = utils.get_uuid()
try:
generator = ScriptGenerator()
script = await generator.generate_script(
video_path=request.video_path,
video_theme=request.video_theme,
custom_prompt=request.custom_prompt,
skip_seconds=request.skip_seconds,
threshold=request.threshold,
vision_batch_size=request.vision_batch_size,
vision_llm_provider=request.vision_llm_provider
)
return {
"task_id": task_id,
"script": script
}
except Exception as e:
logger.exception(f"Generate script failed: {str(e)}")
raise
@router.post(
"/scripts/crop",
response_model=CropVideoResponse,
summary="同步请求;裁剪视频 (V2)"
)
async def crop_video(
request: CropVideoRequest,
background_tasks: BackgroundTasks
):
"""
根据脚本裁剪视频的V2版本API
"""
try:
# 调用视频裁剪服务
video_service = VideoService()
task_id, subclip_videos = await video_service.crop_video(
video_path=request.video_origin_path,
video_script=request.video_script
)
logger.debug(f"裁剪视频成功,视频片段路径: {subclip_videos}")
logger.debug(type(subclip_videos))
return {
"task_id": task_id,
"subclip_videos": subclip_videos
}
except Exception as e:
logger.exception(f"Crop video failed: {str(e)}")
raise
@router.post(
"/youtube/download",
response_model=DownloadVideoResponse,
summary="同步请求下载YouTube视频 (V2)"
)
async def download_youtube_video(
request: DownloadVideoRequest,
background_tasks: BackgroundTasks
):
"""
下载指定分辨率的YouTube视频
"""
try:
youtube_service = YoutubeService()
task_id, output_path, filename = await youtube_service.download_video(
url=request.url,
resolution=request.resolution,
output_format=request.output_format,
rename=request.rename
)
return {
"task_id": task_id,
"output_path": output_path,
"resolution": request.resolution,
"format": request.output_format,
"filename": filename
}
except Exception as e:
logger.exception(f"Download YouTube video failed: {str(e)}")
raise
@router.post(
"/scripts/start-subclip",
response_model=StartSubclipResponse,
summary="异步请求;开始视频剪辑任务 (V2)"
)
async def start_subclip(
request: VideoClipParams,
task_id: str,
subclip_videos: dict,
background_tasks: BackgroundTasks
):
"""
开始视频剪辑任务的V2版本API
"""
try:
# 构建参数对象
params = VideoClipParams(
video_origin_path=request.video_origin_path,
video_clip_json_path=request.video_clip_json_path,
voice_name=request.voice_name,
voice_rate=request.voice_rate,
voice_pitch=request.voice_pitch,
subtitle_enabled=request.subtitle_enabled,
video_aspect=request.video_aspect,
n_threads=request.n_threads
)
# 在后台任务中执行视频剪辑
background_tasks.add_task(
task_service.start_subclip,
task_id=task_id,
params=params,
subclip_path_videos=subclip_videos
)
return {
"task_id": task_id,
"state": "PROCESSING" # 初始状态
}
except Exception as e:
logger.exception(f"Start subclip task failed: {str(e)}")
raise

View File

@ -154,203 +154,8 @@ class VideoParams(BaseModel):
paragraph_number: Optional[int] = 1
class SubtitleRequest(BaseModel):
video_script: str
video_language: Optional[str] = ""
voice_name: Optional[str] = "zh-CN-XiaoxiaoNeural-Female"
voice_volume: Optional[float] = 1.0
voice_rate: Optional[float] = 1.2
bgm_type: Optional[str] = "random"
bgm_file: Optional[str] = ""
bgm_volume: Optional[float] = 0.2
subtitle_position: Optional[str] = "bottom"
font_name: Optional[str] = "STHeitiMedium.ttc"
text_fore_color: Optional[str] = "#FFFFFF"
text_background_color: Optional[str] = "transparent"
font_size: int = 60
stroke_color: Optional[str] = "#000000"
stroke_width: float = 1.5
video_source: Optional[str] = "local"
subtitle_enabled: Optional[str] = "true"
class AudioRequest(BaseModel):
video_script: str
video_language: Optional[str] = ""
voice_name: Optional[str] = "zh-CN-XiaoxiaoNeural-Female"
voice_volume: Optional[float] = AudioVolumeDefaults.VOICE_VOLUME
voice_rate: Optional[float] = 1.2
bgm_type: Optional[str] = "random"
bgm_file: Optional[str] = ""
bgm_volume: Optional[float] = AudioVolumeDefaults.BGM_VOLUME
video_source: Optional[str] = "local"
class VideoScriptParams:
"""
{
"video_subject": "春天的花海",
"video_language": "",
"paragraph_number": 1
}
"""
video_subject: Optional[str] = "春天的花海"
video_language: Optional[str] = ""
paragraph_number: Optional[int] = 1
class VideoTermsParams:
"""
{
"video_subject": "",
"video_script": "",
"amount": 5
}
"""
video_subject: Optional[str] = "春天的花海"
video_script: Optional[str] = (
"春天的花海,如诗如画般展现在眼前。万物复苏的季节里,大地披上了一袭绚丽多彩的盛装。金黄的迎春、粉嫩的樱花、洁白的梨花、艳丽的郁金香……"
)
amount: Optional[int] = 5
class BaseResponse(BaseModel):
status: int = 200
message: Optional[str] = "success"
data: Any = None
class TaskVideoRequest(VideoParams, BaseModel):
pass
class TaskQueryRequest(BaseModel):
pass
class VideoScriptRequest(VideoScriptParams, BaseModel):
pass
class VideoTermsRequest(VideoTermsParams, BaseModel):
pass
######################################################################################################
######################################################################################################
######################################################################################################
######################################################################################################
class TaskResponse(BaseResponse):
class TaskResponseData(BaseModel):
task_id: str
data: TaskResponseData
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {"task_id": "6c85c8cc-a77a-42b9-bc30-947815aa0558"},
},
}
class TaskQueryResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {
"state": 1,
"progress": 100,
"videos": [
"http://127.0.0.1:8080/tasks/6c85c8cc-a77a-42b9-bc30-947815aa0558/final-1.mp4"
],
"combined_videos": [
"http://127.0.0.1:8080/tasks/6c85c8cc-a77a-42b9-bc30-947815aa0558/combined-1.mp4"
],
},
},
}
class TaskDeletionResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {
"state": 1,
"progress": 100,
"videos": [
"http://127.0.0.1:8080/tasks/6c85c8cc-a77a-42b9-bc30-947815aa0558/final-1.mp4"
],
"combined_videos": [
"http://127.0.0.1:8080/tasks/6c85c8cc-a77a-42b9-bc30-947815aa0558/combined-1.mp4"
],
},
},
}
class VideoScriptResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {
"video_script": "春天的花海,是大自然的一幅美丽画卷。在这个季节里,大地复苏,万物生长,花朵争相绽放,形成了一片五彩斑斓的花海..."
},
},
}
class VideoTermsResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {"video_terms": ["sky", "tree"]},
},
}
class BgmRetrieveResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {
"files": [
{
"name": "output013.mp3",
"size": 1891269,
"file": "/NarratoAI/resource/songs/output013.mp3",
}
]
},
},
}
class BgmUploadResponse(BaseResponse):
class Config:
json_schema_extra = {
"example": {
"status": 200,
"message": "success",
"data": {"file": "/NarratoAI/resource/songs/example.mp3"},
},
}
class VideoClipParams(BaseModel):
"""
@ -393,16 +198,7 @@ class VideoClipParams(BaseModel):
bgm_volume: Optional[float] = Field(default=AudioVolumeDefaults.BGM_VOLUME, description="背景音乐音量")
class VideoTranscriptionRequest(BaseModel):
video_name: str
language: str = "zh-CN"
class Config:
arbitrary_types_allowed = True
class VideoTranscriptionResponse(BaseModel):
transcription: str
class SubtitlePosition(str, Enum):

View File

@ -1,63 +0,0 @@
from typing import Optional, List
from pydantic import BaseModel
class GenerateScriptRequest(BaseModel):
video_path: str
video_theme: Optional[str] = ""
custom_prompt: Optional[str] = ""
frame_interval_input: Optional[int] = 5
skip_seconds: Optional[int] = 0
threshold: Optional[int] = 30
vision_batch_size: Optional[int] = 5
vision_llm_provider: Optional[str] = "gemini"
class GenerateScriptResponse(BaseModel):
task_id: str
script: List[dict]
class CropVideoRequest(BaseModel):
video_origin_path: str
video_script: List[dict]
class CropVideoResponse(BaseModel):
task_id: str
subclip_videos: dict
class DownloadVideoRequest(BaseModel):
url: str
resolution: str
output_format: Optional[str] = "mp4"
rename: Optional[str] = None
class DownloadVideoResponse(BaseModel):
task_id: str
output_path: str
resolution: str
format: str
filename: str
class StartSubclipRequest(BaseModel):
task_id: str
video_origin_path: str
video_clip_json_path: str
voice_name: Optional[str] = None
voice_rate: Optional[int] = 0
voice_pitch: Optional[int] = 0
subtitle_enabled: Optional[bool] = True
video_aspect: Optional[str] = "16:9"
n_threads: Optional[int] = 4
subclip_videos: list # 从裁剪视频接口获取的视频片段字典
class StartSubclipResponse(BaseModel):
task_id: str
state: str
videos: Optional[List[str]] = None
combined_videos: Optional[List[str]] = None

View File

@ -1,21 +0,0 @@
"""Application configuration - root APIRouter.
Defines all FastAPI application endpoints.
Resources:
1. https://fastapi.tiangolo.com/tutorial/bigger-applications
"""
from fastapi import APIRouter
from app.controllers.v1 import llm, video
from app.controllers.v2 import script
root_api_router = APIRouter()
# v1
root_api_router.include_router(video.router)
root_api_router.include_router(llm.router)
# v2
root_api_router.include_router(script.router)

View File

@ -1,14 +0,0 @@
import google.generativeai as genai
from app.config import config
import os
os.environ["HTTP_PROXY"] = config.proxy.get("http")
os.environ["HTTPS_PROXY"] = config.proxy.get("https")
genai.configure(api_key="")
model = genai.GenerativeModel("gemini-1.5-pro")
for i in range(50):
response = model.generate_content("直接回复我文本'当前网络可用'")
print(i, response.text)

View File

@ -1,122 +0,0 @@
"""
使用 moviepy 库剪辑指定时间戳视频支持时分秒毫秒精度
"""
from moviepy.editor import VideoFileClip
from datetime import datetime
import os
def time_str_to_seconds(time_str: str) -> float:
"""
将时间字符串转换为秒数
参数:
time_str: 格式为"HH:MM:SS,mmm"的时间字符串例如"00:01:23,456"
返回:
转换后的秒数(float)
"""
try:
# 分离时间和毫秒
time_part, ms_part = time_str.split(',')
# 转换时分秒
time_obj = datetime.strptime(time_part, "%H:%M:%S")
# 计算总秒数
total_seconds = time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second
# 添加毫秒部分
total_seconds += int(ms_part) / 1000
return total_seconds
except ValueError as e:
raise ValueError("时间格式错误,请使用 HH:MM:SS,mmm 格式,例如 00:01:23,456") from e
def format_duration(seconds: float) -> str:
"""
将秒数转换为可读的时间格式
参数:
seconds: 秒数
返回:
格式化的时间字符串 (HH:MM:SS,mmm)
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds_remain = seconds % 60
whole_seconds = int(seconds_remain)
milliseconds = int((seconds_remain - whole_seconds) * 1000)
return f"{hours:02d}:{minutes:02d}:{whole_seconds:02d},{milliseconds:03d}"
def cut_video(video_path: str, start_time: str, end_time: str, output_path: str) -> None:
"""
剪辑视频
参数:
video_path: 视频文件路径
start_time: 开始时间 (格式: "HH:MM:SS,mmm")
end_time: 结束时间 (格式: "HH:MM:SS,mmm")
output_path: 输出文件路径
"""
try:
# 确保输出目录存在
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 如果输出文件已存在,先尝试删除
if os.path.exists(output_path):
try:
os.remove(output_path)
except PermissionError:
print(f"无法删除已存在的文件:{output_path},请确保文件未被其他程序占用")
return
# 转换时间字符串为秒数
start_seconds = time_str_to_seconds(start_time)
end_seconds = time_str_to_seconds(end_time)
# 加载视频文件
video = VideoFileClip(video_path)
# 验证时间范围
if start_seconds >= video.duration or end_seconds > video.duration:
raise ValueError(f"剪辑时间超出视频长度!视频总长度为: {format_duration(video.duration)}")
if start_seconds >= end_seconds:
raise ValueError("结束时间必须大于开始时间!")
# 计算剪辑时长
clip_duration = end_seconds - start_seconds
print(f"原视频总长度: {format_duration(video.duration)}")
print(f"剪辑时长: {format_duration(clip_duration)}")
print(f"剪辑区间: {start_time} -> {end_time}")
# 剪辑视频
video = video.subclip(start_seconds, end_seconds)
# 添加错误处理的写入过程
try:
video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
temp_audiofile='temp-audio.m4a',
remove_temp=True
)
except IOError as e:
print(f"写入视频文件时发生错误:{str(e)}")
raise
finally:
# 确保资源被释放
video.close()
except Exception as e:
print(f"视频剪辑过程中发生错误:{str(e)}")
raise
if __name__ == "__main__":
cut_video(
video_path="/Users/apple/Desktop/NarratoAI/resource/videos/duanju_yuansp.mp4",
start_time="00:00:00,789",
end_time="00:02:00,123",
output_path="/Users/apple/Desktop/NarratoAI/resource/videos/duanju_yuansp_cut3.mp4"
)

View File

@ -1,143 +0,0 @@
"""
使用 moviepy 合并视频音频字幕和背景音乐
"""
from moviepy.editor import (
VideoFileClip,
AudioFileClip,
TextClip,
CompositeVideoClip,
concatenate_videoclips
)
# from moviepy.config import change_settings
import os
# 设置字体文件路径(用于中文字幕显示)
FONT_PATH = "../../resource/fonts/STHeitiMedium.ttc" # 请确保此路径下有对应字体文件
# change_settings(
# {"IMAGEMAGICK_BINARY": r"C:\Program Files\ImageMagick-7.1.1-Q16\magick.exe"}) # Windows系统需要设置 ImageMagick 路径
class VideoMerger:
"""视频合并处理类"""
def __init__(self, output_path: str = "../../resource/videos/merged_video.mp4"):
"""
初始化视频合并器
参数:
output_path: 输出文件路径
"""
self.output_path = output_path
self.video_clips = []
self.background_music = None
self.subtitles = []
def add_video(self, video_path: str, start_time: str = None, end_time: str = None) -> None:
"""
添加视频片段
参数:
video_path: 视频文件路径
start_time: 开始时间 (格式: "MM:SS")
end_time: 结束时间 (格式: "MM:SS")
"""
video = VideoFileClip(video_path)
if start_time and end_time:
video = video.subclip(self._time_to_seconds(start_time),
self._time_to_seconds(end_time))
self.video_clips.append(video)
def add_audio(self, audio_path: str, volume: float = 1.0) -> None:
"""
添加背景音乐
参数:
audio_path: 音频文件路径
volume: 音量大小 (0.0-1.0)
"""
self.background_music = AudioFileClip(audio_path).volumex(volume)
def add_subtitle(self, text: str, start_time: str, end_time: str,
position: tuple = ('center', 'bottom'), fontsize: int = 24) -> None:
"""
添加字幕
参数:
text: 字幕文本
start_time: 开始时间 (格式: "MM:SS")
end_time: 结束时间 (格式: "MM:SS")
position: 字幕位置
fontsize: 字体大小
"""
subtitle = TextClip(
text,
font=FONT_PATH,
fontsize=fontsize,
color='white',
stroke_color='black',
stroke_width=2
)
subtitle = subtitle.set_position(position).set_duration(
self._time_to_seconds(end_time) - self._time_to_seconds(start_time)
).set_start(self._time_to_seconds(start_time))
self.subtitles.append(subtitle)
def merge(self) -> None:
"""合并所有媒体元素并导出视频"""
if not self.video_clips:
raise ValueError("至少需要添加一个视频片段")
# 合并视频片段
final_video = concatenate_videoclips(self.video_clips)
# 如果有背景音乐,设置其持续时间与视频相同
if self.background_music:
self.background_music = self.background_music.set_duration(final_video.duration)
final_video = final_video.set_audio(self.background_music)
# 添加字幕
if self.subtitles:
final_video = CompositeVideoClip([final_video] + self.subtitles)
# 导出最终视频
final_video.write_videofile(
self.output_path,
fps=24,
codec='libx264',
audio_codec='aac'
)
# 释放资源
final_video.close()
for clip in self.video_clips:
clip.close()
if self.background_music:
self.background_music.close()
@staticmethod
def _time_to_seconds(time_str: str) -> float:
"""将时间字符串转换为秒数"""
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
def test_merge_video():
"""测试视频合并功能"""
merger = VideoMerger()
# 添加两个视频片段
merger.add_video("../../resource/videos/cut_video.mp4", "00:00", "01:00")
merger.add_video("../../resource/videos/demo.mp4", "00:00", "00:30")
# 添加背景音乐
merger.add_audio("../../resource/songs/output000.mp3", volume=0.3)
# 添加字幕
merger.add_subtitle("第一个精彩片段", "00:00", "00:05")
merger.add_subtitle("第二个精彩片段", "01:00", "01:05")
# 合并并导出
merger.merge()
if __name__ == "__main__":
test_merge_video()

View File

@ -1,142 +0,0 @@
"""
使用 moviepy 优化视频处理速度的示例
包含视频加速多核处理预设参数优化等
"""
from moviepy.editor import VideoFileClip
from moviepy.video.fx.speedx import speedx
import multiprocessing as mp
import time
class VideoSpeedProcessor:
"""视频速度处理器"""
def __init__(self, input_path: str, output_path: str):
self.input_path = input_path
self.output_path = output_path
# 获取CPU核心数
self.cpu_cores = mp.cpu_count()
def process_with_optimization(self, speed_factor: float = 1.0) -> None:
"""
使用优化参数处理视频
参数:
speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速)
"""
start_time = time.time()
# 加载视频时使用优化参数
video = VideoFileClip(
self.input_path,
audio=True, # 如果不需要音频可以设为False
target_resolution=(720, None), # 可以降低分辨率加快处理
resize_algorithm='fast_bilinear' # 使用快速的重置算法
)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 使用优化参数导出视频
video.write_videofile(
self.output_path,
codec='libx264', # 使用h264编码
audio_codec='aac', # 音频编码
temp_audiofile='temp-audio.m4a', # 临时音频文件
remove_temp=True, # 处理完成后删除临时文件
write_logfile=False, # 关闭日志文件
threads=self.cpu_cores, # 使用多核处理
preset='ultrafast', # 使用最快的编码预设
ffmpeg_params=[
'-brand', 'mp42',
'-crf', '23', # 压缩率范围0-51数值越大压缩率越高
]
)
# 释放资源
video.close()
end_time = time.time()
print(f"处理完成!用时: {end_time - start_time:.2f}")
def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None:
"""
批量处理视频片段并行处理
参数:
segment_times: 列表包含多个(start, end)时间元组
speed_factor: 速度倍数
"""
start_time = time.time()
# 创建进程池
with mp.Pool(processes=self.cpu_cores) as pool:
# 准备参数
args = [(self.input_path, start, end, speed_factor, i)
for i, (start, end) in enumerate(segment_times)]
# 并行处理片段
pool.starmap(self._process_segment, args)
end_time = time.time()
print(f"批量处理完成!总用时: {end_time - start_time:.2f}")
@staticmethod
def _process_segment(video_path: str, start: str, end: str,
speed_factor: float, index: int) -> None:
"""处理单个视频片段"""
# 转换时间格式
start_sec = VideoSpeedProcessor._time_to_seconds(start)
end_sec = VideoSpeedProcessor._time_to_seconds(end)
# 加载并处理视频片段
video = VideoFileClip(
video_path,
audio=True,
target_resolution=(720, None)
).subclip(start_sec, end_sec)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 保存处理后的片段
output_path = f"../../resource/videos/segment_{index}.mp4"
video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=2 # 每个进程使用的线程数
)
video.close()
@staticmethod
def _time_to_seconds(time_str: str) -> float:
"""将时间字符串(MM:SS)转换为秒数"""
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
def test_video_speed():
"""测试视频加速处理"""
processor = VideoSpeedProcessor(
"../../resource/videos/best.mp4",
"../../resource/videos/speed_up.mp4"
)
# 测试1简单加速
processor.process_with_optimization(speed_factor=1.5) # 1.5倍速
# 测试2并行处理多个片段
segments = [
("00:00", "01:00"),
("01:00", "02:00"),
("02:00", "03:00")
]
processor.batch_process_segments(segments, speed_factor=2.0) # 2倍速
if __name__ == "__main__":
test_video_speed()

View File

@ -1,105 +0,0 @@
import os
import traceback
import json
from openai import OpenAI
from pydantic import BaseModel
from typing import List
from app.utils import utils
from app.services.subtitle import extract_audio_and_create_subtitle
class Step(BaseModel):
timestamp: str
picture: str
narration: str
OST: int
new_timestamp: str
class MathReasoning(BaseModel):
result: List[Step]
def chat_with_qwen(prompt: str, system_message: str, subtitle_path: str) -> str:
"""
与通义千问AI模型进行对话
Args:
prompt (str): 用户输入的问题或提示
system_message (str): 系统提示信息用于设定AI助手的行为默认为"You are a helpful assistant."
subtitle_path (str): 字幕文件路径
Returns:
str: AI助手的回复内容
Raises:
Exception: 当API调用失败时抛出异常
"""
try:
client = OpenAI(
api_key="sk-a1acd853d88d41d3ae92777d7bfa2612",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# 读取字幕文件
with open(subtitle_path, "r", encoding="utf-8") as file:
subtitle_content = file.read()
completion = client.chat.completions.create(
model="qwen-turbo-2024-11-01",
messages=[
{'role': 'system', 'content': system_message},
{'role': 'user', 'content': prompt + subtitle_content}
]
)
return completion.choices[0].message.content
except Exception as e:
error_message = f"调用千问API时发生错误{str(e)}"
print(error_message)
print("请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code")
raise Exception(error_message)
# 使用示例
if __name__ == "__main__":
try:
video_path = utils.video_dir("duanju_yuansp.mp4")
# # 判断视频是否存在
# if not os.path.exists(video_path):
# print(f"视频文件不存在:{video_path}")
# exit(1)
# 提取字幕
subtitle_path = os.path.join(utils.video_dir(""), f"duanju_yuan.srt")
extract_audio_and_create_subtitle(video_file=video_path, subtitle_file=subtitle_path)
# 分析字幕
system_message = """
你是一个视频srt字幕分析剪辑器, 输入视频的srt字幕, 分析其中的精彩且尽可能连续的片段并裁剪出来, 注意确保文字与时间戳的正确匹配
输出需严格按照如下 json 格式:
[
{
"timestamp": "00:00:50,020-00,01:44,000",
"picture": "画面1",
"narration": "播放原声",
"OST": 0,
"new_timestamp": "00:00:00,000-00:00:54,020"
},
{
"timestamp": "01:49-02:30",
"picture": "画面2",
"narration": "播放原声",
"OST": 2,
"new_timestamp": "00:54-01:35"
},
]
"""
prompt = "字幕如下:\n"
response = chat_with_qwen(prompt, system_message, subtitle_path)
print(response)
# 保存json注意json中是时间戳需要转换为 分:秒(现在的时间是 "timestamp": "00:00:00,020-00:00:01,660", 需要转换为 "timestamp": "00:00-01:66")
# response = json.loads(response)
# for item in response:
# item["timestamp"] = item["timestamp"].replace(":", "-")
# with open(os.path.join(utils.video_dir(""), "duanju_yuan.json"), "w", encoding="utf-8") as file:
# json.dump(response, file, ensure_ascii=False)
except Exception as e:
print(traceback.format_exc())

View File

@ -17,9 +17,7 @@ azure-cognitiveservices-speech~=1.37.0
# opencv-python==4.11.0.86
# scikit-learn==1.6.1
# fastapi~=0.115.4
# uvicorn~=0.27.1
# pydantic~=2.11.4
# faster-whisper~=1.0.1
# tomli~=2.0.1