mirror of
https://github.com/linyqh/NarratoAI.git
synced 2025-12-13 04:02:49 +00:00
feat(utils): 优化时间戳处理并支持毫秒级精度
- 重构了时间戳转换函数,支持 HH:MM:SS,mmm 格式 - 优化了时间戳计算逻辑,提高了精度- 更新了相关服务和工具类中的时间戳处理方法
This commit is contained in:
parent
86d398d8fd
commit
73729dcb7b
@ -35,7 +35,7 @@ class ScriptGenerator:
|
||||
video_theme: 视频主题
|
||||
custom_prompt: 自定义提示词
|
||||
skip_seconds: 跳过开始的秒数
|
||||
threshold: 差异阈值
|
||||
threshold: 差异<EFBFBD><EFBFBD><EFBFBD>值
|
||||
vision_batch_size: 视觉处理批次大小
|
||||
vision_llm_provider: 视觉模型提供商
|
||||
progress_callback: 进度回调函数
|
||||
@ -177,7 +177,7 @@ class ScriptGenerator:
|
||||
batch_files = self._get_batch_files(keyframe_files, result, vision_batch_size)
|
||||
first_timestamp, last_timestamp, _ = self._get_batch_timestamps(batch_files, prev_batch_files)
|
||||
|
||||
# 添加带时间戳的分析结果
|
||||
# 添加带时间戳的分<EFBFBD><EFBFBD>结果
|
||||
frame_analysis += f"\n=== {first_timestamp}-{last_timestamp} ===\n"
|
||||
frame_analysis += result['response']
|
||||
frame_analysis += "\n"
|
||||
@ -214,7 +214,7 @@ class ScriptGenerator:
|
||||
|
||||
progress_callback(90, "正在生成文案...")
|
||||
|
||||
# 获取文本生成配置
|
||||
# 获取文本生<EFBFBD><EFBFBD>配置
|
||||
text_provider = config.app.get('text_llm_provider', 'gemini').lower()
|
||||
text_api_key = config.app.get(f'text_{text_provider}_api_key')
|
||||
text_model = config.app.get(f'text_{text_provider}_model_name')
|
||||
@ -286,7 +286,7 @@ class ScriptGenerator:
|
||||
task_data = response.json()
|
||||
task_id = task_data["data"].get('task_id')
|
||||
if not task_id:
|
||||
raise Exception(f"无效的API响应: {response.text}")
|
||||
raise Exception(f"无效的API<EFBFBD><EFBFBD>应: {response.text}")
|
||||
|
||||
progress_callback(50, "正在等待分析结果...")
|
||||
retry_count = 0
|
||||
@ -342,10 +342,10 @@ class ScriptGenerator:
|
||||
batch_files: List[str],
|
||||
prev_batch_files: List[str] = None
|
||||
) -> tuple[str, str, str]:
|
||||
"""获取一批文件的时间戳范围"""
|
||||
"""获取一批文件的时间戳范围,支持毫秒级精度"""
|
||||
if not batch_files:
|
||||
logger.warning("Empty batch files")
|
||||
return "00:00", "00:00", "00:00-00:00"
|
||||
return "00:00:00,000", "00:00:00,000", "00:00:00,000-00:00:00,000"
|
||||
|
||||
if len(batch_files) == 1 and prev_batch_files and len(prev_batch_files) > 0:
|
||||
first_frame = os.path.basename(prev_batch_files[-1])
|
||||
@ -358,18 +358,45 @@ class ScriptGenerator:
|
||||
last_time = last_frame.split('_')[2].replace('.jpg', '')
|
||||
|
||||
def format_timestamp(time_str: str) -> str:
|
||||
if len(time_str) < 4:
|
||||
logger.warning(f"Invalid timestamp format: {time_str}")
|
||||
return "00:00"
|
||||
"""将时间字符串转换为 HH:MM:SS,mmm 格式"""
|
||||
try:
|
||||
if len(time_str) < 4:
|
||||
logger.warning(f"Invalid timestamp format: {time_str}")
|
||||
return "00:00:00,000"
|
||||
|
||||
minutes = int(time_str[-4:-2])
|
||||
seconds = int(time_str[-2:])
|
||||
|
||||
if seconds >= 60:
|
||||
minutes += seconds // 60
|
||||
seconds = seconds % 60
|
||||
# 处理毫秒部分
|
||||
if ',' in time_str:
|
||||
time_part, ms_part = time_str.split(',')
|
||||
ms = int(ms_part)
|
||||
else:
|
||||
time_part = time_str
|
||||
ms = 0
|
||||
|
||||
return f"{minutes:02d}:{seconds:02d}"
|
||||
# 处理时分秒
|
||||
parts = time_part.split(':')
|
||||
if len(parts) == 3: # HH:MM:SS
|
||||
h, m, s = map(int, parts)
|
||||
elif len(parts) == 2: # MM:SS
|
||||
h = 0
|
||||
m, s = map(int, parts)
|
||||
else: # SS
|
||||
h = 0
|
||||
m = 0
|
||||
s = int(parts[0])
|
||||
|
||||
# 处理进位
|
||||
if s >= 60:
|
||||
m += s // 60
|
||||
s = s % 60
|
||||
if m >= 60:
|
||||
h += m // 60
|
||||
m = m % 60
|
||||
|
||||
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"时间戳格式转换错误 {time_str}: {str(e)}")
|
||||
return "00:00:00,000"
|
||||
|
||||
first_timestamp = format_timestamp(first_time)
|
||||
last_timestamp = format_timestamp(last_time)
|
||||
|
||||
@ -406,22 +406,47 @@ class ScriptProcessor:
|
||||
def _save_results(self, frame_content_list: List[Dict]):
|
||||
"""保存处理结果,并添加新的时间戳"""
|
||||
try:
|
||||
# 转换秒数为 MM:SS 格式
|
||||
def seconds_to_time(seconds):
|
||||
minutes = seconds // 60
|
||||
remaining_seconds = seconds % 60
|
||||
return f"{minutes:02d}:{remaining_seconds:02d}"
|
||||
def format_timestamp(seconds: float) -> str:
|
||||
"""将秒数转换为 HH:MM:SS,mmm 格式"""
|
||||
hours = int(seconds // 3600)
|
||||
minutes = int((seconds % 3600) // 60)
|
||||
seconds_remainder = seconds % 60
|
||||
whole_seconds = int(seconds_remainder)
|
||||
milliseconds = int((seconds_remainder - whole_seconds) * 1000)
|
||||
|
||||
return f"{hours:02d}:{minutes:02d}:{whole_seconds:02d},{milliseconds:03d}"
|
||||
|
||||
# 计算新的时间戳
|
||||
current_time = 0 # 当前时间点(秒)
|
||||
current_time = 0.0 # 当前时间点(秒,包含毫秒)
|
||||
|
||||
for frame in frame_content_list:
|
||||
# 获取原始时间戳的持续时间
|
||||
start_str, end_str = frame['timestamp'].split('-')
|
||||
|
||||
def time_to_seconds(time_str):
|
||||
minutes, seconds = map(int, time_str.split(':'))
|
||||
return minutes * 60 + seconds
|
||||
def time_to_seconds(time_str: str) -> float:
|
||||
"""将时间字符串转换为秒数(包含毫秒)"""
|
||||
try:
|
||||
if ',' in time_str:
|
||||
time_part, ms_part = time_str.split(',')
|
||||
ms = float(ms_part) / 1000
|
||||
else:
|
||||
time_part = time_str
|
||||
ms = 0
|
||||
|
||||
parts = time_part.split(':')
|
||||
if len(parts) == 3: # HH:MM:SS
|
||||
h, m, s = map(float, parts)
|
||||
seconds = h * 3600 + m * 60 + s
|
||||
elif len(parts) == 2: # MM:SS
|
||||
m, s = map(float, parts)
|
||||
seconds = m * 60 + s
|
||||
else: # SS
|
||||
seconds = float(parts[0])
|
||||
|
||||
return seconds + ms
|
||||
except Exception as e:
|
||||
logger.error(f"时间格式转换错误 {time_str}: {str(e)}")
|
||||
return 0.0
|
||||
|
||||
# 计算当前片段的持续时间
|
||||
start_seconds = time_to_seconds(start_str)
|
||||
@ -429,8 +454,8 @@ class ScriptProcessor:
|
||||
duration = end_seconds - start_seconds
|
||||
|
||||
# 设置新的时间戳
|
||||
new_start = seconds_to_time(current_time)
|
||||
new_end = seconds_to_time(current_time + duration)
|
||||
new_start = format_timestamp(current_time)
|
||||
new_end = format_timestamp(current_time + duration)
|
||||
frame['new_timestamp'] = f"{new_start}-{new_end}"
|
||||
|
||||
# 更新当前时间点
|
||||
@ -443,7 +468,7 @@ class ScriptProcessor:
|
||||
with open(file_name, 'w', encoding='utf-8') as file:
|
||||
json.dump(frame_content_list, file, ensure_ascii=False, indent=4)
|
||||
|
||||
logger.info(f"保存脚本成功,总时长: {seconds_to_time(current_time)}")
|
||||
logger.info(f"保存脚本成功,总时长: {format_timestamp(current_time)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"保存结果时发生错误: {str(e)}\n{traceback.format_exc()}")
|
||||
|
||||
@ -10,6 +10,7 @@ from google.api_core import exceptions
|
||||
import google.generativeai as genai
|
||||
import PIL.Image
|
||||
import traceback
|
||||
from app.utils import utils
|
||||
|
||||
|
||||
class VisionAnalyzer:
|
||||
@ -146,14 +147,34 @@ class VisionAnalyzer:
|
||||
response_text = result['response']
|
||||
image_paths = result['image_paths']
|
||||
|
||||
img_name_start = Path(image_paths[0]).stem.split('_')[-1]
|
||||
img_name_end = Path(image_paths[-1]).stem.split('_')[-1]
|
||||
txt_path = os.path.join(output_dir, f"frame_{img_name_start}_{img_name_end}.txt")
|
||||
# 从文件名中提取时间戳并转换为标准格式
|
||||
def format_timestamp(img_path):
|
||||
# 从文件名中提取时间部分
|
||||
timestamp = Path(img_path).stem.split('_')[-1]
|
||||
try:
|
||||
# 将时间转换为秒
|
||||
seconds = utils.time_to_seconds(timestamp.replace('_', ':'))
|
||||
# 转换为 HH:MM:SS,mmm 格式
|
||||
hours = int(seconds // 3600)
|
||||
minutes = int((seconds % 3600) // 60)
|
||||
seconds_remainder = seconds % 60
|
||||
whole_seconds = int(seconds_remainder)
|
||||
milliseconds = int((seconds_remainder - whole_seconds) * 1000)
|
||||
|
||||
return f"{hours:02d}:{minutes:02d}:{whole_seconds:02d},{milliseconds:03d}"
|
||||
except Exception as e:
|
||||
logger.error(f"时间戳格式转换错误: {timestamp}, {str(e)}")
|
||||
return timestamp
|
||||
|
||||
start_timestamp = format_timestamp(image_paths[0])
|
||||
end_timestamp = format_timestamp(image_paths[-1])
|
||||
|
||||
txt_path = os.path.join(output_dir, f"frame_{start_timestamp}_{end_timestamp}.txt")
|
||||
|
||||
# 保存结果到txt文件
|
||||
with open(txt_path, 'w', encoding='utf-8') as f:
|
||||
f.write(response_text.strip())
|
||||
print(f"已保存分析结果到: {txt_path}")
|
||||
logger.info(f"已保存分析结果到: {txt_path}")
|
||||
|
||||
def load_images(self, image_paths: List[str]) -> List[PIL.Image.Image]:
|
||||
"""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user