feat(video): 提升关键帧时间戳精度到毫秒

- 将关键帧时间戳精确到毫秒,格式为 HHMMSSmmm
- 优化场景检测算法,增加帧数检查和未检测到边界时的处理
-调整帧差计算方式,使用浮点数提高精度
- 修改文件名格式匹配规则,支持毫秒级时间戳
This commit is contained in:
linyqh 2024-12-03 21:19:49 +08:00
parent 6118e63f20
commit f6ba1824e9
2 changed files with 57 additions and 39 deletions

View File

@ -51,21 +51,34 @@ class VideoProcessor:
def detect_shot_boundaries(self, frames: List[np.ndarray], threshold: int = 30) -> List[int]:
"""
使用帧差法检测镜头边界
Args:
frames: 视频帧列表
threshold: 差异阈值
threshold: 差异阈值默认值调低为30
Returns:
List[int]: 镜头边界帧的索引列表
"""
shot_boundaries = []
if len(frames) < 2: # 添加帧数检查
logger.warning("视频帧数过少,无法检测场景边界")
return [len(frames) - 1] # 返回最后一帧作为边界
for i in range(1, len(frames)):
prev_frame = cv2.cvtColor(frames[i - 1], cv2.COLOR_BGR2GRAY)
curr_frame = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
diff = np.mean(np.abs(curr_frame.astype(int) - prev_frame.astype(int)))
# 计算帧差
diff = np.mean(np.abs(curr_frame.astype(float) - prev_frame.astype(float)))
if diff > threshold:
shot_boundaries.append(i)
# 如果没有检测到任何边界,至少返回最后一帧
if not shot_boundaries:
logger.warning("未检测到场景边界,将视频作为单个场景处理")
shot_boundaries.append(len(frames) - 1)
return shot_boundaries
def extract_keyframes(self, frames: List[np.ndarray], shot_boundaries: List[int]) -> Tuple[
@ -113,12 +126,7 @@ class VideoProcessor:
output_dir: str, desc: str = "保存关键帧") -> None:
"""
保存关键帧到指定目录文件名格式为keyframe_帧序号_时间戳.jpg
Args:
keyframes: 关键帧列表
keyframe_indices: 关键帧索引列表
output_dir: 输出目录
desc: 进度条描述
时间戳精确到毫秒格式为HHMMSSmmm
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
@ -126,11 +134,13 @@ class VideoProcessor:
for keyframe, frame_idx in tqdm(zip(keyframes, keyframe_indices),
total=len(keyframes),
desc=desc):
# 计算精确到毫秒的时间戳
timestamp = frame_idx / self.fps
hours = int(timestamp // 3600)
minutes = int((timestamp % 3600) // 60)
seconds = int(timestamp % 60)
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}"
milliseconds = int((timestamp % 1) * 1000) # 计算毫秒部分
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
output_path = os.path.join(output_dir,
f'keyframe_{frame_idx:06d}_{time_str}.jpg')
@ -138,11 +148,7 @@ class VideoProcessor:
def extract_frames_by_numbers(self, frame_numbers: List[int], output_folder: str) -> None:
"""
根据指定的帧号提取帧如果多个帧在同一秒内只保留一个
Args:
frame_numbers: 要提取的帧号列表
output_folder: 输出文件夹路径
根据指定的帧号提取帧如果多个帧在同一毫秒内只保留一个
"""
if not frame_numbers:
raise ValueError("未提供帧号列表")
@ -153,29 +159,31 @@ class VideoProcessor:
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 用于记录已处理的时间戳(秒)
processed_seconds = set()
# 用于记录已处理的时间戳(秒)
processed_timestamps = set()
for frame_number in tqdm(frame_numbers, desc="提取高清帧"):
# 计算时间戳(秒)
timestamp_seconds = int(frame_number / self.fps)
# 计算精确到毫秒的时间戳
timestamp = frame_number / self.fps
timestamp_ms = int(timestamp * 1000) # 转换为毫秒
# 如果这一秒已经处理过,跳过
if timestamp_seconds in processed_seconds:
# 如果这一秒已经处理过,跳过
if timestamp_ms in processed_timestamps:
continue
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = self.cap.read()
if ret:
# 记录这一秒已经处理
processed_seconds.add(timestamp_seconds)
# 记录这一秒已经处理
processed_timestamps.add(timestamp_ms)
# 计算时间戳字符串
hours = int(timestamp_seconds // 3600)
minutes = int((timestamp_seconds % 3600) // 60)
seconds = int(timestamp_seconds % 60)
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}"
hours = int(timestamp // 3600)
minutes = int((timestamp % 3600) // 60)
seconds = int(timestamp % 60)
milliseconds = int((timestamp % 1) * 1000) # 计算毫秒部分
time_str = f"{hours:02d}{minutes:02d}{seconds:02d}{milliseconds:03d}"
output_path = os.path.join(output_folder,
f"keyframe_{frame_number:06d}_{time_str}.jpg")
@ -183,27 +191,34 @@ class VideoProcessor:
else:
logger.info(f"无法读取帧 {frame_number}")
logger.info(f"共提取了 {len(processed_seconds)} 个不同时间戳的帧")
logger.info(f"共提取了 {len(processed_timestamps)} 个不同时间戳的帧")
@staticmethod
def extract_numbers_from_folder(folder_path: str) -> List[int]:
"""
从文件夹中提取帧号
Args:
folder_path: 关键帧文件夹路径
Returns:
List[int]: 排序后的帧号列表
"""
files = [f for f in os.listdir(folder_path) if f.endswith('.jpg')]
# 更新正则表达式以匹配新的文件名格式keyframe_000123_010534.jpg
pattern = re.compile(r'keyframe_(\d+)_\d+\.jpg$')
# 更新正则表达式以匹配新的文件名格式keyframe_000123_010534123.jpg
pattern = re.compile(r'keyframe_(\d+)_\d{9}\.jpg$')
numbers = []
for f in files:
match = pattern.search(f)
if match:
numbers.append(int(match.group(1)))
else:
logger.warning(f"文件名格式不匹配: {f}")
if not numbers:
logger.error(f"在目录 {folder_path} 中未找到有效的关键帧文件")
return sorted(numbers)
def process_video(self, output_dir: str, skip_seconds: float = 0, threshold: int = 30) -> None:
@ -212,7 +227,7 @@ class VideoProcessor:
Args:
output_dir: 输出目录
skip_seconds: 跳过视<EFBFBD><EFBFBD><EFBFBD>开头的秒数
skip_seconds: 跳过视开头的秒数
"""
skip_frames = int(skip_seconds * self.fps)
@ -240,11 +255,14 @@ class VideoProcessor:
def process_video_pipeline(self,
output_dir: str,
skip_seconds: float = 0,
threshold: int = 30,
threshold: int = 20, # 降低默认阈值
compressed_width: int = 320,
keep_temp: bool = False) -> None:
"""
执行完整的视频处理流程压缩提取关键帧导出高清帧
执行完整的视频处理流程
Args:
threshold: 降低默认阈值为20使场景检测更敏感
"""
os.makedirs(output_dir, exist_ok=True)
temp_dir = os.path.join(output_dir, 'temp')
@ -358,7 +376,7 @@ if __name__ == "__main__":
import time
start_time = time.time()
processor = VideoProcessor("best.mp4")
processor.process_video_pipeline(output_dir="output4")
processor = VideoProcessor("E:\\projects\\NarratoAI\\resource\\videos\\test.mp4")
processor.process_video_pipeline(output_dir="output")
end_time = time.time()
print(f"处理完成!总耗时: {end_time - start_time:.2f}")

View File

@ -438,7 +438,7 @@ def generate_script(tr, params):
if 'error' in result:
logger.warning(f"批次 {result['batch_index']} 处理出现警告: {result['error']}")
continue
# 获取当前批次的文件列表
# 获取当前批次的文件列表 keyframe_001136_000045.jpg 将 000045 精度提升到 毫秒
batch_files = get_batch_files(keyframe_files, result, vision_batch_size)
logger.debug(f"批次 {result['batch_index']} 处理完成,共 {len(batch_files)} 张图片")
logger.debug(batch_files)