未完成 generate_video_v2 功能

This commit is contained in:
linyq 2024-09-19 18:23:54 +08:00
parent d1da23e37f
commit 2bc94651a2
4 changed files with 253 additions and 193 deletions

View File

@ -428,7 +428,7 @@ def start_subclip(task_id, params: VideoClipParams, subclip_path_videos):
combined_video_path=combined_video_path, combined_video_path=combined_video_path,
video_paths=subclip_videos, video_paths=subclip_videos,
video_ost_list=video_ost, video_ost_list=video_ost,
audio_file=audio_file, list_script=list_script,
video_aspect=params.video_aspect, video_aspect=params.video_aspect,
threads=n_threads threads=n_threads
) )

View File

@ -1,3 +1,4 @@
import re
import glob import glob
import random import random
from typing import List from typing import List
@ -216,9 +217,7 @@ def generate_video(
logger.info(f" ③ subtitle: {subtitle_path}") logger.info(f" ③ subtitle: {subtitle_path}")
logger.info(f" ④ output: {output_file}") logger.info(f" ④ output: {output_file}")
# https://github.com/harry0703/NarratoAI/issues/217 # 写入与输出文件相同的目录
# PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'final-1.mp4.tempTEMP_MPY_wvf_snd.mp3'
# write into the same directory as the output file
output_dir = os.path.dirname(output_file) output_dir = os.path.dirname(output_file)
font_path = "" font_path = ""
@ -303,6 +302,133 @@ def generate_video(
logger.success("completed") logger.success("completed")
def generate_video_v2(
video_path: str,
audio_paths: List[str],
subtitle_path: str,
output_file: str,
params: Union[VideoParams, VideoClipParams],
):
aspect = VideoAspect(params.video_aspect)
video_width, video_height = aspect.to_resolution()
logger.info(f"开始,视频尺寸: {video_width} x {video_height}")
logger.info(f" ① 视频: {video_path}")
logger.info(f" ② 音频文件数量: {len(audio_paths)}")
logger.info(f" ③ 字幕: {subtitle_path}")
logger.info(f" ④ 输出: {output_file}")
# 写入与输出文件相同的目录
output_dir = os.path.dirname(output_file)
# 字体设置部分保持不变
font_path = ""
if params.subtitle_enabled:
if not params.font_name:
params.font_name = "STHeitiMedium.ttc"
font_path = os.path.join(utils.font_dir(), params.font_name)
if os.name == "nt":
font_path = font_path.replace("\\", "/")
logger.info(f"使用字体: {font_path}")
# create_text_clip 函数保持不变
def create_text_clip(subtitle_item):
phrase = subtitle_item[1]
max_width = video_width * 0.9
wrapped_txt, txt_height = wrap_text(
phrase, max_width=max_width, font=font_path, fontsize=params.font_size
)
_clip = TextClip(
wrapped_txt,
font=font_path,
fontsize=params.font_size,
color=params.text_fore_color,
bg_color=params.text_background_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
print_cmd=False,
)
duration = subtitle_item[0][1] - subtitle_item[0][0]
_clip = _clip.set_start(subtitle_item[0][0])
_clip = _clip.set_end(subtitle_item[0][1])
_clip = _clip.set_duration(duration)
if params.subtitle_position == "bottom":
_clip = _clip.set_position(("center", video_height * 0.95 - _clip.h))
elif params.subtitle_position == "top":
_clip = _clip.set_position(("center", video_height * 0.05))
elif params.subtitle_position == "custom":
# 确保字幕完全在屏幕内
margin = 10 # 额外的边距,单位为像素
max_y = video_height - _clip.h - margin
min_y = margin
custom_y = (video_height - _clip.h) * (params.custom_position / 100)
custom_y = max(min_y, min(custom_y, max_y)) # 限制 y 值在有效范围内
_clip = _clip.set_position(("center", custom_y))
else: # center
_clip = _clip.set_position(("center", "center"))
return _clip
video_clip = VideoFileClip(video_path)
# 处理多个音频文件
audio_clips = []
for audio_path in audio_paths:
# 从文件名中提取时间信息
match = re.search(r'audio_(\d{2}-\d{2}-\d{2}-\d{2})\.mp3', os.path.basename(audio_path))
if match:
time_str = match.group(1)
start, end = time_str.split('-')[:2], time_str.split('-')[2:]
start_time = sum(int(x) * 60 ** i for i, x in enumerate(reversed(start)))
end_time = sum(int(x) * 60 ** i for i, x in enumerate(reversed(end)))
audio_clip = AudioFileClip(audio_path).volumex(params.voice_volume)
audio_clip = audio_clip.set_start(start_time).set_end(end_time)
audio_clips.append(audio_clip)
else:
logger.warning(f"无法从文件名解析时间信息: {audio_path}")
# 合并所有音频剪辑
if audio_clips:
audio_clip = CompositeAudioClip(audio_clips)
else:
logger.warning("没有有效的音频文件")
audio_clip = AudioClip(lambda t: 0, duration=video_clip.duration)
# 字幕处理部分保持不变
if subtitle_path and os.path.exists(subtitle_path):
sub = SubtitlesClip(subtitles=subtitle_path, encoding="utf-8")
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
video_clip = CompositeVideoClip([video_clip, *text_clips])
# 背景音乐处理部分保持不变
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file)
if bgm_file:
try:
bgm_clip = (
AudioFileClip(bgm_file).volumex(params.bgm_volume).audio_fadeout(3)
)
bgm_clip = afx.audio_loop(bgm_clip, duration=video_clip.duration)
audio_clip = CompositeAudioClip([audio_clip, bgm_clip])
except Exception as e:
logger.error(f"添加背景音乐失败: {str(e)}")
video_clip = video_clip.set_audio(audio_clip)
video_clip.write_videofile(
output_file,
audio_codec="aac",
temp_audiofile_path=output_dir,
threads=params.n_threads or 2,
logger=None,
fps=30,
)
video_clip.close()
del video_clip
logger.success("完成")
def preprocess_video(materials: List[MaterialInfo], clip_duration=4): def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
for material in materials: for material in materials:
if not material.url: if not material.url:
@ -352,8 +478,8 @@ def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
def combine_clip_videos(combined_video_path: str, def combine_clip_videos(combined_video_path: str,
video_paths: List[str], video_paths: List[str],
video_ost_list: List[str], video_ost_list: List[bool],
audio_file: str, list_script: list,
video_aspect: VideoAspect = VideoAspect.portrait, video_aspect: VideoAspect = VideoAspect.portrait,
threads: int = 2, threads: int = 2,
) -> str: ) -> str:
@ -369,8 +495,8 @@ def combine_clip_videos(combined_video_path: str,
Returns: Returns:
""" """
audio_clip = AudioFileClip(audio_file) from app.utils.utils import calculate_total_duration
audio_duration = audio_clip.duration audio_duration = calculate_total_duration(list_script)
logger.info(f"音频的最大持续时间: {audio_duration} s") logger.info(f"音频的最大持续时间: {audio_duration} s")
# 每个剪辑所需的持续时间 # 每个剪辑所需的持续时间
req_dur = audio_duration / len(video_paths) req_dur = audio_duration / len(video_paths)
@ -384,62 +510,52 @@ def combine_clip_videos(combined_video_path: str,
clips = [] clips = []
video_duration = 0 video_duration = 0
# 一遍又一遍地添加下载的剪辑,直到达到音频的持续时间 max_duration # 一遍又一遍地添加下载的剪辑,直到达到音频的持续时间 max_duration
while video_duration < audio_duration: # while video_duration < audio_duration:
for video_path, video_ost in zip(video_paths, video_ost_list): for video_path, video_ost in zip(video_paths, video_ost_list):
clip = VideoFileClip(video_path) clip = VideoFileClip(video_path)
if video_ost: # 通过 ost 字段判断是否播放原声
clip = clip.set_audio(audio_clip) if not video_ost:
clip = clip.without_audio()
# # 检查剪辑是否比剩余音频长
# if (audio_duration - video_duration) < clip.duration:
# clip = clip.subclip(0, (audio_duration - video_duration))
# # 仅当计算出的剪辑长度 req_dur 短于实际剪辑时,才缩短剪辑以防止静止图像
# elif req_dur < clip.duration:
# clip = clip.subclip(0, req_dur)
clip = clip.set_fps(30)
# 并非所有视频的大小都相同,因此我们需要调整它们的大小
clip_w, clip_h = clip.size
if clip_w != video_width or clip_h != video_height:
clip_ratio = clip.w / clip.h
video_ratio = video_width / video_height
if clip_ratio == video_ratio:
# 等比例缩放
clip = clip.resize((video_width, video_height))
else: else:
clip = clip.set_audio(audio_clip).without_audio() # 等比缩放视频
# 检查剪辑是否比剩余音频长 if clip_ratio > video_ratio:
if (audio_duration - video_duration) < clip.duration: # 按照目标宽度等比缩放
clip = clip.subclip(0, (audio_duration - video_duration)) scale_factor = video_width / clip_w
# 仅当计算出的剪辑长度 req_dur 短于实际剪辑时,才缩短剪辑以防止静止图像
elif req_dur < clip.duration:
clip = clip.subclip(0, req_dur)
clip = clip.set_fps(30)
# 并非所有视频的大小都相同,因此我们需要调整它们的大小
clip_w, clip_h = clip.size
if clip_w != video_width or clip_h != video_height:
clip_ratio = clip.w / clip.h
video_ratio = video_width / video_height
if clip_ratio == video_ratio:
# 等比例缩放
clip = clip.resize((video_width, video_height))
else: else:
# 等比缩放视频 # 按照目标高度等比缩放
if clip_ratio > video_ratio: scale_factor = video_height / clip_h
# 按照目标宽度等比缩放
scale_factor = video_width / clip_w
else:
# 按照目标高度等比缩放
scale_factor = video_height / clip_h
new_width = int(clip_w * scale_factor) new_width = int(clip_w * scale_factor)
new_height = int(clip_h * scale_factor) new_height = int(clip_h * scale_factor)
clip_resized = clip.resize(newsize=(new_width, new_height)) clip_resized = clip.resize(newsize=(new_width, new_height))
background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)) background = ColorClip(size=(video_width, video_height), color=(0, 0, 0))
clip = CompositeVideoClip([ clip = CompositeVideoClip([
background.set_duration(clip.duration), background.set_duration(clip.duration),
clip_resized.set_position("center") clip_resized.set_position("center")
]) ])
logger.info(f"将视频 {video_path} 大小调整为 {video_width} x {video_height}, 剪辑尺寸: {clip_w} x {clip_h}") logger.info(f"将视频 {video_path} 大小调整为 {video_width} x {video_height}, 剪辑尺寸: {clip_w} x {clip_h}")
# TODO: 片段时长过长时,需要缩短,但暂时没有好的解决方案 clips.append(clip)
# if clip.duration > 5: video_duration += clip.duration
# ctime = utils.reduce_video_time(txt=video_script)
# if clip.duration > (2 * ctime):
# clip = clip.subclip(ctime, 2*ctime)
# else:
# clip = clip.subclip(0, ctime)
# logger.info(f"视频 {video_path} 片段时长较长,将剪辑时长缩短至 {ctime} 秒")
clips.append(clip)
video_duration += clip.duration
video_clip = concatenate_videoclips(clips) video_clip = concatenate_videoclips(clips)
video_clip = video_clip.set_fps(30) video_clip = video_clip.set_fps(30)
@ -457,68 +573,78 @@ def combine_clip_videos(combined_video_path: str,
if __name__ == "__main__": if __name__ == "__main__":
from app.utils import utils combined_video_path = "../../storage/tasks/12312312/com123.mp4"
suffix = "*.mp4" video_paths = ['../../storage/cache_videos/vid-00_00-00_03.mp4',
song_dir = utils.video_dir() '../../storage/cache_videos/vid-00_03-00_07.mp4',
files = glob.glob(os.path.join(song_dir, suffix)) '../../storage/cache_videos/vid-00_12-00_17.mp4',
'../../storage/cache_videos/vid-00_26-00_31.mp4']
video_ost_list = [False, True, False, True]
list_script = [
{
"picture": "夜晚,一个小孩在树林里奔跑,后面有人拿着火把在追赶",
"timestamp": "00:00-00:03",
"narration": "夜黑风高的树林,一个小孩在拼命奔跑,后面的人穷追不舍!",
"OST": False
},
{
"picture": "追赶的人命令抓住小孩",
"timestamp": "00:03-00:07",
"narration": "原声播放1",
"OST": True
},
{
"picture": "小孩躲在草丛里,黑衣人用脚踢了踢他",
"timestamp": "00:12-00:17",
"narration": "小孩脱下外套,跑进树林, 一路奔跑,直到第二天清晨",
"OST": False
},
{
"picture": "小孩跑到车前,慌慌张张地对女人说有人要杀他",
"timestamp": "00:26-00:31",
"narration": "原声播放2",
"OST": True
}
]
# combine_clip_videos(combined_video_path=combined_video_path, video_paths=video_paths, video_ost_list=video_ost_list, list_script=list_script)
print(files) cfg = VideoClipParams()
cfg.video_aspect = VideoAspect.portrait
cfg.font_name = "STHeitiMedium.ttc"
cfg.font_size = 60
cfg.stroke_color = "#000000"
cfg.stroke_width = 1.5
cfg.text_fore_color = "#FFFFFF"
cfg.text_background_color = "transparent"
cfg.bgm_type = "random"
cfg.bgm_file = ""
cfg.bgm_volume = 1.0
cfg.subtitle_enabled = True
cfg.subtitle_position = "bottom"
cfg.n_threads = 2
cfg.paragraph_number = 1
# m = MaterialInfo() cfg.voice_volume = 1.0
# m.url = "/Users/harry/Downloads/IMG_2915.JPG"
# m.provider = "local"
# materials = preprocess_video([m], clip_duration=4)
# print(materials)
# txt_en = "Here's your guide to travel hacks for budget-friendly adventures"
# txt_zh = "测试长字段这是您的旅行技巧指南帮助您进行预算友好的冒险"
# font = utils.resource_dir() + "/fonts/STHeitiMedium.ttc"
# for txt in [txt_en, txt_zh]:
# t, h = wrap_text(text=txt, max_width=1000, font=font, fontsize=60)
# print(t)
#
# task_id = "aa563149-a7ea-49c2-b39f-8c32cc225baf"
# task_dir = utils.task_dir(task_id)
# video_file = f"{task_dir}/combined-1.mp4"
# audio_file = f"{task_dir}/audio.mp3"
# subtitle_file = f"{task_dir}/subtitle.srt"
# output_file = f"{task_dir}/final.mp4"
#
# # video_paths = []
# # for file in os.listdir(utils.storage_dir("test")):
# # if file.endswith(".mp4"):
# # video_paths.append(os.path.join(utils.storage_dir("test"), file))
# #
# # combine_videos(combined_video_path=video_file,
# # audio_file=audio_file,
# # video_paths=video_paths,
# # video_aspect=VideoAspect.portrait,
# # video_concat_mode=VideoConcatMode.random,
# # max_clip_duration=5,
# # threads=2)
#
# cfg = VideoParams()
# cfg.video_aspect = VideoAspect.portrait
# cfg.font_name = "STHeitiMedium.ttc"
# cfg.font_size = 60
# cfg.stroke_color = "#000000"
# cfg.stroke_width = 1.5
# cfg.text_fore_color = "#FFFFFF"
# cfg.text_background_color = "transparent"
# cfg.bgm_type = "random"
# cfg.bgm_file = ""
# cfg.bgm_volume = 1.0
# cfg.subtitle_enabled = True
# cfg.subtitle_position = "bottom"
# cfg.n_threads = 2
# cfg.paragraph_number = 1
#
# cfg.voice_volume = 1.0
#
# generate_video(video_path=video_file, # generate_video(video_path=video_file,
# audio_path=audio_file, # audio_path=audio_file,
# subtitle_path=subtitle_file, # subtitle_path=subtitle_file,
# output_file=output_file, # output_file=output_file,
# params=cfg # params=cfg
# ) # )
video_path = "../../storage/tasks/12312312/com123.mp4"
audio_paths = ['../../storage/tasks/12312312/audio_00-00-00-03.mp3',
'../../storage/tasks/12312312/audio_00-12-00-17.mp3']
subtitle_path = "../../storage/tasks/12312312/subtitle_multiple.srt"
output_file = "../../storage/tasks/12312312/out123.mp4"
generate_video_v2(video_path=video_path,
audio_paths=audio_paths,
subtitle_path=subtitle_path,
output_file=output_file,
params=cfg
)

View File

@ -1354,7 +1354,7 @@ if __name__ == "__main__":
voice_name = parse_voice_name(voice_name) voice_name = parse_voice_name(voice_name)
print(voice_name) print(voice_name)
with open("../../resource/scripts/2024-0913-040147.json", 'r', encoding='utf-8') as f: with open("../../resource/scripts/test.json", 'r', encoding='utf-8') as f:
data = json.load(f) data = json.load(f)
audio_files, sub_maker_list = tts_multiple(task_id="12312312", list_script=data, voice_name=voice_name, voice_rate=1) audio_files, sub_maker_list = tts_multiple(task_id="12312312", list_script=data, voice_name=voice_name, voice_rate=1)

View File

@ -7,6 +7,7 @@ from loguru import logger
import json import json
from uuid import uuid4 from uuid import uuid4
import urllib3 import urllib3
from datetime import datetime
from app.models import const from app.models import const
@ -313,82 +314,15 @@ def seconds_to_time(seconds: float) -> str:
return f"{int(h):02d}:{int(m):02d}:{s:06.3f}" return f"{int(h):02d}:{int(m):02d}:{s:06.3f}"
def load_locales(i18n_dir): def calculate_total_duration(scenes):
_locales = {} total_seconds = 0
for root, dirs, files in os.walk(i18n_dir):
for file in files: for scene in scenes:
if file.endswith(".json"): start, end = scene['timestamp'].split('-')
lang = file.split(".")[0] start_time = datetime.strptime(start, '%M:%S')
with open(os.path.join(root, file), "r", encoding="utf-8") as f: end_time = datetime.strptime(end, '%M:%S')
_locales[lang] = json.loads(f.read())
return _locales duration = end_time - start_time
total_seconds += duration.total_seconds()
def parse_extension(filename): return total_seconds
return os.path.splitext(filename)[1].strip().lower().replace(".", "")
def script_dir(sub_dir: str = ""):
d = resource_dir(f"scripts")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def video_dir(sub_dir: str = ""):
d = resource_dir(f"videos")
if sub_dir:
d = os.path.join(d, sub_dir)
if not os.path.exists(d):
os.makedirs(d)
return d
def split_timestamp(timestamp):
"""
拆分时间戳
"""
start, end = timestamp.split('-')
start_hour, start_minute = map(int, start.split(':'))
end_hour, end_minute = map(int, end.split(':'))
start_time = '00:{:02d}:{:02d}'.format(start_hour, start_minute)
end_time = '00:{:02d}:{:02d}'.format(end_hour, end_minute)
return start_time, end_time
def reduce_video_time(txt: str, duration: float = 0.21531):
"""
按照字数缩减视频时长一个字耗时约 0.21531 s,
Returns:
"""
# 返回结果四舍五入为整数
duration = len(txt) * duration
return int(duration)
def get_current_country():
"""
判断当前网络IP地址所在的国家
"""
try:
# 使用ipapi.co的免费API获取IP地址信息
response = requests.get('https://ipapi.co/json/')
data = response.json()
# 获取国家名称
country = data.get('country_name')
if country:
logger.debug(f"当前网络IP地址位于{country}")
return country
else:
logger.debug("无法确定当前网络IP地址所在的国家")
return None
except requests.RequestException:
logger.error("获取IP地址信息时发生错误请检查网络连接")
return None