Skip to main content

One post tagged with "queues"

View All Tags

· 5 min read
Adam Hendel

We’ve released PGMQ, a packaged extension for message queues on Postgres.

People have been implementing queues on Postgres in many different ways and we’re excited about combining lessons learned from those projects into a simple, feature-rich extension.

Some exciting features of the project include:

  • Guaranteed exactly-once delivery of messages within a visibility timeout
  • Optional archival of messages retention for replayability and retention
  • Familiar SQL interface
  • Single and Batch read of messages
  • Client SDKs in both Rust and Python for an ORM-like feel

The need for message queues

Message queues are a very common architectural feature to manage operational pipelines, particularly within the context of asynchronous tasks and distributed systems. There are products in the market that support message queues (Kafka, RSMQ, RabbitMQ, SQS); however, when adopting one of these technologies, you increase your cognitive load, required skills and production support overhead.

Building your queues on Postgres

As a Postgres startup, we had the same issue, and we decided to build our own Message Queue based on Postgres. We are not the first: others have implemented queues on Postgres, and many have written about it including Dagster, CrunchyData, and PostgresFM dedicated an entire podcast episode to them.

At Tembo, we needed a job queue to manage tasks between our control-plane and data-plane in our managed cloud offering. Our control-plane publishes tasks like create postgres cluster, and update postgres cluster.

To keep our architecture simple and reduce technology sprawl, we built a Postgres extension so that we could run queues for our cloud and more easily share the implementation with the community.

Queues Implemented with best practices

PGMQ was implemented on Postgres and follows industry best practices. One of the most important practices is the use of Postgres’s SKIP LOCKED, which is similar to NOWAIT in other databases. SKIP LOCKED helps ensure that consumers don't hang and FOR UPDATE ensures messages are not duplicated on read. PGMQ also supports partitioning, which is particularly beneficial for large queues and can be used to efficiently archive / expire old messages.

PGMQ also provides exactly once delivery semantics within a visibility timeout. Similar to Amazon’s SQS and RSMQ, PGMQ consumers set the period of time during which Postgres will prevent all consumers from receiving and processing a message. This is done by the consumer on read, and once the visibility timeout expires the message becomes available for consumption once again. That way, if a consumer crashes, there is no data loss. This effectively means at-least-once delivery semantics once the first visibility timeout has expired.


Using PGMQ

To get started, check out our project’s README for a guide on installing the extension.

You can create a new queue by simply calling

SELECT pgmq_create('my_queue');

Then, pgmq_send() a couple messages to the queue. The message id is returned from the send() function.

SELECT * from pgmq_send('my_queue', '{"foo": "bar1"}');
SELECT * from pgmq_send('my_queue', '{"foo": "bar2"}');
(1 row)

(1 row)

Read 2 messages from the queue. Make them invisible for 30 seconds. If the messages are not deleted or archived within 30 seconds, they will become visible again and can be read by another consumer.

SELECT * from pgmq_read('my_queue', 30, 2);
 msg_id | read_ct |              vt               |          enqueued_at          |    message
1 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar1"}
2 | 1 | 2023-02-07 04:56:00.650342-06 | 2023-02-07 04:54:51.530818-06 | {"foo":"bar2"}

If the queue is empty, or if all messages are currently invisible, no rows will be returned.

SELECT * from pgmq_read('my_queue', 30, 1);
 msg_id | read_ct | vt | enqueued_at | message

Archiving removes the message from the queue and inserts it to the queue’s archive table. This provides you with an opt-in retention mechanism for messages, and is an excellent way to debug applications.

Archive the message with id 2.

SELECT * from pgmq_archive('my_queue', 2);

Then inspect the message on the archive table.

SELECT * from pgmq_my_queue_archive;
 msg_id | read_ct |         enqueued_at          |          deleted_at           |              vt               |     message     
2 | 1 | 2023-04-25 00:55:40.68417-05 | 2023-04-25 00:56:35.937594-05 | 2023-04-25 00:56:20.532012-05 | {"foo": "bar2"}```

Alternatively, you can delete a message forever.

SELECT * from pgmq_send('my_queue', '{"foo": "bar3"}');
(1 row)
SELECT pgmq_delete('my_queue', 3);

Getting involved

Give us a star and try out PGMQ by cloning the repo and following the example in the README. Please use Github issues if you run into any issues or have any feedback.

We’ve also built client side libraries in Rust and Python, which will give you an ORM-like experience.

You can also try PGMQ on Tembo Cloud as part of our Message Queue Stack. Tembo Cloud’s Message Queue Stack is powered by PGMQ, but also ships with Postgres configurations optimized for message queue workloads. We’re also working on adding metrics and data visualizations specific to message queues.

Interested in learning more?

Stay tuned for our upcoming post pg_later, an extension we built on top of PGMQ as well as benchmarks comparing PGMQ to SQS and Redis.