deer-flow/backend/tests/bench_actor.py
greatmengqi 228a2a66e3 fix(actor): harden lifecycle, supervision, Redis mailbox, and add comprehensive tests
- Fix spawn() zombie cell: clean up registry on start() failure
- Fix shutdown(): cancel + await tasks that exceed graceful timeout
- Fix _shutdown(): await mailbox.close() to release backend resources
- Fix escalate directive: stop failing child before propagating to grandparent
- Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure
- Fix retry.py: replace assert with proper raise for last_exc
- Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue
- Add RedisMailbox.put_batch() with atomic Lua script for bounded queues
- Add MailboxFullError exception type for semantic backpressure handling
- Add redis>=7.4.0 dependency with public PyPI sources in uv.lock

Tests added (31 total, up from 27):
- test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart
- test_ask_propagates_actor_exception: ask() re-raises original exception type
- test_ask_propagates_exception_while_supervised: exception propagates; root actor survives
- test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op
- test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox
- test_actor_retry.py: ask_with_retry with exponential backoff
- test_mailbox_redis.py: RedisMailbox put/get/batch/close
- bench_actor_redis.py: RedisMailbox throughput benchmarks
2026-03-31 10:09:05 +08:00

269 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Actor framework benchmarks — throughput, latency, concurrency."""
import asyncio
import time
from deerflow.actor import Actor, ActorSystem, Middleware
class NoopActor(Actor):
async def on_receive(self, message):
return message
class CounterActor(Actor):
async def on_started(self):
self.count = 0
async def on_receive(self, message):
if message == "inc":
self.count += 1
return self.count
if message == "get":
return self.count
return self.count
class ChainActor(Actor):
"""Forwards message to next actor in chain."""
next_ref = None
async def on_receive(self, message):
if self.next_ref is not None:
return await self.next_ref.ask(message)
return message
class ComputeActor(Actor):
"""Simulates CPU work via thread pool."""
async def on_receive(self, message):
def fib(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
return await self.context.run_in_executor(fib, message)
class CountMiddleware(Middleware):
def __init__(self):
self.count = 0
async def on_receive(self, ctx, message, next_fn):
self.count += 1
return await next_fn(ctx, message)
def fmt(n):
if n >= 1_000_000:
return f"{n/1_000_000:.1f}M"
if n >= 1_000:
return f"{n/1_000:.0f}K"
return str(n)
async def bench_tell_throughput(n=100_000):
"""Measure tell (fire-and-forget) throughput."""
system = ActorSystem("bench")
ref = await system.spawn(CounterActor, "counter", mailbox_size=n + 10)
start = time.perf_counter()
for _ in range(n):
await ref.tell("inc")
# Wait for all messages to be processed
count = await ref.ask("get", timeout=30.0)
if count != n:
print(f" warning: expected {n} processed, got {count}")
elapsed = time.perf_counter() - start
await system.shutdown()
rate = n / elapsed
print(f" tell throughput: {fmt(n)} msgs in {elapsed:.2f}s = {fmt(int(rate))}/s")
async def bench_ask_throughput(n=50_000):
"""Measure ask (request-response) throughput."""
system = ActorSystem("bench")
ref = await system.spawn(NoopActor, "echo")
start = time.perf_counter()
for _ in range(n):
await ref.ask("ping")
elapsed = time.perf_counter() - start
await system.shutdown()
rate = n / elapsed
print(f" ask throughput: {fmt(n)} msgs in {elapsed:.2f}s = {fmt(int(rate))}/s")
async def bench_ask_latency(n=10_000):
"""Measure ask round-trip latency percentiles."""
system = ActorSystem("bench")
ref = await system.spawn(NoopActor, "echo")
# Warmup
for _ in range(100):
await ref.ask("warmup")
latencies = []
for _ in range(n):
t0 = time.perf_counter()
await ref.ask("ping")
latencies.append((time.perf_counter() - t0) * 1_000_000) # microseconds
await system.shutdown()
latencies.sort()
p50 = latencies[len(latencies) // 2]
p99 = latencies[int(len(latencies) * 0.99)]
p999 = latencies[int(len(latencies) * 0.999)]
print(f" ask latency: p50={p50:.0f}µs p99={p99:.0f}µs p99.9={p999:.0f}µs")
async def bench_concurrent_actors(num_actors=1000, msgs_per_actor=100):
"""Measure throughput with many concurrent actors."""
system = ActorSystem("bench")
refs = []
for i in range(num_actors):
refs.append(await system.spawn(CounterActor, f"a{i}", mailbox_size=msgs_per_actor + 10))
start = time.perf_counter()
async def send_batch(ref, n):
for i in range(n):
await ref.tell("inc")
# Yield control every 50 msgs so actor loops can drain
if i % 50 == 49:
await asyncio.sleep(0)
return await ref.ask("get", timeout=30.0)
results = await asyncio.gather(*[send_batch(r, msgs_per_actor) for r in refs])
elapsed = time.perf_counter() - start
total = num_actors * msgs_per_actor
delivered = sum(results)
rate = total / elapsed
loss = total - delivered
print(f" {num_actors} actors × {msgs_per_actor} msgs: {fmt(total)} in {elapsed:.2f}s = {fmt(int(rate))}/s (loss: {loss})")
await system.shutdown()
async def bench_actor_chain(depth=100):
"""Measure ask latency through a chain of actors (hop overhead)."""
system = ActorSystem("bench")
refs = []
for i in range(depth):
refs.append(await system.spawn(ChainActor, f"c{i}"))
# Link chain: c0 → c1 → ... → c99
for i in range(depth - 1):
refs[i]._cell.actor.next_ref = refs[i + 1]
start = time.perf_counter()
result = await refs[0].ask("ping", timeout=30.0)
elapsed = time.perf_counter() - start
assert result == "ping"
per_hop = elapsed / depth * 1_000_000 # µs
print(f" chain {depth} hops: {elapsed*1000:.1f}ms total, {per_hop:.0f}µs/hop")
await system.shutdown()
async def bench_middleware_overhead(n=50_000):
"""Measure overhead of middleware pipeline."""
mw = CountMiddleware()
system_plain = ActorSystem("plain")
ref_plain = await system_plain.spawn(NoopActor, "echo")
system_mw = ActorSystem("mw")
ref_mw = await system_mw.spawn(NoopActor, "echo", middlewares=[mw])
# Plain
t0 = time.perf_counter()
for _ in range(n):
await ref_plain.ask("p")
plain_elapsed = time.perf_counter() - t0
# With middleware
t0 = time.perf_counter()
for _ in range(n):
await ref_mw.ask("p")
mw_elapsed = time.perf_counter() - t0
overhead = ((mw_elapsed - plain_elapsed) / plain_elapsed) * 100
print(f" middleware overhead: {overhead:+.1f}% ({fmt(n)} ask calls, 1 middleware)")
await system_plain.shutdown()
await system_mw.shutdown()
async def bench_executor_parallel(num_tasks=16):
"""Measure thread pool parallelism with CPU work."""
system = ActorSystem("bench", executor_workers=8)
refs = [await system.spawn(ComputeActor, f"cpu{i}") for i in range(num_tasks)]
start = time.perf_counter()
results = await asyncio.gather(*[r.ask(10_000, timeout=30.0) for r in refs])
elapsed = time.perf_counter() - start
print(f" executor parallel: {num_tasks} fib(10K) in {elapsed*1000:.0f}ms ({num_tasks/elapsed:.0f} tasks/s)")
await system.shutdown()
async def bench_spawn_teardown(n=5000):
"""Measure actor spawn + shutdown speed."""
system = ActorSystem("bench")
start = time.perf_counter()
refs = []
for i in range(n):
refs.append(await system.spawn(NoopActor, f"a{i}"))
spawn_elapsed = time.perf_counter() - start
start = time.perf_counter()
await system.shutdown()
shutdown_elapsed = time.perf_counter() - start
print(f" spawn {n}: {spawn_elapsed*1000:.0f}ms ({n/spawn_elapsed:.0f}/s)")
print(f" shutdown {n}: {shutdown_elapsed*1000:.0f}ms")
async def main():
print("=" * 60)
print(" Actor Framework Benchmarks")
print("=" * 60)
print()
print("[Throughput]")
await bench_tell_throughput()
await bench_ask_throughput()
print()
print("[Latency]")
await bench_ask_latency()
await bench_actor_chain()
print()
print("[Concurrency]")
await bench_concurrent_actors()
await bench_executor_parallel()
print()
print("[Overhead]")
await bench_middleware_overhead()
print()
print("[Lifecycle]")
await bench_spawn_teardown()
print()
print("=" * 60)
print(" Done")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())