Skip to main content

3 posts tagged with "pgmq"

View All Tags

· 6 min read
Adam Hendel

image

Your app needs a message queue. Simple enough—until you try to do it, anyway.

Go set it up on Kafka? Sure...but now you have a Kafka cluster to manage. Redis could work, but now you're just managing Redis instances instead. SQS? That means you have to reconfigure your application to talk to AWS, and you also get an extra external bill as icing on the cake. Let's build it on Postgres! However, if you follow most blogs and guides, you'll probably end up building an agent, watcher process, or background worker to make sure the queue stays healthy. It's better, but if the background worker fails, you could end up with bigger problems.

Fortunately, there's a better way.

By designing with a visibility timeout, we remove the need for external processes for queue management. PGMQ is a Postgres extension built following exactly this sort of self-regulating queue. Today we're going to combine PGMQ with pair of core Postgres features—FOR UPDATE and SKIP LOCKED—to cover all the needs of our message queue. FOR UPDATE helps ensure that just a single consumer receives a message in the queue. And SKIP LOCKED is required if you want to have multiple consumers on the same queue – without it each consumer would wait for the others to remove their locks. Before we put all the pieces together, let's do a quick refresher on how each of these works.

FOR UPDATE: Ensuring Exclusive Access

Imagine a crowded store with a single cashier. As customers approach the counter, the cashier must serve them one at a time, ensuring each transaction is complete before moving on to the next. Similarly, when working with a queue, it's essential to ensure that only one process or transaction works with a particular row of data at a time. This is where FOR UPDATE comes into play.

The FOR UPDATE clause is used to lock selected rows, preventing other transactions from modifying or locking them until the current transaction ends. This ensures that once a task (or row in our queue) is picked up, it isn't grabbed by another worker or process simultaneously. It guarantees exclusive access, much like how our lone cashier attends to a single customer at a time.

SKIP LOCKED: Keeping the Line Moving

Back to our store analogy, if a customer isn't ready to check out and holds up the line, it can cause unnecessary delays for the other customers. What if, instead, those who aren't ready simply step aside, allowing others to continue? That would undoubtedly speed up the checkout process. This is the concept behind SKIP LOCKED.

When combined with FOR UPDATE, the SKIP LOCKED clause ensures that if a row is locked by another transaction, it gets skipped over, and the next available row is selected. This way, other workers or processes don't get stuck waiting for a locked row to be released; they simply move on to the next available task.

By using these two clauses together, you can ensure that tasks in your queue are processed smoothly and efficiently, with each task getting picked up by a single worker and other workers moving seamlessly to the next available task.

However, how can we take this one step further? Many queue implementations have us using FOR UPDATE to mark a message as “in progress” which is great. However, that typically requires us to have a service external to postgres which monitors the queue to check for messages which have been “in progress” for too long.

Self-Regulating Queues in PostgreSQL

If you’re using FOR UPDATE SKIP LOCKED, and setting messages “in progress”, you most likely need a process to watch your queue and check for messages that have been processing for too long. Rather than running a background worker or external process, PGMQ implements a Visibility Timeout (VT). A VT is a designated period during which a message, once read from the queue, becomes invisible to other consumers or workers. This ensures that once a worker picks up a task, other workers won't attempt to process the same task for the duration of this timeout. If the original worker fails to complete the task within the specified timeout, the task becomes visible again by nature of time elapsing past the specified VT, which means PGMQ still provides an at-least-once delivery guarantee even when the VT has elapsed. The task is ready for reprocessing by the same or a different worker.

In essence, the visibility timeout provides a grace period for tasks to be processed. It becomes a buffer against potential failures, ensuring that if a task isn’t completed due to any unforeseen reasons, it doesn’t get lost but rather re-enters the queue. Without something like the VT, queue systems will need to run a process to watch the queue. If that watcher crashes, or loses connection, then messages will stay unavailable until the watcher is recovered.

Queue regulation isn’t just for error modes

A common use case is for a consumer that needs to process a long-running I/O bound task. Let’s say there is a message with a task to create some infrastructure in your favorite cloud provider, e.g. create an EC2 instance if it doesn’t already exist. That could take minutes to start up. Your consumer can submit the request to provision EC2, and then instead of waiting for EC2 to create, it can set the VT on that message to 60 seconds from now, then move on to read the next message. In 60 seconds, the message will become visible again and can be picked up. That can look something like the follow pseudo-code:

# read a message, make it invisible for 30 seconds
select pgmq.read(‘task_queue’, 30, 1)

...check if S3 bucket already already created
...request to create S3 bucket if not exists

# set message VT to 60 seconds from now If not exists:
select pgmq.set_vt(‘task_queue’, <msg_id>, 60)

# consumer archives or deletes the message when its finished with its job
select pgmq.archive('test_queue', <msg_id>)

With PGMQ’s design, messages do not leave the queue until they are explicitly archived or deleted, so you could think of using the VT directly as “returning the message to the queue”, even though it technically never left the queue. Rather, returning it to a state that can be read by other consumers.

Choose the simplest queue architecture with PGMQ

Use FOR UPDATE so that you ensure messages are only read by one consumer at time. SKIP LOCKED so that you can have multiple workers processing messages concurrently. Finally, implement a visibility timeout so that you no longer need to rely on an external process to handle messages that have failed to process. Install the open-source PGMQ extension for Postgres or try it on Tembo Cloud today and immediately benefit from all of these design decisions.

· 12 min read
Binidxaba

In my previous submission to this space, I described my experience with pgmq while using the Python library. In this post, I'll share what I found after inspecting the code.

So, first, I'll describe the general structure of the project. Then, I'll explain what happens when we install the pgmq extension. Finally, I'll describe how some of its functions work.

In this post, I'll be using version v0.25.0, which you can find here.

Project structure

After cloning the appropriate tag, we can see that the repository contains the following files:

$ ls -1
Cargo.lock
Cargo.toml
CONTRIBUTING.md
core
Dockerfile.build
examples
images
LICENSE
Makefile
pgmq.control
pgmq-rs
README.md
sql
src
tembo-pgmq-python
tests

The project uses pgrx. From pgrx's README, we know that the relevant files for the extension are Cargo.toml, pgmq.control and the src and sql directories:

$ tree sql src
sql
├── pgmq--0.10.2--0.11.1.sql
├── pgmq--0.11.1--0.11.2.sql
...
├── pgmq--0.8.0--0.8.1.sql
├── pgmq--0.8.1--0.9.0.sql
└── pgmq--0.9.0--0.10.2.sql
src
├── api.rs
├── errors.rs
├── lib.rs
├── metrics.rs
├── partition.rs
├── sql_src.sql
└── util.rs

0 directories, 7 files

Installing the pgmq extension

note

This section assumes that you have successfully installed the pre-requisites as described in CONTRIBUTING.md

To build the pgmq extension, we can do the following:

cargo build

Alternatively, to build and install the pgmq extension, we can do:

cargo pgrx install

In either case, we can see a shared library pgmq.so being created. The installation process also places the shared library in the lib directory of the postgres installation; and the sql files and the control file in the extensions directory. In my case:

$ ls -1 /opt/postgres/share/extension/pgmq*
/opt/postgres/share/extension/pgmq--0.10.2--0.11.1.sql
...
/opt/postgres/share/extension/pgmq--0.9.0--0.10.2.sql
/opt/postgres/share/extension/pgmq.control

$ ls -1 /opt/postgres/lib/pgmq*
/opt/postgres/lib/pgmq.so

To test the extension, we can do:

cargo pgrx run

and it'll start a psql prompt. In the prompt, we can execute the create extension statement to start using pgmq:

-- List installed extensions
\dx

-- Enable pgmq
create extension pgmq;

-- List installed extensions again
\dx

The output will look something like:

pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+------------------------------
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(1 row)

pgmq=# create extension pgmq;
CREATE EXTENSION

pgmq=# \dx
List of installed extensions
Name | Version | Schema | Description
---------+---------+------------+---------------------------------------------------------------------
pgmq | 0.25.0 | public | A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
(2 rows)

We can also list the available functions:

-- List available functions under pgmq schema
\df pgmq.*
pgmq=# \df pgmq.*
List of functions
Schema | Name | Result data type | Argument data types | Type
--------+------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+------
pgmq | archive | boolean | queue_name text, msg_id bigint | func
pgmq | archive | TABLE(archive boolean) | queue_name text, msg_ids bigint[] | func
pgmq | create | void | queue_name text | func
pgmq | create_non_partitioned | void | queue_name text | func
pgmq | create_partitioned | void | queue_name text, partition_interval text DEFAULT '10000'::text, retention_interval text DEFAULT '100000'::text | func
pgmq | delete | boolean | queue_name text, msg_id bigint | func
pgmq | delete | TABLE(delete boolean) | queue_name text, msg_ids bigint[] | func
pgmq | drop_queue | boolean | queue_name text, partitioned boolean DEFAULT false | func
pgmq | list_queues | TABLE(queue_name text, created_at timestamp with time zone) | | func
pgmq | metrics | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | queue_name text | func
pgmq | metrics_all | TABLE(queue_name text, queue_length bigint, newest_msg_age_sec integer, oldest_msg_age_sec integer, total_messages bigint, scrape_time timestamp with time zone) | | func
pgmq | pop | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text | func
pgmq | purge_queue | bigint | queue_name text | func
pgmq | read | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer | func
pgmq | read_with_poll | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, vt integer, "limit" integer, poll_timeout_s integer DEFAULT 5, poll_interval_ms integer DEFAULT 250 | func
pgmq | send | bigint | queue_name text, message jsonb, delay integer DEFAULT 0 | func
pgmq | send_batch | TABLE(msg_id bigint) | queue_name text, messages jsonb[], delay integer DEFAULT 0 | func
pgmq | set_vt | TABLE(msg_id bigint, read_ct integer, enqueued_at timestamp with time zone, vt timestamp with time zone, message jsonb) | queue_name text, msg_id bigint, vt_offset integer | func
(18 rows)

With this, we can now explore the extension from the inside. And, if needed, recompile and reinstall the extension to play with it.

The internals

We know that when an extension is created with pgrx, it generates a lib.rs file. Let us explore it.

One of the first thing we can see, is that the other five files in the src/ directory are included:

pub mod api;
pub mod errors;
pub mod metrics;
pub mod partition;
pub mod util;

After reviewing these files a little bit, we can notice that there's also some relevant code in another module, the one in core/. For example, in src/partition.rs:

use pgmq_core::{
errors::PgmqError,
query::{
assign_archive, assign_queue, create_archive, create_archive_index, create_index,
create_meta, grant_pgmon_meta, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta,
},
types::{PGMQ_SCHEMA, QUEUE_PREFIX},
util::CheckedName,
};

So, at this point we know that we can find the source code in two places: src/ and core/.

If we continue exploring lib.rs, we can see that a sql file (sql_src.sql) is executed when the extension is enabled:

CREATE TABLE pgmq.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
...

We can actually see that table with psql:

-- List tables in the pgmq schema
\dt pgmq.*

-- List contents of pgmq.meta
select * from pgmq.meta;
pgmq-# \dt pgmq.*
List of relations
Schema | Name | Type | Owner
--------+------+-------+----------
public | meta | table | binidxaba
(1 row)

pgmq=# select * from pgmq.meta;
queue_name | created_at
------------+------------
(0 rows)

The following diagram shows what the pgmq schema looks like right after CREATE EXTENSION is executed:

after-create-extension

From this point, we can suspect that every time we create a queue, a new row is inserted into this table.

Let us see what pgmq.create() does...

pgmq.create()

Most of the functions provided by pgmq are defined in src/api.rs. In that file, we can find the function pgmq_create(queue_name: &str), and if we chase the call sequence, we can discover that the interesting function is init_queue(name: &str) in core/src/query.rs:

pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_queue(name)?,
assign_queue(name)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
grant_pgmon_queue(name)?,
])
}

This function generates several sql statements that are later executed in pgmq_create_non_partitioned using an Spi client.

I'll skip the details, but the sql statements basically do:

  1. Create a table pgmq.q_<queue_name>.
  2. Assign the table to the pqmg extension.
  3. Create an index on the pgmq.q_<queue_name> table.
  4. Create a table pgmq.a_<queue_name>.
  5. Assign the table to the pgmq extension.
  6. Create an index on the pgmq.a_<queue_name> table.
  7. Insert a row on the pgmq.meta table.
  8. Grant privileges to pg_monitor.

We can see the effects of this in psql using the following lines:

-- Create a queue
select pgmq.create('my_queue');

-- List tables
\dt pgmq.*

-- List indexes
\di pgmq.*

-- See the contents of pgmq_meta
select * from pgmq.meta;

The output will show something like:

pgmq=# select pgmq_create('my_queue');
create
--------

(1 row)

pgmq=# \dt pgmq.*;
List of relations
Schema | Name | Type | Owner
--------+------------+-------+-----------
pgmq | a_my_queue | table | binidxaba
pgmq | meta | table | binidxaba
pgmq | q_my_queue | table | binidxaba
(3 rows)

pgmq=# \di pgmq.*
List of relations
Schema | Name | Type | Owner | Table
--------+--------------------------+-------+-----------+------------
pgmq | a_my_queue_pkey | index | binidxaba | a_my_queue
pgmq | archived_at_idx_my_queue | index | binidxaba | a_my_queue
pgmq | meta_queue_name_key | index | binidxaba | meta
pgmq | q_my_queue_pkey | index | binidxaba | q_my_queue
pgmq | q_my_queue_vt_idx | index | binidxaba | q_my_queue
(5 rows)

pgmq=# select * from pgmq.meta;
queue_name | is_partitioned | created_at
------------+----------------+-------------------------------
my_queue | f | 2023-09-18 23:35:38.163096-06
(1 row)

The following diagram shows what the pgmq schema looks like at this point:

complete

For the queue my_queue, we can see the underlying table and the corresponding archive table. Each table has an index associated with the primary key. The pgmq.q_my_queue table also has an index on the vt column, and pgmq.a_my_queue has an index on the archived_at column.

We can suspect that the pgmq.q_my_queue table is used in the send and read operations. Let us look at those two functions.

pgmq.send()

We can explore the send operation in a similar way. The relevant SQL is straightforward. It just inserts a new row in the the underlying table:

INSERT INTO {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (vt, message)
VALUES {values}
RETURNING msg_id;

pgmq-send

note

At this point, we can see the following pattern in the pgmq project:

  • the exposed SQL functions are defined in src/api.rs, and
  • the underlying SQL statements are defined in core/src/query.rs

pgmq.read()

So, let's see. If I were the one programming pgmq.read(), I would perhaps do something like "get the first {limit} rows from the queue table whose {vt} has already expired, and for those rows, also update the visibility timeout to now() + {vt}." Naively, maybe something like:

update pgmq.q_my_queue
SET
vt = clock_timestamp() + interval '10 seconds',
read_ct = read_ct + 1
WHERE
msg_id in (select msg_id from pgmq.q_my_queue where vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT 1);

In reality, pgmq.read is more interesting than that 😅. It performs the following DML:

WITH cte AS
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} t
SET
vt = clock_timestamp() + interval '{vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id=cte.msg_id
RETURNING *;

pgmq-read

Firstly, in pgmq's version, there is a CTE (Common Table Expression) to obtain the first {limit} message IDs whose vt has expired. (It would be interesting to discuss why pgmq developers used a CTE, but we can explore that in another post.)

There are two crucial things to notice in the CTE. One is the order by clause that ensures the FIFO ordering. The other one is the FOR UPDATE SKIP LOCKED clause, claiming the rows no one else has claimed. This part is essential because it ensures correctness in the case of concurrent pgmq.read() operations.

The next step in the DML is to update the corresponding rows with a new vt value by adding the supplied {vt} to the current timestamp. Additionally, the read_ct value is incremented by 1. What is the use of this counter? In general, we can suspect that there is a problem processing a given message if it has a high read_ct value because users usually archive the message after successfully processing it. So, ideally, a message is only read once.

pgmq.archive()

The next stage in the lifecycle of a message is archiving it. For that, pgmq uses the following insert statement:

WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name}
WHERE msg_id = ANY($1)
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{ARCHIVE_PREFIX}_{name} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;

Essentially, it deletes the message with the provided msg_id from the queue table, and then the message is placed in the corresponding archive table.

pgmq-archive

One interesting thing to notice is that pgmq.archive() can be used to archive a batch of messages too:

select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq=# select pgmq.archive('my_queue', ARRAY[3, 4, 5]);
pgmq_archive
--------------
t
t
t
(3 rows)

That is achieved in pgrx by declaring two functions using the same name in the pg_extern derive macro as follows:

#[pg_extern(name = "archive")]
fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtError> {
//...
}

#[pg_extern(name = "archive")]
fn pgmq_archive_batch(
queue_name: &str,
msg_ids: Vec<i64>,
) -> Result<TableIterator<'static, (name!(archive, bool),)>, PgmqExtError> {
//...
}

pgmq.drop_queue()

Finally, let's talk about pgmq.drop_queue(). It essentially executes multiple statements:

  1. Unassign the pgmq.q_<queue_name> table from the extension.
  2. Unassign the pgmq.a_<queue_name> table from the extension.
  3. Drop the table pgmq.q_<queue_name>.
  4. Drop the table pgmq.a_<queue_name>.
  5. Delete the corresponding row from the pgmq.meta table.

Nothing surprising in this one, and with it, we conclude our tour.

Conclusion

In this post, we explored how the pgrx tool is used to generate the pgmq extension. We explored how the metadata objects are created and how they are used in the basic send, read and archive operations. At least from an explorer perspective, the internals of the extension are currently easy to read and understand.

I invite everyone to explore how the other pgmq functions work. You can explore the code at https://github.com/tembo-io/pgmq. And you can learn more about pgrx at: https://github.com/pgcentralfoundation/pgrx.

· 5 min read
Binidxaba

In my recent search for something interesting to do with Rust, I discovered that people write postgres extensions using pgrx.

I found that very cool, and while looking for some real-world examples to study and dissect, I came across pgmq: "A lightweight message queue. Like AWS SQS and RSMQ but on Postgres." So, I decided to give it a shot so that perhaps in the future, I can contribute to the project ;)

When reviewing the repository, I noticed a Python client to interact with pgmq and began to play with it.

Let me quickly describe how easy it was for me to use it.

Setting up the environment

The first step was to start Postgres with a docker container. You can check the README for detailed instructions, but in summary, just run:

docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pgmq-pg:latest

A quick test to make sure that Postgres is running:

psql postgres://postgres:postgres@0.0.0.0:5432/postgres

After that, I simply installed the pgmq Python client in my virtual environment:

pip install tembo-pgmq-python

That's all the setup that I needed.

The PGMQueue class

To use the library, we need to instantiate a PGMQueue object and from that object we can call the methods described in the following table:

SQL functionPGMQueue methodDescription
pgmq_create(queue)create_queue(self, queue: str)Creates a new queue with the name queue.
pgmq_send(queue, message)send(self, queue: str, message: dict, delay: Optional[int] = None)Appends a message to the queue.
pgmq_read(queue, vt, num_messages)read(self, queue: str, vt: Optional[int] = None)Reads num_messages from queue and sets the visibility timeout to vt.
pgmq_archive(queue, msg_id)archive(self, queue: str, msg_id: int)Archives the message with msg_id.
pgmq_pop(queue)pop(self, queue: str)Pop the next message in the queue.
pgmq_delete(queue, msg_id)delete(self, queue: str, msg_id: int)Deletes the message with msg_id from the queue.
pgmq_drop_queue(queue)Not available yetDrops the queue.

Next, let me show you how to implement a simple producer/consumer setup using the methods above.

Implementing a Producer

In summary, the required steps are:

  1. Import the Messages and PGMQueue classes.
  2. Instantiate a PGMQueue object.
  3. Create a queue.
  4. Send N messages via the queue in a loop.
from tembo_pgmq_python import Message, PGMQueue

queue = PGMQueue(**connection_info)
queue.create_queue(test_queue)
...

for x in range(num_messages):
...
msg_id = queue.send(test_queue, test_message)
...

The PGMQueue constructor is the one that receives the connection information. For the Postgres instance initiated by the docker container, the connection details are:

queue = PGMQueue(host="localhost",
port=5432,
username="postgres",
password="postgres",
database="postgres")

Implementing a Consumer

In short, the code should basically do: Import the Messages and PGMQueue classes. Consume the messages from the queue in a loop.

from tembo_pgmq_python import Message, PGMQueue

...
queue = PGMQueue(**connection_info)
...

while True:
...
message: Message = queue.pop(queue_name)
...

Harnessing Producer and Consumer

For simplicity, I used a simple shell script to initiate my experiment:

#/bin/bash

# Spawn one producer
python3 producer.py > /tmp/producer.out &
sleep 2


# Spawn 5 consumers
for i in $(seq 1 5)
do
python3 consumer.py > /tmp/consumer_${i}.out &
done

# Wait for everyone to finish
wait

The script basically starts 1 producer and 5 consumers in the background. The output is saved in the /tmp directory.

And that was it...

From this point, you can explore the other available methods.

Some final words...

It was a pleasant surprise how easy it was to create this example: only a couple of shell commands and a couple of short Python scripts. The PGMQueue methods were very intuitive and straightforward. Personally, my next step is to understand how it works internally. But that's a topic for the future :)

I invite everyone to explore this project at: https://github.com/tembo-io/pgmq. Give it a star and also check out the other available clients for Rust and Go.

Appendix

Here is the complete code if you want to give it a try (or see it in this repository):

"""
This is the Producer's code
"""

import random
import string
from tembo_pgmq_python import Message, PGMQueue


if __name__ == '__main__':
host = "localhost"
port = 5432
username = "postgres"
password = "postgres"
database = "postgres"

num_messages = 100000
test_queue = "bench_queue_sample"

queue = PGMQueue(host=host,
port=port,
username=username,
password=password,
database=database)

try:
#queue.drop_queue(test_queue)
queue.create_queue(test_queue)

for x in range(num_messages):

payload = ''.join(random.choices(string.ascii_uppercase + string.digits, k = 10))
msg = {"payload": payload}
msg_id = queue.send(test_queue, msg)

if (x+1) % 1000 == 0:
print("Sent {} messages".format(x + 1))

except Exception as e:
print(f"{e}")
"""
This is the Consumer's code
"""

import random
import time
from tembo_pgmq_python import Message, PGMQueue


if __name__ == '__main__':
host = "localhost"
port = 5432
username = "postgres"
password = "postgres"
database = "postgres"

no_message_timeout = 0
test_queue = "bench_queue_sample"

queue = PGMQueue(host=host, port=port, username=username, password=password, database=database)

while no_message_timeout < 5:
try:
message: Message = queue.pop(test_queue) # type: ignore
print("Consumed message: {}".format(message.message["payload"]))
no_message_timeout = 0

except IndexError:
no_message_timeout += 1
print("No more messages for {no_message_timeout} consecutive reads")
time.sleep(0.500)