diff --git a/backend/packages/harness/deerflow/models/openai_codex_provider.py b/backend/packages/harness/deerflow/models/openai_codex_provider.py index 5fe2323f2..e41d8c879 100644 --- a/backend/packages/harness/deerflow/models/openai_codex_provider.py +++ b/backend/packages/harness/deerflow/models/openai_codex_provider.py @@ -216,18 +216,48 @@ class CodexChatModel(BaseChatModel): def _stream_response(self, headers: dict, payload: dict) -> dict: """Stream SSE from Codex API and collect the final response.""" completed_response = None + streamed_output_items: dict[int, dict[str, Any]] = {} with httpx.Client(timeout=300) as client: with client.stream("POST", f"{CODEX_BASE_URL}/responses", headers=headers, json=payload) as resp: resp.raise_for_status() for line in resp.iter_lines(): data = self._parse_sse_data_line(line) - if data and data.get("type") == "response.completed": + if not data: + continue + + event_type = data.get("type") + if event_type == "response.output_item.done": + output_index = data.get("output_index") + output_item = data.get("item") + if isinstance(output_index, int) and isinstance(output_item, dict): + streamed_output_items[output_index] = output_item + elif event_type == "response.completed": completed_response = data["response"] if not completed_response: raise RuntimeError("Codex API stream ended without response.completed event") + # ChatGPT Codex can emit the final assistant content only in stream events. + # When response.completed arrives, response.output may still be empty. + if streamed_output_items: + merged_output = [] + response_output = completed_response.get("output") + if isinstance(response_output, list): + merged_output = list(response_output) + + max_index = max(max(streamed_output_items), len(merged_output) - 1) + if max_index >= 0 and len(merged_output) <= max_index: + merged_output.extend([None] * (max_index + 1 - len(merged_output))) + + for output_index, output_item in streamed_output_items.items(): + existing_item = merged_output[output_index] + if not isinstance(existing_item, dict): + merged_output[output_index] = output_item + + completed_response = dict(completed_response) + completed_response["output"] = [item for item in merged_output if isinstance(item, dict)] + return completed_response @staticmethod diff --git a/backend/tests/test_cli_auth_providers.py b/backend/tests/test_cli_auth_providers.py index cb3de5a0c..00df4b726 100644 --- a/backend/tests/test_cli_auth_providers.py +++ b/backend/tests/test_cli_auth_providers.py @@ -5,6 +5,7 @@ import json import pytest from langchain_core.messages import HumanMessage, SystemMessage +from deerflow.models import openai_codex_provider as codex_provider_module from deerflow.models.claude_provider import ClaudeChatModel from deerflow.models.credential_loader import CodexCliCredential from deerflow.models.openai_codex_provider import CodexChatModel @@ -147,3 +148,124 @@ def test_codex_provider_parses_valid_tool_arguments(monkeypatch): ) assert result.generations[0].message.tool_calls == [{"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"}] + + +class _FakeResponseStream: + def __init__(self, lines: list[str]): + self._lines = lines + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def raise_for_status(self): + return None + + def iter_lines(self): + yield from self._lines + + +class _FakeHttpxClient: + def __init__(self, lines: list[str], *_args, **_kwargs): + self._lines = lines + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def stream(self, *_args, **_kwargs): + return _FakeResponseStream(self._lines) + + +def test_codex_provider_merges_streamed_output_items_when_completed_output_is_empty(monkeypatch): + monkeypatch.setattr( + CodexChatModel, + "_load_codex_auth", + lambda self: CodexCliCredential(access_token="token", account_id="acct"), + ) + + lines = [ + 'data: {"type":"response.output_item.done","output_index":0,"item":{"type":"message","content":[{"type":"output_text","text":"Hello from stream"}]}}', + 'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[],"usage":{"input_tokens":1,"output_tokens":2,"total_tokens":3}}}', + ] + + monkeypatch.setattr( + codex_provider_module.httpx, + "Client", + lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs), + ) + + model = CodexChatModel() + response = model._stream_response(headers={}, payload={}) + parsed = model._parse_response(response) + + assert response["output"] == [ + { + "type": "message", + "content": [{"type": "output_text", "text": "Hello from stream"}], + } + ] + assert parsed.generations[0].message.content == "Hello from stream" + + +def test_codex_provider_orders_streamed_output_items_by_output_index(monkeypatch): + monkeypatch.setattr( + CodexChatModel, + "_load_codex_auth", + lambda self: CodexCliCredential(access_token="token", account_id="acct"), + ) + + lines = [ + 'data: {"type":"response.output_item.done","output_index":1,"item":{"type":"message","content":[{"type":"output_text","text":"Second"}]}}', + 'data: {"type":"response.output_item.done","output_index":0,"item":{"type":"message","content":[{"type":"output_text","text":"First"}]}}', + 'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[],"usage":{}}}', + ] + + monkeypatch.setattr( + codex_provider_module.httpx, + "Client", + lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs), + ) + + model = CodexChatModel() + response = model._stream_response(headers={}, payload={}) + + assert [item["content"][0]["text"] for item in response["output"]] == [ + "First", + "Second", + ] + + +def test_codex_provider_preserves_completed_output_when_stream_only_has_placeholder(monkeypatch): + monkeypatch.setattr( + CodexChatModel, + "_load_codex_auth", + lambda self: CodexCliCredential(access_token="token", account_id="acct"), + ) + + lines = [ + 'data: {"type":"response.output_item.added","output_index":0,"item":{"type":"message","status":"in_progress","content":[]}}', + 'data: {"type":"response.completed","response":{"model":"gpt-5.4","output":[{"type":"message","content":[{"type":"output_text","text":"Final from completed"}]}],"usage":{}}}', + ] + + monkeypatch.setattr( + codex_provider_module.httpx, + "Client", + lambda *args, **kwargs: _FakeHttpxClient(lines, *args, **kwargs), + ) + + model = CodexChatModel() + response = model._stream_response(headers={}, payload={}) + parsed = model._parse_response(response) + + assert response["output"] == [ + { + "type": "message", + "content": [{"type": "output_text", "text": "Final from completed"}], + } + ] + assert parsed.generations[0].message.content == "Final from completed"