Published on January 14, 2025

Set Up a Message Queue with Postgres, PGMQ, and Docker

Set Up a Message Queue with Postgres, PGMQ, and Docker

Imagine you’re building a system that processes orders for a restaurant. You need a reliable system to manage these orders efficiently, from the moment a customer places an order to when it’s processed and delivered to their table.

Usually, when you want to build a system like this, the first thing you think of is using a message queue, and often you may reach for something like SQS, Kafka, RabbitMQ, or even Redis.

However, something that is usually overlooked is using Postgres as a message queue. Postgres provides many features, such as SKIP LOCKED, which makes it easy to implement a message queue, and fortunately, there are many extensions and libraries that make it even easier.

In this article, I’m going to walk you through setting up PGMQ with Postgres and Docker, and guide you through a simple example of how to use it to queue and process messages.

Building the Docker Image

First, we need to build a Docker image that includes Postgres and the pgmq extension. We’ll use the official PostgreSQL 16.4 image as our base. This image will serve as the foundation for our restaurant’s order management system.

To start, we need to install some build dependencies. These include essential build tools, git, and the PostgreSQL server development package. We’ll update the package list and install these dependencies using the following commands:

apt-get update && apt-get install -y build-essential git postgresql-server-dev-16

Next, we’ll clone the pgmq repository from GitHub, navigate to the extension directory, and build and install the extension. We’ll use these commands:

git clone https://github.com/tembo-io/pgmq.git /pgmq
cd /pgmq/pgmq-extension
make
make install install-pg-partman

After the installation, we’ll clean up by removing the build dependencies and clearing the apt cache to keep our image lightweight:

apt-get remove -y build-essential git postgresql-server-dev-16
apt-get autoremove -y
rm -rf /var/lib/apt/lists/*

Now, we can build our Docker image using the following command:

docker build -t postgres-pgmq .

This command tells Docker to build an image with the tag ‘postgres-pgmq’ using the instructions in the current directory.

Here’s the full Dockerfile we’ll use to build our image:

# Use the official PostgreSQL 16.4 image as the base
FROM postgres:16.4

# Install build dependencies
RUN apt-get update && apt-get install -y \
    build-essential \
    git \
    postgresql-server-dev-16

# Clone, build, and install pgmq
RUN git clone https://github.com/tembo-io/pgmq.git /pgmq \
    && cd /pgmq/pgmq-extension \
    && make \
    && make install install-pg-partman

# Clean up
RUN apt-get remove -y build-essential git postgresql-server-dev-16 \
    && apt-get autoremove -y \
    && rm -rf /var/lib/apt/lists/*

Running the Docker Container

Once we have our image built, we can run a Docker container based on it. We’ll use the following command to start a new container:

docker run -d --name database -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres-pgmq

This command starts a new container in detached mode, names it ‘database’, sets the Postgres password to ‘postgres’, maps port 5432 on the host to port 5432 in the container, and uses our ‘postgres-pgmq’ image.

Note: Please make sure to replace the password if you’re using this anywhere with real data.

Setting Up the Orders Queue

Now that our Postgres instance with pgmq is running, we can set up an orders queue for our restaurant. We’ll go through this process step by step, simulating how orders are placed, processed, and archived.

Installing the pgmq Extension

First, we need to install the pgmq extension in our Postgres database. We can do this with the following SQL command:

CREATE EXTENSION pgmq;

This command creates the pgmq extension, which provides the functions we’ll use to manage our message queue.

Creating the Orders Queue

Next, we’ll create our ‘orders’ queue using the following SQL command:

SELECT pgmq.create('orders');

This command creates a new queue named ‘orders’ in our database. The pgmq.create function allows us to easily set up a queue without any additional configuration.

Sending Messages to the Queue

Now, let’s simulate placing new orders by sending messages to our queue. We’ll send three different orders using the pgmq.send function, which allows us to send a single message to a queue.

For the first order, let’s say a customer orders a pizza. We’ll use the following SQL command:

SELECT pgmq.send(
    queue_name => 'orders',
    msg => '{"user_id": 123, "product_id": 456, "quantity": 2, "status": "pending"}'
);

This command sends a message to the ‘orders’ queue with details about the first order. The msg parameter is a JSON object containing the order information, including the user ID, product ID, quantity, and status.

For the second order, a customer orders a salad. We’ll use a similar command:

SELECT pgmq.send(
    queue_name => 'orders',
    msg => '{"user_id": 789, "product_id": 101, "quantity": 1, "status": "pending"}'
);

This sends another message to the ‘orders’ queue with details about the second order.

For the third order, a customer orders a large meal, but we want to delay processing it for 10 seconds to check for any special dietary requirements. We’ll use the delay parameter of the pgmq.send function:

SELECT pgmq.send(
    queue_name => 'orders',
    msg => '{"user_id": 123, "product_id": 789, "quantity": 5, "status": "pending"}',
    delay => 10
);

This command sends a message to the ‘orders’ queue with a 10-second delay before it becomes visible. The delay parameter can also accept a timestamp if you want to specify an exact time for the message to become visible.

Reading and Processing Messages

Now, let’s simulate a kitchen worker reading and processing orders from the queue. We’ll use the pgmq.read function to read messages. This function allows us to read one or more messages from a queue and set a visibility timeout (VT) to make them invisible to other consumers for a specified time.

We’ll read two messages at a time, making them invisible for 60 seconds:

SELECT * FROM pgmq.read(
    queue_name => 'orders',
    vt => 60,
    qty => 2
);

This command reads up to two messages from the ‘orders’ queue, making them invisible to other readers for 60 seconds. The pgmq.read function returns a set of pgmq.message_record objects, which include the message ID, read count, enqueue time, visibility timeout, and the message payload.

After reading the messages, the kitchen worker would typically process them. For example, they might prepare the pizza and salad, update the order status in a database, or generate a receipt. Let’s assume we’ve processed the first two orders.

Archiving Processed Orders

Once the orders are processed, we can archive them instead of deleting them. This allows for long-term retention and the ability to replay messages if needed. We’ll use the pgmq.archive function, which removes a message from the queue and inserts it into the queue’s archive table.

We’ll archive the first two orders:

SELECT pgmq.archive(
    queue_name => 'orders',
    msg_id => 1
);

SELECT pgmq.archive(
    queue_name => 'orders',
    msg_id => 2
);

These commands archive the messages with IDs 1 and 2 from the ‘orders’ queue. The pgmq.archive function returns a boolean value indicating whether the operation was successful.

Checking the Archive Table

We can check the archive table to see the archived messages:

SELECT * FROM pgmq.a_orders;

This command shows us all the messages that have been archived from the ‘orders’ queue. The archive table name is automatically generated by pgmq and follows the format a_<queue_name>.

Processing the Delayed Order

After waiting for the delay period to pass, we can check if the delayed message is now available:

SELECT * FROM pgmq.read(
    queue_name => 'orders',
    vt => 60,
    qty => 1
);

This command reads one message from the ‘orders’ queue, which should now include the delayed order.

Once we’ve processed the delayed order, we can archive it as well:

SELECT pgmq.archive(
    queue_name => 'orders',
    msg_id => 3
);

This command archives the message with ID 3 from the ‘orders’ queue.

We can check the archive table again to confirm that all three orders have been archived:

SELECT * FROM pgmq.a_orders;

Cleaning Up

Finally, if we want to clean up after our demo, we can drop the ‘orders’ queue:

SELECT pgmq.drop_queue('orders');

This command removes the ‘orders’ queue from our database. The pgmq.drop_queue function also deletes the associated archive table.

Going All In On Postgres

I’m personally a huge fan of using Postgres for as many things as possible. I have written a separate post on using Postgres for RAG applications that you can check out if you’re interested in learning more about this.

Final Thoughts

As you can see, it’s very simple to set up a message queue, similar to something like SQS, directly in Postgres with PGMQ. This is usually a really nice alternative to using separate services like SQS or Kafka in your stack to keep things simple and avoid having to manage another external service.

If you’re interested in learning more about PGMQ, you can check out the PGMQ GitHub repository for more information.