How to Build an Event-Driven Data Pipeline With Python and Message Queues

Data center cooling infrastructure with server racks and cables

Most data pipelines start as scheduled jobs. A cron task runs every 15 minutes, pulls new records from an API, transforms them, and writes to a database. It works fine until the source starts producing data faster than the schedule allows, or the pipeline needs to respond to something the moment it happens rather than eventually. Scheduled jobs also run whether or not there's new data to process, burning compute on empty cycles and adding latency that's baked into the architecture from day one.

Event-driven pipelines solve this by reversing the direction: instead of a scheduler asking "is there new data?", a message broker notifies the pipeline when data arrives. The pipeline wakes up, processes, and goes back to waiting. Latency drops. The system is responsive to actual load rather than a fixed schedule. Resource usage aligns with work that actually needs doing.

This guide covers the architecture, the key components, and how to implement a basic event-driven pipeline in Python with a message queue.

network operations center monitors screens
Photo by ThisIsEngineering on Pexels

What Event-Driven Means in Practice

The distinction is simple but consequential.

Scheduled pipeline: a timer fires, the pipeline asks the source "do you have new data?", the source either returns data or nothing, the pipeline processes whatever arrived, then the timer fires again.

Event-driven pipeline: the source generates a message when new data exists. The message goes into a queue. The pipeline is always listening; when a message arrives, it processes it. There's no polling, no empty cycles, and no fixed wait interval between data arriving and processing starting.

The practical difference:

  • Scheduled pipelines always have latency equal to at least half the interval. A 10-minute schedule means data arriving one second after a cycle just missed it waits 10 minutes.
  • Event-driven pipelines process data as soon as it arrives. Latency is bounded by network transit and processing time, not a schedule.
  • Scheduled pipelines run at full CPU even when there's nothing to do. Event-driven pipelines are idle when there's no work.
  • Event-driven pipelines are naturally concurrent -- you can add consumers without changing the pipeline architecture.

Not every pipeline benefits from event-driven design. Batch jobs that aggregate a full day's data, reports that need to run on a fixed schedule, and reconciliation jobs that require a complete dataset are all better as scheduled tasks. Event-driven design is the right choice when you need low latency, when data arrives unpredictably, or when the source can push notifications.

Core Components

An event-driven pipeline has four pieces:

Message broker -- the intermediary that receives messages from producers and delivers them to consumers. The broker guarantees message persistence and delivery. Common choices for Python pipelines: Redis Pub/Sub (simple, in-memory, fast), RabbitMQ (durable, feature-rich, AMQP protocol), Apache Kafka (high-throughput, distributed, log-based).

Producer -- the component that sends messages when events occur. This might be a webhook receiver, a database change listener, a file watcher, or any other code that detects the triggering condition and publishes to the queue.

Message queue -- the buffer between producers and consumers. When consumers are slow or temporarily down, messages accumulate in the queue rather than being lost. The queue decouples the producer from the consumer -- the producer doesn't wait for the consumer to be ready.

Consumer -- the component that processes messages. Multiple consumers can read from the same queue for parallel processing. Consumers acknowledge successful processing; unacknowledged messages can be redelivered.

Building a Basic Pipeline With Redis and Python

Redis is a practical starting point for most Python event-driven pipelines because it's already in many stacks, simple to reason about, and fast for single-server workloads. For higher durability and more complex routing, RabbitMQ or Kafka are the next steps up.

Producer

The producer publishes a message to a Redis list when an event occurs:

import redis
import json
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, db=0)

def publish_event(event_type: str, payload: dict):
    message = json.dumps({
        "event_type": event_type,
        "payload": payload,
        "timestamp": datetime.utcnow().isoformat()
    })
    r.rpush("data_events", message)

This pushes to the right end of the list. The consumer reads from the left, giving FIFO ordering.

Consumer

The consumer blocks waiting for messages using blpop, which is preferable to a polling loop because it doesn't burn CPU in the wait:

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def process_event(message: dict):
    # Your processing logic here
    event_type = message.get("event_type")
    payload = message.get("payload")
    print(f"Processing {event_type}: {payload}")

def run_consumer():
    while True:
        result = r.blpop("data_events", timeout=30)
        if result:
            _, raw_message = result
            message = json.loads(raw_message)
            process_event(message)

blpop blocks until a message arrives or the timeout elapses. The 30-second timeout prevents the consumer from blocking indefinitely if the Redis connection drops.

fiber optic light strands close glow
Photo by Marek Piwnicki on Pexels

Using Celery for Production Workloads

For workloads that need retries, monitoring, task routing, and worker management, Celery is the standard Python task queue. It runs on top of Redis or RabbitMQ and handles the producer-consumer pattern with additional features for durability and observability.

A Celery task is a Python function decorated with @celery.task:

from celery import Celery

app = Celery('pipeline', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_data_event(self, event_type: str, payload: dict):
    try:
        # processing logic
        pass
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

The max_retries and countdown parameters handle automatic retry with delay. Failed tasks after max retries go to Celery's dead-letter mechanism or can be routed to a dead-letter queue.

Handling Failures and Retries

Failures in event-driven pipelines fall into two categories:

Transient failures -- network errors, temporary database unavailability, rate limit responses from external APIs. These are usually safe to retry after a short wait. Exponential backoff (retry after 1 second, then 2, then 4, etc.) prevents retry storms when a downstream service is degraded.

Permanent failures -- malformed data, schema mismatches, logic errors in the processing code. Retrying these will always fail. They need to go to a dead-letter queue (DLQ) for inspection and manual handling rather than blocking the queue indefinitely.

A well-designed event-driven pipeline handles both:

  1. Catch transient errors and retry with backoff (max 3-5 times)
  2. After max retries, route to DLQ with error context attached
  3. Monitor DLQ depth as a health metric
  4. Alert when DLQ accumulates beyond a threshold

Both Redis (via a separate list) and RabbitMQ (via native DLQ routing) support this pattern. Celery's CELERY_TASK_REJECT_ON_WORKER_LOST and task_acks_late settings provide finer control over when tasks are requeued vs. rejected.

"Event-driven pipelines expose a failure mode that scheduled pipelines hide: silent accumulation of unprocessed messages. When a consumer goes down on a scheduled job, the next run catches up. When a consumer goes down on an event-driven pipeline, messages queue up. Monitoring queue depth and consumer lag is table stakes, not optional observability." - Dennis Traina, 137Foundry

server rack cables organized rows close
Photo by Brett Sayles on Pexels

Scaling the Consumer Layer

One advantage of the producer-consumer architecture is horizontal scaling: running multiple consumers against the same queue distributes the load without changing the producer code.

With Redis, multiple consumers each call blpop on the same list. Redis handles mutex locking; each message is delivered to exactly one consumer.

With RabbitMQ and Kafka, the same principle applies but with more sophisticated routing options. Apache Kafka specifically is designed for high-throughput consumer groups where multiple consumers share a partition of a topic, enabling near-linear scaling of consumer throughput.

Key metrics to monitor as consumer count grows:

  • Queue depth (messages waiting)
  • Consumer lag (time between publish and process)
  • Consumer processing rate per worker
  • Error rate and DLQ depth

Python's multiprocessing module or separate containers running the same consumer code are both common approaches for adding consumers without a full orchestration layer.

When to Use Each Approach

Requirement Scheduled pipeline Event-driven pipeline
Low latency (seconds) No Yes
Predictable resource usage Yes Variable
Simple to operate Yes More complex
Handles bursty traffic No Yes
Requires a complete dataset Yes No
Works with webhooks/push sources Poor fit Natural fit

Event-driven pipelines add operational complexity: you need to run and monitor the message broker, manage consumer scaling, handle DLQs, and alert on queue depth. For pipelines that genuinely need low latency or natural push-based sources, the operational overhead is worthwhile. For pipelines that run nightly and transform a full dataset, scheduled jobs are simpler and equally effective.

data center hallway server aisles rows
Photo by panumas nikhomkhai on Pexels

Next Steps

A basic event-driven pipeline running locally can be extended in several directions:

  • Add a monitoring layer (Prometheus + Grafana, or Datadog) that tracks queue depth, consumer lag, and error rates
  • Introduce message schemas (using Pydantic or a schema registry) to catch malformed payloads before they reach the consumer
  • Move from local Redis to a managed broker for production (Redis Cloud, Amazon MQ, Confluent Cloud)
  • Implement idempotent consumers so that redelivered messages don't cause duplicate processing

The 137Foundry data integration service handles event-driven pipeline design for teams that need production-ready implementations without building the infrastructure from scratch. The AI automation practice covers the cases where event-driven data flows power downstream automation rather than pure storage.

The core architecture -- producer, broker, consumer, DLQ -- applies regardless of the specific tools you choose. Getting that structure right before layering in complexity is the difference between a pipeline that runs for years and one that becomes a support burden within months.

Need help with your next project?

137Foundry builds custom software, AI integrations, and automation systems for businesses that need real solutions.

Book a Free Consultation View Services