The example from the real-time monitoring documentation tells us most of what we need to know if we want to make our own receiver:
from celery import Celery
state = app.events.State()
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event["uuid"])
"TASK FAILED: %s[%s] %s"
with app.connection() as connection:
recv = app.events.Receiver(
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == "__main__":
app = Celery(broker="amqp://guest@localhost//")
app.events.Receiver here is a wrapper around the
EventReceiver which passes in the associated
If our application relies on strong guarantees about message delivery, we would like to run multiple instances of the receiver in case one or more of them fails. We then hope that the remaining instances can pick up the slack until the other instances come back online.
However, if we run multiple instances of the example above we’ll see that every receiver processes all messages! We could try to handle this duplication within our application, but there’s a better way. It’s wonderfully simple.
recv = app.events.Receiver(
Yeah! With the addition of the
node_id argument each receiver will consume
messages from the events queue in a round-robin fashion. We can spin up multiple
receiver instances without fear of processing the same message twice.
That’s pretty much the crux of this post! With all credit and thanks to m’colleague Neil Wang for uncovering this beautifully minimal solution to a niggling problem we had at work.
You can check out
repository for a complete example Celery
application which demonstrates the effect of setting
node_id with multiple
event receiver instances.
To understand why setting this argument addresses our needs we need to know a little bit about how the broker works.
Brokers, exchanges, queues, and consumers
We’ll focus on AMQP brokers here, using the language from RabbitMQ’s AMQP-0-9-1 documentation. In principle Celery should support the behaviour above with any broker, but some such as Redis may not provide the same guarantees as dedicated AMQP brokers.2 Check your broker’s documentation to be sure!
EventReceiver starts it registers itself with the broker as a
consumer of a specific queue. Celery worker instances and the tasks they
execute push status updates, ‘events’, to an exchange that queue is bound
to. The information flow goes worker → exchange → queue → consumer.
Upon popping an event message off the queue a receiver will dispatch it to a
Python function based on the event’s type (e.g.
task-failed). The receiver’s
handlers dictionary defines the mapping between
event type and Python function handler.
By instantiating multiple
EventReceiver objects with the default settings,
each receiver will create on their own uniquely named queue, bound to a common
exchange. As the exchange type is ‘topic’ and Celery creates each
queue with the catch-all routing key ’
receiver will process all messages. The topic exchange ‘multicasts’ each message
to all receiver queues.
To have each message be processed exclusively by a single receiver, we just need to have all receivers listen to the same queue.
node_id argument in the
EventReceiver constructor results in
all receivers creating a queue with the same name . Upon binding
this queue to the
celeryev exchange the broker will de-duplicate the queues by
name, resulting in a single queue being bound. The broker will then take care of
distributing events to consumers of the queue in a round-robin fashion.
With an understanding of AMQP exchanges, queues, and consumers, and an understanding of how Celery’s events system uses these components, our solution might now seem pretty obvious! It can take time to understand a system, and often it doesn’t feel necessary to do so when you want to get up and running quickly. Taking the time can yield many benefits.
By default the messages sent to the events exchange have a time-to-live of 5 seconds. This stops the event queues from filling up indefinitely, but introduces the possibility of event messages being missed (i.e. if no receiver pops a given message off the queue before the TTL).
Increasing the number of receiver instances and reducing the runtime of your receiver’s event processing logic will reduce the chances of a message being dropped before they can be processed. Increasing the TTL will do the same, although if you’re unable to process events within 5 seconds it might be worth reconsidering if real-time monitoring is necessary for your system.
What happens in a receiver goes offline while it is processing an event message? It is lost forever. As far as I can tell Celery never acknowledges the messages it pulls from the event queue and so the broker will drop them after the TTL. From the broker’s point of view the first receiver has consumed the message, and so it will not try to distribute it to another receiver.
So, even though our solution helps protect us against a single point of failure, it is not bullet proof. If you absolutely, positively must know about every single event then you should make further considerations:
- Make the events receiver as simple and robust as possible to minimise the potential for processing failures, e.g. push event payloads verbatim into a database.
- Figure out if you can detect when a message may have been dropped (e.g. your task state DB has an entry stuck in ‘running’ for much longer than is normal) and then try to recover the lost information based on the current state of the system.
As ever, these distributed systems offer plenty of opportunity for generating exciting edge cases!