Skip to main content

3 posts tagged with "rust"

View All Tags

· 6 min read
Adam Hendel

alt_text

So you have a database, and that database does something. (Probably several somethings, if we're honest). However, today, you need it to do something else.

Simple enough...you just create a tool to give it that new functionality. Job done.

Except it isn't. Because the requests for new "things" never stop. They get complicated. They slow things down. They conflict with one another. Sometimes they even screw up your database along the way. And if we're honest, you don't really want to be constantly building new tools for every new need anyway. Before you know it, all of these "things" you're asking the database to do are starting to get in the way of its core performance.

The good news is that Postgres has a rich ecosystem of tools and services built by the community. Many of these run in the database as Postgres extensions, while others run outside the database as external services. Some of the most well known examples are PostgREST, an out-of-the box REST API for Postgres and pgbouncer, a production ready connection pooler. A scalable way to run these pieces of software with the Tembo operator is to utilize a new feature called Application Services, which runs these applications in containers next to postgres.

Understanding the Essence of Tembo Operator

We have developed the Tembo Operator looking to add new capabilities for developers and enterprises. We believe that it stands out from other techniques in many ways, and in particular, one important feature is that it lets users deploy applications in containers right alongside their PostgreSQL instances, ensuring efficient connection to Postgres with very low latency.

The Advantage of Running Applications in Separate Containers

PostgreSQL is a very powerful piece of software. When running in Kubernetes, a useful pattern is to run important applications in a separate container, in the same namespace. By running these applications in separate containers, we isolate application requirements, such as resource allocations. This means that each container can have dedicated CPU and Memory, ensuring there's no competition with the resources reserved for running PostgreSQL. This segregation ensures that the Postgres database and other applications can function and scale efficiently.

Spotlight on PostgREST: A Prime Example

PostgREST is a perfect example where an application can run with this pattern. PostgREST serves as a standalone web server that turns your database directly into a RESTful API. The immediate advantage? Developers can use the auto-generated API to build robust applications without writing any code. By simplifying the process and reducing the need for middleware, PostgREST has become a popular tool in the Postgres ecosystem.

However, let’s remember that the main advantage of this method is not just resource allocation. It's about ensuring the optimal performance of Postgres without bogging it down with additional tasks.

Let’s look at an example of a spec that would run the workload that we just described. This will look familiar if you’ve worked with the Kubernetes Pod spec.

apiVersion: coredb.io/v1alpha1
kind: CoreDB
metadata:
name: my-postgres-deployment
spec:
image: "quay.io/tembo/standard-cnpg:15.3.0-1-1096aeb"
appServices:
- name: postgrest
image: postgrest/postgrest:v10.0.0
routing:
- port: 3000
ingressPath: /
env:
- name: PGRST_DB_URI
valueFromPlatform: ReadWriteConnection
- name: PGRST_DB_SCHEMA
value: public
- name: PGRST_DB_ANON_ROLE
value: postgres

The Tembo operator always starts postgres with default configurations, so let’s focus on the appServices section of the spec, which will tell us what and how we’ll run an application container.

We can run any number of applications in containers near the Postgres instance. Only the name and image parameters are required, but you can configure commands, arguments, environment variables, CPU and memory, and readiness and liveness probes for the application.

If you need network communication, you can also configure the ingress by specifying a port and a path. In our example, postgREST runs on port 3000 and expects traffic routed to the root path. appServices also support various Middleware configurations, but we will cover those in a future blog.

Environment variables also have some advanced configuration. The Tembo operator creates a few roles in Postgres, and tracks those credentials in a Kubernetes secret. If we want to pull those credentials into an application, we can do so by using the valueFromPlatform option. The service currently supports pulling in credentials for ReadWriteConnection, ReadOnlyConnection but we’ll be building out more role assignments soon.

Arbitrary container deployments

The Tembo operator is not limited to postgREST; it can run nearly any containerized application. For example, run your Rails application, or FastAPI web server. Specify the image, and other configurations as necessary and the Tembo operator will provision the resources.

apiVersion: coredb.io/v1alpha1
kind: CoreDB
metadata:
name: my-postgres-deployment
spec:
appServices:
- name: my-app
image: quay.io/myImage:latest
routing:
- port: 3000
ingressPath: /
command: [ "python", "-m", "myapp.py" ]
env:
- name: MY_ENV
value: my_value

Kubernetes Footprint

Let’s take a look at what actually gets created when you specify an appService; we’ll use the postgREST example as an illustration.

Every application service gets its own Kubernetes Deployment. If the appService has any ingress requirements, then a Kubernetes Service and an ingress resource (Tembo currently uses Traefik) is created for that appService. Middleware is also created (also Traefik) if the appService has any middleware configuration specified. Here’s an image of the pods you’d likely see in the namespace you create for Postgres.

alt_text

More to follow

The Tembo operator is under continuous development and runs the entire Postgres Stack, which is not limited to just the core Postgres engine. It also includes auxiliary container services that run outside of Postgres. These auxiliary services augment the Postgres experience by running complex applications outside of Postgres which isolates their workload from your database. Running this with the Tembo Operator makes it simple to get these services up and running.

And if you want to try out the full power of Postgres without being concerned with how it will run, try out Tembo Cloud.Drop into our Slack channel to ask questions and get help from the Tembo team and other community members.

· 8 min read
Ian Stanton

At Tembo, we’ve been developing an open-source Kubernetes Operator for Postgres. We use this operator to power our managed Postgres platform, Tembo Cloud. We’re excited to share our progress, experience, and vision for this project. This post aims to assist anyone interested in utilizing Kubernetes operators for Postgres or writing Kubernetes operators using Rust.

What is a Kubernetes Operator?

Kubernetes was designed with automation in mind, and operators allow for users to extend native Kubernetes behavior and principles to manage custom resources and components.

With a Kubernetes operator, users can write code that defines how their application should be deployed and managed on Kubernetes. This code is then packaged into a container image and deployed to Kubernetes. The operator then watches for changes to the custom resource and takes action to reconcile the state of the application’s components with the desired state of the custom resource.

In short, using a Kubernetes operator is the most effective way to run applications on Kubernetes in 2023.

You can read more about Kubernetes operators on this CNCF blog post, where the image below is.

./k8s-operator.webp

*Image credit: CNCF blog*

Kubernetes Operators and the Rise of Rust

Because Kubernetes itself is written in Go, the majority of Kubernetes operators available today are also written in Go. The kubebuilder project simplifies the process of building Kubernetes operators in Go and is widely considered the de facto standard for doing so.

With the increasing popularity of Rust, it was only a matter of time before someone developed a framework for building Kubernetes operators in Rust. The kube-rs project allows developers to build Rust-based Kubernetes operators in a similar manner to the kubebuilder project. This project excited us for a few reasons:

  1. We were interested in learning Rust.
  2. We wanted to explore whether Rust could be a viable alternative to Go for writing Kubernetes operators.
  3. We were inspired by the success of companies like Stackable, who have developed numerous Kubernetes operators in Rust.

This excitement led us to the decision to write our Kubernetes operator in Rust.

Building the Tembo Operator

Tembo Cloud distinguishes itself from other managed Postgres offerings in several ways, one of which is the ability to install and enable Postgres extensions on the fly. This experience is in part powered by Trunk, a Postgres extension registry and companion CLI that provide a simplified extension management experience.

It also introduces the concept of Stacks, which are pre-built use-case-specific Postgres deployments which are optimized and tuned to serve a specific workload.

Roll Your Own

In order to build these unique capabilities, we knew we’d need to harness the power and flexibility of a Kubernetes operator in our own way. Although there are several Kubernetes operators for Postgres available, none of them offer the same unique Postgres extension management experience or the concept of Stacks.

Initially, we attempted to build our own operator from scratch. We had successfully built the extension management piece, but soon realized that we were duplicating existing efforts. We had a comprehensive list of baseline features to develop, which included:

  • Backup
  • Recovery
  • Connection Pooling
  • Failover
  • Upgrades

CNPG to the Rescue

Enter CloudNativePG (CNPG). CNPG is a Kubernetes operator for Postgres created by the folks at EDB. We found it to be the most compelling of the many Kubernetes operators for Postgres out there. It provided many of the features we needed, including backup, recovery, connection pooling, failover, and upgrades. However, we still needed the ability to install and enable any Postgres extensions on the fly and define Stacks.

This is where the Tembo Operator comes in. We built the Tembo Operator in a way that utilizes CNPG, which enables us to offer a distinctive management experience for Postgres extensions and Stacks while utilizing a reliable and stable Postgres solution.

Using the Tembo Operator

Let’s take a look at what a custom resource spec looks like for the Tembo Operator. Here’s an example for our Machine Learning Stack. We can see this sample spec makes use of our Machine Learning Stack and includes a handful of extensions. Keep in mind, these extensions are installed at runtime with Trunk and are not built into the container image.

apiVersion: coredb.io/v1alpha1
kind: CoreDB
metadata:
name: sample-machine-learning
spec:
image: "quay.io/tembo/ml-cnpg:15.3.0-1-a3e532d"
stop: false
stack:
name: MachineLearning
postgres_config:
- name: pg_stat_statements.track
value: all
- name: cron.host
value: /controller/run
- name: track_io_timing
value: 'on'
- name: shared_preload_libraries
value: vectorize,pg_stat_statements,pgml,pg_cron,pg_later
trunk_installs:
- name: pgvector
version: 0.5.0
- name: pgml
version: 2.7.1
- name: pg_embedding
version: 0.1.0
- name: pg_cron
version: 1.5.2
- name: pgmq
version: 0.14.2
- name: vectorize
version: 0.0.2
- name: pg_later
version: 0.0.8
extensions:
# trunk project pgvector
- name: vector
locations:
- database: postgres
enabled: true
version: 0.5.0
# trunk project postgresml
- name: pgml
locations:
- database: postgres
enabled: true
version: 2.7.1
# trunk project pg_embedding
- name: embedding
locations:
- database: postgres
enabled: false
version: 0.1.0
- name: pg_cron
description: pg_cron
locations:
- database: postgres
enabled: true
version: 1.5.2
- name: pgmq
description: pgmq
locations:
- database: postgres
enabled: true
version: 0.14.2
- name: vectorize
description: simple vector search
locations:
- database: postgres
enabled: true
version: 0.0.2
- name: pg_later
description: async query execution
locations:
- database: postgres
enabled: true
version: 0.0.8
runtime_config:
- name: shared_buffers
value: "1024MB"
- name: max_connections
value: "431"
- name: work_mem
value: "5MB"
- name: bgwriter_delay
value: "200ms"
- name: effective_cache_size
value: "2867MB"
- name: maintenance_work_mem
value: "204MB"
- name: max_wal_size
value: "10GB"

To create our Postgres instance, we run the following command:

❯ kubectl apply -f yaml/sample-machine-learning.yaml
coredb.coredb.io/sample-machine-learning created
❯ kubectl get po
NAME READY STATUS RESTARTS AGE
sample-machine-learning-1 1/1 Running 0 19s
sample-machine-learning-metrics-5fbcf9b676-hkxtk 1/1 Running 0 31s

Once we’ve connected to the Postgres instance, we can run \dx to confirm the extensions were installed and enabled as expected:

export PGPASSWORD=$(kubectl get secrets/sample-machine-learning-connection --template={{.data.password}} | base64 -d)
❯ psql postgres://postgres:$PGPASSWORD@sample-machine-learning.localhost:5432
psql (16.0 (Ubuntu 16.0-1.pgdg22.04+1), server 15.3)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
Type "help" for help.

postgres=# \dx
List of installed extensions
Name | Version | Schema | Description
--------------------+---------+------------+------------------------------------------------------------------------
pg_cron | 1.5 | pg_catalog | Job scheduler for PostgreSQL
pg_later | 0.0.8 | pglater | pg_later: Run queries now and get results later
pg_stat_statements | 1.10 | public | track planning and execution statistics of all SQL statements executed
pgmq | 0.14.2 | public | A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
vector | 0.5.0 | public | vector data type and ivfflat access method
vectorize | 0.0.2 | vectorize | The simplest way to do vector search on Postgres

Let’s install a new extension by adding the following to our sample spec:

...
trunk_installs:
- name: pg_bm25
version: 0.4.0
...
extensions:
- name: pg_bm25
locations:
- database: postgres
enabled: true
version: 0.4.0

After applying the updated spec and connecting to Postgres, we can see the new extension pg_bm25 is installed and enabled as expected:

❯ psql postgres://postgres:$PGPASSWORD@sample-machine-learning.localhost:5432
psql (16.0 (Ubuntu 16.0-1.pgdg22.04+1), server 15.3)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, compression: off)
Type "help" for help.

postgres=# \dx
List of installed extensions
Name | Version | Schema | Description
--------------------+---------+------------+------------------------------------------------------------------------
pg_bm25 | 0.0.0 | paradedb | pg_bm25: PostgreSQL-native, full text search using BM25
pg_cron | 1.5 | pg_catalog | Job scheduler for PostgreSQL
pg_later | 0.0.8 | pglater | pg_later: Run queries now and get results later
pg_stat_statements | 1.10 | public | track planning and execution statistics of all SQL statements executed
pgmq | 0.14.2 | public | A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
vector | 0.5.0 | public | vector data type and ivfflat access method
vectorize | 0.0.2 | vectorize | The simplest way to do vector search on Postgres

Up Next

We’re currently working on exciting new features that enable the deployment of custom applications alongside Postgres. These features include a REST API, GraphQL, and more. Stay tuned for future updates!

For more information on running the Tembo Operator, check out our docs at:

If you're interested in contributing to the project, check out our Github repo at:

And if you want to try out the full power of Postgres and fully delegate extension management to us, try out Tembo Cloud.

· 12 min read
Binidxaba

In my previous submission to this space, I described my experience with pgmq while using the Python library. In this post, I'll share what I found after inspecting the code.

So, first, I'll describe the general structure of the project. Then, I'll explain what happens when we install the pgmq extension. Finally, I'll describe how some of its functions work.

In this post, I'll be using version v0.25.0, which you can find here.

Project structure

After cloning the appropriate tag, we can see that the repository contains the following files:

$ ls -1
Cargo.lock
Cargo.toml
CONTRIBUTING.md
core
Dockerfile.build
examples
images
LICENSE
Makefile
pgmq.control
pgmq-rs
README.md
sql
src
tembo-pgmq-python
tests

The project uses pgrx. From pgrx's README, we know that the relevant files for the extension are Cargo.toml, pgmq.control and the src and sql directories:

$ tree sql src
sql
├── pgmq--0.10.2--0.11.1.sql
├── pgmq--0.11.1--0.11.2.sql
...
├── pgmq--0.8.0--0.8.1.sql
├── pgmq--0.8.1--0.9.0.sql
└── pgmq--0.9.0--0.10.2.sql
src
├── api.rs
├── errors.rs
├── lib.rs
├── metrics.rs
├── partition.rs
├── sql_src.sql
└── util.rs

0 directories, 7 files

Installing the pgmq extension

note

This section assumes that you have successfully installed the pre-requisites as described in CONTRIBUTING.md

To build the pgmq extension, we can do the following:

cargo build

Alternatively, to build and install the pgmq extension, we can do:

cargo pgrx install

In either case, we can see a shared library pgmq.so being created. The installation process also places the shared library in the lib directory of the postgres installation; and the sql files and the control file in the extensions directory. In my case:

$ ls -1 /opt/postgres/share/extension/pgmq*
/opt/postgres/share/extension/pgmq--0.10.2--0.11.1.sql
...
/opt/postgres/share/extension/pgmq--0.9.0--0.10.2.sql
/opt/postgres/share/extension/pgmq.control

$ ls -1 /opt/postgres/lib/pgmq*
/opt/postgres/lib/pgmq.so

To test the extension, we can do:

cargo pgrx run

and it'll start a psql prompt. In the prompt, we can execute the create extension statement to start using pgmq:

-- List installed extensions
\dx

-- Enable pgmq
create extension pgmq;

-- List installed extensions again
\dx

The output will look something like:

pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(1 row)

pgmq=# create extension pgmq;
CREATE EXTENSION

pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+---------------------------------------------------------------------
pgmq | 0.25.0 | public | A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(2 rows)

We can also list the available functions:

-- List available functions under pgmq schema
\df pgmq.*
pgmq=# \df pgmq.*
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+------
pgmq | archive | boolean | queue_name text, msg_id bigint | func
pgmq | archive | TABLE(archive boolean) | queue_name text, msg_ids bigint[] | func
pgmq | create | void | queue_name text | func
pgmq | create_non_partitioned | void | queue_name text | func
pgmq | create_partitioned | void | queue_name text, partition_interval text DEFAULT '10000'::text, retention_interval text DEFAULT '100000'::text | func
pgmq | delete | boolean | queue_name text, msg_id bigint | func
pgmq | delete | TABLE(delete boolean) | queue_name text, msg_ids bigint[] | func
pgmq | drop_queue | boolean | queue_name text, partitioned boolean DEFAULT false | func
pgmq | list_queues | TABLE(queue_name text, created_at timestamp with time zone) | | func
pgmq | metrics | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | queue_name text | func
pgmq | metrics_all | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | | func
pgmq | pop | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text | func
pgmq | purge_queue | bigint | queue_name text | func
pgmq | read | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer | func
pgmq | read_with_poll | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer, poll_timeout_s integer DEFAULT 5, poll_interval_ms integer DEFAULT 250 | func
pgmq | send | bigint | queue_name text, message jsonb, delay integer DEFAULT 0 | func
pgmq | send_batch | TABLE(msg_id bigint) | queue_name text, messages jsonb[], delay integer DEFAULT 0 | func
pgmq | set_vt | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, msg_id bigint, vt_offset integer | func
(18 rows)

With this, we can now explore the extension from the inside. And, if needed, recompile and reinstall the extension to play with it.

The internals

We know that when an extension is created with pgrx, it generates a lib.rs file. Let us explore it.

One of the first thing we can see, is that the other five files in the src/ directory are included:

pub mod api;
pub mod errors;
pub mod metrics;
pub mod partition;
pub mod util;

After reviewing these files a little bit, we can notice that there's also some relevant code in another module, the one in core/. For example, in src/partition.rs:

use pgmq_core::{
errors::PgmqError,
query::{
assign_archive, assign_queue, create_archive, create_archive_index, create_index,
create_meta, grant_pgmon_meta, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta,
},
types::{PGMQ_SCHEMA, QUEUE_PREFIX},
util::CheckedName,
};

So, at this point we know that we can find the source code in two places: src/ and core/.

If we continue exploring lib.rs, we can see that a sql file (sql_src.sql) is executed when the extension is enabled:

CREATE TABLE pgmq.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
...

We can actually see that table with psql:

-- List tables in the pgmq schema
\dt pgmq.*

-- List contents of pgmq.meta
select * from pgmq.meta;
pgmq-# \dt pgmq.*
List of relations
Schema | Name | Type | Owner
--------+------+-------+----------
public | meta | table | binidxaba
(1 row)

pgmq=# select * from pgmq.meta;
queue_name | created_at
------------+------------
(0 rows)

The following diagram shows what the pgmq schema looks like right after CREATE EXTENSION is executed:

after-create-extension

From this point, we can suspect that every time we create a queue, a new row is inserted into this table.

Let us see what pgmq.create() does...

pgmq.create()

Most of the functions provided by pgmq are defined in src/api.rs. In that file, we can find the function pgmq_create(queue_name: &str), and if we chase the call sequence, we can discover that the interesting function is init_queue(name: &str) in core/src/query.rs:

pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_queue(name)?,
assign_queue(name)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
grant_pgmon_queue(name)?,
])
}

This function generates several sql statements that are later executed in pgmq_create_non_partitioned using an Spi client.

I'll skip the details, but the sql statements basically do:

  1. Create a table pgmq.q_<queue_name>.
  2. Assign the table to the pqmg extension.
  3. Create an index on the pgmq.q_<queue_name> table.
  4. Create a table pgmq.a_<queue_name>.
  5. Assign the table to the pgmq extension.
  6. Create an index on the pgmq.a_<queue_name> table.
  7. Insert a row on the pgmq.meta table.
  8. Grant privileges to pg_monitor.

We can see the effects of this in psql using the following lines:

-- Create a queue
select pgmq.create('my_queue');

-- List tables
\dt pgmq.*

-- List indexes
\di pgmq.*

-- See the contents of pgmq_meta
select * from pgmq.meta;

The output will show something like:

pgmq=# select pgmq_create('my_queue');
create
--------

(1 row)

pgmq=# \dt pgmq.*;
List of relations
Schema | Name | Type | Owner
--------+------------+-------+-----------
pgmq | a_my_queue | table | binidxaba
pgmq | meta | table | binidxaba
pgmq | q_my_queue | table | binidxaba
(3 rows)

pgmq=# \di pgmq.*
List of relations
Schema | Name | Type | Owner | Table
--------+--------------------------+-------+-----------+------------
pgmq | a_my_queue_pkey | index | binidxaba | a_my_queue
pgmq | archived_at_idx_my_queue | index | binidxaba | a_my_queue
pgmq | meta_queue_name_key | index | binidxaba | meta
pgmq | q_my_queue_pkey | index | binidxaba | q_my_queue
pgmq | q_my_queue_vt_idx | index | binidxaba | q_my_queue
(5 rows)

pgmq=# select * from pgmq.meta;
queue_name | is_partitioned | created_at
------------+----------------+-------------------------------
my_queue | f | 2023-09-18 23:35:38.163096-06
(1 row)

The following diagram shows what the pgmq schema looks like at this point:

complete

For the queue my_queue, we can see the underlying table and the corresponding archive table. Each table has an index associated with the primary key. The pgmq.q_my_queue table also has an index on the vt column, and pgmq.a_my_queue has an index on the archived_at column.

We can suspect that the pgmq.q_my_queue table is used in the send and read operations. Let us look at those two functions.

pgmq.send()

We can explore the send operation in a similar way. The relevant SQL is straightforward. It just inserts a new row in the the underlying table:

INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message)
VALUES {values}
RETURNING msg_id;

pgmq-send

note

At this point, we can see the following pattern in the pgmq project:

  • the exposed SQL functions are defined in src/api.rs, and
  • the underlying SQL statements are defined in core/src/query.rs

pgmq.read()

So, let's see. If I were the one programming pgmq.read(), I would perhaps do something like "get the first {limit} rows from the queue table whose {vt} has already expired, and for those rows, also update the visibility timeout to now() + {vt}." Naively, maybe something like:

update pgmq.q_my_queue
SET
vt = clock_timestamp() + interval '10 seconds',
read_ct = read_ct + 1
WHERE
msg_id in (select msg_id from pgmq.q_my_queue where vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT 1);

In reality, pgmq.read is more interesting than that 😅. It performs the following DML:

WITH cte AS
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t
SET
vt = clock_timestamp() + interval '{vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id=cte.msg_id
RETURNING *;

pgmq-read

Firstly, in pgmq's version, there is a CTE (Common Table Expression) to obtain the first {limit} message IDs whose vt has expired. (It would be interesting to discuss why pgmq developers used a CTE, but we can explore that in another post.)

There are two crucial things to notice in the CTE. One is the order by clause that ensures the FIFO ordering. The other one is the FOR UPDATE SKIP LOCKED clause, claiming the rows no one else has claimed. This part is essential because it ensures correctness in the case of concurrent pgmq.read() operations.

The next step in the DML is to update the corresponding rows with a new vt value by adding the supplied {vt} to the current timestamp. Additionally, the read_ct value is incremented by 1. What is the use of this counter? In general, we can suspect that there is a problem processing a given message if it has a high read_ct value because users usually archive the message after successfully processing it. So, ideally, a message is only read once.

pgmq.archive()

The next stage in the lifecycle of a message is archiving it. For that, pgmq uses the following insert statement:

WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;

Essentially, it deletes the message with the provided msg_id from the queue table, and then the message is placed in the corresponding archive table.

pgmq-archive

One interesting thing to notice is that pgmq.archive() can be used to archive a batch of messages too:

select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq=# select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq_archive
--------------
t
t
t
(3 rows)

That is achieved in pgrx by declaring two functions using the same name in the pg_extern derive macro as follows:

#[pg_extern(name = "archive")]
fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtError> {
//...
}

#[pg_extern(name = "archive")]
fn pgmq_archive_batch(
queue_name: &str,
msg_ids: Vec<i64>,
) -> Result<TableIterator<'static, (name!(archive, bool),)>, PgmqExtError> {
//...
}

pgmq.drop_queue()

Finally, let's talk about pgmq.drop_queue(). It essentially executes multiple statements:

  1. Unassign the pgmq.q_<queue_name> table from the extension.
  2. Unassign the pgmq.a_<queue_name> table from the extension.
  3. Drop the table pgmq.q_<queue_name>.
  4. Drop the table pgmq.a_<queue_name>.
  5. Delete the corresponding row from the pgmq.meta table.

Nothing surprising in this one, and with it, we conclude our tour.

Conclusion

In this post, we explored how the pgrx tool is used to generate the pgmq extension. We explored how the metadata objects are created and how they are used in the basic send, read and archive operations. At least from an explorer perspective, the internals of the extension are currently easy to read and understand.

I invite everyone to explore how the other pgmq functions work. You can explore the code at https://github.com/tembo-io/pgmq. And you can learn more about pgrx at: https://github.com/pgcentralfoundation/pgrx.