剪辑逻辑进度70%;

待优化点:
1. 生成字幕逻辑优化
2. 优化脚本-解说质量
3. 修复字幕bug
This commit is contained in:
linyq 2024-09-29 00:02:40 +08:00
parent 7b3014ad42
commit 02589c8355
7 changed files with 154 additions and 205 deletions

View File

@ -34,19 +34,6 @@ def merge_audio_files(task_id: str, audio_file_paths: List[str], total_duration:
# 创建一个总时长为total_duration的空白音频
blank_audio = AudioSegment.silent(duration=total_duration * 1000) # pydub使用毫秒
# 创建SubMaker对象
sub_maker = edge_tts.SubMaker()
# 解析JSON格式的video_script
script_data = video_script
for segment in script_data:
start_time, end_time = parse_timestamp(segment['new_timestamp'])
duration = (end_time - start_time) * 1000 # 转换为毫秒
if not segment['OST']:
# 如果不是原声则添加narration作为字幕
sub_maker.create_sub((start_time * 1000, duration), segment['narration'])
for audio_path in audio_file_paths:
if not os.path.exists(audio_path):
@ -82,12 +69,12 @@ def merge_audio_files(task_id: str, audio_file_paths: List[str], total_duration:
logger.error(f"导出音频失败:{str(e)}")
return None, None
return output_file, sub_maker
return output_file
def parse_timestamp(timestamp: str) -> tuple:
def parse_timestamp(timestamp: str):
"""解析时间戳字符串为秒数"""
start, end = timestamp.split('-')
return time_to_seconds(*start.split(':')), time_to_seconds(*end.split(':'))
# start, end = timestamp.split('-')
return time_to_seconds(timestamp)
def extract_timestamp(filename):
"""从文件名中提取开始和结束时间戳"""
@ -95,30 +82,31 @@ def extract_timestamp(filename):
times = time_part.split('-')
# 将时间戳转换为秒
start_seconds = time_to_seconds(times[0], times[1])
end_seconds = time_to_seconds(times[2], times[3])
start_seconds = time_to_seconds(times[0])
end_seconds = time_to_seconds(times[1])
return start_seconds, end_seconds
def time_to_seconds(minutes, seconds):
"""将分钟和秒转换为总秒数"""
return int(minutes) * 60 + int(seconds)
def time_to_seconds(times):
"""将 “00:06” 转换为总秒数 """
times = times.split(':')
return int(times[0]) * 60 + int(times[1])
if __name__ == "__main__":
# 示例用法
audio_files =[
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00-06-00-24.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00-32-00-38.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00-43-00-52.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00-52-01-09.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_01-13-01-15.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:06-00:24.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:32-00:38.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:43-00:52.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_00:52-01:09.mp3",
"/Users/apple/Desktop/home/NarratoAI/storage/tasks/test456/audio_01:13-01:15.mp3",
]
total_duration = 38
video_script_path = "/Users/apple/Desktop/home/NarratoAI/resource/scripts/test003.json"
with open(video_script_path, "r", encoding="utf-8") as f:
video_script = json.load(f)
output_file, sub_maker = merge_audio_files("test456", audio_files, total_duration, video_script)
print(output_file, sub_maker)
output_file = merge_audio_files("test456", audio_files, total_duration, video_script)
print(output_file)

View File

@ -792,14 +792,67 @@ def screen_matching(huamian: str, wenan: str, llm_provider: str):
Return: list[script]
- picture: 字段表示当前画面描述与转录脚本保持一致
- timestamp: 字段表示某一段文案对应的画面的时间戳不必和转录脚本的时间戳一致应该充分考虑文案内容匹配出与其描述最匹配的时间戳
- 请注意请严格的执行已经出现的画面不能重复出现即生成的脚本中 timestamp 不能有重叠的部分
- narration: 字段表示需要解说文案每段解说文案尽量不要超过30字
- OST: 字段表示是否开启原声即当 OST 字段为 true narration 字段为空字符串 OST false narration 字段为对应的解说文案
- 注意在画面匹配的过程中需要适当的加入原声播放使得解说和画面更加匹配请按照 1:1 的比例生成原声和解说的脚本内容
- 注意在时间戳匹配上一定不能原样照搬转录脚本应当适当的合并或者删减一些片段
- 注意第一个画面一定是原声播放并且时长不少于 20 s为了吸引观众第一段一定是整个转录脚本中最精彩的片段
- 注意匹配的画面不能重复出现即生成的脚本中 timestamp 不能重复
- 请以严格的 JSON 格式返回数据不要包含任何注释标记或其他字符数据应符合 JSON 语法可以被 json.loads() 函数直接解析 不要添加 ```json 或其他标记
""" % (huamian, wenan)
prompt = """
你是一位拥有10年丰富经验的影视解说创作专家你的任务是根据提供的视频转录脚本和解说文案创作一个引人入胜的解说脚本请按照以下要求完成任务
1. 输入数据
- 视频转录脚本包含时间戳画面描述和人物台词
- 解说文案需要你进行匹配和编排的内容
- 视频转录脚本和文案 XML 标记<PICTURE></PICTURE> <COPYWRITER></COPYWRITER>分隔如下所示
视频转录脚本
<PICTURE>
%s
</PICTURE>
文案
<COPYWRITER>
%s
</COPYWRITER>
2. 输出要求
- 格式严格的JSON格式可直接被json.loads()解析
- 结构list[script]其中script为字典类型
- script字段
{
"picture": "画面描述",
"timestamp": "时间戳",
"narration": "解说文案",
"OST": true/false
}
3. 匹配规则
a) 时间戳匹配
- 根据文案内容选择最合适的画面时间段
- 避免时间重叠确保画面不重复出现
- 适当合并或删减片段不要完全照搬转录脚本
b) 画面描述与转录脚本保持一致
c) 解说文案
- 当OST为true时narration为空字符串
- 当OST为false时narration为解说文案但是要确保文案字数不要超过 30若文案较长则添加到下一个片段
d) OST原声
- 按1:1比例穿插原声和解说片段
- 第一个片段必须是原声时长不少于20秒
- 选择整个视频中最精彩的片段作为开场
4. 创作重点
- 确保解说与画面高度匹配
- 巧妙安排原声和解说的交替提升观众体验
- 创造一个引人入胜节奏紧凑的解说脚本
5. 注意事项
- 严格遵守JSON格式不包含任何注释或额外标记
- 充分利用你的专业经验创作出高质量吸引人的解说内容
请基于以上要求将提供的视频转录脚本和解说文案整合成一个专业吸引人的解说脚本你的创作将直接影响观众的观看体验请发挥你的专业素养创作出最佳效果
""" % (huamian, wenan)
try:
response = _generate_response(prompt, llm_provider)
logger.success("匹配成功")
@ -830,5 +883,3 @@ if __name__ == "__main__":
res = clean_model_output(res)
aaa = json.loads(res)
print(json.dumps(aaa, indent=2, ensure_ascii=False))
# response = _generate_response("你好,介绍一下你自己")
# print(response)

View File

@ -355,7 +355,8 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos):
logger.debug(f"解说时间戳列表: \n{time_list}")
# 获取视频总时长(单位 s)
total_duration = list_script[-1]['new_timestamp']
total_duration = int(total_duration.split("-")[1].split(":")[0]) * 60 + int(total_duration.split("-")[1].split(":")[1])
total_duration = int(total_duration.split("-")[1].split(":")[0]) * 60 + int(
total_duration.split("-")[1].split(":")[1])
except Exception as e:
logger.error(f"无法读取视频json脚本请检查配置是否正确。{e}")
raise ValueError("无法读取视频json脚本请检查配置是否正确")
@ -375,11 +376,9 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos):
logger.error(
"音频文件为空可能是网络不可用。如果您在中国请使用VPN。或者手动选择 zh-CN-Yunjian-男性 音频")
return
logger.info("合并音频")
audio_file, sub_maker = audio_merger.merge_audio_files(task_id, audio_files, total_duration, list_script)
logger.info(f"合并音频:\n\n {audio_files}")
audio_file = audio_merger.merge_audio_files(task_id, audio_files, total_duration, list_script)
# audio_duration = voice.get_audio_duration(sub_maker)
# audio_duration = math.ceil(audio_duration)
sm.state.update_task(task_id, state=const.TASK_STATE_PROCESSING, progress=30)
subtitle_path = ""
@ -389,7 +388,7 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos):
logger.info(f"\n\n## 3. 生成字幕、提供程序是: {subtitle_provider}")
subtitle_fallback = False
if subtitle_provider == "edge":
voice.create_subtitle(text=video_script, sub_maker=sub_maker, subtitle_file=subtitle_path)
voice.create_subtitle(text=video_script, sub_maker="sub_maker", subtitle_file=subtitle_path)
# voice.create_subtitle(
# text=video_script,
# sub_maker_list=sub_maker_list,
@ -415,10 +414,6 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos):
logger.info("\n\n## 4. 裁剪视频")
subclip_videos = [x for x in subclip_path_videos.values()]
# subclip_videos = material.clip_videos(task_id=task_id,
# timestamp_terms=time_list,
# origin_video=params.video_origin_path
# )
logger.debug(f"\n\n## 裁剪后的视频文件列表: \n{subclip_videos}")
if not subclip_videos:
@ -433,17 +428,10 @@ def start_subclip(task_id: str, params: VideoClipParams, subclip_path_videos):
combined_video_paths = []
_progress = 50
# for i in range(params.video_count):
index = 1
combined_video_path = path.join(utils.task_dir(task_id), f"combined.mp4")
logger.info(f"\n\n## 5. 合并视频: => {combined_video_path}")
print("111", subclip_videos)
print("222", video_ost)
print("333", len(subclip_videos))
print("444", len(video_ost))
# for video_path, video_ost in zip(subclip_videos, video_ost):
# print(video_path)
# print(video_ost)
video.combine_clip_videos(
combined_video_path=combined_video_path,
video_paths=subclip_videos,
@ -502,18 +490,18 @@ if __name__ == "__main__":
# start_subclip(task_id, params, subclip_path_videos=subclip_path_videos)
task_id = "test456"
subclip_path_videos = {'00:00-00:06': './storage/cache_videos/vid-00_00-00_06.mp4',
'00:06-00:24': './storage/cache_videos/vid-00_06-00_24.mp4',
'01:28-01:36': './storage/cache_videos/vid-01_28-01_36.mp4',
'00:41-00:47': './storage/cache_videos/vid-00_41-00_47.mp4',
'01:58-02:03': './storage/cache_videos/vid-01_58-02_03.mp4',
'02:03-02:12': './storage/cache_videos/vid-02_03-02_12.mp4',
'02:40-02:57': './storage/cache_videos/vid-02_40-02_57.mp4',
subclip_path_videos = {'01:10-01:17': './storage/cache_videos/vid-01_10-01_17.mp4',
'01:58-02:04': './storage/cache_videos/vid-01_58-02_04.mp4',
'02:25-02:31': './storage/cache_videos/vid-02_25-02_31.mp4',
'01:28-01:33': './storage/cache_videos/vid-01_28-01_33.mp4',
'03:14-03:18': './storage/cache_videos/vid-03_14-03_18.mp4',
'03:18-03:20': './storage/cache_videos/vid-03_18-03_20.mp4'}
'00:24-00:28': './storage/cache_videos/vid-00_24-00_28.mp4',
'03:02-03:08': './storage/cache_videos/vid-03_02-03_08.mp4',
'00:41-00:44': './storage/cache_videos/vid-00_41-00_44.mp4',
'02:12-02:25': './storage/cache_videos/vid-02_12-02_25.mp4'}
params = VideoClipParams(
video_clip_json_path="/Users/apple/Desktop/home/NarratoAI/resource/scripts/test003.json",
video_clip_json_path="/Users/apple/Desktop/home/NarratoAI/resource/scripts/test004.json",
video_origin_path="/Users/apple/Desktop/home/NarratoAI/resource/videos/1.mp4",
)
start_subclip(task_id, params, subclip_path_videos=subclip_path_videos)

View File

@ -1410,7 +1410,8 @@ def tts_multiple(task_id: str, list_script: list, voice_name: str, voice_rate: f
for item in list_script:
if not item['OST']:
timestamp = item['new_timestamp'].replace(':', '-')
# timestamp = item['new_timestamp'].replace(':', '@')
timestamp = item['new_timestamp']
audio_file = os.path.join(output_dir, f"audio_{timestamp}.mp3")
# 检查文件是否已存在,如存在且不强制重新生成,则跳过

View File

@ -1,37 +1,46 @@
import json
from loguru import logger
import os
from datetime import datetime, timedelta
import re
from datetime import timedelta
def time_to_seconds(time_str):
time_obj = datetime.strptime(time_str, "%M:%S")
return timedelta(minutes=time_obj.minute, seconds=time_obj.second).total_seconds()
parts = list(map(int, time_str.split(':')))
if len(parts) == 2:
return timedelta(minutes=parts[0], seconds=parts[1]).total_seconds()
elif len(parts) == 3:
return timedelta(hours=parts[0], minutes=parts[1], seconds=parts[2]).total_seconds()
raise ValueError(f"无法解析时间字符串: {time_str}")
def seconds_to_time_str(seconds):
minutes, seconds = divmod(int(seconds), 60)
return f"{minutes:02d}:{seconds:02d}"
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
else:
return f"{minutes:02d}:{seconds:02d}"
def adjust_timestamp(start_time, duration):
start_seconds = time_to_seconds(start_time)
end_seconds = start_seconds + duration
return f"{start_time}-{seconds_to_time_str(end_seconds)}"
def check_script(file_path, total_duration):
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
def estimate_audio_duration(text):
# 假设平均每个字符需要 0.2 秒
return len(text) * 0.2
def check_script(data, total_duration):
errors = []
ost_narrations = set()
last_end_time = 0
time_ranges = []
logger.info(f"开始检查文件: {file_path}")
logger.info("开始检查脚本")
logger.info(f"视频总时长: {total_duration:.2f}")
logger.info("=" * 50)
for i, item in enumerate(data, 1):
logger.info(f"\n检查第 {i} 项:")
# 检查所有必需字段是否存在
required_fields = ['picture', 'timestamp', 'narration', 'OST', 'new_timestamp']
# 检查所有必需字段
required_fields = ['picture', 'timestamp', 'narration', 'OST']
for field in required_fields:
if field not in item:
errors.append(f"{i} 项缺少 {field} 字段")
@ -39,160 +48,68 @@ def check_script(file_path, total_duration):
else:
logger.info(f" - {field}: {item[field]}")
# 检查 OST 为 false 的情况
# 检查 OST 相关规则
if item.get('OST') == False:
if not item.get('narration'):
errors.append(f"{i} 项 OST 为 false但 narration 为空")
logger.info(" - 错误: OST 为 false但 narration 为空")
elif len(item['narration']) > 30:
errors.append(f"{i} 项 OST 为 false但 narration 超过 30 字")
logger.info(f" - 错误: OST 为 false但 narration 超过 30 字 (当前: {len(item['narration'])} 字)")
elif len(item['narration']) > 60:
errors.append(f"{i} 项 OST 为 false但 narration 超过 60 字")
logger.info(f" - 错误: OST 为 false但 narration 超过 60 字 (当前: {len(item['narration'])} 字)")
else:
logger.info(" - OST 为 falsenarration 检查通过")
# 检查 OST 为 true 的情况
if item.get('OST') == True:
if not item.get('narration').startswith('原声播放_'):
errors.append(f"{i} 项 OST 为 true但 narration 不是 '原声播放_xxx' 格式")
logger.info(" - 错误: OST 为 true但 narration 不是 '原声播放_xxx' 格式")
elif item['narration'] in ost_narrations:
errors.append(f"{i} 项 OST 为 true但 narration '{item['narration']}' 不是唯一值")
logger.info(f" - 错误: OST 为 true但 narration '{item['narration']}' 不是唯一值")
elif item.get('OST') == True:
if "原声播放_" not in item.get('narration'):
errors.append(f"{i} 项 OST 为 true但 narration 不为空")
logger.info(" - 错误: OST 为 true但 narration 不为空")
else:
logger.info(" - OST 为 truenarration 检查通过")
ost_narrations.add(item['narration'])
# 检查 timestamp 是否重叠
# 检查 timestamp
if 'timestamp' in item:
start, end = map(time_to_seconds, item['timestamp'].split('-'))
if start < last_end_time:
errors.append(f"{i} 项 timestamp '{item['timestamp']}'前一项重叠")
logger.info(f" - 错误: timestamp '{item['timestamp']}'前一项重叠")
if any((start < existing_end and end > existing_start) for existing_start, existing_end in time_ranges):
errors.append(f"{i} 项 timestamp '{item['timestamp']}'其他时间段重叠")
logger.info(f" - 错误: timestamp '{item['timestamp']}'其他时间段重叠")
else:
logger.info(f" - timestamp '{item['timestamp']}' 检查通过")
last_end_time = end
time_ranges.append((start, end))
# 检查 timestamp 是否超过总时长
if end > total_duration:
errors.append(f"{i} 项 timestamp '{item['timestamp']}' 超过总时长 {total_duration:.2f}")
logger.info(f" - 错误: timestamp '{item['timestamp']}' 超过总时长 {total_duration:.2f}")
else:
logger.info(f" - timestamp 在总时长范围内")
# if end > total_duration:
# errors.append(f"第 {i} 项 timestamp '{item['timestamp']}' 超过总时长 {total_duration:.2f} 秒")
# logger.info(f" - 错误: timestamp '{item['timestamp']}' 超过总时长 {total_duration:.2f} 秒")
# else:
# logger.info(f" - timestamp 在总时长范围内")
# 检查 new_timestamp 是否连续
logger.info("\n检查 new_timestamp 连续性:")
last_end_time = 0
for i, item in enumerate(data, 1):
if 'new_timestamp' in item:
start, end = map(time_to_seconds, item['new_timestamp'].split('-'))
if start != last_end_time:
errors.append(f"{i} 项 new_timestamp '{item['new_timestamp']}' 与前一项不连续")
logger.info(f" - 错误: 第 {i} 项 new_timestamp '{item['new_timestamp']}' 与前一项不连续")
else:
logger.info(f" - 第 {i} 项 new_timestamp '{item['new_timestamp']}' 连续性检查通过")
last_end_time = end
# 处理 narration 字段
if item.get('OST') == False and item.get('narration'):
estimated_duration = estimate_audio_duration(item['narration'])
start_time = item['timestamp'].split('-')[0]
item['timestamp'] = adjust_timestamp(start_time, estimated_duration)
logger.info(f" - 已调整 timestamp 为 {item['timestamp']} (估算音频时长: {estimated_duration:.2f} 秒)")
if errors:
logger.info("检查结果:不通过")
logger.info("发现以下错误:")
for error in errors:
logger.info(f"- {error}")
fix_script(file_path, data, errors)
else:
logger.info("检查结果:通过")
logger.info("所有项目均符合规则要求。")
def fix_script(file_path, data, errors):
logger.info("\n开始修复脚本...")
fixed_data = []
for i, item in enumerate(data, 1):
if item['OST'] == False and (not item['narration'] or len(item['narration']) > 30):
if not item['narration']:
logger.info(f"{i} 项 narration 为空,需要人工参与修复。")
fixed_data.append(item)
else:
logger.info(f"修复第 {i} 项 narration 超过 30 字的问题...")
fixed_items = split_narration(item)
fixed_data.extend(fixed_items)
else:
fixed_data.append(item)
for error in errors:
if not error.startswith("") or "OST 为 false" not in error:
logger.info(f"需要人工参与修复: {error}")
# 生成新的文件名
file_name, file_ext = os.path.splitext(file_path)
new_file_path = f"{file_name}_revise{file_ext}"
# 保存修复后的数据到新文件
with open(new_file_path, 'w', encoding='utf-8') as f:
json.dump(fixed_data, f, ensure_ascii=False, indent=4)
logger.info(f"\n脚本修复完成,已保存到新文件: {new_file_path}")
def split_narration(item):
narration = item['narration']
chunks = smart_split(narration)
start_time, end_time = map(time_to_seconds, item['timestamp'].split('-'))
new_start_time, new_end_time = map(time_to_seconds, item['new_timestamp'].split('-'))
total_duration = end_time - start_time
new_total_duration = new_end_time - new_start_time
chunk_duration = total_duration / len(chunks)
new_chunk_duration = new_total_duration / len(chunks)
fixed_items = []
for i, chunk in enumerate(chunks):
new_item = item.copy()
new_item['narration'] = chunk
chunk_start = start_time + i * chunk_duration
chunk_end = chunk_start + chunk_duration
new_item['timestamp'] = f"{seconds_to_time_str(chunk_start)}-{seconds_to_time_str(chunk_end)}"
new_chunk_start = new_start_time + i * new_chunk_duration
new_chunk_end = new_chunk_start + new_chunk_duration
new_item['new_timestamp'] = f"{seconds_to_time_str(new_chunk_start)}-{seconds_to_time_str(new_chunk_end)}"
fixed_items.append(new_item)
return fixed_items
def smart_split(text, target_length=30):
# 使用正则表达式分割文本,保留标点符号
segments = re.findall(r'[^,。!?,!?]+[,。!?,!?]?', text)
result = []
current_chunk = ""
for segment in segments:
if len(current_chunk) + len(segment) <= target_length:
current_chunk += segment
else:
if current_chunk:
result.append(current_chunk.strip())
current_chunk = segment
if current_chunk:
result.append(current_chunk.strip())
# 如果有任何chunk超过了目标长度进行进一步的分割
final_result = []
for chunk in result:
if len(chunk) > target_length:
sub_chunks = [chunk[i:i + target_length] for i in range(0, len(chunk), target_length)]
final_result.extend(sub_chunks)
else:
final_result.append(chunk)
return final_result
return errors, data
if __name__ == "__main__":
file_path = "/Users/apple/Desktop/home/NarratoAI/resource/scripts/2024-0923-085036.json"
file_path = "/Users/apple/Desktop/home/NarratoAI/resource/scripts/test004.json"
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
total_duration = 280
check_script(file_path, total_duration)
# check_script(data, total_duration)
from app.utils.utils import add_new_timestamps
res = add_new_timestamps(data)
print(json.dumps(res, indent=4, ensure_ascii=False))

View File

@ -10,6 +10,7 @@ import urllib3
from datetime import datetime, timedelta
from app.models import const
from app.utils import check_script
urllib3.disable_warnings()
@ -340,6 +341,9 @@ def add_new_timestamps(scenes):
current_time = timedelta()
updated_scenes = []
# 保存脚本前先检查脚本是否正确
check_script.check_script(scenes, calculate_total_duration(scenes))
for scene in scenes:
new_scene = scene.copy() # 创建场景的副本,以保留原始数据
start, end = new_scene['timestamp'].split('-')

View File

@ -1,7 +1,7 @@
#!/bin/bash
# 从环境变量中加载VPN代理的配置URL
vpn_proxy_url="$VPN_PROXY_URL"
vpn_proxy_url="http://127.0.0.1:7890"
# 检查是否成功加载
if [ -z "$vpn_proxy_url" ]; then
echo "VPN代理配置URL未设置请检查环境变量VPN_PROXY_URL"