NarratoAI/app/services/llm/migration_adapter.py

338 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
迁移适配器
为现有代码提供向后兼容的接口方便逐步迁移到新的LLM服务架构
"""
import asyncio
import json
from typing import List, Dict, Any, Optional, Union
from pathlib import Path
import PIL.Image
from loguru import logger
from .unified_service import UnifiedLLMService
from .exceptions import LLMServiceError
# 导入新的提示词管理系统
from app.services.prompts import PromptManager
# 提供商注册由 webui.py:main() 显式调用(见 LLM 提供商注册机制重构)
# 这样更可靠,错误也更容易调试
def _run_async_safely(coro_func, *args, **kwargs):
"""
安全地运行异步协程,处理各种事件循环情况
Args:
coro_func: 协程函数(不是协程对象)
*args: 协程函数的位置参数
**kwargs: 协程函数的关键字参数
Returns:
协程的执行结果
"""
def run_in_new_loop():
"""在新的事件循环中运行协程"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro_func(*args, **kwargs))
finally:
loop.close()
asyncio.set_event_loop(None)
try:
# 尝试获取当前事件循环
try:
loop = asyncio.get_running_loop()
# 如果有运行中的事件循环,使用线程池执行
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_new_loop)
return future.result()
except RuntimeError:
# 没有运行中的事件循环,直接运行
return run_in_new_loop()
except Exception as e:
logger.error(f"异步执行失败: {str(e)}")
raise LLMServiceError(f"异步执行失败: {str(e)}")
class LegacyLLMAdapter:
"""传统LLM接口适配器"""
@staticmethod
def create_vision_analyzer(provider: str, api_key: str, model: str, base_url: str = None):
"""
创建视觉分析器实例 - 兼容原有接口
Args:
provider: 提供商名称
api_key: API密钥
model: 模型名称
base_url: API基础URL
Returns:
适配器实例
"""
return VisionAnalyzerAdapter(provider, api_key, model, base_url)
@staticmethod
def generate_narration(markdown_content: str, api_key: str, base_url: str, model: str) -> str:
"""
生成解说文案 - 兼容原有接口
Args:
markdown_content: Markdown格式的视频帧分析内容
api_key: API密钥
base_url: API基础URL
model: 模型名称
Returns:
生成的解说文案JSON字符串
"""
try:
# 使用新的提示词管理系统
prompt = PromptManager.get_prompt(
category="documentary",
name="narration_generation",
parameters={
"video_frame_description": markdown_content
}
)
# 使用统一服务生成文案
result = _run_async_safely(
UnifiedLLMService.generate_text,
prompt=prompt,
system_prompt="你是一名专业的短视频解说文案撰写专家。",
temperature=1.5,
response_format="json"
)
# 使用增强的JSON解析器
from webui.tools.generate_short_summary import parse_and_fix_json
parsed_result = parse_and_fix_json(result)
if not parsed_result:
logger.error("无法解析LLM返回的JSON数据")
# 返回一个基本的JSON结构而不是错误字符串
return json.dumps({
"items": [
{
"_id": 1,
"timestamp": "00:00:00-00:00:10",
"picture": "解析失败请检查LLM输出",
"narration": "解说文案生成失败,请重试"
}
]
}, ensure_ascii=False)
# 确保返回的是JSON字符串
return json.dumps(parsed_result, ensure_ascii=False)
except Exception as e:
logger.error(f"生成解说文案失败: {str(e)}")
# 返回一个基本的JSON结构而不是错误字符串
return json.dumps({
"items": [
{
"_id": 1,
"timestamp": "00:00:00-00:00:10",
"picture": "生成失败",
"narration": f"解说文案生成失败: {str(e)}"
}
]
}, ensure_ascii=False)
class VisionAnalyzerAdapter:
"""视觉分析器适配器"""
def __init__(self, provider: str, api_key: str, model: str, base_url: str = None):
self.provider = provider
self.api_key = api_key
self.model = model
self.base_url = base_url
async def analyze_images(self,
images: List[Union[str, Path, PIL.Image.Image]],
prompt: str,
batch_size: int = 10) -> List[Dict[str, Any]]:
"""
分析图片 - 兼容原有接口
Args:
images: 图片列表
prompt: 分析提示词
batch_size: 批处理大小
Returns:
分析结果列表,格式与旧实现兼容
"""
try:
# 使用统一服务分析图片
results = await UnifiedLLMService.analyze_images(
images=images,
prompt=prompt,
provider=self.provider,
batch_size=batch_size
)
# 转换为旧格式以保持向后兼容性
# 新实现返回 List[str],需要转换为 List[Dict]
compatible_results = []
for i, result in enumerate(results):
# 计算这个批次处理的图片数量
start_idx = i * batch_size
end_idx = min(start_idx + batch_size, len(images))
images_processed = end_idx - start_idx
compatible_results.append({
'batch_index': i,
'images_processed': images_processed,
'response': result,
'model_used': self.model
})
logger.info(f"图片分析完成,共处理 {len(images)} 张图片,生成 {len(compatible_results)} 个批次结果")
return compatible_results
except Exception as e:
logger.error(f"图片分析失败: {str(e)}")
raise
class SubtitleAnalyzerAdapter:
"""字幕分析器适配器"""
def __init__(self, api_key: str, model: str, base_url: str, provider: str = None):
self.api_key = api_key
self.model = model
self.base_url = base_url
self.provider = provider or "openai"
def _run_async_safely(self, coro_func, *args, **kwargs):
"""安全地运行异步协程"""
return _run_async_safely(coro_func, *args, **kwargs)
def _clean_json_output(self, output: str) -> str:
"""清理JSON输出移除markdown标记等"""
import re
# 移除可能的markdown代码块标记
output = re.sub(r'^```json\s*', '', output, flags=re.MULTILINE)
output = re.sub(r'^```\s*$', '', output, flags=re.MULTILINE)
output = re.sub(r'^```.*$', '', output, flags=re.MULTILINE)
# 移除开头和结尾的```标记
output = re.sub(r'^```', '', output)
output = re.sub(r'```$', '', output)
# 移除前后空白字符
output = output.strip()
return output
def analyze_subtitle(self, subtitle_content: str) -> Dict[str, Any]:
"""
分析字幕内容 - 兼容原有接口
Args:
subtitle_content: 字幕内容
Returns:
分析结果字典
"""
try:
# 使用统一服务分析字幕
result = self._run_async_safely(
UnifiedLLMService.analyze_subtitle,
subtitle_content=subtitle_content,
provider=self.provider,
temperature=1.0
)
return {
"status": "success",
"analysis": result,
"model": self.model,
"temperature": 1.0
}
except Exception as e:
logger.error(f"字幕分析失败: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": 1.0
}
def generate_narration_script(self, short_name: str, plot_analysis: str, subtitle_content: str = "", temperature: float = 0.7) -> Dict[str, Any]:
"""
生成解说文案 - 兼容原有接口
Args:
short_name: 短剧名称
plot_analysis: 剧情分析内容
subtitle_content: 原始字幕内容,用于提供准确的时间戳信息
temperature: 生成温度
Returns:
生成结果字典
"""
try:
# 使用新的提示词管理系统构建提示词
prompt = PromptManager.get_prompt(
category="short_drama_narration",
name="script_generation",
parameters={
"drama_name": short_name,
"plot_analysis": plot_analysis,
"subtitle_content": subtitle_content
}
)
# 使用统一服务生成文案
result = self._run_async_safely(
UnifiedLLMService.generate_text,
prompt=prompt,
system_prompt="你是一位专业的短视频解说脚本撰写专家。",
provider=self.provider,
temperature=temperature,
response_format="json"
)
# 清理JSON输出
cleaned_result = self._clean_json_output(result)
# 新的提示词系统返回的是包含items数组的JSON格式
# 为了保持向后兼容我们需要直接返回这个JSON字符串
# 调用方会期望这是一个包含items数组的JSON字符串
return {
"status": "success",
"narration_script": cleaned_result,
"model": self.model,
"temperature": temperature
}
except Exception as e:
logger.error(f"解说文案生成失败: {str(e)}")
return {
"status": "error",
"message": str(e),
"temperature": temperature
}
# 为了向后兼容,提供一些全局函数
def create_vision_analyzer(provider: str, api_key: str, model: str, base_url: str = None):
"""创建视觉分析器 - 全局函数"""
return LegacyLLMAdapter.create_vision_analyzer(provider, api_key, model, base_url)
def generate_narration(markdown_content: str, api_key: str, base_url: str, model: str) -> str:
"""生成解说文案 - 全局函数"""
return LegacyLLMAdapter.generate_narration(markdown_content, api_key, base_url, model)