diff --git a/app/test/test_moviepy_merge.py b/app/test/test_moviepy_merge.py new file mode 100644 index 0000000..da08e6c --- /dev/null +++ b/app/test/test_moviepy_merge.py @@ -0,0 +1,143 @@ +""" +使用 moviepy 合并视频、音频、字幕和背景音乐 +""" + +from moviepy.editor import ( + VideoFileClip, + AudioFileClip, + TextClip, + CompositeVideoClip, + concatenate_videoclips +) +# from moviepy.config import change_settings +import os + +# 设置字体文件路径(用于中文字幕显示) +FONT_PATH = "../../resource/fonts/STHeitiMedium.ttc" # 请确保此路径下有对应字体文件 +# change_settings( +# {"IMAGEMAGICK_BINARY": r"C:\Program Files\ImageMagick-7.1.1-Q16\magick.exe"}) # Windows系统需要设置 ImageMagick 路径 + + +class VideoMerger: + """视频合并处理类""" + + def __init__(self, output_path: str = "../../resource/videos/merged_video.mp4"): + """ + 初始化视频合并器 + 参数: + output_path: 输出文件路径 + """ + self.output_path = output_path + self.video_clips = [] + self.background_music = None + self.subtitles = [] + + def add_video(self, video_path: str, start_time: str = None, end_time: str = None) -> None: + """ + 添加视频片段 + 参数: + video_path: 视频文件路径 + start_time: 开始时间 (格式: "MM:SS") + end_time: 结束时间 (格式: "MM:SS") + """ + video = VideoFileClip(video_path) + if start_time and end_time: + video = video.subclip(self._time_to_seconds(start_time), + self._time_to_seconds(end_time)) + self.video_clips.append(video) + + def add_audio(self, audio_path: str, volume: float = 1.0) -> None: + """ + 添加背景音乐 + 参数: + audio_path: 音频文件路径 + volume: 音量大小 (0.0-1.0) + """ + self.background_music = AudioFileClip(audio_path).volumex(volume) + + def add_subtitle(self, text: str, start_time: str, end_time: str, + position: tuple = ('center', 'bottom'), fontsize: int = 24) -> None: + """ + 添加字幕 + 参数: + text: 字幕文本 + start_time: 开始时间 (格式: "MM:SS") + end_time: 结束时间 (格式: "MM:SS") + position: 字幕位置 + fontsize: 字体大小 + """ + subtitle = TextClip( + text, + font=FONT_PATH, + fontsize=fontsize, + color='white', + stroke_color='black', + stroke_width=2 + ) + + subtitle = subtitle.set_position(position).set_duration( + self._time_to_seconds(end_time) - self._time_to_seconds(start_time) + ).set_start(self._time_to_seconds(start_time)) + + self.subtitles.append(subtitle) + + def merge(self) -> None: + """合并所有媒体元素并导出视频""" + if not self.video_clips: + raise ValueError("至少需要添加一个视频片段") + + # 合并视频片段 + final_video = concatenate_videoclips(self.video_clips) + + # 如果有背景音乐,设置其持续时间与视频相同 + if self.background_music: + self.background_music = self.background_music.set_duration(final_video.duration) + final_video = final_video.set_audio(self.background_music) + + # 添加字幕 + if self.subtitles: + final_video = CompositeVideoClip([final_video] + self.subtitles) + + # 导出最终视频 + final_video.write_videofile( + self.output_path, + fps=24, + codec='libx264', + audio_codec='aac' + ) + + # 释放资源 + final_video.close() + for clip in self.video_clips: + clip.close() + if self.background_music: + self.background_music.close() + + @staticmethod + def _time_to_seconds(time_str: str) -> float: + """将时间字符串转换为秒数""" + minutes, seconds = map(int, time_str.split(':')) + return minutes * 60 + seconds + + +def test_merge_video(): + """测试视频合并功能""" + merger = VideoMerger() + + # 添加两个视频片段 + merger.add_video("../../resource/videos/cut_video.mp4", "00:00", "01:00") + merger.add_video("../../resource/videos/demo.mp4", "00:00", "00:30") + + # 添加背景音乐 + merger.add_audio("../../resource/songs/output000.mp3", volume=0.3) + + # 添加字幕 + merger.add_subtitle("第一个精彩片段", "00:00", "00:05") + merger.add_subtitle("第二个精彩片段", "01:00", "01:05") + + # 合并并导出 + merger.merge() + + +if __name__ == "__main__": + test_merge_video() diff --git a/app/test/test_moviepy_speed.py b/app/test/test_moviepy_speed.py new file mode 100644 index 0000000..3697ba2 --- /dev/null +++ b/app/test/test_moviepy_speed.py @@ -0,0 +1,142 @@ +""" +使用 moviepy 优化视频处理速度的示例 +包含:视频加速、多核处理、预设参数优化等 +""" + +from moviepy.editor import VideoFileClip +from moviepy.video.fx.speedx import speedx +import multiprocessing as mp +import time + + +class VideoSpeedProcessor: + """视频速度处理器""" + + def __init__(self, input_path: str, output_path: str): + self.input_path = input_path + self.output_path = output_path + # 获取CPU核心数 + self.cpu_cores = mp.cpu_count() + + def process_with_optimization(self, speed_factor: float = 1.0) -> None: + """ + 使用优化参数处理视频 + 参数: + speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速) + """ + start_time = time.time() + + # 加载视频时使用优化参数 + video = VideoFileClip( + self.input_path, + audio=True, # 如果不需要音频可以设为False + target_resolution=(720, None), # 可以降低分辨率加快处理 + resize_algorithm='fast_bilinear' # 使用快速的重置算法 + ) + + # 应用速度变化 + if speed_factor != 1.0: + video = speedx(video, factor=speed_factor) + + # 使用优化参数导出视频 + video.write_videofile( + self.output_path, + codec='libx264', # 使用h264编码 + audio_codec='aac', # 音频编码 + temp_audiofile='temp-audio.m4a', # 临时音频文件 + remove_temp=True, # 处理完成后删除临时文件 + write_logfile=False, # 关闭日志文件 + threads=self.cpu_cores, # 使用多核处理 + preset='ultrafast', # 使用最快的编码预设 + ffmpeg_params=[ + '-brand', 'mp42', + '-crf', '23', # 压缩率,范围0-51,数值越大压缩率越高 + ] + ) + + # 释放资源 + video.close() + + end_time = time.time() + print(f"处理完成!用时: {end_time - start_time:.2f} 秒") + + def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None: + """ + 批量处理视频片段(并行处理) + 参数: + segment_times: 列表,包含多个(start, end)时间元组 + speed_factor: 速度倍数 + """ + start_time = time.time() + + # 创建进程池 + with mp.Pool(processes=self.cpu_cores) as pool: + # 准备参数 + args = [(self.input_path, start, end, speed_factor, i) + for i, (start, end) in enumerate(segment_times)] + + # 并行处理片段 + pool.starmap(self._process_segment, args) + + end_time = time.time() + print(f"批量处理完成!总用时: {end_time - start_time:.2f} 秒") + + @staticmethod + def _process_segment(video_path: str, start: str, end: str, + speed_factor: float, index: int) -> None: + """处理单个视频片段""" + # 转换时间格式 + start_sec = VideoSpeedProcessor._time_to_seconds(start) + end_sec = VideoSpeedProcessor._time_to_seconds(end) + + # 加载并处理视频片段 + video = VideoFileClip( + video_path, + audio=True, + target_resolution=(720, None) + ).subclip(start_sec, end_sec) + + # 应用速度变化 + if speed_factor != 1.0: + video = speedx(video, factor=speed_factor) + + # 保存处理后的片段 + output_path = f"../../resource/videos/segment_{index}.mp4" + video.write_videofile( + output_path, + codec='libx264', + audio_codec='aac', + preset='ultrafast', + threads=2 # 每个进程使用的线程数 + ) + + video.close() + + @staticmethod + def _time_to_seconds(time_str: str) -> float: + """将时间字符串(MM:SS)转换为秒数""" + minutes, seconds = map(int, time_str.split(':')) + return minutes * 60 + seconds + + +def test_video_speed(): + """测试视频加速处理""" + processor = VideoSpeedProcessor( + "../../resource/videos/best.mp4", + "../../resource/videos/speed_up.mp4" + ) + + # 测试1:简单加速 + processor.process_with_optimization(speed_factor=1.5) # 1.5倍速 + + # 测试2:并行处理多个片段 + segments = [ + ("00:00", "01:00"), + ("01:00", "02:00"), + ("02:00", "03:00") + ] + processor.batch_process_segments(segments, speed_factor=2.0) # 2倍速 + + +if __name__ == "__main__": + test_video_speed()