mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-26 11:48:10 +00:00
- Fix spawn() zombie cell: clean up registry on start() failure - Fix shutdown(): cancel + await tasks that exceed graceful timeout - Fix _shutdown(): await mailbox.close() to release backend resources - Fix escalate directive: stop failing child before propagating to grandparent - Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure - Fix retry.py: replace assert with proper raise for last_exc - Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue - Add RedisMailbox.put_batch() with atomic Lua script for bounded queues - Add MailboxFullError exception type for semantic backpressure handling - Add redis>=7.4.0 dependency with public PyPI sources in uv.lock Tests added (31 total, up from 27): - test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart - test_ask_propagates_actor_exception: ask() re-raises original exception type - test_ask_propagates_exception_while_supervised: exception propagates; root actor survives - test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op - test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox - test_actor_retry.py: ask_with_retry with exponential backoff - test_mailbox_redis.py: RedisMailbox put/get/batch/close - bench_actor_redis.py: RedisMailbox throughput benchmarks
63 lines
1.7 KiB
Python
63 lines
1.7 KiB
Python
import asyncio
|
|
|
|
import pytest
|
|
|
|
from deerflow.actor import Actor, ActorSystem, IdempotentActorMixin, RetryEnvelope, ask_with_retry
|
|
|
|
|
|
class FlakyIdempotentActor(IdempotentActorMixin, Actor):
|
|
async def on_started(self):
|
|
self.calls = 0
|
|
|
|
async def on_receive(self, message):
|
|
return await self.handle_idempotent(message, self._handle)
|
|
|
|
async def _handle(self, payload):
|
|
self.calls += 1
|
|
if payload == 'flaky' and self.calls == 1:
|
|
await asyncio.sleep(0.02)
|
|
return 'late'
|
|
return f"ok:{payload}"
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_ask_with_retry_timeout_raises():
|
|
system = ActorSystem('retry')
|
|
ref = await system.spawn(FlakyIdempotentActor, 'a')
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
await ask_with_retry(
|
|
ref,
|
|
'flaky',
|
|
timeout=0.005,
|
|
max_attempts=3,
|
|
base_backoff_s=0.001,
|
|
max_backoff_s=0.005,
|
|
jitter_ratio=0.0,
|
|
idempotency_key='k1',
|
|
)
|
|
|
|
# This helper retries timeout, but if each attempt times out it should raise.
|
|
assert ref.is_alive
|
|
await system.shutdown()
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_idempotent_envelope_returns_cached_result():
|
|
system = ActorSystem('retry')
|
|
ref = await system.spawn(FlakyIdempotentActor, 'a')
|
|
|
|
m1 = RetryEnvelope.wrap('x', idempotency_key='same-key')
|
|
m2 = RetryEnvelope.wrap('x', idempotency_key='same-key', attempt=2, max_attempts=3)
|
|
|
|
r1 = await ref.ask(m1, timeout=1.0)
|
|
r2 = await ref.ask(m2, timeout=1.0)
|
|
|
|
assert r1 == 'ok:x'
|
|
assert r2 == 'ok:x'
|
|
# handler should run once due to idempotency cache
|
|
actor = ref._cell.actor
|
|
assert actor.calls == 1
|
|
|
|
await system.shutdown()
|