A thread showed up on Twitter about messaging software options. My story is
not 140-charaters material, so here we are.
If you do insist on a tldr; version: use NSQ if you need something that
works out of the box, use Redis if you want
to build your own.
At work, we use a lot of message queues to plug different components
together. They are a great tool to have on your belt but the
landscape is filled with solutions so picking your own solution can
be a daunting task.
I've also gone through several solutions over the past 19 years working
on ISP and Web-related businesses, I have scars to prove this, so what
you'll read below is the summary, obviously biased, of my experiences.
I divided this into four sections:
- Defining the problem;
- Solutions out there;
- What do I use?;
- Tips and tricks I collected over the years.
Defining the problem
To help us understand one another, I'll give you the mental model I use
to think about this problem.
There are three types of actors involved:
First you have producers. You have two main types: some generate
messages that represent work tasks that someone must perform, others
just emit messages that represent significant events that might be worth
reacting to on another part of your system. A producer can do both
during its lifetime. The big difference is that the work tasks usually
have higher requirements of persistency. You should not loose a message
that triggers a email to a client on a successful order for example.
Then you have consumers. They collect the messages from producers and do
"stuff" with them. You imagination is the limit on "stuff" here. You can
perform the task you got, collect statistics, log events, you name it.
Consumers are usually producers too, generating further messages to
other consumers in line.
For CS geeks, producers and consumers usually form a acyclic
directed graph.
And then you have you queue software, the message broker. I usually like
to divide them into three main categories:
- centralised: you have one central message broker that producers and
consumers talk to. Given it's SPOF nature, some provide you with
clustering solutions;
- distributed: you have message brokers everywhere, producers can
connect to any of them, consumers may need to connect to all of them
or only to some if the broker provides forwarding services (typical
tradeoff, where do you put the complexity, on the broker or the
client). The solutions here vary a lot on how producers and consumers
discover message brokers or each other for a direct connection, and
what kind of consistent view of the network is provided. Assume there
is some warm up period during which messages could be lost;
- embedded: the queue software runs on your producers/consumers process
space and connect to each other directly or via tiny dumb message
routers. They tend to produce distributed topologies, but you get to
solve you problem at a lower level in the stack.
The message broker is there to provide you one main service: decoupling.
Two dimensions of decoupling are important to have:
- location: producers should not know or need to know who the consumers
are, or how many there are, or where they are, and you should be able
to add/drop consumers at any point in time;
- time: something produced right now should be available to a consumer
that connects at a latter point in time.
You also should look at what type of message routing options and fan-out
strategies are provided. The two main ones are pub/sub and round robin
respectively. Check what kind of options are available to use them
together inside the broker, usually a powerful combination.
Pub/sub will broadcast messages to all consumers, some solutions will
provide a routing only on topic, others hierarchical topics (you can
subscribe to subtrees), and some even complex filtering solutions. Round
robin will distribute incoming messages over all of the consumers, each
message getting to only one consumer. One thing to keep an eye out is
how good the broker is at allowing you to cluster multiple consumer
connections as a single one, for load balancing purposes.
Then you need to consider message delivery guarantees, a key phrase to
look for when you are shopping for this systems. Personally I tend to
look for "at least once" semantics, as I believe them to be the better
trade-off. "Only once" is usually too resource intensive, and all the
others are too unreliable for my use cases. Removing duplicates on the
consumer side is easy enough with either a memory cache with decent
expire policies or if you store the message ID on the destination system
as a unique key.
Finally you should think how the system deals with the two main problems
that you'll see:
- slow consumers: how can you avoid or deal with a message broker
holding hundreds of thousands of messages;
- dead consumers: what happens to the messages the consumer received
just before his life was over.
Fun stuff, and I even had to mention byzantine failures yet!
Solutions out there
There are lots. I believe that we have very good generic tools now,
providing you the required building blocks for this type of systems, and
this caused an cambrian explosion. It seems almost all startups out
there decided to roll their own, and publish it as the best thing since
sliced bread. Guilty as charged...
I have a borderline fetish interest on this stuff, so I always look at
any solution that crosses my radar. Below I'll list the main candidates
I think you should look into.
First for rolling your own, you have two generic tools:
ZeroMQ and Redis.
On the mostly built, just use it section, you have AMQP-based solutions,
SAPO Broker, Gearman and NSQ.
ZeroMQ
ZeroMQ is fast, very fast, but also very very low-level. It is an
embedded system, but also includes a small message router that you can spin up to provide you
location decoupling, and some basic translation services, from Queue to
Broadcast for example. You can build basically any topology, but you do
it by hand, using the building blocks. It's basically for custom-made
solutions.
You have pub/sub and you can also load balance messages between
different workers.
Previous version had disk-based storage for overflow messages, but this
was deprecated and removed in more recent versions.
There some higher level systems that use ZeroMQ as the backbone, but
none ever caught my eye as interesting enough.
Basically, a fast low-level embedded library where the sky is the limit
on what you can do as long as you know you get to keep this baby all
for yourself.
Redis
Redis is a memory-only system (it had a complex VM system before, but it
was dropped; there is a unofficial VM branch, I wouldn't trust it), so
all your queued messages will be in memory. You need to understand that
you can fill up your entire memory and crash or stop accepting commands.
It has protections agains this, look into them.
So, either you have (as in "you build one) a serious flow control system
to pause producers, or you need an escape valve, a special "consumer"
that goes into big queues, dumps messages to disk and puts them back
later, or just drop them and let your producer or bean counter system
inject them again later.
Publish and subscribe works, even in the future cluster version, but
consumers need to be subscribed before the message is published. There
is no queueing for subscribers, none.
To work around this, most systems use the BLPOPRPUSH command which
reliably takes a message from a inbox queue and inserts into your own
private queue for processing. As long as you have a system to look at
those private queues and inject messages from the private queues
back into the inbox, you can build a pretty fast and reliable queue
with this.
A solid centralised system, that you can use to build whatever
semantics you need.
AMQP
AMQP-based systems are centralised, with a very flexible routing
solution, that allows you to build almost any topology you need.
I started a pure-perl implementation of the client, that would take the
XML protocol specification (the best part of AMQP actually) and generate
the Perl code for it, but while doing that I was always thinking "do you
really need all this protocol complexity?"
I moved to simpler things. I really didn't like the centralised aspects
of it, and most clustering solutions look to me as a fix for a design
decision, not a good symptom.
Still, it has it's place if you need a central system for environments
with strong requirements of auditing for compliance purposes.
SAPO Broker
I used this a lot when I worked at SAPO. It is a distributed system,
very fast, with a rich set of semantics, and clients in multiple
languages.
It includes a forwarding service, so you can subscribe to topics
locally and the network reacts and starts forwarding relevant messages
to your node.
The node can become the bottleneck on big topics. I haven't looked at it
recently, but you cannot setup your own node to receive only a subset of
the entire traffic, so that you could distribute a single topic set of
consumers over a set of nodes.
Again, I haven't looked at it recently, it might already have a solution
for this, but I do know the author and the code is solid. In heavy use
at SAPO for quite some time.
Look into it.
Gearman
One of Brad Fitzpatrick (of memcached, perlbal, Open ID fame...) pearls.
A distributed system focused on work tasks, doesn't support
pub/sub at all.
Pretty good semantics to guarantee you don't loose messages, although
persistence was only added later.
Also includes a feedback system to report work status back to the
client, including progress messages, something you don't see often.
It just works.
NSQ
Built from scratch in Go by the Bitly people, processing huge amounts
of messages there.
Very easy to install, has three main components: the message broker
nsqd, the topology/discovery service nsqlookupd and a decent admin
interface with nsqadmin.
Each nsqd works standalone, reports topology to a set of nsqlookupd
servers. Logic is on the client side that aggregates information
received from all the discovery services to form a coherent picture of
the network, and finds out to which brokers he needs to connect to to
collect all messages on the topic he is interested on.
Allows you to stream messages to clients, keeping a per-client
configuration number of in-flight unacknowledged messages, works
wonderfully well for asynchronous consumers, or for consumers that need to
batch messages and process them in a single operation.
My recommendation at the moment.
What do I use?
I gone through several solutions: AMQP, Gearman, ZeroMQ, Redis
and now NSQ.
AMQP
I dropped AMQP altogether. The centralised nature doesn't provide me
enough benefits to compensate for a complex protocol. It is very
flexible, but I find it a heavy complex solution.
Having said that, if you want/need to use AMQP, use
RabbitMQ.
ZeroMQ
Some of the AMQP authors agreed with my complexity assessment, they
dropped AMQP and created ZeroMQ (and one of them is even doing
nanomsg, an even simpler system).
I still keep ZeroMQ at hand for very specialised solutions, with very
stable topologies, where speed is the number one concern. I don't expect
any kind of reliability of systems build from this, so I usually wrap
them into a end-to-end reliability blanket like a opportunistic
transactional queuing system (see [Tips and tricks][] section).
For speed, you cannot beat ZeroMQ. nanomsg seems saner but I don't think
is ready yet.
Redis
I absolutely adore Redis... It is my favourite swiss army knife. You can
do just about anything with it, and it has all the tools you need to
write you own queuing system. At least, that's what 99% of all startups
think, because queuing systems using
Redis is
probably the major software category on Github at
the moment.
It was my go to system for quite some time, and I still use it if I need
something quickly, very simple to get it right, but it reacts badly to
slow consumers, you have to be very careful with that.
You can use use several out-of-the box solutions.
Resque is
one of the most popular, and it is available on most languages, but
logic is on the consumer side, so each version might not be compatible
with the others.
My favorite is qless, a Resque
inspired solution, but that uses server side Lua for all the logic,
which means that all languages can share the exact same logic (see the
qless-core repository).
Gearman
This is rock solid stuff, Etsy processes a huge amount of tasks with it
(I believe I saw the number 6000 per second over 5 servers on the
mailing list).
The discovery part is the non-existing bit. If you add a new broker, you
need to tell all your producers and consumers.
If you want to use this, look at Consul
for the discovery part.
NQS
My current favourite, and the one I use now. It gets so many
things right:
- dead easy installation: you can just use the provided binaries, but
there is no GPG signature on those at the moment, something that I
hope to be fixed soon;
- decent "at least one" semantics;
- decent buffer to disk semantics, although we try to avoid this
situations with producer flow control (see tips and tricks section);
- the pubsub + channels with round-robin is my favourite topology;
- the nsqlookupd solves the discovery problem nicely: we use three
of them, they share the same AWS micro instance that hosts the
Consul cluster.
We run a central cluster of 2 servers for general use, but we also keep
one nsqd per instance. We use it for system integration, monitoring,
and logging (we have a couple of consumers that take batches of messages
and publish to Loggly).
We also have a couple of consumer gateways between NQS and nginx-push-stream-module, allows some messages to reach client browsers in
real-time.
Still, I would like to see:
nsqlookupd should publish topology changes over a nsq topic: this
would allow consumers to react faster to new producers;
nsqd to nsqlookupd protocol should be over a public channel, that
other systems could leverage for cluster monitoring and control.
I'm still learning all the details of NSQ and getting into the
community, so maybe some of this is already there and I just
haven't found it.
For Perl mongers out there, I'm writing a NSQ client using AnyEvent, basically extracting the internal working code into a public
distribution. My goal is to release it on August 14th, CPAN day, but I
can only work on it during weekends.
Our code deals keeping track of the topology using nsqlookupd and
connecting to multiple nsqd, although the in-flight message management
might still need some TLC. Currently, on my CPU laptop, I can do a bit
more than 1M message per minute (no processing of course), but the
benchmark is clearly CPU bound (sum of broker, producer and consumer
give 180% CPU usage on my dual core CPU... well, I was listening to
iTunes and browsing the web at the moment...), a faster CPU would give
you better numbers.
Tips and tricks
Opportunistic transactional queuing
My motto is "trust but verify". I usually build the queuing solution
assuming it will loose messages, but I keep a checks and balances system
on the side that will re-inject a message if they doesn't show up on the
queue system.
The main ingredient I use is something I call oportunistic transactional
queueing. This derives from an observation: the messages that need the
highest reliability usually happen at the end of a database transaction.
The common scenario is a client that does something over the web or API
that changes the state on your system and you need to perform further
work on it, preferably asynchronous. If you have a work log, a place to
keep stuff that you need to do later, on the same database, then you can
insert a row there in the same transaction that updated the user state.
The expensive part is the transaction commit, so adding a new row is
marginal. At the same time you send the message to your queueing system.
You also need a bean counter software. It will scan those work log
tables from time to time (once a minute is fine, unless you have a lot
of them), and make sure they are either done (and in this case you can
archive them), or lost (and in this case re-inject them).
This will scale with your application because you can shard this work
log. You can have as many work logs as you want.
As long as you don't scan this tables often and as long as you archive
them aggressively, you should avoid most of the problems usually
associated with "database as a queue" problem. The main thing you should
remember is the work log is just to check the reliability of your queue,
not as your queue.
Topology discovery messages
Mapping the topology and monitoring it can be a hard problem to solve
with this systems, specially if they are distributed.
Engineer your consumers and producers to support discovery messages.
When a discovery message is received by a consumer, he needs to do
two things:
- report that you got it: the message should include a URL, consumers
post a small JSON blob saying who he is, where he is, where did he
receive the message from, which topics is he subscribed to, which
topics does he publish to, and optional statistic information (total
messages processed, uptime, messages processed since last tracer, time
since last discovery message, whatever else you fancy);
- publish the message to all topics that you would normally publish to.
This simple system allows you to collect those reports somewhere (I use
Redis myself), and show a unified view of your network (with pretty load
graphs even) on a browser.
End-to-end message tracing
Include a UUID on the message you generate, and make sure this UUID is
passed along, from publisher to consumer.
By searching your logs for this UUID you can see all the consumers the
message travelled through and even show it as a timeline, with parallel
tasks in separate lines.
I usually publish a log message on a separate topic, include the
consumer identification, the UUID of the task, and the time it took to
process, and then have a Redis-based collector that keeps track of the
last couple of hours of messages, to see how much they took end-to-end,
and where are they spending their time.
I can quickly spot hot spots in the network with this system.
Have at least three priorities
No matter the technology I use, I try very hard to have three levels
of priority:
- urgent: this message should take precedence over all others;
- normal: basic stuff, normal traffic;
- bulk: lowest priority, consume this if you have nothing better to do.
It can introduce delays on bulk message processing, but you can
detect that, and spin up a new consumer (even a dedicated consumer
just for bulk).
Sometimes I split normal into "normal normal" and "normal but generated
by clients", but unless you have other problems with some slow
components on your system that need this type of split to give priority
of your clients messages, I don't bother.
On NSQ and Redis this is usually implemented by using the topic name and
a prefix (_bulk, _urgent prefixes for the two non-normal queues.)
Producers flow control
With slow consumers, you have to be careful not to flood your broker
network with messages that they will have to spend time writing to disk
and reading back. I find it easier to pause producers.
This of course assume that you can do that. On my uses cases, I'm
fortunate enough to have producers that can limit their output rate. If
you deal with end-user traffic, you might get spikes you can't control.
Hopefully you'll be able to launch more instances to cope with them.
I do this in several ways, depending if I'm using Redis or NSQ. Some
tricks you can try:
- on very simple situations where you are using a Redis list a small
queue between two systems, when you push into the queue, Redis returns
the number of messages in there. If this number is larger than you
high-water threshold, just pause, check from time to time until it
reaches the low-water threshold and resume;
- on NSQ you can periodically query the broker you are publishing to and
find out the size of the outgoing channels backlog. Use that
information to manage your output rate. Works well, supports multiple
consumers per topic, but it's only hop-by-bop;
- include on your messages a "pause" URL: if a consumer is felling
overwhelmed, you can use the URL to pause and resume the producer.
Works throughout your network, no matter how many levels it has;
- if you use the topology discovery messages above, the system that
collects them can periodically generate traffic report messages on a
separate channel with the recommended rate of input on each topic, by
analysing each flow report he gets. Producers subscribe to this report
channel and use it as a guideline. This is a complex system to get
right, but when it does, it is a beauty to see.
I'm sure there are other ways, but these four so far allowed me to solve
this problem whenever it showed up.
I love the last one, and I'm currently toying with a system like this,
I'll write more about it later, when I have enough experience to know if
it is working or not.
The third is a good compromise. It is very nice to have a callback into
the producer to tell him to shut up for a bit, and works with any system.
Final words
Enough words already. Go and start having fun with this stuff. Send me
links about your work, love to read about real world problems solved
with this systems.