feat(test): 添加 video 处理测试脚本

- 新增 test_moviepy_merge.py,实现视频合并功能
- 新增 test_moviepy_speed.py,实现视频加速处理- 添加 VideoMerger 和 VideoSpeedProcessor 类,分别用于视频合并和速度处理
- 测试脚本展示了如何使用 moviepy 进行视频编辑
This commit is contained in:
linyq 2024-11-15 18:29:29 +08:00
parent cc44aab181
commit 3d4a58e5cf
2 changed files with 285 additions and 0 deletions

View File

@ -0,0 +1,143 @@
"""
使用 moviepy 合并视频音频字幕和背景音乐
"""
from moviepy.editor import (
VideoFileClip,
AudioFileClip,
TextClip,
CompositeVideoClip,
concatenate_videoclips
)
# from moviepy.config import change_settings
import os
# 设置字体文件路径(用于中文字幕显示)
FONT_PATH = "../../resource/fonts/STHeitiMedium.ttc" # 请确保此路径下有对应字体文件
# change_settings(
# {"IMAGEMAGICK_BINARY": r"C:\Program Files\ImageMagick-7.1.1-Q16\magick.exe"}) # Windows系统需要设置 ImageMagick 路径
class VideoMerger:
"""视频合并处理类"""
def __init__(self, output_path: str = "../../resource/videos/merged_video.mp4"):
"""
初始化视频合并器
参数:
output_path: 输出文件路径
"""
self.output_path = output_path
self.video_clips = []
self.background_music = None
self.subtitles = []
def add_video(self, video_path: str, start_time: str = None, end_time: str = None) -> None:
"""
添加视频片段
参数:
video_path: 视频文件路径
start_time: 开始时间 (格式: "MM:SS")
end_time: 结束时间 (格式: "MM:SS")
"""
video = VideoFileClip(video_path)
if start_time and end_time:
video = video.subclip(self._time_to_seconds(start_time),
self._time_to_seconds(end_time))
self.video_clips.append(video)
def add_audio(self, audio_path: str, volume: float = 1.0) -> None:
"""
添加背景音乐
参数:
audio_path: 音频文件路径
volume: 音量大小 (0.0-1.0)
"""
self.background_music = AudioFileClip(audio_path).volumex(volume)
def add_subtitle(self, text: str, start_time: str, end_time: str,
position: tuple = ('center', 'bottom'), fontsize: int = 24) -> None:
"""
添加字幕
参数:
text: 字幕文本
start_time: 开始时间 (格式: "MM:SS")
end_time: 结束时间 (格式: "MM:SS")
position: 字幕位置
fontsize: 字体大小
"""
subtitle = TextClip(
text,
font=FONT_PATH,
fontsize=fontsize,
color='white',
stroke_color='black',
stroke_width=2
)
subtitle = subtitle.set_position(position).set_duration(
self._time_to_seconds(end_time) - self._time_to_seconds(start_time)
).set_start(self._time_to_seconds(start_time))
self.subtitles.append(subtitle)
def merge(self) -> None:
"""合并所有媒体元素并导出视频"""
if not self.video_clips:
raise ValueError("至少需要添加一个视频片段")
# 合并视频片段
final_video = concatenate_videoclips(self.video_clips)
# 如果有背景音乐,设置其持续时间与视频相同
if self.background_music:
self.background_music = self.background_music.set_duration(final_video.duration)
final_video = final_video.set_audio(self.background_music)
# 添加字幕
if self.subtitles:
final_video = CompositeVideoClip([final_video] + self.subtitles)
# 导出最终视频
final_video.write_videofile(
self.output_path,
fps=24,
codec='libx264',
audio_codec='aac'
)
# 释放资源
final_video.close()
for clip in self.video_clips:
clip.close()
if self.background_music:
self.background_music.close()
@staticmethod
def _time_to_seconds(time_str: str) -> float:
"""将时间字符串转换为秒数"""
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
def test_merge_video():
"""测试视频合并功能"""
merger = VideoMerger()
# 添加两个视频片段
merger.add_video("../../resource/videos/cut_video.mp4", "00:00", "01:00")
merger.add_video("../../resource/videos/demo.mp4", "00:00", "00:30")
# 添加背景音乐
merger.add_audio("../../resource/songs/output000.mp3", volume=0.3)
# 添加字幕
merger.add_subtitle("第一个精彩片段", "00:00", "00:05")
merger.add_subtitle("第二个精彩片段", "01:00", "01:05")
# 合并并导出
merger.merge()
if __name__ == "__main__":
test_merge_video()

View File

@ -0,0 +1,142 @@
"""
使用 moviepy 优化视频处理速度的示例
包含视频加速多核处理预设参数优化等
"""
from moviepy.editor import VideoFileClip
from moviepy.video.fx.speedx import speedx
import multiprocessing as mp
import time
class VideoSpeedProcessor:
"""视频速度处理器"""
def __init__(self, input_path: str, output_path: str):
self.input_path = input_path
self.output_path = output_path
# 获取CPU核心数
self.cpu_cores = mp.cpu_count()
def process_with_optimization(self, speed_factor: float = 1.0) -> None:
"""
使用优化参数处理视频
参数:
speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速)
"""
start_time = time.time()
# 加载视频时使用优化参数
video = VideoFileClip(
self.input_path,
audio=True, # 如果不需要音频可以设为False
target_resolution=(720, None), # 可以降低分辨率加快处理
resize_algorithm='fast_bilinear' # 使用快速的重置算法
)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 使用优化参数导出视频
video.write_videofile(
self.output_path,
codec='libx264', # 使用h264编码
audio_codec='aac', # 音频编码
temp_audiofile='temp-audio.m4a', # 临时音频文件
remove_temp=True, # 处理完成后删除临时文件
write_logfile=False, # 关闭日志文件
threads=self.cpu_cores, # 使用多核处理
preset='ultrafast', # 使用最快的编码预设
ffmpeg_params=[
'-brand', 'mp42',
'-crf', '23', # 压缩率范围0-51数值越大压缩率越高
]
)
# 释放资源
video.close()
end_time = time.time()
print(f"处理完成!用时: {end_time - start_time:.2f}")
def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None:
"""
批量处理视频片段并行处理
参数:
segment_times: 列表包含多个(start, end)时间元组
speed_factor: 速度倍数
"""
start_time = time.time()
# 创建进程池
with mp.Pool(processes=self.cpu_cores) as pool:
# 准备参数
args = [(self.input_path, start, end, speed_factor, i)
for i, (start, end) in enumerate(segment_times)]
# 并行处理片段
pool.starmap(self._process_segment, args)
end_time = time.time()
print(f"批量处理完成!总用时: {end_time - start_time:.2f}")
@staticmethod
def _process_segment(video_path: str, start: str, end: str,
speed_factor: float, index: int) -> None:
"""处理单个视频片段"""
# 转换时间格式
start_sec = VideoSpeedProcessor._time_to_seconds(start)
end_sec = VideoSpeedProcessor._time_to_seconds(end)
# 加载并处理视频片段
video = VideoFileClip(
video_path,
audio=True,
target_resolution=(720, None)
).subclip(start_sec, end_sec)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 保存处理后的片段
output_path = f"../../resource/videos/segment_{index}.mp4"
video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=2 # 每个进程使用的线程数
)
video.close()
@staticmethod
def _time_to_seconds(time_str: str) -> float:
"""将时间字符串(MM:SS)转换为秒数"""
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
def test_video_speed():
"""测试视频加速处理"""
processor = VideoSpeedProcessor(
"../../resource/videos/best.mp4",
"../../resource/videos/speed_up.mp4"
)
# 测试1简单加速
processor.process_with_optimization(speed_factor=1.5) # 1.5倍速
# 测试2并行处理多个片段
segments = [
("00:00", "01:00"),
("01:00", "02:00"),
("02:00", "03:00")
]
processor.batch_process_segments(segments, speed_factor=2.0) # 2倍速
if __name__ == "__main__":
test_video_speed()