# The Simplest Queue?

My student Jianjun is proving things about ARM executables that handle interrupts. It’s very difficult work, so when I asked him to write up a “simple” case study where an interrupt and the main context communicate through a ring buffer, I thought it would be helpful if I handed him the simplest possible queue that is at all realistic. Since queues are already pretty simple, there seemed to be only two tricks worth playing: force queue size to be a power of two, and waste one slot to simplify full-queue detection. Here’s my code:

```#include "q.h"

#if ((QSIZE)<2)||(!((((QSIZE)|((QSIZE)-1))+1)/2==(QSIZE)))
#error QSIZE must be >1 and also a power of 2
#endif

#define MASK ((QSIZE)-1)

static int q[QSIZE];
static int head, tail;

static int
inc (int x)
{
return (x + 1) & MASK;
}

int
full (void)
{
return inc (head) == tail;
}

int
mt (void)
{
return head == tail;
}

int
enq (int item)
{
if (full ())
return 0;
q[head] = item;
head = inc (head);
return 1;
}

int
deq (int *loc)
{
if (mt ())
return 0;
*loc = q[tail];
tail = inc (tail);
return 1;
}

int
qents (void)
{
int s = head - tail;
if (s < 0)
s += (QSIZE);
return s;
}
```

The header file simply defines QSIZE and gives prototypes for the functions. Did I miss any good tricks? The code coming out of compilers I have sitting around isn’t quite as clean as I’d have hoped. There’s some redundancy in the code (calling inc() twice in the enqueue and dequeue functions) but eliminating it should be a simple matter for the compiler.

## 14 replies on “The Simplest Queue?”

1. MSN says:

http://fgiesen.wordpress.com/2010/12/14/ring-buffers-and-queues/

The virtual stream implementation allows you to use all elements and still get a valid count when the queue is full at the cost of deferring the wraparound to element dereferencing.

It also uses unsigned indices to represent head and tail so it should work even when they overflow.

2. MSN, thanks — I like that implementation a little better.

3. Very nice! The inc() function makes it very neat. Looks hard to reduce it further, except possibly for using the ternary conditional for the return statement in qents() instead of the if() followed by return. (Not sure if it will have any effect on the resulting code, but at least the C code would have fewer lines.)

Also, there is another, slightly more minimalistic, trick to test for power of two:

#if (QSIZE & (QSIZE – 1)) != 0
#error QSIZE must be a power of two
#endif

Speaking of minimalistic programs, some time ago a bunch of us went crazy writing code that fit into 140 bytes so it could be posted in a single twitter message. We wrote linked list libraries, CRC16, RC4 stream cipher, unix grep, unix cat, a unix shell, an IP stack, a CPU emulator, a Fibonacci for the emulated CPU, etc. Crazy. Many of them were saved here: http://freaklabs.org/index.php/Blog/Misc/Twitter-twitcode-Archive.html

4. That’s a great link Adam!

I’m not sure why I didn’t think of the better power-of-2 trick since it’s the same idea used to create the mask…

5. Eddie Kohler says:

What if you made the queue hold things other than ints? I bet the compiler isn’t optimizing the two inc()s in deq() because loc might alias head.

On a quick look, it looks like an algorithm with “unsigned head; int size;” generates slightly longer code for deq() than “unsigned head, tail;”, but shorter code for everything else. Agree with RTM about “unsigned head, tail;” and only doing “% QSIZE” on queue access.

XOXO

6. Eddie I tried a few different element types like void * and a struct, and it didn’t seem to improve code generation. Need to play with this more…

7. Pedro Pedruzzi says:

Nice lock-free code. But I think it lacks concurrency correctness.

This is a single-writer single-reader fifo. It may look easy because each thread owns one of the indexes. However care must be taken to enforce the order of some memory writes at least in enq() and deq() (as far as the other thread is concerned).

The C compiler does not know about concurrency and may (read will) reorder your variable assignments at will, opening a time window for a race condition. The problem is deeper in a multi-processing environment since the hardware itself may reorder memory operations when serializing data from a CPU cache to the main memory.

So there is need for memory barriers to enforce order constraint on memory operations issued before and after it. It has meaning both to the compiler and the hardware (if multi-processed).

Unfortunately there is no way to do this in the standard C language AFAIK.

I you are on a single CPU, you can make your variables volatile (argh!). But it is overkill and not the correct way to fix it IMO. The compiler will not generate good code because you told it that it can’t optimize accesses to those variables.

But let’s assume we have a proper memory barrier function or macro mb(). I would suggest:

int
enq (int item)
{
if (full ())
return 0;
q[head] = item;
mb ();
head = inc (head);
return 1;
}

int
deq (int *loc)
{
if (mt ())
return 0;
*loc = q[tail];
mb ();
tail = inc (tail);
return 1;
}

I am not sure if there is need for any other memory barrier.

8. Hi Pedro, thanks for the comments. You’re right, this was definitely not intended to be a concurrent queue.

9. Also, you seem to be assuming there’s just one producer and just one consumer — quite a lot more care is going to have to be taken to make it work in the general case. The barriers are almost certainly a mistake too, but I’d expect CMPXCHG would do the trick.

10. Anonymous Cowherd says:

@Pedro #7, @regehr #8: I assume this is a newbie question, but, can you define for the peanut gallery what you mean by “concurrency correctness” and “concurrent queue”?

I had assumed that the whole point of the original post was to come up with short code that was also interrupt-safe — i.e., if an interrupt fires at any point inside deq(), and the interrupt code calls enq(), stuff should still work. (Until the queue fills up completely, of course.)

Part of Pedro’s comment seemed to be that this code would not be safe if there could be multiple threads calling enq() at the same time. Or was he saying that the basic enq-from-within-deq scenario I outlined above wouldn’t work? And if he was saying that, then what exactly *was* the point of the original post? (Was the post’s mention of “interrupts” a red herring?)

11. Hi AC, my queue code doesn’t account for concurrency, but is hopefully correct in a sequential context. In a concurrent context it needs to be protected, probably simply by ensuring that interrupts are disabled whenever a queue function is called. A concurrent queue would have the locking functionality built in.

Pedro is addressing a more interesting issue — making the queue safe without any locks. As he points out, C doesn’t make this easy. His approach, of using barriers and volatiles, is probably the hard way. Using a compare-and-swap primitive (like the x86 CMPXCHG I mentioned) would be relatively simple. Maybe I should do a post about that. Hopefully this answers your questions.

12. AC, to give a bit more detail, here’s the code I handed my grad student to work with:

```int
main (void)
{
setup ();
while (1)
{
if (n_items > 0)
{
disable_irq ();
// required because queue variables are not volatile
__asm__ __volatile__ ("":::"memory");
int item;
int res = deq (&item);
if (res != 1)
die ();
n_items--;
// required because queue variables are not volatile
__asm__ __volatile__ ("":::"memory");
enable_irq ();
process (item);
}
}
}
```

As you can see, it synchronizes access to the queue. The interrupt handler (not shown) was assumed to execute with interrupts disabled.

13. Pedro Pedruzzi says:

@Pedro, @regehr: As you have said, things gets hard when you assume a general access concurrent queue (ie. have an arbitrary number of concurrent readers and writers). I am not sure but I think in this case you will have to use a locking mechanism (and that will brings some contention and overhead).

However, it is completely possible to implement a lock-free queue for the single-reader single-writer case (eg.: “interrupt and the main context communicate through a ring buffer”). And that could be done simply adding some memory barriers to this code.

If I understood professor Regehr right, this lock-free synchronization could also be done (I am not really sure how) with compare-and-swap primitive.

And yes, I was saying that the basic enq-from-within-deq scenario wouldn’t work. The original post is a very interesting piece of code that outlines a simples queue. But to be correct at the scenario first mentioned it need the barriers.

14. Hi Pedro, the processor being used by my student doesn’t support memory barriers since it has only one core. Rather, the IRQ runs with interrupts disabled (the hardware takes care of this) and then main() must, as I showed, disable interrupts during the critical section.

Comments are closed.