In my previous submission to this space, I described my experience with pgmq while using the Python library. In this post, I’ll share what I found after inspecting the code.
So, first, I’ll describe the general structure of the project. Then, I’ll explain what happens when we install the pgmq extension. Finally, I’ll describe how some of its functions work.
In this post, I’ll be using version v0.25.0, which you can find here.
Project structure
After cloning the appropriate tag, we can see that the repository contains the following files:
$ ls -1
Cargo.lock
Cargo.toml
CONTRIBUTING.md
core
Dockerfile.build
examples
images
LICENSE
Makefile
pgmq.control
pgmq-rs
README.md
sql
src
tembo-pgmq-python
tests
The project uses pgrx. From pgrx’s README, we know that the relevant files for the extension are Cargo.toml
, pgmq.control
and the src
and sql
directories:
$ tree sql src
sql
├── pgmq--0.10.2--0.11.1.sql
├── pgmq--0.11.1--0.11.2.sql
...
├── pgmq--0.8.0--0.8.1.sql
├── pgmq--0.8.1--0.9.0.sql
└── pgmq--0.9.0--0.10.2.sql
src
├── api.rs
├── errors.rs
├── lib.rs
├── metrics.rs
├── partition.rs
├── sql_src.sql
└── util.rs
0 directories, 7 files
Installing the pgmq extension
This section assumes that you have successfully installed the pre-requisites as described in CONTRIBUTING.md
To build the pgmq extension, we can do the following:
cargo build
Alternatively, to build and install the pgmq extension, we can do:
cargo pgrx install
In either case, we can see a shared library pgmq.so
being created. The installation process also places the shared library in the lib
directory of the postgres installation; and the sql files and the control file in the extensions
directory. In my case:
$ ls -1 /opt/postgres/share/extension/pgmq*
/opt/postgres/share/extension/pgmq--0.10.2--0.11.1.sql
...
/opt/postgres/share/extension/pgmq--0.9.0--0.10.2.sql
/opt/postgres/share/extension/pgmq.control
$ ls -1 /opt/postgres/lib/pgmq*
/opt/postgres/lib/pgmq.so
To test the extension, we can do:
cargo pgrx run
and it’ll start a psql
prompt. In the prompt, we can execute the create extension
statement to start using pgmq:
-- List installed extensions
\dx
-- Enable pgmq
create extension pgmq;
-- List installed extensions again
\dx
The output will look something like:
pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(1 row)
pgmq=# create extension pgmq;
CREATE EXTENSION
pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+---------------------------------------------------------------------
pgmq | 0.25.0 | public | A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(2 rows)
We can also list the available functions:
-- List available functions under pgmq schema
\df pgmq.*
pgmq=# \df pgmq.*
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+------
pgmq | archive | boolean | queue_name text, msg_id bigint | func
pgmq | archive | TABLE(archive boolean) | queue_name text, msg_ids bigint[] | func
pgmq | create | void | queue_name text | func
pgmq | create_non_partitioned | void | queue_name text | func
pgmq | create_partitioned | void | queue_name text, partition_interval text DEFAULT '10000'::text, retention_interval text DEFAULT '100000'::text | func
pgmq | delete | boolean | queue_name text, msg_id bigint | func
pgmq | delete | TABLE(delete boolean) | queue_name text, msg_ids bigint[] | func
pgmq | drop_queue | boolean | queue_name text, partitioned boolean DEFAULT false | func
pgmq | list_queues | TABLE(queue_name text, created_at timestamp with time zone) | | func
pgmq | metrics | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | queue_name text | func
pgmq | metrics_all | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | | func
pgmq | pop | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text | func
pgmq | purge_queue | bigint | queue_name text | func
pgmq | read | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer | func
pgmq | read_with_poll | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer, poll_timeout_s integer DEFAULT 5, poll_interval_ms integer DEFAULT 250 | func
pgmq | send | bigint | queue_name text, message jsonb, delay integer DEFAULT 0 | func
pgmq | send_batch | TABLE(msg_id bigint) | queue_name text, messages jsonb[], delay integer DEFAULT 0 | func
pgmq | set_vt | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, msg_id bigint, vt_offset integer | func
(18 rows)
With this, we can now explore the extension from the inside. And, if needed, recompile and reinstall the extension to play with it.
The internals
We know that when an extension is created with pgrx, it generates a lib.rs
file. Let us explore it.
One of the first thing we can see, is that the other five files in the src/
directory are included:
pub mod api;
pub mod errors;
pub mod metrics;
pub mod partition;
pub mod util;
After reviewing these files a little bit, we can notice that there’s also some relevant code in another module, the one in core/
. For example, in src/partition.rs
:
use pgmq_core::{
errors::PgmqError,
query::{
assign_archive, assign_queue, create_archive, create_archive_index, create_index,
create_meta, grant_pgmon_meta, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta,
},
types::{PGMQ_SCHEMA, QUEUE_PREFIX},
util::CheckedName,
};
So, at this point we know that we can find the source code in two places: src/
and core/
.
If we continue exploring lib.rs
, we can see that a sql file (sql_src.sql
) is executed when the extension is enabled:
CREATE TABLE pgmq.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
...
We can actually see that table with psql
:
-- List tables in the pgmq schema
\dt pgmq.*
-- List contents of pgmq.meta
select * from pgmq.meta;
pgmq-# \dt pgmq.*
List of relations
Schema | Name | Type | Owner
--------+------+-------+----------
public | meta | table | binidxaba
(1 row)
pgmq=# select * from pgmq.meta;
queue_name | created_at
------------+------------
(0 rows)
The following diagram shows what the pgmq schema looks like right after CREATE EXTENSION
is executed:
From this point, we can suspect that every time we create a queue, a new row is inserted into this table.
Let us see what pgmq.create()
does…
pgmq.create()
Most of the functions provided by pgmq are defined in src/api.rs
. In that file, we can find the function pgmq_create(queue_name: &str)
, and if we chase the call sequence, we can discover that the interesting function is init_queue(name: &str)
in core/src/query.rs
:
pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_queue(name)?,
assign_queue(name)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
grant_pgmon_queue(name)?,
])
}
This function generates several sql statements that are later executed in pgmq_create_non_partitioned
using an Spi
client.
I’ll skip the details, but the sql statements basically do:
- Create a table
pgmq.q_<queue_name>
. - Assign the table to the pqmg extension.
- Create an index on the
pgmq.q_<queue_name>
table. - Create a table
pgmq.a_<queue_name>
. - Assign the table to the pgmq extension.
- Create an index on the
pgmq.a_<queue_name>
table. - Insert a row on the
pgmq.meta
table. - Grant privileges to
pg_monitor
.
We can see the effects of this in psql using the following lines:
-- Create a queue
select pgmq.create('my_queue');
-- List tables
\dt pgmq.*
-- List indexes
\di pgmq.*
-- See the contents of pgmq_meta
select * from pgmq.meta;
The output will show something like:
pgmq=# select pgmq_create('my_queue');
create
--------
(1 row)
pgmq=# \dt pgmq.*;
List of relations
Schema | Name | Type | Owner
--------+------------+-------+-----------
pgmq | a_my_queue | table | binidxaba
pgmq | meta | table | binidxaba
pgmq | q_my_queue | table | binidxaba
(3 rows)
pgmq=# \di pgmq.*
List of relations
Schema | Name | Type | Owner | Table
--------+--------------------------+-------+-----------+------------
pgmq | a_my_queue_pkey | index | binidxaba | a_my_queue
pgmq | archived_at_idx_my_queue | index | binidxaba | a_my_queue
pgmq | meta_queue_name_key | index | binidxaba | meta
pgmq | q_my_queue_pkey | index | binidxaba | q_my_queue
pgmq | q_my_queue_vt_idx | index | binidxaba | q_my_queue
(5 rows)
pgmq=# select * from pgmq.meta;
queue_name | is_partitioned | created_at
------------+----------------+-------------------------------
my_queue | f | 2023-09-18 23:35:38.163096-06
(1 row)
The following diagram shows what the pgmq schema looks like at this point:
For the queue my_queue
, we can see the underlying table and the corresponding archive table. Each table has an index associated with the primary key. The pgmq.q_my_queue
table also has an index on the vt
column, and pgmq.a_my_queue
has an index on the archived_at
column.
We can suspect that the pgmq.q_my_queue
table is used in the send and read operations. Let us look at those two functions.
pgmq.send()
We can explore the send operation in a similar way. The relevant SQL is straightforward. It just inserts a new row in the the underlying table:
INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message)
VALUES {values}
RETURNING msg_id;
At this point, we can see the following pattern in the pgmq project: - the
exposed SQL functions are defined in src/api.rs
, and - the underlying SQL
statements are defined in core/src/query.rs
pgmq.read()
So, let’s see. If I were the one programming pgmq.read()
, I would perhaps do something like “get the first {limit}
rows from the queue table whose {vt}
has already expired, and for those rows, also update the visibility timeout to now() + {vt}
.” Naively, maybe something like:
update pgmq.q_my_queue
SET
vt = clock_timestamp() + interval '10 seconds',
read_ct = read_ct + 1
WHERE
msg_id in (select msg_id from pgmq.q_my_queue where vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT 1);
In reality, pgmq.read
is more interesting than that :sweat_smile:. It performs the following DML:
WITH cte AS
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t
SET
vt = clock_timestamp() + interval '{vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id=cte.msg_id
RETURNING *;
Firstly, in pgmq’s version, there is a CTE (Common Table Expression) to obtain the first {limit}
message IDs whose vt
has expired. (It would be interesting to discuss why pgmq developers used a CTE, but we can explore that in another post.)
There are two crucial things to notice in the CTE. One is the order by
clause that ensures the FIFO ordering. The other one is the FOR UPDATE SKIP LOCKED
clause, claiming the rows no one else has claimed. This part is essential because it ensures correctness in the case of concurrent pgmq.read()
operations.
The next step in the DML is to update the corresponding rows with a new vt value by adding the supplied {vt}
to the current timestamp. Additionally, the read_ct
value is incremented by 1. What is the use of this counter? In general, we can suspect that there is a problem processing a given message if it has a high read_ct
value because users usually archive the message after successfully processing it. So, ideally, a message is only read once.
pgmq.archive()
The next stage in the lifecycle of a message is archiving it. For that, pgmq uses the following insert statement:
WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;
Essentially, it deletes the message with the provided msg_id
from the queue table, and then the message is placed in the corresponding archive table.
One interesting thing to notice is that pgmq.archive()
can be used to archive a batch of messages too:
select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq=# select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq_archive
--------------
t
t
t
(3 rows)
That is achieved in pgrx by declaring two functions using the same name in the pg_extern
derive macro as follows:
#[pg_extern(name = "archive")]
fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtError> {
//...
}
#[pg_extern(name = "archive")]
fn pgmq_archive_batch(
queue_name: &str,
msg_ids: Vec<i64>,
) -> Result<TableIterator<'static, (name!(archive, bool),)>, PgmqExtError> {
//...
}
pgmq.drop_queue()
Finally, let’s talk about pgmq.drop_queue()
. It essentially executes multiple statements:
- Unassign the
pgmq.q_<queue_name>
table from the extension. - Unassign the
pgmq.a_<queue_name>
table from the extension. - Drop the table
pgmq.q_<queue_name>
. - Drop the table
pgmq.a_<queue_name>
. - Delete the corresponding row from the
pgmq.meta
table.
Nothing surprising in this one, and with it, we conclude our tour.
Conclusion
In this post, we explored how the pgrx tool is used to generate the pgmq extension. We explored how the metadata objects are created and how they are used in the basic send, read and archive operations. At least from an explorer perspective, the internals of the extension are currently easy to read and understand.
I invite everyone to explore how the other pgmq functions work. You can explore the code at https://github.com/tembo-io/pgmq. And you can learn more about pgrx at: https://github.com/pgcentralfoundation/pgrx.