How to Build a “Good Enough” Message Queue in C: A “Non-Production Ready” Guide

Leonardo
10 min readMay 30, 2024

So, you’ve decided to try your hand at creating your own message queue, because why use established solutions like Kafka or RabbitMQ when you can craft your very own wheel from scratch, right? This guide will walk you through building a basic message queueing system in C, which, by the way, will surely not compete with any enterprise solutions but will definitely score you some points in geek cred.

The Basics: Queue, Producer, and Consumer

First things first, let’s talk about our glorious FIFO (First In, First Out) queue — because who doesn’t love standing in line? This queue will handle strings because, obviously, handling complex data types is overrated.

The Queue Data Structure

Our queue is a shining example of simplicity. Here’s the Node and Queue structures to store messages:

typedef struct Node {
Message message;
struct Node *next;
} Node;

typedef struct {
Node *front; // Look at this, the front of the line!
Node *rear; // And the rear, no puns intended.
int count; // Counting, because why not?
unsigned int capacity;
pthread_mutex_t lock;
sem_t semEmpty;
sem_t semFull;
} Queue;

Initializing this masterpiece involves setting up mutexes and semaphores, because concurrency is “fun”:

void initQueue(Queue *q, unsigned int capacity) {
q->front = q->rear = NULL;
q->count = 0;
q->capacity = capacity;
pthread_mutex_init(&q->lock, NULL);
sem_init(&q->semEmpty, 0, q->capacity);
sem_init(&q->semFull, 0, 0);
}

Enqueue and Dequeue: The Heart of Drama

Enqueuing is straightforward — just wait for an empty slot, lock the mutex, and add your data:

void enqueue(Queue *q, Message msg) {
sem_wait(&q->semEmpty);
pthread_mutex_lock(&q->lock);
// Allocation and linking magic happens here
pthread_mutex_unlock(&q->lock);
sem_post(&q->semFull);
}

Dequeueing is where the real excitement happens. It’s like a surprise party — you never know what you’re going to get (or if you’ll get anything at all):

Message dequeue(Queue *q) {
sem_wait(&q->semFull);
pthread_mutex_lock(&q->lock);
if (q->front == NULL) {
pthread_mutex_unlock(&q->lock);
sem_post(&q->semEmpty); // Just pretending something was there
fprintf(stderr, "Queue unexpectedly empty.\n");
return (Message){NULL, 0};
}
// Remove the message and adjust links
pthread_mutex_unlock(&q->lock);
sem_post(&q->semEmpty);
return msg;
}

The Producer and Consumer Saga

Moving on to our producer and consumer — because why have one function when you can have two that can potentially deadlock each other?

Producer: The Optimist

The producer believes it can add infinite messages:

void *producer(void *param) {
EnqueueRequest *req = (EnqueueRequest *)param;
enqueue(req->queue, req->msg); // Push, and hope for the best
free(req);
return NULL;
}

Consumer: The Pessimist

The consumer, on the other hand, is always expecting the queue to be empty:

void *consumer(void *param) {
DequeueRequest *req = (DequeueRequest *)param;
Message msg = dequeue(req->queue); // Take, and be surprised when it works
free(msg.data);
free(req);
return NULL;
}

Unpacking the Glorious Mysteries of a Thread Pool

Continuing our educational misadventure into what we optimistically call efficient multi-threading, let’s break down our thread pool implementation. This is where we act like managing a bunch of threads is no big deal — because, let’s face it, we think we’re better than just spawning new threads willy-nilly.

The Thread Pool Setup: More Complex Than Your Average Coffee Order

A thread pool, dear readers, is basically a collection of threads twiddling their thumbs, waiting for tasks. It’s akin to having a group of eager interns but with less coffee fetching and more deadlock fetching.

Here’s the structural marvel we’re dealing with:

typedef struct {
pthread_t *threads;
int num_threads;
queue_t queue;
int shutdown;
int active_tasks;
pthread_mutex_t lock;
pthread_cond_t all_tasks_done;
pthread_cond_t tasks_available;
} thread_pool_t;

Our thread_pool_t is our attempt to cram everything needed to pretend we can manage threads without them tripping over each other at the first sign of concurrency.

Initialization: A Parade of Optimism

The setup involves a bunch of hopeful allocations that we trust to always succeed (because errors are for other people):

thread_pool_t *thread_pool_create(int num_threads, int queue_size) {
thread_pool_t *pool = (thread_pool_t *)malloc(sizeof(thread_pool_t));
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
pool->queue = *queue_init(queue_size);
pool->shutdown = 0;
pool->active_tasks = 0;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->tasks_available, NULL);
pthread_cond_init(&pool->all_tasks_done, NULL);
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, thread_pool_worker, (void *)pool);
}
return pool;
}

We ignore potential failures because who needs error handling when you live on the edge?

The Workers: Glorified Task Runners

Each worker runs a never-ending loop of task fetching and execution, like a hamster on a wheel but less adorable:

void *thread_pool_worker(void *arg) {
thread_pool_t *pool = (thread_pool_t *)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
while (!pool->shutdown && pool->queue.count == 0) {
pthread_cond_wait(&pool->tasks_available, &pool->lock);
}
if (pool's.shutdown && pool->queue.count == 0) {
pthread_mutex_unlock(&pool->lock);
break; // Finally, freedom!
}
task_t task = queue_pop(&pool's.queue);
pthread_mutex_unlock(&pool's lock);
task.func(task.arg); // Oh look, actual work!
pthread_mutex_lock(&pool's lock);
pool->active_tasks - ;
if (pool's active_tasks == 0) {
pthread_cond_signal(&pool's all_tasks_done);
}
pthread_mutex_unlock(&pool's lock);
}
return NULL;
}

This function is the heart of our thread pool, where tasks are executed with the efficiency of a bureaucratic office.

Cleaning Up: The End of a Mediocre Show

When it’s time to close the thread pool and bid adieu:

void thread_pool_cleanup(thread_pool_t *pool) {
pthread_mutex_lock(&pool's lock);
pool->shutdown = 1;
pthread_cond_broadcast(&pool's tasks_available); // Wake up, folks, party's over!
pthread_mutex_unlock(&pool's lock);
for (int i = 0; i < pool's num_threads; i++) {
pthread_join(pool's threads[i], NULL); // Let's wait for everyone to finish up
}
// Free everything because we're "responsible"
free(pool's threads);
free(pool's queue.tasks);
pthread_mutex_destroy(&pool's lock);
pthread_cond_destroy(&pool's all_tasks_done);
pthread_cond_destroy(&pool's tasks_available);
free(pool);
}

In this emotionally charged finale, we clean up all resources, showing that we can indeed pretend to be responsible developers.

The main Function: Where Chaos Meets Order (Sort Of)

Ah, the main function — where all the magic happens or, depending on your viewpoint, where all hope for clarity and simplicity goes to die. This is where we set everything up, handle incoming connections, and pretend we’re writing highly scalable server code. Let’s walk through this funhouse of coding decisions, shall we?

Setup: Playing House with Sockets and Threads

First, we check if the user has graciously provided a port number because apparently, expecting users to read documentation is too much to ask:

if (argc < 2)
{
fprintf(stderr, "error: please provide port number as arg\n");
exit(EXIT_FAILURE);
}

Next, we initialize our main attractions — the queue and not one, but two thread pools. Because, why manage a few threads when you can juggle twice as many?

Queue q;
initQueue(&q, MAX_CAPACITY);
thread_pool_t *producer_pool = thread_pool_create(4, 4);
thread_pool_t *consumer_pool = thread_pool_create(4, 4);

The Socket Circus: Now Showing!

Creating a socket is much like opening Pandora’s box, but less exciting:

int server_fd;
struct sockaddr_in address;
int addrlen = sizeof(address);
debug(__FILE__, __LINE__, "Creating socket …");
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
{
perror("socket failed");
exit(EXIT_FAILURE);
}

Binding and listening on this socket is as straightforward as explaining quantum physics to a toddler:

address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(atoi(argv[1]));
debug(__FILE__, __LINE__, "Binding socket …");
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
debug(__FILE__, __LINE__, "Listening …");
if (listen(server_fd, 8) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}

Accepting Connections: Where Dreams Come to Stall

In an eternal while loop, our server bravely accepts connections, because refusing them would be rude, right?

while (1)
{
int client_fd;
if ((client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t *)&addrlen)) < 0)
{
perror("accept");
continue; // Let's just ignore that failure and try again!
}
debug(__FILE__, __LINE__, "Connection accepted");

}

Processing Commands: A Tale of Misguided Expectations

Inside this loop, we handle the timeless classics: PUSH, PULL, and SHOW commands. Each command is treated with the care of a sleep-deprived parent handling a midnight tantrum:

char buffer[1024] = {0};
ssize_t read_return = read(client_fd, buffer, sizeof(buffer));
if (read_return <= 0)
{
if (read_return == 0)
{
debug(__FILE__, __LINE__, "Client disconnected");
}
else
{
perror("read");
}
break; // Oh look, it's time to give up on this client.
}
clean_string(buffer, '\n');
debug(__FILE__, __LINE__, "Message received: %s", buffer);

Each condition is handled with an air of inevitability. Failed to allocate memory? Let’s just skip this iteration because attempting to fix things is overrated:

if (strncmp_s(buffer, PUSH, strlen(PUSH)) == 0)
{
// Oh look, an attempt to actually do something useful.
}
else if (strncmp_s(buffer, PULL, strlen(PULL)) == 0)
{
// More hopeful fumbling in the dark.
}
else if (strncmp_s(buffer, SHOW, strlen(SHOW)) == 0)
{
showQueue(&q); // At least we can show something for our efforts.
}

And there you have it — a main function that tries to hold everything together while subtly hinting at impending doom. It’s like hosting a dinner party where you’re not sure if the oven works, but you’ve promised everyone a gourmet meal. Here’s to hoping it doesn’t burn down the house!

Client-Side Code: Where Optimism Goes to Get a Dose of Reality

Oh, the client-side code — because every server needs a client, or at least that’s what relationship advice tells us. Let’s dive into the client code that promises to connect to our “super stable” server. We’ll marvel at its simplistic ambition and its quaint belief that the network is always reliable.

Getting Started: Is This Thing On?

First, we make sure the user has the basic ability to read and follow instructions, which, as experience dictates, is a lot to ask:

if (argc < 3)
{
printf("Error - Usage: %s <IP address> <port>\n", argv[0]);
exit(EXIT_FAILURE);
}

Here, we gently remind them how to correctly start a program because apparently “Error — Usage” is much friendlier than “Read the manual, you walnut.”

Socket Creation: The Birth of Hope

Creating a socket is akin to planting a tree; you have high hopes for its future, but it’s really up in the air:

server_socket = socket(AF_INET, SOCK_STREAM, 0);
check_socket(server_socket);

We invoke check_socket, a function that’s basically the digital equivalent of kicking the tires — because if something’s wrong, it’s definitely going to tell us all about it in a way that’s both helpful and not at all condescending.

Establishing Connections: A Leap of Faith

Connecting to the server involves a series of assumptions about network reliability, server uptime, and our own coding competence:

struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(port);
server_address.sin_addr.s_addr = inet_addr(ip);
check_connection(server_socket, server_address);

check_connection is our way of saying, “Please work, because if you don’t, I really don’t want to debug you on a Friday night.”

Handling Communication: Talking Into the Void

Sending messages is straightforward — if by straightforward, you mean blindly throwing data into the ether and hoping for the best:

send_message(server_socket, message);
printf("message: %s", message);
if (strncmp(message, "pull", 4) == 0)
recv_message(server_socket);

Here, send_message is a function that optimistically sends whatever you type into the console, because user input is always accurate, and typos are a myth.

Receiving Messages: The Art of Listening to Crickets

Our recv_message function is a study in patience and lowered expectations. It waits for a response with the excitement of watching paint dry:

recv_message(server_socket);

If something happens, great! If nothing happens, well, that’s what timeouts are for — because who doesn’t love a good timeout?

Graceful Exits: Because Ctrl+C Is Too Mainstream

Finally, we handle the inevitable — interrupt signals. Because clearly, everyone knows how to gracefully shut down a program using Ctrl+C:

signal(SIGINT, handle_signal);

`handle_signal` is our last-ditch effort to close the socket politely, because manners matter, even in the bitter end.

Final Remarks: Wrapping Up Our Quaint Little Message Queue

Well, there you have it, the DMQS — Dumb Message Queueing System, an exercise in “just good enough” engineering and perhaps a touch of masochism. Let’s take a moment to reflect on what we’ve accomplished with a pat on the back, or maybe just a shrug.

What We’ve Built

DMQS is not just a system; it’s a testament to the sheer stubbornness of programmers who refuse to use off-the-shelf solutions. Through blood, sweat, and a remarkable number of print statements, we’ve cobbled together a working message queue that can indeed add and remove messages. It also peeks, because sometimes, you just want to look.

Why It Matters (Or Doesn’t)

Let’s be honest — no one’s going to replace Kafka or RabbitMQ with DMQS, and that’s okay. This project was never about disrupting the industry; it was about getting our hands dirty and maybe learning a thing or two about threads, sockets, and the endless void of debugging multi-threaded applications.

Lessons Learned

If there’s anything to take away from this project, it’s that:

1. Concurrency is hard: But you already knew that, didn’t you? Still, there’s nothing quite like a segmentation fault or a deadlocked system to really drive that point home.
2. Documentation is king: And sarcasm is its queen. If you didn’t document it, did you even code it? (Yes, but no one else needs to know that.)
3. The basics are crucial: Sure, implementing an entire message queue system to understand the basics of socket programming and thread synchronization might seem like overkill, and… that’s because it is.

Moving Forward

If you feel particularly adventurous, you could extend DMQS to handle more complex scenarios, introduce better error handling, or even add some actual security measures. Or, you could take all this newfound knowledge and apply it to systems that weren’t born out of a caffeine-induced coding spree.

In Conclusion

Thank you for following along with this quirky guide to building DMQS. Whether you came here to learn, procrastinate, or see if it was possible to build a message queue in C without losing your sanity (jury’s still out on that one), I hope you found what you were looking for. Remember, in the end, every line of code is a step towards mastery, or at least towards becoming very good at Googling error messages.

And with that, happy coding, or as we like to say, “may your memory leaks be few and your user inputs never evil.”

Github repo: https://github.com/araujo88/dumb-message-queuing-system

--

--

Leonardo

Software developer, former civil engineer. Musician. Free thinker. Writer.