feature/domain-migration main
Event-Driven Pipeline with Real-Time Observability
Critical system events were vanishing into the void—no reliable delivery, no retry on failure, and zero observability into what was happening under the hood.
An event-driven pipeline with Redis Streams for at-least-once delivery, circuit breakers for resilience, dead-letter queues for failed events, and Prometheus/Grafana for real-time observability.
High-level overview of the event pipeline and observability stack.
Event storage and subscription management structure.
Fire-and-forget event publishing meant events were lost during crashes, deployments, or network partitions.
Used Redis Streams with consumer groups. Events are acknowledged only after successful processing. Pending entries are claimed by other consumers if the original times out.
# Consumer group with acknowledgment
async def consume_events(group: str, consumer: str):
while True:
# Read pending events first (crash recovery)
pending = await redis.xautoclaim(
STREAM_KEY, group, consumer,
min_idle_time=CLAIM_TIMEOUT_MS,
count=10
)
for event_id, event_data in pending:
await process_with_ack(group, event_id, event_data)
# Then read new events
events = await redis.xreadgroup(
groupname=group,
consumername=consumer,
streams={STREAM_KEY: '>'},
count=10,
block=5000
)
for event_id, event_data in events:
await process_with_ack(group, event_id, event_data)
async def process_with_ack(group: str, event_id: str, data: dict):
try:
await handle_event(data)
await redis.xack(STREAM_KEY, group, event_id)
except Exception as e:
# Will be reclaimed by another consumer after timeout
logger.error(f"Event {event_id} failed: {e}")
When downstream services failed, the event queue backed up. Continuous retries overwhelmed recovering services.
Implemented circuit breakers per handler. After N failures, the circuit opens and events are short-circuited to the DLQ. Half-open state tests recovery before full resumption.
# Circuit breaker implementation
class CircuitBreaker:
def __init__(self, handler: str, threshold: int = 5, timeout: int = 60):
self.handler = handler
self.threshold = threshold
self.timeout = timeout
async def call(self, func, *args, **kwargs):
state = await self.get_state()
if state == CircuitState.OPEN:
if await self.should_attempt_reset():
await self.set_state(CircuitState.HALF_OPEN)
else:
raise CircuitOpenError(f"{self.handler} circuit is open")
try:
result = await func(*args, **kwargs)
await self.record_success()
return result
except Exception as e:
await self.record_failure()
if await self.get_failure_count() >= self.threshold:
await self.set_state(CircuitState.OPEN)
metrics.circuit_opened.labels(handler=self.handler).inc()
raise
async def record_success(self):
await redis.hset(f"circuit:{self.handler}", "failures", 0)
await self.set_state(CircuitState.CLOSED)
Debugging event flow issues required digging through logs. No visibility into queue depth, processing latency, or failure rates.
Instrumented everything with Prometheus metrics. Grafana dashboards show real-time event throughput, consumer lag, circuit states, and DLQ depth. Alerts fire on anomalies.
# Prometheus metrics instrumentation
from prometheus_client import Counter, Histogram, Gauge
events_published = Counter(
'eventflow_events_published_total',
'Total events published',
['event_type', 'source']
)
events_processed = Counter(
'eventflow_events_processed_total',
'Total events processed',
['event_type', 'handler', 'status']
)
processing_latency = Histogram(
'eventflow_processing_seconds',
'Event processing latency',
['event_type', 'handler'],
buckets=[.01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]
)
consumer_lag = Gauge(
'eventflow_consumer_lag',
'Number of pending events per consumer group',
['consumer_group']
)
dlq_depth = Gauge(
'eventflow_dlq_depth',
'Number of events in dead letter queue',
['handler']
)
# Usage in handler
async def handle_event(event: dict, handler: str):
with processing_latency.labels(
event_type=event['type'],
handler=handler
).time():
result = await process(event)
events_processed.labels(
event_type=event['type'],
handler=handler,
status='success'
).inc()
At-least-once delivery means duplicates happen. Every handler must be idempotent—processing the same event twice should have no additional effect.
Circuit breakers prevent cascade failures. When services fail, stop hammering them and give them time to recover.
You can't fix what you can't see. Metrics, tracing, and alerting must be built in from day one—not bolted on later.
Failed events shouldn't disappear. DLQs preserve them for investigation, manual intervention, and eventual reprocessing.
Check out the full source code or reach out to discuss the architecture.