feature/domain-migration main EventFlow Case Study | Clifford Opoku-Sarkodie - Backend Engineer
DevSecOps

EventFlow

Event-Driven Pipeline with Real-Time Observability

2024 Python, FastAPI, Redis Streams Production Ready

Project Overview

The Problem

Critical system events were vanishing into the void—no reliable delivery, no retry on failure, and zero observability into what was happening under the hood.

The Solution

An event-driven pipeline with Redis Streams for at-least-once delivery, circuit breakers for resilience, dead-letter queues for failed events, and Prometheus/Grafana for real-time observability.

Key Metrics

  • 🚀 99.99% event delivery rate
  • ⚡ Sub-100ms end-to-end latency (p95)
  • 📊 Real-time dashboards for all event flows

System Architecture

High-level overview of the event pipeline and observability stack.

flowchart TB subgraph Producers["Event Producers"] API[API Services] WORKER[Background Workers] WEBHOOK[Webhook Handlers] end subgraph EventBus["Event Bus - Redis Streams"] STREAM[Event Stream] CONSUMER_GROUP[Consumer Groups] end subgraph Consumers["Event Consumers"] HANDLER1[Email Handler] HANDLER2[Notification Handler] HANDLER3[Analytics Handler] HANDLER4[Webhook Dispatcher] end subgraph Resilience["Resilience Layer"] CIRCUIT[Circuit Breaker] RETRY[Retry Manager] DLQ[Dead Letter Queue] end subgraph Observability["Observability Stack"] PROMETHEUS[Prometheus] GRAFANA[Grafana Dashboards] ALERTS[Alert Manager] end API --> STREAM WORKER --> STREAM WEBHOOK --> STREAM STREAM --> CONSUMER_GROUP CONSUMER_GROUP --> HANDLER1 CONSUMER_GROUP --> HANDLER2 CONSUMER_GROUP --> HANDLER3 CONSUMER_GROUP --> HANDLER4 HANDLER1 --> CIRCUIT HANDLER2 --> CIRCUIT HANDLER3 --> CIRCUIT HANDLER4 --> CIRCUIT CIRCUIT --> RETRY RETRY --> DLQ STREAM --> PROMETHEUS CIRCUIT --> PROMETHEUS DLQ --> PROMETHEUS PROMETHEUS --> GRAFANA PROMETHEUS --> ALERTS

Data Model

Event storage and subscription management structure.

erDiagram EVENTS ||--o{ EVENT_DELIVERIES : has SUBSCRIPTIONS ||--o{ EVENT_DELIVERIES : triggers EVENTS ||--o| DEAD_LETTER_QUEUE : fails_to EVENTS { uuid id PK string event_type string source json payload string correlation_id timestamp created_at timestamp processed_at } SUBSCRIPTIONS { uuid id PK string event_type string handler_name string endpoint_url json config boolean is_active timestamp created_at } EVENT_DELIVERIES { uuid id PK uuid event_id FK uuid subscription_id FK enum status "pending|processing|delivered|failed" integer attempt_count json last_error timestamp next_retry_at timestamp delivered_at } DEAD_LETTER_QUEUE { uuid id PK uuid event_id FK string handler_name json error_details integer total_attempts timestamp failed_at timestamp reprocessed_at } CIRCUIT_STATES { string handler_name PK enum state "closed|open|half_open" integer failure_count timestamp last_failure timestamp opened_at }

Engineering Challenges

01

At-Least-Once Delivery

Problem

Fire-and-forget event publishing meant events were lost during crashes, deployments, or network partitions.

Solution

Used Redis Streams with consumer groups. Events are acknowledged only after successful processing. Pending entries are claimed by other consumers if the original times out.

# Consumer group with acknowledgment
async def consume_events(group: str, consumer: str):
    while True:
        # Read pending events first (crash recovery)
        pending = await redis.xautoclaim(
            STREAM_KEY, group, consumer,
            min_idle_time=CLAIM_TIMEOUT_MS,
            count=10
        )
        
        for event_id, event_data in pending:
            await process_with_ack(group, event_id, event_data)
        
        # Then read new events
        events = await redis.xreadgroup(
            groupname=group,
            consumername=consumer,
            streams={STREAM_KEY: '>'},
            count=10,
            block=5000
        )
        
        for event_id, event_data in events:
            await process_with_ack(group, event_id, event_data)

async def process_with_ack(group: str, event_id: str, data: dict):
    try:
        await handle_event(data)
        await redis.xack(STREAM_KEY, group, event_id)
    except Exception as e:
        # Will be reclaimed by another consumer after timeout
        logger.error(f"Event {event_id} failed: {e}")
02

Circuit Breaker Pattern

Problem

When downstream services failed, the event queue backed up. Continuous retries overwhelmed recovering services.

Solution

Implemented circuit breakers per handler. After N failures, the circuit opens and events are short-circuited to the DLQ. Half-open state tests recovery before full resumption.

# Circuit breaker implementation
class CircuitBreaker:
    def __init__(self, handler: str, threshold: int = 5, timeout: int = 60):
        self.handler = handler
        self.threshold = threshold
        self.timeout = timeout
    
    async def call(self, func, *args, **kwargs):
        state = await self.get_state()
        
        if state == CircuitState.OPEN:
            if await self.should_attempt_reset():
                await self.set_state(CircuitState.HALF_OPEN)
            else:
                raise CircuitOpenError(f"{self.handler} circuit is open")
        
        try:
            result = await func(*args, **kwargs)
            await self.record_success()
            return result
        except Exception as e:
            await self.record_failure()
            
            if await self.get_failure_count() >= self.threshold:
                await self.set_state(CircuitState.OPEN)
                metrics.circuit_opened.labels(handler=self.handler).inc()
            
            raise

    async def record_success(self):
        await redis.hset(f"circuit:{self.handler}", "failures", 0)
        await self.set_state(CircuitState.CLOSED)
03

Real-Time Observability

Problem

Debugging event flow issues required digging through logs. No visibility into queue depth, processing latency, or failure rates.

Solution

Instrumented everything with Prometheus metrics. Grafana dashboards show real-time event throughput, consumer lag, circuit states, and DLQ depth. Alerts fire on anomalies.

# Prometheus metrics instrumentation
from prometheus_client import Counter, Histogram, Gauge

events_published = Counter(
    'eventflow_events_published_total',
    'Total events published',
    ['event_type', 'source']
)

events_processed = Counter(
    'eventflow_events_processed_total',
    'Total events processed',
    ['event_type', 'handler', 'status']
)

processing_latency = Histogram(
    'eventflow_processing_seconds',
    'Event processing latency',
    ['event_type', 'handler'],
    buckets=[.01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]
)

consumer_lag = Gauge(
    'eventflow_consumer_lag',
    'Number of pending events per consumer group',
    ['consumer_group']
)

dlq_depth = Gauge(
    'eventflow_dlq_depth',
    'Number of events in dead letter queue',
    ['handler']
)

# Usage in handler
async def handle_event(event: dict, handler: str):
    with processing_latency.labels(
        event_type=event['type'],
        handler=handler
    ).time():
        result = await process(event)
        events_processed.labels(
            event_type=event['type'],
            handler=handler,
            status='success'
        ).inc()

Key Takeaways

🔄

Idempotency is Essential

At-least-once delivery means duplicates happen. Every handler must be idempotent—processing the same event twice should have no additional effect.

🛡️

Fail Fast, Recover Gracefully

Circuit breakers prevent cascade failures. When services fail, stop hammering them and give them time to recover.

📊

Observability is Not Optional

You can't fix what you can't see. Metrics, tracing, and alerting must be built in from day one—not bolted on later.

📬

Dead Letter Queues Save Lives

Failed events shouldn't disappear. DLQs preserve them for investigation, manual intervention, and eventual reprocessing.

Interested in the Technical Details?

Check out the full source code or reach out to discuss the architecture.