feat: 添加LLM驱动的字幕翻译功能

新增配置项以配置字幕翻译批次大小和最大并发数
补充中、英文国际化文案支持翻译相关界面
实现核心字幕翻译服务,支持批量处理与并发执行
添加WebUI界面用于触发和监控字幕翻译任务
新增完整单元测试覆盖翻译功能全流程
This commit is contained in:
viccy 2026-06-11 10:09:58 +08:00
parent b9f07a6a10
commit 9f28fcfa98
6 changed files with 668 additions and 16 deletions

View File

@ -0,0 +1,370 @@
"""LLM-powered SRT subtitle translation."""
from __future__ import annotations
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from math import ceil
from typing import Any, Callable
from loguru import logger
from app.config import config
from app.services.llm.migration_adapter import _run_async_safely
from app.services.llm.unified_service import UnifiedLLMService
from app.services.subtitle_corrector import (
_ensure_llm_providers_registered,
_extract_json_text,
parse_srt_blocks,
)
from app.services.subtitle_text import read_subtitle_text
from app.utils import utils
class SubtitleTranslationError(RuntimeError):
"""Raised when subtitle translation cannot produce a valid SRT."""
DEFAULT_BATCH_SIZE = 20
DEFAULT_MAX_WORKERS = 3
DEFAULT_MAX_REPAIR_ATTEMPTS = 3
TranslationProgressCallback = Callable[[int, int, str], None]
def _get_positive_int(value, default: int, *, minimum: int = 1, maximum: int | None = None) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
parsed = max(minimum, parsed)
if maximum is not None:
parsed = min(maximum, parsed)
return parsed
def _resolve_batch_size(batch_size: int | None = None) -> int:
if batch_size is None:
batch_size = config.app.get("subtitle_translate_batch_size", DEFAULT_BATCH_SIZE)
return _get_positive_int(batch_size, DEFAULT_BATCH_SIZE, minimum=1, maximum=200)
def _resolve_max_workers(max_workers: int | None = None) -> int:
if max_workers is None:
max_workers = config.app.get("subtitle_translate_max_workers", DEFAULT_MAX_WORKERS)
return _get_positive_int(max_workers, DEFAULT_MAX_WORKERS, minimum=1, maximum=8)
def _split_blocks(blocks, batch_size: int):
return [blocks[index:index + batch_size] for index in range(0, len(blocks), batch_size)]
def _build_translation_prompt(blocks, target_language: str) -> str:
payload = {str(block.order): block.text for block in blocks}
return f"""
请将以下 SRT 字幕文本翻译为{target_language}
翻译要求
1. 结合全部字幕内容理解语境输出自然准确适合字幕阅读的{target_language}
2. 只翻译字幕文本不要修改时间轴序号条目数量或条目顺序
3. 保留必要的说话人标记专有名词品牌名代码数字和换行除非目标语言中有约定译名
4. 不要添加解释注释剧情信息或 Markdown
5. 空字幕文本保持为空字符串
只输出严格 JSON 对象不要输出 Markdown 或解释文字必须保留所有输入 key格式必须为
{{"1":"翻译后的字幕文本","2":"翻译后的字幕文本"}}
待翻译字幕条目
{json.dumps(payload, ensure_ascii=False, indent=2)}
""".strip()
def _parse_translations(raw_output: str, expected_ids: set[int]) -> dict[int, str]:
json_text = _extract_json_text(raw_output)
try:
data: Any = json.loads(json_text)
except json.JSONDecodeError as exc:
raise SubtitleTranslationError("LLM 未返回有效 JSON 字幕翻译结果") from exc
if isinstance(data, dict) and "items" in data:
items = data["items"]
elif isinstance(data, list):
items = data
elif isinstance(data, dict):
items = [{"id": key, "text": value} for key, value in data.items()]
else:
raise SubtitleTranslationError("LLM 字幕翻译结果格式无效")
if not isinstance(items, list):
raise SubtitleTranslationError("LLM 字幕翻译结果缺少 items 列表")
translations: dict[int, str] = {}
for item in items:
if not isinstance(item, dict):
continue
try:
item_id = int(item.get("id"))
except (TypeError, ValueError):
continue
if item_id in expected_ids:
translations[item_id] = str(item.get("text") or "").strip()
missing_ids = sorted(expected_ids - set(translations.keys()))
if missing_ids:
raise SubtitleTranslationError(f"LLM 字幕翻译结果缺少字幕条目: {missing_ids[:10]}")
return translations
def _build_repair_prompt(
*,
blocks,
target_language: str,
previous_output: str,
error_message: str,
) -> str:
payload = {str(block.order): block.text for block in blocks}
return f"""
你上一轮返回的字幕翻译 JSON 无法通过校验请修复后重新输出
目标语言{target_language}
校验错误
{error_message}
原始字幕条目
{json.dumps(payload, ensure_ascii=False, indent=2)}
上一轮输出
{previous_output}
请只输出严格 JSON 对象必须包含并且只包含原始字幕条目的所有 key
""".strip()
def _translate_chunk(
*,
chunk,
chunk_index: int,
total_chunks: int,
target_language: str,
provider: str,
api_key: str,
base_url: str,
temperature: float,
max_repair_attempts: int,
) -> dict[int, str]:
start_order = chunk[0].order
end_order = chunk[-1].order
expected_ids = {block.order for block in chunk}
logger.info(
f"字幕翻译批次 {chunk_index}/{total_chunks} 开始: "
f"条目 {start_order}-{end_order}, 共 {len(chunk)}"
)
prompt = _build_translation_prompt(chunk, target_language)
last_output = ""
last_error = ""
for attempt in range(1, max_repair_attempts + 1):
if attempt > 1:
logger.warning(
f"字幕翻译批次 {chunk_index}/{total_chunks}{attempt} 次修复: {last_error}"
)
prompt = _build_repair_prompt(
blocks=chunk,
target_language=target_language,
previous_output=last_output,
error_message=last_error,
)
raw_output = _run_async_safely(
UnifiedLLMService.generate_text,
prompt=prompt,
system_prompt=(
f"你是一位专业字幕翻译员,擅长在严格保留 JSON key 一一对应的前提下,"
f"将字幕准确翻译为{target_language}"
),
provider=provider,
temperature=temperature,
response_format="json",
api_key=api_key,
api_base=base_url,
)
last_output = str(raw_output or "")
try:
translations = _parse_translations(last_output, expected_ids)
except SubtitleTranslationError as exc:
last_error = str(exc)
if attempt >= max_repair_attempts:
logger.error(
f"字幕翻译批次 {chunk_index}/{total_chunks} 失败: "
f"条目 {start_order}-{end_order}, {last_error}"
)
raise
continue
logger.info(
f"字幕翻译批次 {chunk_index}/{total_chunks} 完成: "
f"条目 {start_order}-{end_order}"
)
return translations
raise SubtitleTranslationError(
f"字幕翻译批次 {chunk_index}/{total_chunks} 未生成有效结果: 条目 {start_order}-{end_order}"
)
def _call_progress_callback(
progress_callback: TranslationProgressCallback | None,
completed: int,
total: int,
message: str,
) -> None:
if not progress_callback:
return
try:
progress_callback(completed, total, message)
except Exception as exc:
logger.debug(f"字幕翻译进度回调失败: {exc}")
def _render_translated_srt(blocks, translations: dict[int, str]) -> str:
rendered_blocks = []
for block in blocks:
translated_text = translations.get(block.order, "")
rendered_blocks.append(f"{block.index_line}\n{block.time_line}\n{translated_text}")
return "\n\n".join(rendered_blocks).rstrip() + "\n"
def translate_srt_content(
srt_content: str,
*,
target_language: str = "中文",
provider: str = "",
api_key: str = "",
base_url: str = "",
temperature: float = 0.2,
batch_size: int | None = None,
max_workers: int | None = None,
progress_callback: TranslationProgressCallback | None = None,
) -> str:
target_language = str(target_language or "").strip() or "中文"
blocks = parse_srt_blocks(srt_content)
_ensure_llm_providers_registered()
resolved_batch_size = _resolve_batch_size(batch_size)
chunks = _split_blocks(blocks, resolved_batch_size)
resolved_max_workers = min(_resolve_max_workers(max_workers), len(chunks))
total_chunks = len(chunks)
total_blocks = len(blocks)
logger.info(
f"开始批量翻译字幕: 共 {total_blocks} 条, {total_chunks} 批, "
f"每批最多 {resolved_batch_size} 条, 并发 {resolved_max_workers}, 目标语言: {target_language}"
)
translations: dict[int, str] = {}
completed_blocks = 0
_call_progress_callback(
progress_callback,
0,
total_blocks,
f"开始翻译字幕,共 {total_blocks} 条,{total_chunks}",
)
if total_chunks == 1:
translations.update(
_translate_chunk(
chunk=chunks[0],
chunk_index=1,
total_chunks=total_chunks,
target_language=target_language,
provider=provider,
api_key=api_key,
base_url=base_url,
temperature=temperature,
max_repair_attempts=DEFAULT_MAX_REPAIR_ATTEMPTS,
)
)
completed_blocks = total_blocks
_call_progress_callback(progress_callback, completed_blocks, total_blocks, "字幕翻译完成")
else:
with ThreadPoolExecutor(max_workers=resolved_max_workers) as executor:
future_to_meta = {}
for index, chunk in enumerate(chunks, start=1):
future = executor.submit(
_translate_chunk,
chunk=chunk,
chunk_index=index,
total_chunks=total_chunks,
target_language=target_language,
provider=provider,
api_key=api_key,
base_url=base_url,
temperature=temperature,
max_repair_attempts=DEFAULT_MAX_REPAIR_ATTEMPTS,
)
future_to_meta[future] = (index, chunk)
for future in as_completed(future_to_meta):
chunk_index, chunk = future_to_meta[future]
chunk_translations = future.result()
translations.update(chunk_translations)
completed_blocks += len(chunk)
message = (
f"字幕翻译进度: {completed_blocks}/{total_blocks}"
f"({ceil(completed_blocks * 100 / total_blocks)}%), "
f"完成批次 {chunk_index}/{total_chunks}"
)
logger.info(message)
_call_progress_callback(progress_callback, completed_blocks, total_blocks, message)
missing_ids = sorted({block.order for block in blocks} - set(translations.keys()))
if missing_ids:
raise SubtitleTranslationError(f"字幕翻译结果缺少字幕条目: {missing_ids[:10]}")
translated_srt = _render_translated_srt(blocks, translations)
logger.info(f"字幕翻译完成,共 {total_blocks}")
return translated_srt
def write_srt_file(srt_content: str, subtitle_file: str = "") -> str:
if not subtitle_file:
subtitle_file = os.path.join(utils.subtitle_dir(), "subtitle_translated.srt")
parent = os.path.dirname(subtitle_file)
if parent:
os.makedirs(parent, exist_ok=True)
with open(subtitle_file, "w", encoding="utf-8") as f:
f.write(srt_content)
return subtitle_file
def translate_subtitle_file(
subtitle_file: str,
output_file: str = "",
*,
target_language: str = "中文",
provider: str = "",
api_key: str = "",
base_url: str = "",
temperature: float = 0.2,
batch_size: int | None = None,
max_workers: int | None = None,
progress_callback: TranslationProgressCallback | None = None,
) -> str:
if not subtitle_file or not os.path.isfile(subtitle_file):
raise SubtitleTranslationError(f"字幕文件不存在: {subtitle_file}")
decoded = read_subtitle_text(subtitle_file)
translated_srt = translate_srt_content(
decoded.text,
target_language=target_language,
provider=provider,
api_key=api_key,
base_url=base_url,
temperature=temperature,
batch_size=batch_size,
max_workers=max_workers,
progress_callback=progress_callback,
)
return write_srt_file(translated_srt, output_file)

View File

@ -0,0 +1,157 @@
import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
from app.services import subtitle_translator as translator
SAMPLE_SRT = """1
00:00:01,000 --> 00:00:03,000
Hello, everyone.
2
00:00:04,000 --> 00:00:06,000
We are going to Beijing.
"""
BATCH_SAMPLE_SRT = """1
00:00:01,000 --> 00:00:02,000
Line one.
2
00:00:02,000 --> 00:00:03,000
Line two.
3
00:00:03,000 --> 00:00:04,000
Line three.
4
00:00:04,000 --> 00:00:05,000
Line four.
5
00:00:05,000 --> 00:00:06,000
Line five.
"""
class SubtitleTranslatorTests(unittest.TestCase):
def test_translate_srt_content_preserves_timecodes_and_rebuilds_text(self):
llm_output = {
"items": [
{"id": 1, "text": "大家好。"},
{"id": 2, "text": "我们要去北京。"},
]
}
with (
mock.patch("app.services.subtitle_translator._ensure_llm_providers_registered"),
mock.patch(
"app.services.subtitle_translator._run_async_safely",
return_value=json.dumps(llm_output, ensure_ascii=False),
) as run_llm,
):
translated = translator.translate_srt_content(
SAMPLE_SRT,
target_language="中文",
provider="openai",
api_key="sk-test",
base_url="https://llm.example/v1",
)
self.assertIn("00:00:01,000 --> 00:00:03,000", translated)
self.assertIn("大家好。", translated)
self.assertIn("我们要去北京。", translated)
self.assertNotIn("Hello, everyone.", translated)
call_kwargs = run_llm.call_args.kwargs
self.assertEqual("openai", call_kwargs["provider"])
self.assertEqual("sk-test", call_kwargs["api_key"])
self.assertEqual("https://llm.example/v1", call_kwargs["api_base"])
self.assertEqual("json", call_kwargs["response_format"])
self.assertIn("专业字幕翻译员", call_kwargs["system_prompt"])
self.assertIn("翻译为中文", call_kwargs["prompt"])
def test_translate_srt_content_rejects_missing_items(self):
llm_output = {"items": [{"id": 1, "text": "大家好。"}]}
with (
mock.patch("app.services.subtitle_translator._ensure_llm_providers_registered"),
mock.patch(
"app.services.subtitle_translator._run_async_safely",
return_value=json.dumps(llm_output, ensure_ascii=False),
),
):
with self.assertRaises(translator.SubtitleTranslationError):
translator.translate_srt_content(SAMPLE_SRT, provider="openai")
def test_translate_subtitle_file_writes_translated_srt(self):
llm_output = {
"items": [
{"id": 1, "text": "大家好。"},
{"id": 2, "text": "我们要去北京。"},
]
}
with tempfile.TemporaryDirectory() as tmp_dir:
input_file = Path(tmp_dir) / "input.srt"
output_file = Path(tmp_dir) / "output.srt"
input_file.write_text(SAMPLE_SRT, encoding="utf-8")
with (
mock.patch("app.services.subtitle_translator._ensure_llm_providers_registered"),
mock.patch(
"app.services.subtitle_translator._run_async_safely",
return_value=json.dumps(llm_output, ensure_ascii=False),
),
):
result_path = translator.translate_subtitle_file(
str(input_file),
str(output_file),
target_language="中文",
provider="openai",
)
self.assertEqual(str(output_file), result_path)
self.assertIn("大家好。", output_file.read_text(encoding="utf-8"))
def test_translate_srt_content_batches_requests_and_reports_progress(self):
progress_events = []
def fake_run_llm(*args, **kwargs):
payload_text = kwargs["prompt"].rsplit("待翻译字幕条目:", 1)[1].strip()
payload = json.loads(payload_text)
translated = {key: f"译文{key}" for key in payload}
return json.dumps(translated, ensure_ascii=False)
with (
mock.patch("app.services.subtitle_translator._ensure_llm_providers_registered"),
mock.patch(
"app.services.subtitle_translator._run_async_safely",
side_effect=fake_run_llm,
) as run_llm,
):
translated = translator.translate_srt_content(
BATCH_SAMPLE_SRT,
target_language="中文",
provider="openai",
batch_size=2,
max_workers=1,
progress_callback=lambda completed, total, message: progress_events.append(
(completed, total, message)
),
)
self.assertEqual(3, run_llm.call_count)
self.assertIn("译文1", translated)
self.assertIn("译文5", translated)
self.assertEqual((0, 5), progress_events[0][:2])
self.assertEqual((5, 5), progress_events[-1][:2])
self.assertTrue(any("完成批次" in event[2] for event in progress_events))
if __name__ == "__main__":
unittest.main()

View File

@ -5,6 +5,8 @@
llm_vision_timeout = 120 # 视觉模型基础超时时间
llm_text_timeout = 180 # 文本模型基础超时时间(解说文案生成等复杂任务需要更长时间)
llm_max_retries = 3 # API 重试次数
subtitle_translate_batch_size = 20 # 字幕翻译每批处理的字幕条数
subtitle_translate_max_workers = 3 # 字幕翻译最大并发批次数
##########################################
# 🚀 LLM 配置 - 使用 OpenAI 兼容统一接口

View File

@ -205,6 +205,14 @@ def _format_file_list_for_display(paths, max_items=3):
return f"{visible_names} +{len(file_names) - max_items}"
def _safe_filename_fragment(value, fallback="translated"):
fragment = "".join(
char if char.isalnum() or char in {"-", "_"} else "_"
for char in str(value or "").strip()
).strip("_")
return fragment or fallback
def _read_subtitle_file(path):
try:
return read_subtitle_text(path).text
@ -1211,23 +1219,41 @@ def render_fun_asr_transcription(tr):
# 上传字幕面板会在本轮渲染中更新 session_state这里重新读取一次保证按钮状态同步。
subtitle_paths = _selected_subtitle_paths()
can_transcribe = backend != "upload" and bool(media_paths)
can_correct_subtitles = bool(subtitle_paths)
can_manage_subtitles = bool(subtitle_paths)
saved_target_language = str(config.ui.get("subtitle_translate_target_language", "中文") or "中文")
with subtitle_cols[1]:
action_cols = st.columns(2)
with action_cols[0]:
transcribe_clicked = st.button(
tr("Transcribe subtitles"),
key="fun_asr_transcribe",
disabled=not can_transcribe,
use_container_width=True,
)
with action_cols[1]:
correct_clicked = st.button(
tr("Calibrate subtitles"),
key="subtitle_correct",
disabled=not can_correct_subtitles,
use_container_width=True,
)
transcribe_clicked = st.button(
tr("Transcribe subtitles"),
key="fun_asr_transcribe",
disabled=not can_transcribe,
use_container_width=True,
)
subtitle_action_cols = st.columns(3, vertical_alignment="bottom")
with subtitle_action_cols[0]:
target_language = st.text_input(
tr("Subtitle target language"),
value=saved_target_language,
key="subtitle_translate_target_language",
placeholder=tr("Subtitle target language placeholder"),
)
with subtitle_action_cols[1]:
translate_clicked = st.button(
tr("Translate subtitles"),
key="subtitle_translate",
disabled=not can_manage_subtitles,
use_container_width=True,
)
with subtitle_action_cols[2]:
correct_clicked = st.button(
tr("Calibrate subtitles"),
key="subtitle_correct",
disabled=not can_manage_subtitles,
use_container_width=True,
)
target_language = str(target_language or "").strip() or "中文"
if correct_clicked:
from app.services import subtitle_corrector
@ -1278,6 +1304,87 @@ def render_fun_asr_transcription(tr):
st.error(f"{tr('Subtitle calibration failed')}: {str(e)}")
return
if translate_clicked:
from app.services import subtitle_translator
text_provider = config.app.get('text_llm_provider', 'openai').lower()
text_api_key = config.app.get(f'text_{text_provider}_api_key')
text_base_url = config.app.get(f'text_{text_provider}_base_url')
translated_paths = []
try:
config.ui["subtitle_translate_target_language"] = target_language
config.save_config()
spinner_text = tr("Translating subtitles...").format(language=target_language)
with st.spinner(spinner_text):
progress_bar = st.progress(0)
progress_caption = st.empty()
target_suffix = _safe_filename_fragment(target_language)
for index, subtitle_path in enumerate(subtitle_paths, start=1):
subtitle_name = (
f"{os.path.splitext(os.path.basename(subtitle_path))[0]}"
f"_translated_{target_suffix}.srt"
)
output_path = _unique_file_path(utils.subtitle_dir(), subtitle_name)
subtitle_file_label = os.path.basename(subtitle_path)
def update_translation_progress(
completed,
total,
message,
file_index=index,
file_label=subtitle_file_label,
):
total = max(int(total or 0), 1)
completed = max(0, min(int(completed or 0), total))
file_progress = completed / total
overall_progress = ((file_index - 1) + file_progress) / max(len(subtitle_paths), 1)
progress_bar.progress(min(overall_progress, 1.0))
progress_caption.caption(
tr("Subtitle translation progress").format(
file=file_label,
completed=completed,
total=total,
message=message,
)
)
translated_path = subtitle_translator.translate_subtitle_file(
subtitle_file=subtitle_path,
output_file=output_path,
target_language=target_language,
provider=text_provider,
api_key=text_api_key,
base_url=text_base_url,
progress_callback=update_translation_progress,
)
translated_paths.append(translated_path)
progress_bar.progress(index / len(subtitle_paths))
progress_caption.empty()
progress_bar.empty()
_set_subtitle_state(translated_paths)
success_placeholder = st.empty()
if len(translated_paths) == 1:
success_placeholder.success(
tr("Subtitle translation succeeded").format(file=os.path.basename(translated_paths[0]))
)
else:
success_placeholder.success(
tr("Subtitle translation succeeded for multiple files").format(
count=len(translated_paths),
files=_format_file_list_for_display(translated_paths),
)
)
time.sleep(3)
success_placeholder.empty()
except Exception as e:
logger.error(f"字幕翻译失败: {traceback.format_exc()}")
st.error(f"{tr('Subtitle translation failed')}: {str(e)}")
return
if not transcribe_clicked:
return

View File

@ -501,6 +501,9 @@
"Selected video files do not exist": "These selected video files do not exist. Please select or upload them again: {files}",
"Transcribe subtitles": "Transcribe Subtitles",
"Calibrate subtitles": "Calibrate Subtitles",
"Translate subtitles": "Translate Subtitles",
"Subtitle target language": "Target language",
"Subtitle target language placeholder": "Chinese",
"Please enter Ali Bailian API Key": "Please enter the Ali Bailian API Key first",
"Please enter local FunASR-Pack API URL": "Please enter the local FunASR-Pack API URL first",
"Please enter local FireRedASR API URL": "Please enter the local ASR API URL first",
@ -515,6 +518,11 @@
"Subtitle calibration succeeded": "Subtitle calibration succeeded: {file}",
"Subtitle calibration succeeded for multiple files": "Subtitle calibration succeeded for {count} files: {files}",
"Subtitle calibration failed": "Subtitle calibration failed",
"Translating subtitles...": "Translating subtitles to {language} with the LLM, please wait...",
"Subtitle translation succeeded": "Subtitle translation succeeded: {file}",
"Subtitle translation succeeded for multiple files": "Subtitle translation succeeded for {count} files: {files}",
"Subtitle translation failed": "Subtitle translation failed",
"Subtitle translation progress": "{file}: {completed}/{total} items · {message}",
"Transcribed subtitles storage hint": "Previously transcribed subtitles are saved in {path}; drag a file from that folder to upload",
"Tavily Search Settings": "Tavily Web Search",
"Tavily API Key": "Tavily API Key",

View File

@ -440,6 +440,9 @@
"Selected video files do not exist": "以下视频文件不存在,请重新选择或上传: {files}",
"Transcribe subtitles": "转录字幕",
"Calibrate subtitles": "校准字幕",
"Translate subtitles": "翻译字幕",
"Subtitle target language": "目标语言",
"Subtitle target language placeholder": "中文",
"Please enter Ali Bailian API Key": "请先输入阿里百炼 API Key",
"Please enter local FunASR-Pack API URL": "请先输入本地 FunASR-Pack API 地址",
"Please enter local FireRedASR API URL": "请先输入本地ASR API 地址",
@ -454,6 +457,11 @@
"Subtitle calibration succeeded": "字幕校准成功: {file}",
"Subtitle calibration succeeded for multiple files": "字幕校准成功,共 {count} 个文件: {files}",
"Subtitle calibration failed": "字幕校准失败",
"Translating subtitles...": "正在使用大模型翻译字幕为 {language},请稍候...",
"Subtitle translation succeeded": "字幕翻译成功: {file}",
"Subtitle translation succeeded for multiple files": "字幕翻译成功,共 {count} 个文件: {files}",
"Subtitle translation failed": "字幕翻译失败",
"Subtitle translation progress": "{file}: {completed}/{total} 条 · {message}",
"Transcribed subtitles storage hint": "之前转录生成的字幕保存在 {path},可从该目录拖入上传",
"Tavily Search Settings": "Tavily 联网搜索",
"Tavily API Key": "Tavily API Key",