fix(provider): preserve streamed Codex output when response.completed.output is empty (#1928)

* fix: preserve streamed Codex output items

* fix: prefer completed Codex output over streamed placeholders
This commit is contained in:
Async23 2026-04-07 18:21:22 +08:00 committed by GitHub
parent c3170f22da
commit 0948c7a4e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 153 additions and 1 deletions

View File

@ -216,18 +216,48 @@ class CodexChatModel(BaseChatModel):
def _stream_response(self, headers: dict, payload: dict) -> dict: def _stream_response(self, headers: dict, payload: dict) -> dict:
"""Stream SSE from Codex API and collect the final response.""" """Stream SSE from Codex API and collect the final response."""
completed_response = None completed_response = None
streamed_output_items: dict[int, dict[str, Any]] = {}
with httpx.Client(timeout=300) as client: with httpx.Client(timeout=300) as client:
with client.stream("POST", f"{CODEX_BASE_URL}/responses", headers=headers, json=payload) as resp: with client.stream("POST", f"{CODEX_BASE_URL}/responses", headers=headers, json=payload) as resp:
resp.raise_for_status() resp.raise_for_status()
for line in resp.iter_lines(): for line in resp.iter_lines():
data = self._parse_sse_data_line(line) data = self._parse_sse_data_line(line)
if data and data.get("type") == "response.completed": if not data:
continue
event_type = data.get("type")
if event_type == "response.output_item.done":
output_index = data.get("output_index")
output_item = data.get("item")
if isinstance(output_index, int) and isinstance(output_item, dict):
streamed_output_items[output_index] = output_item
elif event_type == "response.completed":
completed_response = data["response"] completed_response = data["response"]
if not completed_response: if not completed_response:
raise RuntimeError("Codex API stream ended without response.completed event") raise RuntimeError("Codex API stream ended without response.completed event")
# ChatGPT Codex can emit the final assistant content only in stream events.
# When response.completed arrives, response.output may still be empty.
if streamed_output_items:
merged_output = []
response_output = completed_response.get("output")
if isinstance(response_output, list):
merged_output = list(response_output)
max_index = max(max(streamed_output_items), len(merged_output) - 1)
if max_index >= 0 and len(merged_output) <= max_index:
merged_output.extend([None] * (max_index + 1 - len(merged_output)))
for output_index, output_item in streamed_output_items.items():
existing_item = merged_output[output_index]
if not isinstance(existing_item, dict):
merged_output[output_index] = output_item
completed_response = dict(completed_response)
completed_response["output"] = [item for item in merged_output if isinstance(item, dict)]
return completed_response return completed_response
@staticmethod @staticmethod

View File

@ -5,6 +5,7 @@ import json
import pytest import pytest
from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.messages import HumanMessage, SystemMessage
from deerflow.models import openai_codex_provider as codex_provider_module
from deerflow.models.claude_provider import ClaudeChatModel from deerflow.models.claude_provider import ClaudeChatModel
from deerflow.models.credential_loader import CodexCliCredential from deerflow.models.credential_loader import CodexCliCredential
from deerflow.models.openai_codex_provider import CodexChatModel from deerflow.models.openai_codex_provider import CodexChatModel
@ -147,3 +148,124 @@ def test_codex_provider_parses_valid_tool_arguments(monkeypatch):
) )
assert result.generations[0].message.tool_calls == [{"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"}] assert result.generations[0].message.tool_calls == [{"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"}]
class _FakeResponseStream:
def __init__(self, lines: list[str]):
self._lines = lines
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def raise_for_status(self):
return None
def iter_lines(self):
yield from self._lines
class _FakeHttpxClient:
def __init__(self, lines: list[str], *_args, **_kwargs):
self._lines = lines
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def stream(self, *_args, **_kwargs):
return _FakeResponseStream(self._lines)
def test_codex_provider_merges_streamed_output_items_when_completed_output_is_empty(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
lines = [
'data: {"type":"response.output_item.done","output_index":0,"item":{"type":"message","content":[{"type":"output_text","text":"Hello from stream"}]}}',
'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[],"usage":{"input_tokens":1,"output_tokens":2,"total_tokens":3}}}',
]
monkeypatch.setattr(
codex_provider_module.httpx,
"Client",
lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs),
)
model = CodexChatModel()
response = model._stream_response(headers={}, payload={})
parsed = model._parse_response(response)
assert response["output"] == [
{
"type": "message",
"content": [{"type": "output_text", "text": "Hello from stream"}],
}
]
assert parsed.generations[0].message.content == "Hello from stream"
def test_codex_provider_orders_streamed_output_items_by_output_index(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
lines = [
'data: {"type":"response.output_item.done","output_index":1,"item":{"type":"message","content":[{"type":"output_text","text":"Second"}]}}',
'data: {"type":"response.output_item.done","output_index":0,"item":{"type":"message","content":[{"type":"output_text","text":"First"}]}}',
'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[],"usage":{}}}',
]
monkeypatch.setattr(
codex_provider_module.httpx,
"Client",
lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs),
)
model = CodexChatModel()
response = model._stream_response(headers={}, payload={})
assert [item["content"][0]["text"] for item in response["output"]] == [
"First",
"Second",
]
def test_codex_provider_preserves_completed_output_when_stream_only_has_placeholder(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
lines = [
'data: {"type":"response.output_item.added","output_index":0,"item":{"type":"message","status":"in_progress","content":[]}}',
'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[{"type":"message","content":[{"type":"output_text","text":"Final from completed"}]}],"usage":{}}}',
]
monkeypatch.setattr(
codex_provider_module.httpx,
"Client",
lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs),
)
model = CodexChatModel()
response = model._stream_response(headers={}, payload={})
parsed = model._parse_response(response)
assert response["output"] == [
{
"type": "message",
"content": [{"type": "output_text", "text": "Final from completed"}],
}
]
assert parsed.generations[0].message.content == "Final from completed"