diff --git a/app/services/generate_video.py b/app/services/generate_video.py new file mode 100644 index 0000000..04db992 --- /dev/null +++ b/app/services/generate_video.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- + +''' +@Project: NarratoAI +@File : generate_video +@Author : 小林同学 +@Date : 2025/5/7 上午11:55 +''' + +import os +import traceback +from typing import Optional, Dict, Any, Union +from loguru import logger +from moviepy import ( + VideoFileClip, + AudioFileClip, + CompositeAudioClip, + CompositeVideoClip, + TextClip, + concatenate_videoclips, + afx +) +from moviepy.video.tools.subtitles import SubtitlesClip +from PIL import ImageFont + +from app.utils import utils + + +def merge_materials( + video_path: str, + audio_path: str, + output_path: str, + subtitle_path: Optional[str] = None, + bgm_path: Optional[str] = None, + options: Optional[Dict[str, Any]] = None +) -> str: + """ + 合并视频、音频、BGM和字幕素材生成最终视频 + + 参数: + video_path: 视频文件路径 + audio_path: 音频文件路径 + output_path: 输出文件路径 + subtitle_path: 字幕文件路径,可选 + bgm_path: 背景音乐文件路径,可选 + options: 其他选项配置,可包含以下字段: + - voice_volume: 人声音量,默认1.0 + - bgm_volume: 背景音乐音量,默认0.3 + - original_audio_volume: 原始音频音量,默认0.0 + - keep_original_audio: 是否保留原始音频,默认False + - subtitle_font: 字幕字体,默认None,系统会使用默认字体 + - subtitle_font_size: 字幕字体大小,默认40 + - subtitle_color: 字幕颜色,默认白色 + - subtitle_bg_color: 字幕背景颜色,默认透明 + - subtitle_position: 字幕位置,可选值'bottom', 'top', 'center',默认'bottom' + - stroke_color: 描边颜色,默认黑色 + - stroke_width: 描边宽度,默认1 + - threads: 处理线程数,默认2 + - fps: 输出帧率,默认30 + + 返回: + 输出视频的路径 + """ + # 合并选项默认值 + if options is None: + options = {} + + # 设置默认参数值 + voice_volume = options.get('voice_volume', 1.0) + bgm_volume = options.get('bgm_volume', 0.3) + original_audio_volume = options.get('original_audio_volume', 0.0) # 默认为0,即不保留原声 + keep_original_audio = options.get('keep_original_audio', False) # 是否保留原声 + subtitle_font = options.get('subtitle_font', None) + subtitle_font_size = options.get('subtitle_font_size', 40) + subtitle_color = options.get('subtitle_color', '#FFFFFF') + subtitle_bg_color = options.get('subtitle_bg_color', 'transparent') + subtitle_position = options.get('subtitle_position', 'bottom') + stroke_color = options.get('stroke_color', '#000000') + stroke_width = options.get('stroke_width', 1) + threads = options.get('threads', 2) + fps = options.get('fps', 30) + + # 处理透明背景色问题 - MoviePy 2.1.1不支持'transparent'值 + if subtitle_bg_color == 'transparent': + subtitle_bg_color = None # None在新版MoviePy中表示透明背景 + + # 创建输出目录(如果不存在) + output_dir = os.path.dirname(output_path) + os.makedirs(output_dir, exist_ok=True) + + logger.info(f"开始合并素材...") + logger.info(f" ① 视频: {video_path}") + logger.info(f" ② 音频: {audio_path}") + if subtitle_path: + logger.info(f" ③ 字幕: {subtitle_path}") + if bgm_path: + logger.info(f" ④ 背景音乐: {bgm_path}") + logger.info(f" ⑤ 输出: {output_path}") + + # 加载视频 + try: + video_clip = VideoFileClip(video_path) + logger.info(f"视频尺寸: {video_clip.size[0]}x{video_clip.size[1]}, 时长: {video_clip.duration}秒") + + # 提取视频原声(如果需要) + original_audio = None + if keep_original_audio and original_audio_volume > 0: + try: + original_audio = video_clip.audio + if original_audio: + original_audio = original_audio.with_effects([afx.MultiplyVolume(original_audio_volume)]) + logger.info(f"已提取视频原声,音量设置为: {original_audio_volume}") + else: + logger.warning("视频没有音轨,无法提取原声") + except Exception as e: + logger.error(f"提取视频原声失败: {str(e)}") + original_audio = None + + # 移除原始音轨,稍后会合并新的音频 + video_clip = video_clip.without_audio() + + except Exception as e: + logger.error(f"加载视频失败: {str(e)}") + raise + + # 处理背景音乐和所有音频轨道合成 + audio_tracks = [] + + # 先添加主音频(配音) + if audio_path and os.path.exists(audio_path): + try: + voice_audio = AudioFileClip(audio_path).with_effects([afx.MultiplyVolume(voice_volume)]) + audio_tracks.append(voice_audio) + logger.info(f"已添加配音音频,音量: {voice_volume}") + except Exception as e: + logger.error(f"加载配音音频失败: {str(e)}") + + # 添加原声(如果需要) + if original_audio is not None: + audio_tracks.append(original_audio) + logger.info(f"已添加视频原声,音量: {original_audio_volume}") + + # 添加背景音乐(如果有) + if bgm_path and os.path.exists(bgm_path): + try: + bgm_clip = AudioFileClip(bgm_path).with_effects([ + afx.MultiplyVolume(bgm_volume), + afx.AudioFadeOut(3), + afx.AudioLoop(duration=video_clip.duration), + ]) + audio_tracks.append(bgm_clip) + logger.info(f"已添加背景音乐,音量: {bgm_volume}") + except Exception as e: + logger.error(f"添加背景音乐失败: \n{traceback.format_exc()}") + + # 合成最终的音频轨道 + if audio_tracks: + final_audio = CompositeAudioClip(audio_tracks) + video_clip = video_clip.with_audio(final_audio) + logger.info(f"已合成所有音频轨道,共{len(audio_tracks)}个") + else: + logger.warning("没有可用的音频轨道,输出视频将没有声音") + + # 处理字体路径 + font_path = None + if subtitle_path and subtitle_font: + font_path = os.path.join(utils.font_dir(), subtitle_font) + if os.name == "nt": + font_path = font_path.replace("\\", "/") + logger.info(f"使用字体: {font_path}") + + # 处理视频尺寸 + video_width, video_height = video_clip.size + + # 字幕处理函数 + def create_text_clip(subtitle_item): + """创建单个字幕片段""" + phrase = subtitle_item[1] + max_width = video_width * 0.9 + + # 如果有字体路径,进行文本换行处理 + wrapped_txt = phrase + txt_height = 0 + if font_path: + wrapped_txt, txt_height = wrap_text( + phrase, + max_width=max_width, + font=font_path, + fontsize=subtitle_font_size + ) + + # 创建文本片段 + try: + _clip = TextClip( + text=wrapped_txt, + font=font_path, + font_size=subtitle_font_size, + color=subtitle_color, + bg_color=subtitle_bg_color, # 这里已经在前面处理过,None表示透明 + stroke_color=stroke_color, + stroke_width=stroke_width, + ) + except Exception as e: + logger.error(f"创建字幕片段失败: {str(e)}, 使用简化参数重试") + # 如果上面的方法失败,尝试使用更简单的参数 + _clip = TextClip( + text=wrapped_txt, + font=font_path, + font_size=subtitle_font_size, + color=subtitle_color, + ) + + # 设置字幕时间 + duration = subtitle_item[0][1] - subtitle_item[0][0] + _clip = _clip.with_start(subtitle_item[0][0]) + _clip = _clip.with_end(subtitle_item[0][1]) + _clip = _clip.with_duration(duration) + + # 设置字幕位置 + if subtitle_position == "bottom": + _clip = _clip.with_position(("center", video_height * 0.95 - _clip.h)) + elif subtitle_position == "top": + _clip = _clip.with_position(("center", video_height * 0.05)) + else: # center + _clip = _clip.with_position(("center", "center")) + + return _clip + + # 创建TextClip工厂函数 + def make_textclip(text): + return TextClip( + text=text, + font=font_path, + font_size=subtitle_font_size, + color=subtitle_color, + ) + + # 处理字幕 + if subtitle_path and os.path.exists(subtitle_path): + try: + # 加载字幕文件 + sub = SubtitlesClip( + subtitles=subtitle_path, + encoding="utf-8", + make_textclip=make_textclip + ) + + # 创建每个字幕片段 + text_clips = [] + for item in sub.subtitles: + clip = create_text_clip(subtitle_item=item) + text_clips.append(clip) + + # 合成视频和字幕 + video_clip = CompositeVideoClip([video_clip, *text_clips]) + logger.info(f"已添加{len(text_clips)}个字幕片段") + except Exception as e: + logger.error(f"处理字幕失败: \n{traceback.format_exc()}") + + # 导出最终视频 + try: + video_clip.write_videofile( + output_path, + audio_codec="aac", + temp_audiofile_path=output_dir, + threads=threads, + fps=fps, + ) + logger.success(f"素材合并完成: {output_path}") + except Exception as e: + logger.error(f"导出视频失败: {str(e)}") + raise + finally: + # 释放资源 + video_clip.close() + del video_clip + + return output_path + + +def wrap_text(text, max_width, font="Arial", fontsize=60): + """ + 文本换行函数,使长文本适应指定宽度 + + 参数: + text: 需要换行的文本 + max_width: 最大宽度(像素) + font: 字体路径 + fontsize: 字体大小 + + 返回: + 换行后的文本和文本高度 + """ + # 创建ImageFont对象 + try: + font_obj = ImageFont.truetype(font, fontsize) + except: + # 如果无法加载指定字体,使用默认字体 + font_obj = ImageFont.load_default() + + def get_text_size(inner_text): + inner_text = inner_text.strip() + left, top, right, bottom = font_obj.getbbox(inner_text) + return right - left, bottom - top + + width, height = get_text_size(text) + if width <= max_width: + return text, height + + processed = True + + _wrapped_lines_ = [] + words = text.split(" ") + _txt_ = "" + for word in words: + _before = _txt_ + _txt_ += f"{word} " + _width, _height = get_text_size(_txt_) + if _width <= max_width: + continue + else: + if _txt_.strip() == word.strip(): + processed = False + break + _wrapped_lines_.append(_before) + _txt_ = f"{word} " + _wrapped_lines_.append(_txt_) + if processed: + _wrapped_lines_ = [line.strip() for line in _wrapped_lines_] + result = "\n".join(_wrapped_lines_).strip() + height = len(_wrapped_lines_) * height + return result, height + + _wrapped_lines_ = [] + chars = list(text) + _txt_ = "" + for word in chars: + _txt_ += word + _width, _height = get_text_size(_txt_) + if _width <= max_width: + continue + else: + _wrapped_lines_.append(_txt_) + _txt_ = "" + _wrapped_lines_.append(_txt_) + result = "\n".join(_wrapped_lines_).strip() + height = len(_wrapped_lines_) * height + return result, height + + +if __name__ == '__main__': + merger_mp4 = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger.mp4' + merger_sub = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merged_subtitle_00_00_00-00_01_30.srt' + merger_audio = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/merger_audio.mp3' + bgm_path = '/Users/apple/Desktop/home/NarratoAI/resource/songs/bgm.mp3' + output_video = '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/combined_test.mp4' + + # 调用示例 + options = { + 'voice_volume': 1.0, # 配音音量 + 'bgm_volume': 0.1, # 背景音乐音量 + 'original_audio_volume': 1.0, # 视频原声音量,0表示不保留 + 'keep_original_audio': True, # 是否保留原声 + 'subtitle_font': 'MicrosoftYaHeiNormal.ttc', # 这里使用相对字体路径,会自动在 font_dir() 目录下查找 + 'subtitle_font_size': 40, + 'subtitle_color': '#FFFFFF', + 'subtitle_bg_color': None, # 直接使用None表示透明背景 + 'subtitle_position': 'bottom', + 'threads': 2 + } + + try: + merge_materials( + video_path=merger_mp4, + audio_path=merger_audio, + subtitle_path=merger_sub, + bgm_path=bgm_path, + output_path=output_video, + options=options + ) + except Exception as e: + logger.error(f"合并素材失败: \n{traceback.format_exc()}") diff --git a/app/services/video.py b/app/services/video.py index dbe7986..e588a8b 100644 --- a/app/services/video.py +++ b/app/services/video.py @@ -105,86 +105,6 @@ def manage_clip(clip): del clip -def combine_clip_videos(combined_video_path: str, - video_paths: List[str], - video_ost_list: List[int], - list_script: list, - video_aspect: VideoAspect = VideoAspect.portrait, - threads: int = 2, - ) -> str: - """ - 合并子视频 - Args: - combined_video_path: 合并后的存储路径 - video_paths: 子视频路径列表 - video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说) - list_script: 剪辑脚本 - video_aspect: 屏幕比例 - threads: 线程数 - - Returns: - str: 合并后的视频路径 - """ - from app.utils.utils import calculate_total_duration - audio_duration = calculate_total_duration(list_script) - logger.info(f"音频的最大持续时间: {audio_duration} s") - - output_dir = os.path.dirname(combined_video_path) - aspect = VideoAspect(video_aspect) - video_width, video_height = aspect.to_resolution() - - clips = [] - for video_path, video_ost in zip(video_paths, video_ost_list): - try: - clip = VideoFileClip(video_path) - - if video_ost == 0: # 不保留原声 - clip = clip.without_audio() - # video_ost 为 1 或 2 时都保留原声,不需要特殊处理 - - clip = clip.set_fps(30) - - # 处理视频尺寸 - clip_w, clip_h = clip.size - if clip_w != video_width or clip_h != video_height: - clip = resize_video_with_padding( - clip, - target_width=video_width, - target_height=video_height - ) - logger.info(f"视频 {video_path} 已调整尺寸为 {video_width} x {video_height}") - - clips.append(clip) - - except Exception as e: - logger.error(f"处理视频 {video_path} 时出错: {str(e)}") - continue - - if not clips: - raise ValueError("没有有效的视频片段可以合并") - - try: - video_clip = concatenate_videoclips(clips) - video_clip = video_clip.set_fps(30) - - logger.info("开始合并视频... (过程中出现 UserWarning: 不必理会)") - video_clip.write_videofile( - filename=combined_video_path, - threads=threads, - audio_codec="aac", - fps=30, - temp_audiofile=os.path.join(output_dir, "temp-audio.m4a") - ) - finally: - # 确保资源被正确放 - video_clip.close() - for clip in clips: - clip.close() - - logger.success("视频合并完成") - return combined_video_path - - def resize_video_with_padding(clip, target_width: int, target_height: int): """ 调整视频尺寸并添加黑边