Building simplicidade.org: notes, projects, and ocasional rants

Messaging

A thread showed up on Twitter about messaging software options. My story is not 140-charaters material, so here we are.

If you do insist on a tldr; version: use NSQ if you need something that works out of the box, use Redis if you want to build your own.

At work, we use a lot of message queues to plug different components together. They are a great tool to have on your belt but the landscape is filled with solutions so picking your own solution can be a daunting task.

I’ve also gone through several solutions over the past 19 years working on ISP and Web-related businesses, I have scars to prove this, so what you’ll read below is the summary, obviously biased, of my experiences.

I divided this into four sections:

  • Defining the problem;
  • Solutions out there;
  • What do I use?;
  • Tips and tricks I collected over the years.

Defining the problem

To help us understand one another, I’ll give you the mental model I use to think about this problem.

There are three types of actors involved:

First you have producers. You have two main types: some generate messages that represent work tasks that someone must perform, others just emit messages that represent significant events that might be worth reacting to on another part of your system. A producer can do both during its lifetime. The big difference is that the work tasks usually have higher requirements of persistency. You should not loose a message that triggers a email to a client on a successful order for example.

Then you have consumers. They collect the messages from producers and do “stuff” with them. You imagination is the limit on “stuff” here. You can perform the task you got, collect statistics, log events, you name it. Consumers are usually producers too, generating further messages to other consumers in line.

For CS geeks, producers and consumers usually form a acyclic directed graph.

And then you have you queue software, the message broker. I usually like to divide them into three main categories:

  • centralised: you have one central message broker that producers and consumers talk to. Given it’s SPOF nature, some provide you with clustering solutions;
  • distributed: you have message brokers everywhere, producers can connect to any of them, consumers may need to connect to all of them or only to some if the broker provides forwarding services (typical tradeoff, where do you put the complexity, on the broker or the client). The solutions here vary a lot on how producers and consumers discover message brokers or each other for a direct connection, and what kind of consistent view of the network is provided. Assume there is some warm up period during which messages could be lost;
  • embedded: the queue software runs on your producers/consumers process space and connect to each other directly or via tiny dumb message routers. They tend to produce distributed topologies, but you get to solve you problem at a lower level in the stack.

The message broker is there to provide you one main service: decoupling. Two dimensions of decoupling are important to have:

  • location: producers should not know or need to know who the consumers are, or how many there are, or where they are, and you should be able to add/drop consumers at any point in time;
  • time: something produced right now should be available to a consumer that connects at a latter point in time.

You also should look at what type of message routing options and fan-out strategies are provided. The two main ones are pub/sub and round robin respectively. Check what kind of options are available to use them together inside the broker, usually a powerful combination.

Pub/sub will broadcast messages to all consumers, some solutions will provide a routing only on topic, others hierarchical topics (you can subscribe to subtrees), and some even complex filtering solutions. Round robin will distribute incoming messages over all of the consumers, each message getting to only one consumer. One thing to keep an eye out is how good the broker is at allowing you to cluster multiple consumer connections as a single one, for load balancing purposes.

Then you need to consider message delivery guarantees, a key phrase to look for when you are shopping for this systems. Personally I tend to look for “at least once” semantics, as I believe them to be the better trade-off. “Only once” is usually too resource intensive, and all the others are too unreliable for my use cases. Removing duplicates on the consumer side is easy enough with either a memory cache with decent expire policies or if you store the message ID on the destination system as a unique key.

Finally you should think how the system deals with the two main problems that you’ll see:

  • slow consumers: how can you avoid or deal with a message broker holding hundreds of thousands of messages;
  • dead consumers: what happens to the messages the consumer received just before his life was over.

Fun stuff, and I even had to mention byzantine failures yet!

Solutions out there

There are lots. I believe that we have very good generic tools now, providing you the required building blocks for this type of systems, and this caused an cambrian explosion. It seems almost all startups out there decided to roll their own, and publish it as the best thing since sliced bread. Guilty as charged…

I have a borderline fetish interest on this stuff, so I always look at any solution that crosses my radar. Below I’ll list the main candidates I think you should look into.

First for rolling your own, you have two generic tools: ZeroMQ and Redis.

On the mostly built, just use it section, you have AMQP-based solutions, SAPO Broker, Gearman and NSQ.

ZeroMQ

ZeroMQ is fast, very fast, but also very very low-level. It is an embedded system, but also includes a small message router - 0MQ Api”) that you can spin up to provide you location decoupling, and some basic translation services, from Queue to Broadcast for example. You can build basically any topology, but you do it by hand, using the building blocks. It’s basically for custom-made solutions.

You have pub/sub and you can also load balance messages between different workers.

Previous version had disk-based storage for overflow messages, but this was deprecated and removed in more recent versions.

There some higher level systems that use ZeroMQ as the backbone, but none ever caught my eye as interesting enough.

Basically, a fast low-level embedded library where the sky is the limit on what you can do as long as you know you get to keep this baby all for yourself.

Redis

Redis is a memory-only system (it had a complex VM system before, but it was dropped; there is a unofficial VM branch, I wouldn’t trust it), so all your queued messages will be in memory. You need to understand that you can fill up your entire memory and crash or stop accepting commands. It has protections agains this, look into them.

So, either you have (as in “you build one) a serious flow control system to pause producers, or you need an escape valve, a special “consumer” that goes into big queues, dumps messages to disk and puts them back later, or just drop them and let your producer or bean counter system inject them again later.

Publish and subscribe works, even in the future cluster version, but consumers need to be subscribed before the message is published. There is no queueing for subscribers, none.

To work around this, most systems use the BLPOPRPUSH command which reliably takes a message from a inbox queue and inserts into your own private queue for processing. As long as you have a system to look at those private queues and inject messages from the private queues back into the inbox, you can build a pretty fast and reliable queue with this.

A solid centralised system, that you can use to build whatever semantics you need.

AMQP

AMQP-based systems are centralised, with a very flexible routing solution, that allows you to build almost any topology you need.

I started a pure-perl implementation of the client, that would take the XML protocol specification (the best part of AMQP actually) and generate the Perl code for it, but while doing that I was always thinking “do you really need all this protocol complexity?”

I moved to simpler things. I really didn’t like the centralised aspects of it, and most clustering solutions look to me as a fix for a design decision, not a good symptom.

Still, it has it’s place if you need a central system for environments with strong requirements of auditing for compliance purposes.

SAPO Broker

I used this a lot when I worked at SAPO. It is a distributed system, very fast, with a rich set of semantics, and clients in multiple languages.

It includes a forwarding service, so you can subscribe to topics locally and the network reacts and starts forwarding relevant messages to your node.

The node can become the bottleneck on big topics. I haven’t looked at it recently, but you cannot setup your own node to receive only a subset of the entire traffic, so that you could distribute a single topic set of consumers over a set of nodes.

Again, I haven’t looked at it recently, it might already have a solution for this, but I do know the author and the code is solid. In heavy use at SAPO for quite some time.

Look into it.

Gearman

One of Brad Fitzpatrick (of memcached, perlbal, Open ID fame…) pearls.

A distributed system focused on work tasks, doesn’t support pub/sub at all.

Pretty good semantics to guarantee you don’t loose messages, although persistence was only added later.

Also includes a feedback system to report work status back to the client, including progress messages, something you don’t see often.

It just works.

NSQ

Built from scratch in Go by the Bitly people, processing huge amounts of messages there.

Very easy to install, has three main components: the message broker nsqd, the topology/discovery service nsqlookupd and a decent admin interface with nsqadmin.

Each nsqd works standalone, reports topology to a set of nsqlookupd servers. Logic is on the client side that aggregates information received from all the discovery services to form a coherent picture of the network, and finds out to which brokers he needs to connect to to collect all messages on the topic he is interested on.

Allows you to stream messages to clients, keeping a per-client configuration number of in-flight unacknowledged messages, works wonderfully well for asynchronous consumers, or for consumers that need to batch messages and process them in a single operation.

My recommendation at the moment.

What do I use?

I gone through several solutions: AMQP, Gearman, ZeroMQ, Redis and now NSQ.

AMQP

I dropped AMQP altogether. The centralised nature doesn’t provide me enough benefits to compensate for a complex protocol. It is very flexible, but I find it a heavy complex solution.

Having said that, if you want/need to use AMQP, use RabbitMQ.

ZeroMQ

Some of the AMQP authors agreed with my complexity assessment, they dropped AMQP and created ZeroMQ (and one of them is even doing nanomsg, an even simpler system).

I still keep ZeroMQ at hand for very specialised solutions, with very stable topologies, where speed is the number one concern. I don’t expect any kind of reliability of systems build from this, so I usually wrap them into a end-to-end reliability blanket like a opportunistic transactional queuing system (see [Tips and tricks][] section).

For speed, you cannot beat ZeroMQ. nanomsg seems saner but I don’t think is ready yet.

Redis

I absolutely adore Redis… It is my favourite swiss army knife. You can do just about anything with it, and it has all the tools you need to write you own queuing system. At least, that’s what 99% of all startups think, because queuing systems using Redis is probably the major software category on Github at the moment.

It was my go to system for quite some time, and I still use it if I need something quickly, very simple to get it right, but it reacts badly to slow consumers, you have to be very careful with that.

You can use use several out-of-the box solutions. Resque is one of the most popular, and it is available on most languages, but logic is on the consumer side, so each version might not be compatible with the others.

My favorite is qless, a Resque inspired solution, but that uses server side Lua for all the logic, which means that all languages can share the exact same logic (see the qless-core repository).

Gearman

This is rock solid stuff, Etsy processes a huge amount of tasks with it (I believe I saw the number 6000 per second over 5 servers on the mailing list).

The discovery part is the non-existing bit. If you add a new broker, you need to tell all your producers and consumers.

If you want to use this, look at Consul for the discovery part.

NQS

My current favourite, and the one I use now. It gets so many things right:

  • dead easy installation: you can just use the provided binaries, but there is no GPG signature on those at the moment, something that I hope to be fixed soon;
  • decent “at least one” semantics;
  • decent buffer to disk semantics, although we try to avoid this situations with producer flow control (see tips and tricks section);
  • the pubsub + channels with round-robin is my favourite topology;
  • the nsqlookupd solves the discovery problem nicely: we use three of them, they share the same AWS micro instance that hosts the Consul cluster.

We run a central cluster of 2 servers for general use, but we also keep one nsqd per instance. We use it for system integration, monitoring, and logging (we have a couple of consumers that take batches of messages and publish to Loggly).

We also have a couple of consumer gateways between NQS and nginx-push-stream-module, allows some messages to reach client browsers in real-time.

Still, I would like to see:

  • nsqlookupd should publish topology changes over a nsq topic: this would allow consumers to react faster to new producers;
  • nsqd to nsqlookupd protocol should be over a public channel, that other systems could leverage for cluster monitoring and control.

I’m still learning all the details of NSQ and getting into the community, so maybe some of this is already there and I just haven’t found it.

For Perl mongers out there, I’m writing a NSQ client using AnyEvent, basically extracting the internal working code into a public distribution. My goal is to release it on August 14th, CPAN day, but I can only work on it during weekends.

Our code deals keeping track of the topology using nsqlookupd and connecting to multiple nsqd, although the in-flight message management might still need some TLC. Currently, on my CPU laptop, I can do a bit more than 1M message per minute (no processing of course), but the benchmark is clearly CPU bound (sum of broker, producer and consumer give 180% CPU usage on my dual core CPU… well, I was listening to iTunes and browsing the web at the moment…), a faster CPU would give you better numbers.

Tips and tricks

Opportunistic transactional queuing

My motto is “trust but verify”. I usually build the queuing solution assuming it will loose messages, but I keep a checks and balances system on the side that will re-inject a message if they doesn’t show up on the queue system.

The main ingredient I use is something I call oportunistic transactional queueing. This derives from an observation: the messages that need the highest reliability usually happen at the end of a database transaction.

The common scenario is a client that does something over the web or API that changes the state on your system and you need to perform further work on it, preferably asynchronous. If you have a work log, a place to keep stuff that you need to do later, on the same database, then you can insert a row there in the same transaction that updated the user state. The expensive part is the transaction commit, so adding a new row is marginal. At the same time you send the message to your queueing system.

You also need a bean counter software. It will scan those work log tables from time to time (once a minute is fine, unless you have a lot of them), and make sure they are either done (and in this case you can archive them), or lost (and in this case re-inject them).

This will scale with your application because you can shard this work log. You can have as many work logs as you want.

As long as you don’t scan this tables often and as long as you archive them aggressively, you should avoid most of the problems usually associated with “database as a queue” problem. The main thing you should remember is the work log is just to check the reliability of your queue, not as your queue.

Topology discovery messages

Mapping the topology and monitoring it can be a hard problem to solve with this systems, specially if they are distributed.

Engineer your consumers and producers to support discovery messages. When a discovery message is received by a consumer, he needs to do two things:

  • report that you got it: the message should include a URL, consumers post a small JSON blob saying who he is, where he is, where did he receive the message from, which topics is he subscribed to, which topics does he publish to, and optional statistic information (total messages processed, uptime, messages processed since last tracer, time since last discovery message, whatever else you fancy);
  • publish the message to all topics that you would normally publish to.

This simple system allows you to collect those reports somewhere (I use Redis myself), and show a unified view of your network (with pretty load graphs even) on a browser.

End-to-end message tracing

Include a UUID on the message you generate, and make sure this UUID is passed along, from publisher to consumer.

By searching your logs for this UUID you can see all the consumers the message travelled through and even show it as a timeline, with parallel tasks in separate lines.

I usually publish a log message on a separate topic, include the consumer identification, the UUID of the task, and the time it took to process, and then have a Redis-based collector that keeps track of the last couple of hours of messages, to see how much they took end-to-end, and where are they spending their time.

I can quickly spot hot spots in the network with this system.

Have at least three priorities

No matter the technology I use, I try very hard to have three levels of priority:

  • urgent: this message should take precedence over all others;
  • normal: basic stuff, normal traffic;
  • bulk: lowest priority, consume this if you have nothing better to do. It can introduce delays on bulk message processing, but you can detect that, and spin up a new consumer (even a dedicated consumer just for bulk).

Sometimes I split normal into “normal normal” and “normal but generated by clients”, but unless you have other problems with some slow components on your system that need this type of split to give priority of your clients messages, I don’t bother.

On NSQ and Redis this is usually implemented by using the topic name and a prefix (_bulk, _urgent prefixes for the two non-normal queues.)

Producers flow control

With slow consumers, you have to be careful not to flood your broker network with messages that they will have to spend time writing to disk and reading back. I find it easier to pause producers.

This of course assume that you can do that. On my uses cases, I’m fortunate enough to have producers that can limit their output rate. If you deal with end-user traffic, you might get spikes you can’t control. Hopefully you’ll be able to launch more instances to cope with them.

I do this in several ways, depending if I’m using Redis or NSQ. Some tricks you can try:

  • on very simple situations where you are using a Redis list a small queue between two systems, when you push into the queue, Redis returns the number of messages in there. If this number is larger than you high-water threshold, just pause, check from time to time until it reaches the low-water threshold and resume;
  • on NSQ you can periodically query the broker you are publishing to and find out the size of the outgoing channels backlog. Use that information to manage your output rate. Works well, supports multiple consumers per topic, but it’s only hop-by-bop;
  • include on your messages a “pause” URL: if a consumer is felling overwhelmed, you can use the URL to pause and resume the producer. Works throughout your network, no matter how many levels it has;
  • if you use the topology discovery messages above, the system that collects them can periodically generate traffic report messages on a separate channel with the recommended rate of input on each topic, by analysing each flow report he gets. Producers subscribe to this report channel and use it as a guideline. This is a complex system to get right, but when it does, it is a beauty to see.

I’m sure there are other ways, but these four so far allowed me to solve this problem whenever it showed up.

I love the last one, and I’m currently toying with a system like this, I’ll write more about it later, when I have enough experience to know if it is working or not.

The third is a good compromise. It is very nice to have a callback into the producer to tell him to shut up for a bit, and works with any system.

Final words

Enough words already. Go and start having fun with this stuff. Send me links about your work, love to read about real world problems solved with this systems.