fix(gateway): return ISO 8601 timestamps from threads endpoints (#2599)

* fix(gateway): return ISO 8601 timestamps from threads endpoints (#2594)

ThreadResponse documents created_at / updated_at as ISO timestamps,
matching the LangGraph Platform schema (langgraph_sdk.schema.Thread
exposes them as datetime, JSON-encoded as ISO 8601). The gateway
threads router was instead emitting str(time.time()) — unix-second
floats — breaking frontend new Date() parsing and producing a mixed
ISO/unix wire format that also corrupted the search sort order.

Centralize timestamp generation in deerflow.utils.time:
- now_iso()       — datetime.now(UTC).isoformat()
- coerce_iso(x)   — heals legacy unix-timestamp strings on read so the
                    store converges to ISO without a one-shot migration

threads.py: replace 6 time.time() call sites with now_iso(); wrap all
read paths and Phase-2 checkpoint metadata with coerce_iso(); _store_upsert
opportunistically heals legacy created_at on update; drop unused time import.

thread_runs.py: reuse now_iso() instead of a private duplicate _now_iso(),
preventing future drift between the two timestamp call sites.

Tests: 9 unit tests for the helper; 5 integration tests pinning the ISO
contract for create/get/patch/search and the legacy-healing path on the
internal store upsert. Full suite: 2144 passed, 15 skipped, 0 failed.

Closes #2594

* fix(gateway): coerce checkpoint metadata timestamps to ISO on read

After the merge with main, three additional read paths in ``threads.py``
were still emitting raw ``str(metadata.get("created_at", ""))`` —
``get_thread_state``, ``update_thread_state``, and ``get_thread_history``.

Same root cause as #2594: when the checkpoint metadata's ``created_at``
is a unix-second float (legacy data, or a checkpoint written by an older
Gateway version), ``str(float)`` produces ``"1777252410.411327"`` and the
frontend's ``new Date(...)`` returns ``Invalid Date``. The fix on the
``/threads/{id}`` GET path was already in place; these three sibling
endpoints needed the same treatment.

All four call sites now flow through ``coerce_iso``, so:
- legacy float metadata heals to ISO on the way out,
- ISO metadata passes through unchanged,
- ``datetime`` instances (which the new ``coerce_iso`` branch handles
  explicitly) emit with the ``T`` separator instead of falling through
  to the space-separated ``str(datetime)`` form.

Coverage added for the two endpoints not already pinned by the merge:
- ``test_get_thread_state_returns_iso_for_legacy_checkpoint_metadata``
- ``test_get_thread_history_returns_iso_for_legacy_checkpoint_metadata``

Both pre-seed a checkpoint whose metadata carries the literal float
from the issue body and assert the wire format is ISO.
This commit is contained in:
Xinmin Zeng 2026-05-02 15:16:16 +08:00 committed by GitHub
parent bb8b234d85
commit ca3332f8bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 494 additions and 32 deletions

View File

@ -13,7 +13,6 @@ matching the LangGraph Platform wire format expected by the
from __future__ import annotations
import logging
import time
import uuid
from typing import Any
@ -27,6 +26,7 @@ from app.gateway.utils import sanitize_log_param
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime import serialize_channel_values
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.utils.time import coerce_iso, now_iso
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/threads", tags=["threads"])
@ -234,7 +234,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
checkpointer = get_checkpointer(request)
thread_store = get_thread_store(request)
thread_id = body.thread_id or str(uuid.uuid4())
now = time.time()
now = now_iso()
# ``body.metadata`` is already stripped of server-reserved keys by
# ``ThreadCreateRequest._strip_reserved`` — see the model definition.
@ -244,8 +244,8 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
return ThreadResponse(
thread_id=thread_id,
status=existing_record.get("status", "idle"),
created_at=str(existing_record.get("created_at", "")),
updated_at=str(existing_record.get("updated_at", "")),
created_at=coerce_iso(existing_record.get("created_at", "")),
updated_at=coerce_iso(existing_record.get("updated_at", "")),
metadata=existing_record.get("metadata", {}),
)
@ -280,8 +280,8 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
return ThreadResponse(
thread_id=thread_id,
status="idle",
created_at=str(now),
updated_at=str(now),
created_at=now,
updated_at=now,
metadata=body.metadata,
)
@ -306,8 +306,11 @@ async def search_threads(body: ThreadSearchRequest, request: Request) -> list[Th
ThreadResponse(
thread_id=r["thread_id"],
status=r.get("status", "idle"),
created_at=r.get("created_at", ""),
updated_at=r.get("updated_at", ""),
# ``coerce_iso`` heals legacy unix-second values that
# ``MemoryThreadMetaStore`` historically wrote with ``time.time()``;
# SQL-backed rows already arrive as ISO strings and pass through.
created_at=coerce_iso(r.get("created_at", "")),
updated_at=coerce_iso(r.get("updated_at", "")),
metadata=r.get("metadata", {}),
values={"title": r["display_name"]} if r.get("display_name") else {},
interrupts={},
@ -339,8 +342,8 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques
return ThreadResponse(
thread_id=thread_id,
status=record.get("status", "idle"),
created_at=str(record.get("created_at", "")),
updated_at=str(record.get("updated_at", "")),
created_at=coerce_iso(record.get("created_at", "")),
updated_at=coerce_iso(record.get("updated_at", "")),
metadata=record.get("metadata", {}),
)
@ -380,8 +383,8 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
record = {
"thread_id": thread_id,
"status": "idle",
"created_at": ckpt_meta.get("created_at", ""),
"updated_at": ckpt_meta.get("updated_at", ckpt_meta.get("created_at", "")),
"created_at": coerce_iso(ckpt_meta.get("created_at", "")),
"updated_at": coerce_iso(ckpt_meta.get("updated_at", ckpt_meta.get("created_at", ""))),
"metadata": {k: v for k, v in ckpt_meta.items() if k not in ("created_at", "updated_at", "step", "source", "writes", "parents")},
}
@ -395,8 +398,8 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
return ThreadResponse(
thread_id=thread_id,
status=status,
created_at=str(record.get("created_at", "")),
updated_at=str(record.get("updated_at", "")),
created_at=coerce_iso(record.get("created_at", "")),
updated_at=coerce_iso(record.get("updated_at", "")),
metadata=record.get("metadata", {}),
values=serialize_channel_values(channel_values),
)
@ -447,10 +450,10 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
values=values,
next=next_tasks,
metadata=metadata,
checkpoint={"id": checkpoint_id, "ts": str(metadata.get("created_at", ""))},
checkpoint={"id": checkpoint_id, "ts": coerce_iso(metadata.get("created_at", ""))},
checkpoint_id=checkpoint_id,
parent_checkpoint_id=parent_checkpoint_id,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
tasks=tasks,
)
@ -500,7 +503,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
channel_values.update(body.values)
checkpoint["channel_values"] = channel_values
metadata["updated_at"] = time.time()
metadata["updated_at"] = now_iso()
if body.as_node:
metadata["source"] = "update"
@ -541,7 +544,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
next=[],
metadata=metadata,
checkpoint_id=new_checkpoint_id,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
)
@ -608,7 +611,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
parent_checkpoint_id=parent_id,
metadata=user_meta,
values=values,
created_at=str(metadata.get("created_at", "")),
created_at=coerce_iso(metadata.get("created_at", "")),
next=next_tasks,
)
)

View File

@ -7,13 +7,13 @@ router for thread records.
from __future__ import annotations
import time
from typing import Any
from langgraph.store.base import BaseStore
from deerflow.persistence.thread_meta.base import ThreadMetaStore
from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_user_id
from deerflow.utils.time import coerce_iso, now_iso
THREADS_NS: tuple[str, ...] = ("threads",)
@ -48,7 +48,7 @@ class MemoryThreadMetaStore(ThreadMetaStore):
metadata: dict | None = None,
) -> dict:
resolved_user_id = resolve_user_id(user_id, method_name="MemoryThreadMetaStore.create")
now = time.time()
now = now_iso()
record: dict[str, Any] = {
"thread_id": thread_id,
"assistant_id": assistant_id,
@ -106,7 +106,7 @@ class MemoryThreadMetaStore(ThreadMetaStore):
if record is None:
return
record["display_name"] = display_name
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)
async def update_status(self, thread_id: str, status: str, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
@ -114,7 +114,7 @@ class MemoryThreadMetaStore(ThreadMetaStore):
if record is None:
return
record["status"] = status
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)
async def update_metadata(self, thread_id: str, metadata: dict, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
@ -124,7 +124,7 @@ class MemoryThreadMetaStore(ThreadMetaStore):
merged = dict(record.get("metadata") or {})
merged.update(metadata)
record["metadata"] = merged
record["updated_at"] = time.time()
record["updated_at"] = now_iso()
await self._store.aput(THREADS_NS, thread_id, record)
async def delete(self, thread_id: str, *, user_id: str | None | _AutoSentinel = AUTO) -> None:
@ -144,6 +144,8 @@ class MemoryThreadMetaStore(ThreadMetaStore):
"display_name": val.get("display_name"),
"status": val.get("status", "idle"),
"metadata": val.get("metadata", {}),
"created_at": str(val.get("created_at", "")),
"updated_at": str(val.get("updated_at", "")),
# ``coerce_iso`` heals legacy unix-second values written by
# earlier Gateway versions that called ``str(time.time())``.
"created_at": coerce_iso(val.get("created_at", "")),
"updated_at": coerce_iso(val.get("updated_at", "")),
}

View File

@ -6,9 +6,10 @@ import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from deerflow.utils.time import now_iso as _now_iso
from .schemas import DisconnectMode, RunStatus
if TYPE_CHECKING:
@ -17,10 +18,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _now_iso() -> str:
return datetime.now(UTC).isoformat()
@dataclass
class RunRecord:
"""Mutable record for a single run."""

View File

@ -0,0 +1,75 @@
"""ISO 8601 timestamp helpers for the Gateway and embedded runtime.
DeerFlow stores and serializes thread/run timestamps as ISO 8601 UTC
strings to match the LangGraph Platform schema (see
``langgraph_sdk.schema.Thread``, where ``created_at`` / ``updated_at``
are ``datetime`` and JSON-encode to ISO 8601). All timestamp generation
should funnel through :func:`now_iso` so the wire format stays
consistent across endpoints, the embedded ``RunManager``, and the
checkpoint metadata written by the Gateway.
:func:`coerce_iso` provides a forward-compatible read path for legacy
records that historically stored ``str(time.time())`` floats.
"""
from __future__ import annotations
import re
from datetime import UTC, datetime
__all__ = ["coerce_iso", "now_iso"]
_UNIX_TIMESTAMP_PATTERN = re.compile(r"^\d{10}(?:\.\d+)?$")
"""Matches the unix-timestamp string shape historically written by
``str(time.time())`` (10-digit seconds with optional fractional part).
The 10-digit anchor avoids accidentally rewriting ISO years like
``"2026"`` and stays valid until the year 2286.
"""
def now_iso() -> str:
"""Return the current UTC time as an ISO 8601 string.
Example: ``"2026-04-27T03:19:46.511479+00:00"``.
"""
return datetime.now(UTC).isoformat()
def coerce_iso(value: object) -> str:
"""Best-effort coerce a stored timestamp to an ISO 8601 string.
Translates legacy unix-timestamp floats / strings written by older
DeerFlow versions into ISO without a one-shot migration. ISO strings
pass through unchanged; ``datetime`` instances are normalised to UTC
(tz-naive values are assumed to be UTC) and emitted via
``isoformat()`` so the wire format always uses the ``T`` separator;
empty values become ``""``; unrecognised values are stringified as a
last resort.
"""
if value is None or value == "":
return ""
if isinstance(value, bool):
# ``bool`` is a subclass of ``int`` — treat as garbage, not 0/1.
return str(value)
if isinstance(value, datetime):
# ``datetime`` must be handled before the ``int``/``float`` check;
# str(datetime) would produce ``"YYYY-MM-DD HH:MM:SS+00:00"``
# (space separator), which breaks strict ISO 8601 consumers.
if value.tzinfo is None:
value = value.replace(tzinfo=UTC)
else:
value = value.astimezone(UTC)
return value.isoformat()
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), UTC).isoformat()
except (ValueError, OverflowError, OSError):
return str(value)
if isinstance(value, str):
if _UNIX_TIMESTAMP_PATTERN.match(value):
try:
return datetime.fromtimestamp(float(value), UTC).isoformat()
except (ValueError, OverflowError, OSError):
return value
return value
return str(value)

View File

@ -1,12 +1,66 @@
import re
from unittest.mock import patch
import pytest
from _router_auth_helpers import make_authed_test_app
from fastapi import HTTPException
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.store.memory import InMemoryStore
from app.gateway.routers import threads
from deerflow.config.paths import Paths
from deerflow.persistence.thread_meta.memory import THREADS_NS, MemoryThreadMetaStore
_ISO_TIMESTAMP_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
class _PermissiveThreadMetaStore(MemoryThreadMetaStore):
"""Memory store that skips user-id filtering for router tests.
Owner isolation is exercised separately in
``test_memory_thread_meta_isolation.py``. Router tests need to drive
the FastAPI surface end-to-end with a single fixed app user, but the
stub auth middleware in ``_router_auth_helpers`` stamps a fresh UUID
on every request, so the production filtering would reject every
pre-seeded record. Bypass that filter so the test can focus on the
timestamp wire format.
"""
async def _get_owned_record(self, thread_id, user_id, method_name): # type: ignore[override]
item = await self._store.aget(THREADS_NS, thread_id)
return dict(item.value) if item is not None else None
async def check_access(self, thread_id, user_id, *, require_existing=False): # type: ignore[override]
item = await self._store.aget(THREADS_NS, thread_id)
if item is None:
return not require_existing
return True
async def create(self, thread_id, *, assistant_id=None, user_id=None, display_name=None, metadata=None): # type: ignore[override]
return await super().create(thread_id, assistant_id=assistant_id, user_id=None, display_name=display_name, metadata=metadata)
async def search(self, *, metadata=None, status=None, limit=100, offset=0, user_id=None): # type: ignore[override]
return await super().search(metadata=metadata, status=status, limit=limit, offset=offset, user_id=None)
def _build_thread_app() -> tuple[FastAPI, InMemoryStore, InMemorySaver]:
"""Build a stub-authed FastAPI app wired with an in-memory ThreadMetaStore.
The thread_store on ``app.state`` is a permissive subclass of
``MemoryThreadMetaStore`` so tests can drive ``/api/threads``
end-to-end and pre-seed legacy records via the underlying BaseStore.
Returns ``(app, store, checkpointer)`` for direct seeding/inspection.
"""
app = make_authed_test_app()
store = InMemoryStore()
checkpointer = InMemorySaver()
app.state.store = store
app.state.checkpointer = checkpointer
app.state.thread_store = _PermissiveThreadMetaStore(store)
app.include_router(threads.router)
return app, store, checkpointer
def test_delete_thread_data_removes_thread_directory(tmp_path):
@ -136,3 +190,244 @@ def test_strip_reserved_metadata_empty_input():
def test_strip_reserved_metadata_strips_all_reserved_keys():
out = threads._strip_reserved_metadata({"user_id": "x", "keep": "me"})
assert out == {"keep": "me"}
# ---------------------------------------------------------------------------
# ISO 8601 timestamp contract (issue #2594)
# ---------------------------------------------------------------------------
#
# Threads endpoints document ``created_at`` / ``updated_at`` as ISO
# timestamps and that is the format LangGraph Platform uses
# (``langgraph_sdk.schema.Thread.created_at: datetime`` JSON-encodes to
# ISO 8601). The tests below pin that contract end-to-end and also
# exercise the ``coerce_iso`` healing path for legacy unix-timestamp
# records written by older Gateway versions.
def test_create_thread_returns_iso_timestamps() -> None:
app, _store, _checkpointer = _build_thread_app()
with TestClient(app) as client:
response = client.post("/api/threads", json={"metadata": {}})
assert response.status_code == 200, response.text
body = response.json()
assert _ISO_TIMESTAMP_RE.match(body["created_at"]), body["created_at"]
assert _ISO_TIMESTAMP_RE.match(body["updated_at"]), body["updated_at"]
assert body["created_at"] == body["updated_at"]
def test_get_thread_returns_iso_for_legacy_unix_record() -> None:
"""A thread record written by older versions stores ``time.time()``
floats. ``get_thread`` must transparently surface them as ISO so the
frontend's ``new Date(...)`` parser does not break.
"""
app, store, checkpointer = _build_thread_app()
legacy_thread_id = "legacy-thread"
legacy_ts = "1777252410.411327"
async def _seed() -> None:
await store.aput(
THREADS_NS,
legacy_thread_id,
{
"thread_id": legacy_thread_id,
"status": "idle",
"created_at": legacy_ts,
"updated_at": legacy_ts,
"metadata": {},
},
)
from langgraph.checkpoint.base import empty_checkpoint
await checkpointer.aput(
{"configurable": {"thread_id": legacy_thread_id, "checkpoint_ns": ""}},
empty_checkpoint(),
{"step": -1, "source": "input", "writes": None, "parents": {}},
{},
)
import asyncio
asyncio.run(_seed())
with TestClient(app) as client:
response = client.get(f"/api/threads/{legacy_thread_id}")
assert response.status_code == 200, response.text
body = response.json()
assert _ISO_TIMESTAMP_RE.match(body["created_at"]), body["created_at"]
assert _ISO_TIMESTAMP_RE.match(body["updated_at"]), body["updated_at"]
def test_patch_thread_returns_iso_and_advances_updated_at() -> None:
app, store, _checkpointer = _build_thread_app()
thread_id = "patch-target"
legacy_created = "1777000000.000000"
legacy_updated = "1777000000.000000"
async def _seed() -> None:
await store.aput(
THREADS_NS,
thread_id,
{
"thread_id": thread_id,
"status": "idle",
"created_at": legacy_created,
"updated_at": legacy_updated,
"metadata": {"k": "v0"},
},
)
import asyncio
asyncio.run(_seed())
with TestClient(app) as client:
response = client.patch(f"/api/threads/{thread_id}", json={"metadata": {"k": "v1"}})
assert response.status_code == 200, response.text
body = response.json()
assert _ISO_TIMESTAMP_RE.match(body["created_at"]), body["created_at"]
assert _ISO_TIMESTAMP_RE.match(body["updated_at"]), body["updated_at"]
# Patch issues a fresh ``updated_at`` via ``MemoryThreadMetaStore.update_metadata``,
# so it must be > the migrated legacy ``created_at`` (both ISO strings
# sort lexicographically by time when the format is consistent).
assert body["updated_at"] > body["created_at"]
assert body["metadata"] == {"k": "v1"}
def test_search_threads_normalizes_legacy_unix_seconds_to_iso() -> None:
"""``MemoryThreadMetaStore`` may hold legacy ``time.time()`` floats
written by older Gateway versions. ``/search`` must surface them as
ISO via ``coerce_iso`` so the frontend's ``new Date(...)`` parser
does not break.
"""
app, store, _checkpointer = _build_thread_app()
async def _seed() -> None:
# Legacy unix-second float (the literal value from issue #2594).
await store.aput(
THREADS_NS,
"legacy",
{
"thread_id": "legacy",
"status": "idle",
"created_at": 1777000000.0,
"updated_at": 1777000000.0,
"metadata": {},
},
)
# Modern ISO string, slightly later.
await store.aput(
THREADS_NS,
"modern",
{
"thread_id": "modern",
"status": "idle",
"created_at": "2026-04-27T00:00:00+00:00",
"updated_at": "2026-04-27T00:00:00+00:00",
"metadata": {},
},
)
import asyncio
asyncio.run(_seed())
with TestClient(app) as client:
response = client.post("/api/threads/search", json={"limit": 10})
assert response.status_code == 200, response.text
items = response.json()
assert {item["thread_id"] for item in items} == {"legacy", "modern"}
for item in items:
assert _ISO_TIMESTAMP_RE.match(item["created_at"]), item
assert _ISO_TIMESTAMP_RE.match(item["updated_at"]), item
def test_memory_thread_meta_store_writes_iso_on_create() -> None:
"""``MemoryThreadMetaStore.create`` must emit ISO so newly created
threads serialize correctly without depending on the router's
``coerce_iso`` heal path.
"""
import asyncio
store = InMemoryStore()
repo = MemoryThreadMetaStore(store)
async def _scenario() -> dict:
await repo.create("fresh", user_id=None, metadata={"a": 1})
record = (await store.aget(THREADS_NS, "fresh")).value
return record
record = asyncio.run(_scenario())
assert _ISO_TIMESTAMP_RE.match(record["created_at"]), record
assert _ISO_TIMESTAMP_RE.match(record["updated_at"]), record
def test_get_thread_state_returns_iso_for_legacy_checkpoint_metadata() -> None:
"""Checkpoints written by older Gateway versions stored
``created_at`` as a unix-second float in their metadata. The
``/state`` endpoint must surface that value as ISO so the frontend's
``new Date(...)`` parser does not break same root cause as the
thread-record bug fixed in #2594, but on the checkpoint side.
"""
app, _store, checkpointer = _build_thread_app()
thread_id = "legacy-state"
async def _seed() -> None:
from langgraph.checkpoint.base import empty_checkpoint
await checkpointer.aput(
{"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}},
empty_checkpoint(),
{"step": -1, "source": "input", "writes": None, "parents": {}, "created_at": 1777252410.411327},
{},
)
import asyncio
asyncio.run(_seed())
with TestClient(app) as client:
response = client.get(f"/api/threads/{thread_id}/state")
assert response.status_code == 200, response.text
body = response.json()
assert _ISO_TIMESTAMP_RE.match(body["created_at"]), body["created_at"]
assert _ISO_TIMESTAMP_RE.match(body["checkpoint"]["ts"]), body["checkpoint"]
def test_get_thread_history_returns_iso_for_legacy_checkpoint_metadata() -> None:
"""``/history`` walks ``checkpointer.alist`` and emits one entry per
checkpoint. Each entry's ``created_at`` must come out as ISO even if
older checkpoints stored a unix-second float in their metadata.
"""
app, _store, checkpointer = _build_thread_app()
thread_id = "legacy-history"
async def _seed() -> None:
from langgraph.checkpoint.base import empty_checkpoint
await checkpointer.aput(
{"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}},
empty_checkpoint(),
{"step": -1, "source": "input", "writes": None, "parents": {}, "created_at": 1777252410.411327},
{},
)
import asyncio
asyncio.run(_seed())
with TestClient(app) as client:
response = client.post(f"/api/threads/{thread_id}/history", json={"limit": 10})
assert response.status_code == 200, response.text
entries = response.json()
assert entries, "expected at least one history entry"
for entry in entries:
assert _ISO_TIMESTAMP_RE.match(entry["created_at"]), entry

View File

@ -0,0 +1,90 @@
"""Tests for ``deerflow.utils.time``."""
from __future__ import annotations
import re
from datetime import UTC, datetime, timedelta, timezone
from deerflow.utils.time import coerce_iso, now_iso
_ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
def test_now_iso_is_utc_iso8601() -> None:
value = now_iso()
assert _ISO_RE.match(value), value
parsed = datetime.fromisoformat(value)
assert parsed.tzinfo is not None
assert parsed.tzinfo.utcoffset(parsed) == UTC.utcoffset(parsed)
def test_coerce_iso_passes_iso_through() -> None:
iso = "2026-04-27T01:13:30.411334+00:00"
assert coerce_iso(iso) == iso
def test_coerce_iso_converts_unix_float_string() -> None:
legacy = "1777252410.411327"
out = coerce_iso(legacy)
assert _ISO_RE.match(out), out
# Round-trip: parsed timestamp matches the original epoch.
parsed = datetime.fromisoformat(out)
assert abs(parsed.timestamp() - 1777252410.411327) < 1e-3
def test_coerce_iso_converts_unix_int_string() -> None:
out = coerce_iso("1700000000")
assert _ISO_RE.match(out), out
def test_coerce_iso_converts_numeric_types() -> None:
out_float = coerce_iso(1777252410.411327)
out_int = coerce_iso(1700000000)
assert _ISO_RE.match(out_float)
assert _ISO_RE.match(out_int)
def test_coerce_iso_handles_empty_and_none() -> None:
assert coerce_iso(None) == ""
assert coerce_iso("") == ""
def test_coerce_iso_does_not_misinterpret_short_numeric() -> None:
# A 4-digit year should never be parsed as a unix timestamp; only
# 10-digit unix-second strings match the legacy pattern.
assert coerce_iso("2026") == "2026"
def test_coerce_iso_handles_unparseable_string() -> None:
assert coerce_iso("not-a-timestamp") == "not-a-timestamp"
def test_coerce_iso_rejects_bool() -> None:
# ``bool`` is a subclass of ``int`` — must not be treated as epoch 0/1.
assert coerce_iso(True) == "True"
assert coerce_iso(False) == "False"
def test_coerce_iso_handles_tz_aware_datetime() -> None:
# str(datetime) would emit a space separator; coerce_iso must use ``T``.
dt = datetime(2026, 4, 27, 1, 13, 30, 411327, tzinfo=UTC)
out = coerce_iso(dt)
assert out == "2026-04-27T01:13:30.411327+00:00"
assert "T" in out and " " not in out
def test_coerce_iso_handles_tz_naive_datetime_as_utc() -> None:
dt = datetime(2026, 4, 27, 1, 13, 30, 411327)
out = coerce_iso(dt)
assert out == "2026-04-27T01:13:30.411327+00:00"
parsed = datetime.fromisoformat(out)
assert parsed.tzinfo is not None
assert parsed.utcoffset() == timedelta(0)
def test_coerce_iso_normalises_non_utc_datetime_to_utc() -> None:
# +08:00 wall-clock 09:13 == UTC 01:13.
plus_eight = timezone(timedelta(hours=8))
dt = datetime(2026, 4, 27, 9, 13, 30, 411327, tzinfo=plus_eight)
out = coerce_iso(dt)
assert out == "2026-04-27T01:13:30.411327+00:00"