mirror of
https://github.com/linyqh/NarratoAI.git
synced 2026-01-03 03:38:11 +00:00
refactor(video): 重构视频合并功能
- 移除了 video.py 中的 combine_clip_videos 函数- 新增 generate_video.py 文件,实现更强大的视频生成功能 - 新增 merge_materials 函数,支持合并视频、音频、BGM 和字幕素材 - 优化了字幕处理和音频混合逻辑 - 增加了更多可配置选项,提高灵活性
This commit is contained in:
parent
e40046d05d
commit
4bc20c2902
383
app/services/generate_video.py
Normal file
383
app/services/generate_video.py
Normal file
@ -0,0 +1,383 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
'''
|
||||
@Project: NarratoAI
|
||||
@File : generate_video
|
||||
@Author : 小林同学
|
||||
@Date : 2025/5/7 上午11:55
|
||||
'''
|
||||
|
||||
import os
|
||||
import traceback
|
||||
from typing import Optional, Dict, Any, Union
|
||||
from loguru import logger
|
||||
from moviepy import (
|
||||
VideoFileClip,
|
||||
AudioFileClip,
|
||||
CompositeAudioClip,
|
||||
CompositeVideoClip,
|
||||
TextClip,
|
||||
concatenate_videoclips,
|
||||
afx
|
||||
)
|
||||
from moviepy.video.tools.subtitles import SubtitlesClip
|
||||
from PIL import ImageFont
|
||||
|
||||
from app.utils import utils
|
||||
|
||||
|
||||
def merge_materials(
|
||||
video_path: str,
|
||||
audio_path: str,
|
||||
output_path: str,
|
||||
subtitle_path: Optional[str] = None,
|
||||
bgm_path: Optional[str] = None,
|
||||
options: Optional[Dict[str, Any]] = None
|
||||
) -> str:
|
||||
"""
|
||||
合并视频、音频、BGM和字幕素材生成最终视频
|
||||
|
||||
参数:
|
||||
video_path: 视频文件路径
|
||||
audio_path: 音频文件路径
|
||||
output_path: 输出文件路径
|
||||
subtitle_path: 字幕文件路径,可选
|
||||
bgm_path: 背景音乐文件路径,可选
|
||||
options: 其他选项配置,可包含以下字段:
|
||||
- voice_volume: 人声音量,默认1.0
|
||||
- bgm_volume: 背景音乐音量,默认0.3
|
||||
- original_audio_volume: 原始音频音量,默认0.0
|
||||
- keep_original_audio: 是否保留原始音频,默认False
|
||||
- subtitle_font: 字幕字体,默认None,系统会使用默认字体
|
||||
- subtitle_font_size: 字幕字体大小,默认40
|
||||
- subtitle_color: 字幕颜色,默认白色
|
||||
- subtitle_bg_color: 字幕背景颜色,默认透明
|
||||
- subtitle_position: 字幕位置,可选值'bottom', 'top', 'center',默认'bottom'
|
||||
- stroke_color: 描边颜色,默认黑色
|
||||
- stroke_width: 描边宽度,默认1
|
||||
- threads: 处理线程数,默认2
|
||||
- fps: 输出帧率,默认30
|
||||
|
||||
返回:
|
||||
输出视频的路径
|
||||
"""
|
||||
# 合并选项默认值
|
||||
if options is None:
|
||||
options = {}
|
||||
|
||||
# 设置默认参数值
|
||||
voice_volume = options.get('voice_volume', 1.0)
|
||||
bgm_volume = options.get('bgm_volume', 0.3)
|
||||
original_audio_volume = options.get('original_audio_volume', 0.0) # 默认为0,即不保留原声
|
||||
keep_original_audio = options.get('keep_original_audio', False) # 是否保留原声
|
||||
subtitle_font = options.get('subtitle_font', None)
|
||||
subtitle_font_size = options.get('subtitle_font_size', 40)
|
||||
subtitle_color = options.get('subtitle_color', '#FFFFFF')
|
||||
subtitle_bg_color = options.get('subtitle_bg_color', 'transparent')
|
||||
subtitle_position = options.get('subtitle_position', 'bottom')
|
||||
stroke_color = options.get('stroke_color', '#000000')
|
||||
stroke_width = options.get('stroke_width', 1)
|
||||
threads = options.get('threads', 2)
|
||||
fps = options.get('fps', 30)
|
||||
|
||||
# 处理透明背景色问题 - MoviePy 2.1.1不支持'transparent'值
|
||||
if subtitle_bg_color == 'transparent':
|
||||
subtitle_bg_color = None # None在新版MoviePy中表示透明背景
|
||||
|
||||
# 创建输出目录(如果不存在)
|
||||
output_dir = os.path.dirname(output_path)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
logger.info(f"开始合并素材...")
|
||||
logger.info(f" ① 视频: {video_path}")
|
||||
logger.info(f" ② 音频: {audio_path}")
|
||||
if subtitle_path:
|
||||
logger.info(f" ③ 字幕: {subtitle_path}")
|
||||
if bgm_path:
|
||||
logger.info(f" ④ 背景音乐: {bgm_path}")
|
||||
logger.info(f" ⑤ 输出: {output_path}")
|
||||
|
||||
# 加载视频
|
||||
try:
|
||||
video_clip = VideoFileClip(video_path)
|
||||
logger.info(f"视频尺寸: {video_clip.size[0]}x{video_clip.size[1]}, 时长: {video_clip.duration}秒")
|
||||
|
||||
# 提取视频原声(如果需要)
|
||||
original_audio = None
|
||||
if keep_original_audio and original_audio_volume > 0:
|
||||
try:
|
||||
original_audio = video_clip.audio
|
||||
if original_audio:
|
||||
original_audio = original_audio.with_effects([afx.MultiplyVolume(original_audio_volume)])
|
||||
logger.info(f"已提取视频原声,音量设置为: {original_audio_volume}")
|
||||
else:
|
||||
logger.warning("视频没有音轨,无法提取原声")
|
||||
except Exception as e:
|
||||
logger.error(f"提取视频原声失败: {str(e)}")
|
||||
original_audio = None
|
||||
|
||||
# 移除原始音轨,稍后会合并新的音频
|
||||
video_clip = video_clip.without_audio()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"加载视频失败: {str(e)}")
|
||||
raise
|
||||
|
||||
# 处理背景音乐和所有音频轨道合成
|
||||
audio_tracks = []
|
||||
|
||||
# 先添加主音频(配音)
|
||||
if audio_path and os.path.exists(audio_path):
|
||||
try:
|
||||
voice_audio = AudioFileClip(audio_path).with_effects([afx.MultiplyVolume(voice_volume)])
|
||||
audio_tracks.append(voice_audio)
|
||||
logger.info(f"已添加配音音频,音量: {voice_volume}")
|
||||
except Exception as e:
|
||||
logger.error(f"加载配音音频失败: {str(e)}")
|
||||
|
||||
# 添加原声(如果需要)
|
||||
if original_audio is not None:
|
||||
audio_tracks.append(original_audio)
|
||||
logger.info(f"已添加视频原声,音量: {original_audio_volume}")
|
||||
|
||||
# 添加背景音乐(如果有)
|
||||
if bgm_path and os.path.exists(bgm_path):
|
||||
try:
|
||||
bgm_clip = AudioFileClip(bgm_path).with_effects([
|
||||
afx.MultiplyVolume(bgm_volume),
|
||||
afx.AudioFadeOut(3),
|
||||
afx.AudioLoop(duration=video_clip.duration),
|
||||
])
|
||||
audio_tracks.append(bgm_clip)
|
||||
logger.info(f"已添加背景音乐,音量: {bgm_volume}")
|
||||
except Exception as e:
|
||||
logger.error(f"添加背景音乐失败: \n{traceback.format_exc()}")
|
||||
|
||||
# 合成最终的音频轨道
|
||||
if audio_tracks:
|
||||
final_audio = CompositeAudioClip(audio_tracks)
|
||||
video_clip = video_clip.with_audio(final_audio)
|
||||
logger.info(f"已合成所有音频轨道,共{len(audio_tracks)}个")
|
||||
else:
|
||||
logger.warning("没有可用的音频轨道,输出视频将没有声音")
|
||||
|
||||
# 处理字体路径
|
||||
font_path = None
|
||||
if subtitle_path and subtitle_font:
|
||||
font_path = os.path.join(utils.font_dir(), subtitle_font)
|
||||
if os.name == "nt":
|
||||
font_path = font_path.replace("\\", "/")
|
||||
logger.info(f"使用字体: {font_path}")
|
||||
|
||||
# 处理视频尺寸
|
||||
video_width, video_height = video_clip.size
|
||||
|
||||
# 字幕处理函数
|
||||
def create_text_clip(subtitle_item):
|
||||
"""创建单个字幕片段"""
|
||||
phrase = subtitle_item[1]
|
||||
max_width = video_width * 0.9
|
||||
|
||||
# 如果有字体路径,进行文本换行处理
|
||||
wrapped_txt = phrase
|
||||
txt_height = 0
|
||||
if font_path:
|
||||
wrapped_txt, txt_height = wrap_text(
|
||||
phrase,
|
||||
max_width=max_width,
|
||||
font=font_path,
|
||||
fontsize=subtitle_font_size
|
||||
)
|
||||
|
||||
# 创建文本片段
|
||||
try:
|
||||
_clip = TextClip(
|
||||
text=wrapped_txt,
|
||||
font=font_path,
|
||||
font_size=subtitle_font_size,
|
||||
color=subtitle_color,
|
||||
bg_color=subtitle_bg_color, # 这里已经在前面处理过,None表示透明
|
||||
stroke_color=stroke_color,
|
||||
stroke_width=stroke_width,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"创建字幕片段失败: {str(e)}, 使用简化参数重试")
|
||||
# 如果上面的方法失败,尝试使用更简单的参数
|
||||
_clip = TextClip(
|
||||
text=wrapped_txt,
|
||||
font=font_path,
|
||||
font_size=subtitle_font_size,
|
||||
color=subtitle_color,
|
||||
)
|
||||
|
||||
# 设置字幕时间
|
||||
duration = subtitle_item[0][1] - subtitle_item[0][0]
|
||||
_clip = _clip.with_start(subtitle_item[0][0])
|
||||
_clip = _clip.with_end(subtitle_item[0][1])
|
||||
_clip = _clip.with_duration(duration)
|
||||
|
||||
# 设置字幕位置
|
||||
if subtitle_position == "bottom":
|
||||
_clip = _clip.with_position(("center", video_height * 0.95 - _clip.h))
|
||||
elif subtitle_position == "top":
|
||||
_clip = _clip.with_position(("center", video_height * 0.05))
|
||||
else: # center
|
||||
_clip = _clip.with_position(("center", "center"))
|
||||
|
||||
return _clip
|
||||
|
||||
# 创建TextClip工厂函数
|
||||
def make_textclip(text):
|
||||
return TextClip(
|
||||
text=text,
|
||||
font=font_path,
|
||||
font_size=subtitle_font_size,
|
||||
color=subtitle_color,
|
||||
)
|
||||
|
||||
# 处理字幕
|
||||
if subtitle_path and os.path.exists(subtitle_path):
|
||||
try:
|
||||
# 加载字幕文件
|
||||
sub = SubtitlesClip(
|
||||
subtitles=subtitle_path,
|
||||
encoding="utf-8",
|
||||
make_textclip=make_textclip
|
||||
)
|
||||
|
||||
# 创建每个字幕片段
|
||||
text_clips = []
|
||||
for item in sub.subtitles:
|
||||
clip = create_text_clip(subtitle_item=item)
|
||||
text_clips.append(clip)
|
||||
|
||||
# 合成视频和字幕
|
||||
video_clip = CompositeVideoClip([video_clip, *text_clips])
|
||||
logger.info(f"已添加{len(text_clips)}个字幕片段")
|
||||
except Exception as e:
|
||||
logger.error(f"处理字幕失败: \n{traceback.format_exc()}")
|
||||
|
||||
# 导出最终视频
|
||||
try:
|
||||
video_clip.write_videofile(
|
||||
output_path,
|
||||
audio_codec="aac",
|
||||
temp_audiofile_path=output_dir,
|
||||
threads=threads,
|
||||
fps=fps,
|
||||
)
|
||||
logger.success(f"素材合并完成: {output_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"导出视频失败: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
# 释放资源
|
||||
video_clip.close()
|
||||
del video_clip
|
||||
|
||||
return output_path
|
||||
|
||||
|
||||
def wrap_text(text, max_width, font="Arial", fontsize=60):
|
||||
"""
|
||||
文本换行函数,使长文本适应指定宽度
|
||||
|
||||
参数:
|
||||
text: 需要换行的文本
|
||||
max_width: 最大宽度(像素)
|
||||
font: 字体路径
|
||||
fontsize: 字体大小
|
||||
|
||||
返回:
|
||||
换行后的文本和文本高度
|
||||
"""
|
||||
# 创建ImageFont对象
|
||||
try:
|
||||
font_obj = ImageFont.truetype(font, fontsize)
|
||||
except:
|
||||
# 如果无法加载指定字体,使用默认字体
|
||||
font_obj = ImageFont.load_default()
|
||||
|
||||
def get_text_size(inner_text):
|
||||
inner_text = inner_text.strip()
|
||||
left, top, right, bottom = font_obj.getbbox(inner_text)
|
||||
return right - left, bottom - top
|
||||
|
||||
width, height = get_text_size(text)
|
||||
if width <= max_width:
|
||||
return text, height
|
||||
|
||||
processed = True
|
||||
|
||||
_wrapped_lines_ = []
|
||||
words = text.split(" ")
|
||||
_txt_ = ""
|
||||
for word in words:
|
||||
_before = _txt_
|
||||
_txt_ += f"{word} "
|
||||
_width, _height = get_text_size(_txt_)
|
||||
if _width <= max_width:
|
||||
continue
|
||||
else:
|
||||
if _txt_.strip() == word.strip():
|
||||
processed = False
|
||||
break
|
||||
_wrapped_lines_.append(_before)
|
||||
_txt_ = f"{word} "
|
||||
_wrapped_lines_.append(_txt_)
|
||||
if processed:
|
||||
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
|
||||
result = "\n".join(_wrapped_lines_).strip()
|
||||
height = len(_wrapped_lines_) * height
|
||||
return result, height
|
||||
|
||||
_wrapped_lines_ = []
|
||||
chars = list(text)
|
||||
_txt_ = ""
|
||||
for word in chars:
|
||||
_txt_ += word
|
||||
_width, _height = get_text_size(_txt_)
|
||||
if _width <= max_width:
|
||||
continue
|
||||
else:
|
||||
_wrapped_lines_.append(_txt_)
|
||||
_txt_ = ""
|
||||
_wrapped_lines_.append(_txt_)
|
||||
result = "\n".join(_wrapped_lines_).strip()
|
||||
height = len(_wrapped_lines_) * height
|
||||
return result, height
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
merger_mp4 = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger.mp4'
|
||||
merger_sub = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merged_subtitle_00_00_00-00_01_30.srt'
|
||||
merger_audio = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger_audio.mp3'
|
||||
bgm_path = '/Users/apple/Desktop/home/NarratoAI/resource/songs/bgm.mp3'
|
||||
output_video = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/combined_test.mp4'
|
||||
|
||||
# 调用示例
|
||||
options = {
|
||||
'voice_volume': 1.0, # 配音音量
|
||||
'bgm_volume': 0.1, # 背景音乐音量
|
||||
'original_audio_volume': 1.0, # 视频原声音量,0表示不保留
|
||||
'keep_original_audio': True, # 是否保留原声
|
||||
'subtitle_font': 'MicrosoftYaHeiNormal.ttc', # 这里使用相对字体路径,会自动在 font_dir() 目录下查找
|
||||
'subtitle_font_size': 40,
|
||||
'subtitle_color': '#FFFFFF',
|
||||
'subtitle_bg_color': None, # 直接使用None表示透明背景
|
||||
'subtitle_position': 'bottom',
|
||||
'threads': 2
|
||||
}
|
||||
|
||||
try:
|
||||
merge_materials(
|
||||
video_path=merger_mp4,
|
||||
audio_path=merger_audio,
|
||||
subtitle_path=merger_sub,
|
||||
bgm_path=bgm_path,
|
||||
output_path=output_video,
|
||||
options=options
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"合并素材失败: \n{traceback.format_exc()}")
|
||||
@ -105,86 +105,6 @@ def manage_clip(clip):
|
||||
del clip
|
||||
|
||||
|
||||
def combine_clip_videos(combined_video_path: str,
|
||||
video_paths: List[str],
|
||||
video_ost_list: List[int],
|
||||
list_script: list,
|
||||
video_aspect: VideoAspect = VideoAspect.portrait,
|
||||
threads: int = 2,
|
||||
) -> str:
|
||||
"""
|
||||
合并子视频
|
||||
Args:
|
||||
combined_video_path: 合并后的存储路径
|
||||
video_paths: 子视频路径列表
|
||||
video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说)
|
||||
list_script: 剪辑脚本
|
||||
video_aspect: 屏幕比例
|
||||
threads: 线程数
|
||||
|
||||
Returns:
|
||||
str: 合并后的视频路径
|
||||
"""
|
||||
from app.utils.utils import calculate_total_duration
|
||||
audio_duration = calculate_total_duration(list_script)
|
||||
logger.info(f"音频的最大持续时间: {audio_duration} s")
|
||||
|
||||
output_dir = os.path.dirname(combined_video_path)
|
||||
aspect = VideoAspect(video_aspect)
|
||||
video_width, video_height = aspect.to_resolution()
|
||||
|
||||
clips = []
|
||||
for video_path, video_ost in zip(video_paths, video_ost_list):
|
||||
try:
|
||||
clip = VideoFileClip(video_path)
|
||||
|
||||
if video_ost == 0: # 不保留原声
|
||||
clip = clip.without_audio()
|
||||
# video_ost 为 1 或 2 时都保留原声,不需要特殊处理
|
||||
|
||||
clip = clip.set_fps(30)
|
||||
|
||||
# 处理视频尺寸
|
||||
clip_w, clip_h = clip.size
|
||||
if clip_w != video_width or clip_h != video_height:
|
||||
clip = resize_video_with_padding(
|
||||
clip,
|
||||
target_width=video_width,
|
||||
target_height=video_height
|
||||
)
|
||||
logger.info(f"视频 {video_path} 已调整尺寸为 {video_width} x {video_height}")
|
||||
|
||||
clips.append(clip)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理视频 {video_path} 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
if not clips:
|
||||
raise ValueError("没有有效的视频片段可以合并")
|
||||
|
||||
try:
|
||||
video_clip = concatenate_videoclips(clips)
|
||||
video_clip = video_clip.set_fps(30)
|
||||
|
||||
logger.info("开始合并视频... (过程中出现 UserWarning: 不必理会)")
|
||||
video_clip.write_videofile(
|
||||
filename=combined_video_path,
|
||||
threads=threads,
|
||||
audio_codec="aac",
|
||||
fps=30,
|
||||
temp_audiofile=os.path.join(output_dir, "temp-audio.m4a")
|
||||
)
|
||||
finally:
|
||||
# 确保资源被正确放
|
||||
video_clip.close()
|
||||
for clip in clips:
|
||||
clip.close()
|
||||
|
||||
logger.success("视频合并完成")
|
||||
return combined_video_path
|
||||
|
||||
|
||||
def resize_video_with_padding(clip, target_width: int, target_height: int):
|
||||
"""
|
||||
调整视频尺寸并添加黑边
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user