And yet another four;
And thick and fast they came at last,
And more, and more, and more…
Queues are an important and often overlooked tool in the developer’s toolbox. At Panda Strike, we use queues in a variety of ways to improve the performance and reliability of the applications we build.
Two common scenarios for queues:
In mobile applications, queueing network requests and responses ensure our app never blocks or ends up in an undefined state due to network issues.
In Web APIs, enqueuing requests helps us achieve high levels of concurrency and ensure that we don’t drop or lose requests, even if a component crashes.
There are various kinds of queues: FIFO queues, LIFO queues (also known as a stack), priority queues, durable queues, and so on. Let’s start with a simple FIFO (first-in, first-out) queue.
tasks = do (data=[]) ->
enqueue: (task) -> data.push task
dequeue: -> data.shift()
Here’s a simple example of using a queue:
tasks.enqueue -> console.log "hello, world"
tasks.enqueue -> console.log "goodbye, world"
do tasks.dequeue() # prints hello, world
Pretty simple isn’t it? This is one reason to get into the habit of thinking in terms of queues: they don’t add much complexity to your code. You might as well just incorporate queues into your design from the start.
Of course, you may just as well be thinking, how does this help us? All we accomplished in the example above was come up with a complicated way to call a function. What isn’t as obvious is that we’ve separated the what from the when and even the where.
To demonstrate this idea, consider a simple HTTP server that decides on an action to be performed—the what—and enqueues it.
(require "http").createServer ({method, url}, response) ->
switch method
when "GET"
switch url
when "/hello"
tasks.enqueue {response, action: "hello"}
when "/goodbye"
tasks.enqueue {response, action: "goodbye"}
.listen 8080, ->
console.log "Listening on port 8080"
Here are the actions that we support. When these functions run, we’re performing the action—this is the when.
actions:
hello: -> "Hello, World"
goodbye: -> "Goodbye, World"
We also need something to get things out of the queue. (In real life, we’d avoid checking an empty queue.)
do process = ->
task = tasks.dequeue()
if task?
result = actions[task.action]()
task.response.end result
setImmediate process
Obviously, this is a very simple-minded request processor. But we’ve separated figuring out what we need to do to fulfill a request from when we’re going to do it. Instead of just dropping whatever we were working on to deal with the latest incoming request, we can simply take note of the request and resume what we were doing.
But we can also change the where: we can offload the request processing to an entirely different machine. Here’s a simple worker that accepts tasks over a socket.
lines = require "byline"
(require "net").createServer (socket) ->
lines(socket).on "data", (action) ->
tasks.enqueue {socket, action}
.listen 8181, ->
console.log "Listing on port 8181"
We process the socket’s stream a line at a time, using the byline
NPM module. Each line is a task, and we enqueue it accordingly.
We can now move our actions
object and our process
function over to the worker process. We’ll make one small change to the process
function, to reference a socket
property instead of the response
property.
do process = ->
task = tasks.dequeue()
if task?
result = actions[task.action]()
task.socket.end result
setImmediate process
If we run that and telnet to port 8181, we can enqueue tasks and get responses.
All that remains is to provide a client proxy with a queuing interface so that our HTTP server can send tasks to the worker. (Again, in real life, we’d probably keep the socket open. Or we’d use a messaging queuing library.)
tasks = do (data=[]) ->
enqueue: (task) ->
socket = (require "net").connect port: 8181, ->
socket.write "#{task.action}\n"
socket.pipe task.response
We swap this in for the original queue (which now resides in our worker) and we’ve now offloaded our request processing into another process entirely.
And we haven’t changed our HTTP request handling code at all. That is, we’ve changed the when and the where without affecting the what.
Here’s the worker code.
The Worker
# this is the _when_: we actually execute the desired
# code here...
tasks = do (data=[]) ->
enqueue: (message) -> data.push message
dequeue: -> data.shift()
actions =
hello: -> "Hello, World"
goodbye: -> "Goodbye, World"
do process = ->
task = tasks.dequeue()
if task?
result = actions[task.action]()
task.socket.end result
setImmediate process
lines = require "byline"
# This is the _where_: we're running server that
# encapsulates the code, allowing us to run it
# in a different process or machine
(require "net").createServer (socket) ->
lines(socket).on "data", (action) ->
tasks.enqueue {socket, action}
.listen 8181, ->
console.log "Listing on port 8181"
And here’s our HTTP server.
The HTTP Server
# The client proxy, with a convenient queueing interface
tasks = do (data=[]) ->
enqueue: (task) ->
socket = (require "net").connect port: 8181, ->
socket.write "#{task.action}\n"
socket.pipe task.response
# Here's the _what_, where we describe the request and
# queue it up. This code doesn't change regardless of
# whether we implement these actions locally or remotely
(require "http").createServer ({method, url}, response) ->
switch method
when "GET"
switch url
when "/hello"
tasks.enqueue {response, action: "hello"}
when "/goodbye"
tasks.enqueue {response, action: "goodbye"}
.listen 8080, ->
console.log "Listening on port 8080"
Again, once we introduced queuing, we no longer need to change the request handling code, even when we moved the implementation into a different process, or even a different machine. Just as importantly, our request handling code isn’t much more complex than it would have been without using queues.
Obviously, this is a naive implementation of a queue-based worker. In real life, we’d want to use a more sophisticated queueing implementation, probably based on queuing technologies, like ZeroMQ or RabbitMQ. We often use Redis, which isn’t a message queue, but can be used to implement one. The main point here is that we could use these technologies without appreciably changing our request handling. We could add everything from retry logic to durable messages and still use the same queuing interface.
This is a great strategy for making your applications more performant, reliable, and elastic.
Have a bottleneck in your API? Move the associated code into a worker that runs on dedicated machines. Heck, you can even rewrite it in C or Java, because what you’re enqueuing is merely a (serializable) description of the task you want performed—the what. The when and the where don’t have to care about the implementation used to create it.
Dropping requests? Use a durable queue, one that relies on persistent data storage. Or use a local, client-side queue so that you can easily retry requests if they don’t appear to have gotten through. Again, once you have a queuing interface in place, it’s easy to add features like this without changing your core application logic.
Of course, queues aren’t a cure-all. Queuing can introduce problems. Probably the worst of these is simply running out of memory to put more tasks in a queue. This can happen when things are getting placed into the queue faster than they’re being taken out. (Ironically, the solution to this is usually more queuing.) But getting into the habit of using queueing interfaces is never a bad thing by itself. It costs you very little because queues, in principle, are very simple things, and buy you a lot of flexibility.
If it’s distributed, or could become so, code to a queue-based interface.