在generate_script_docu.py中修正时间戳解析逻辑,支持新旧格式兼容,并确保分析结果以JSON格式保存到指定目录。移除冗余的日志输出,优化代码结构。

This commit is contained in:
linyq 2025-05-08 15:13:13 +08:00
parent 017f398cb1
commit e1b694824b
2 changed files with 146 additions and 10 deletions

View File

@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : 生成介绍文案
@Author : 小林同学
@Date : 2025/5/8 上午11:33
'''
import json
import os
import traceback
def parse_frame_analysis_to_markdown(json_file_path):
"""
解析视频帧分析JSON文件并转换为Markdown格式
:param json_file_path: JSON文件路径
:return: Markdown格式的字符串
"""
# 检查文件是否存在
if not os.path.exists(json_file_path):
return f"错误: 文件 {json_file_path} 不存在"
try:
# 读取JSON文件
with open(json_file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# 初始化Markdown字符串
markdown = ""
# 获取总结和帧观察数据
summaries = data.get('overall_activity_summaries', [])
frame_observations = data.get('frame_observations', [])
# 按批次组织数据
batch_frames = {}
for frame in frame_observations:
batch_index = frame.get('batch_index')
if batch_index not in batch_frames:
batch_frames[batch_index] = []
batch_frames[batch_index].append(frame)
# 生成Markdown内容
for i, summary in enumerate(summaries, 1):
batch_index = summary.get('batch_index')
time_range = summary.get('time_range', '')
batch_summary = summary.get('summary', '')
# 处理可能过长的文本行,保证格式对齐
batch_summary_lines = [batch_summary[i:i+80] for i in range(0, len(batch_summary), 80)]
markdown += f"## 片段 {i}\n"
markdown += f"- 时间范围:{time_range}\n"
# 添加片段描述,处理长文本
markdown += f"- 片段描述:{batch_summary_lines[0]}\n" if batch_summary_lines else f"- 片段描述:\n"
for line in batch_summary_lines[1:]:
markdown += f" {line}\n"
markdown += "- 详细描述:\n"
# 添加该批次的帧观察详情
frames = batch_frames.get(batch_index, [])
for frame in frames:
timestamp = frame.get('timestamp', '')
observation = frame.get('observation', '')
# 处理可能过长的观察文本并确保observation不为空
observation_lines = [observation[i:i+80] for i in range(0, len(observation), 80)] if observation else [""]
markdown += f" - {timestamp}: {observation_lines[0] if observation_lines else ''}\n"
for line in observation_lines[1:]:
markdown += f" {line}\n"
markdown += "\n"
return markdown
except Exception as e:
return f"处理JSON文件时出错: {traceback.format_exc()}"
if __name__ == '__main__':
video_frame_description_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_1139.json"
# 测试新的JSON文件
test_file_path = "/Users/apple/Desktop/home/NarratoAI/storage/temp/analysis/frame_analysis_20250508_1458.json"
markdown_output = parse_frame_analysis_to_markdown(test_file_path)
print(markdown_output)
# 输出到文件以便检查格式
output_file = "/Users/apple/Desktop/home/NarratoAI/storage/temp/narration_script.md"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(markdown_output)
print(f"\n已将Markdown输出保存到: {output_file}")

View File

@ -177,8 +177,15 @@ def generate_script_docu(params):
overall_activity_summaries = [] # 合并所有批次的整体总结
prev_batch_files = None
frame_counter = 1 # 初始化帧计数器,用于给所有帧分配连续的序号
logger.debug(json.dumps(results, indent=4, ensure_ascii=False))
# logger.debug(json.dumps(results, indent=4, ensure_ascii=False))
# 确保分析目录存在
analysis_dir = os.path.join(utils.storage_dir(), "temp", "analysis")
os.makedirs(analysis_dir, exist_ok=True)
origin_res = os.path.join(analysis_dir, "frame_analysis.json")
with open(origin_res, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
# 开始处理
for result in results:
if 'error' in result:
logger.warning(f"批次 {result['batch_index']} 处理出现警告: {result['error']}")
@ -222,8 +229,23 @@ def generate_script_docu(params):
if len(timestamp_parts) >= 3:
timestamp_str = timestamp_parts[-1].split('.')[0]
try:
timestamp_seconds = int(timestamp_str) / 1000 # 转换为秒
formatted_time = utils.format_time(timestamp_seconds) # 格式化时间戳
# 修正时间戳解析逻辑
# 格式为000100000表示00:01:00,000即1分钟
# 需要按照对应位数进行解析:
# 前两位是小时,中间两位是分钟,后面是秒和毫秒
if len(timestamp_str) >= 9: # 确保格式正确
hours = int(timestamp_str[0:2])
minutes = int(timestamp_str[2:4])
seconds = int(timestamp_str[4:6])
milliseconds = int(timestamp_str[6:9])
# 计算总秒数
timestamp_seconds = hours * 3600 + minutes * 60 + seconds + milliseconds / 1000
formatted_time = utils.format_time(timestamp_seconds) # 格式化时间戳
else:
# 兼容旧的解析方式
timestamp_seconds = int(timestamp_str) / 1000 # 转换为秒
formatted_time = utils.format_time(timestamp_seconds) # 格式化时间戳
except ValueError:
logger.warning(f"无法解析时间戳: {timestamp_str}")
timestamp_seconds = 0
@ -237,6 +259,7 @@ def generate_script_docu(params):
obs["frame_path"] = file_path
obs["timestamp"] = formatted_time
obs["timestamp_seconds"] = timestamp_seconds
obs["batch_index"] = result['batch_index']
# 使用全局递增的帧计数器替换原始的frame_number
if "frame_number" in obs:
@ -255,9 +278,28 @@ def generate_script_docu(params):
# 转换为毫秒并计算持续时间(秒)
try:
first_time_ms = int(first_time_str)
last_time_ms = int(last_time_str)
batch_duration = (last_time_ms - first_time_ms) / 1000
# 修正解析逻辑,与上面相同的方式解析时间戳
if len(first_time_str) >= 9 and len(last_time_str) >= 9:
# 解析第一个时间戳
first_hours = int(first_time_str[0:2])
first_minutes = int(first_time_str[2:4])
first_seconds = int(first_time_str[4:6])
first_ms = int(first_time_str[6:9])
first_time_seconds = first_hours * 3600 + first_minutes * 60 + first_seconds + first_ms / 1000
# 解析第二个时间戳
last_hours = int(last_time_str[0:2])
last_minutes = int(last_time_str[2:4])
last_seconds = int(last_time_str[4:6])
last_ms = int(last_time_str[6:9])
last_time_seconds = last_hours * 3600 + last_minutes * 60 + last_seconds + last_ms / 1000
batch_duration = last_time_seconds - first_time_seconds
else:
# 兼容旧的解析方式
first_time_ms = int(first_time_str)
last_time_ms = int(last_time_str)
batch_duration = (last_time_ms - first_time_ms) / 1000
except ValueError:
# 使用 utils.time_to_seconds 函数处理格式化的时间戳
first_time_seconds = utils.time_to_seconds(first_time_str.replace('_', ':').replace('-', ','))
@ -290,10 +332,6 @@ def generate_script_docu(params):
now = datetime.now()
timestamp_str = now.strftime("%Y%m%d_%H%M")
# 确保分析目录存在
analysis_dir = os.path.join(utils.storage_dir(), "temp", "analysis")
os.makedirs(analysis_dir, exist_ok=True)
# 保存完整的分析结果为JSON
analysis_filename = f"frame_analysis_{timestamp_str}.json"
analysis_json_path = os.path.join(analysis_dir, analysis_filename)