Building real-time, thread-safe, resilient, and type safe Queue with Postgres

Naveen Negi
13 min readApr 29, 2024

This blog will go into (fairly) advanced implementation of “Postgres as queue”. If PostgreSQL is already part of your tech stack, it’s convenient because you won’t need to add any new technology. It handles heavy loads well and, with thoughtful database modeling, can significantly improve system performance.

What are we going to cover in this blog

This is what we are going to cover in this blog

  1. Designing a table as Queue
  2. Managing Concurrency
  3. Real-Time Job Processing
  4. Resilience and Failover strategy
  5. Enforcing Valid State Transition for Jobs
  6. Managing “Stuck” Jobs with lease expiry
  7. Simplifying table design with Updatable view

Table design

Consider a table designed to manage jobs, named archive_jobs. The structure of this table is straightforward and includes the following columns: Id, status, and description.

+-------------+--------------+---------------+
| Column Name | Data Type | Constraints |
+-------------+--------------+---------------+
| id | UUID | Not Null, Primary Key |
| description | VARCHAR(255) | Not Null |
| status | VARCHAR(255) | |
+-------------+--------------+---------------+

Handling concurrent access on the table with “skip locked”.

If multiple processes or threads, like several instances of a service running in Kubernetes pods, are reading from the table concurrently, you need to manage concurrency effectively. You can achieve this with the SQL command:

`SELECT * FROM archive_jobs WHERE status = ‘active’ FOR UPDATE SKIP LOCKED`

This command specifically targets rows that are in the 'active' state and does two key things:

  1. Skip Locked Rows: It skips over any rows that are already locked by other processes. This feature is particularly useful in high-concurrency environments as it prevents processes from waiting on each other, thereby reducing bottlenecks.
  2. Lock Rows: Once it finds rows that are not locked, it locks them. This lock persists for the duration of the transaction, ensuring that no other process can modify these rows until the transaction is completed.

It’s important to understand that while SELECT ... FOR UPDATE SKIP LOCKED effectively handles row-level locking, it doesn’t automatically communicate the job's state to other parts of your system. To address this, you should explicitly update the job's status within the same transaction where you acquire the lock. Here's how you can do it:

@Transactional
public List<Job> fetchJobs() {
// Lock and select jobs
List<Job> jobs = jdbcTemplate.query(
"SELECT * FROM archive_jobs WHERE status = ? FOR UPDATE SKIP LOCKED LIMIT ?",
(rs, rowNum) -> new Job(rs.getObject("id", UUID.class), rs.getString("description"), rs.getString("status")),
"active", 5 );

// Get IDs from the fetched jobs
List<UUID> jobIds = jobs.stream().map(Job::id).toList();

// Update statuses if jobs are found
if (!jobIds.isEmpty()) {
jdbcTemplate.update(
"UPDATE archive_jobs SET status = ? WHERE id = ANY(?)",
"processing_active", jobIds.toArray(UUID[]::new)
);
}

return jobs;
}

This function does the following

  • Selects and locks the rows: Ensures that no other transaction can select these rows for updating until the current transaction is complete.
  • Updates the status: Clearly signals to other processes that these jobs are now in a ‘processing’ state.
  • Commits the transaction: Releases the locks and finalizes the state change.

By coupling the select and update operations in one transaction, you ensure atomicity and maintain system integrity, preventing other processes from interfering with jobs that are already being processed.

If a job fails during processing, having a distinct “processing” state can help in identifying jobs that didn’t complete successfully. This can be crucial for troubleshooting issues such as why a job failed, especially if the system crashes or if there is a need to restart or roll back operations.

Above function initiates a transaction that starts with a `BEGIN` command, selects and locks rows labeled ‘active’ for update, changes their status to ‘processing_active’, and then commits the transaction to secure the rows from concurrent access and finalize the updates, as detailed in the database logs.

postgres-1  | 2024-04-23 17:39:20.004 GMT [232] LOG:  duration: 0.116 ms  bind S_2:
postgres-1 | 2024-04-23 17:39:41.885 GMT [232] LOG: duration: 0.203 ms bind S_3: BEGIN
postgres-1 | 2024-04-23 17:39:41.885 GMT [232] LOG: execute S_3: BEGIN
postgres-1 | 2024-04-23 17:39:41.885 GMT [232] LOG: duration: 0.114 ms
postgres-1 | 2024-04-23 17:39:41.888 GMT [232] LOG: duration: 2.471 ms bind S_4: SELECT * FROM archive_jobs WHERE status = $1 FOR UPDATE SKIP LOCKED LIMIT $2
postgres-1 | 2024-04-23 17:39:41.888 GMT [232] DETAIL: parameters: $1 = 'active', $2 = '5'
postgres-1 | 2024-04-23 17:39:41.888 GMT [232] LOG: execute S_4: SELECT * FROM archive_jobs WHERE status = $1 FOR UPDATE SKIP LOCKED LIMIT $2
postgres-1 | 2024-04-23 17:39:41.888 GMT [232] DETAIL: parameters: $1 = 'active', $2 = '5'
postgres-1 | 2024-04-23 17:39:41.891 GMT [232] LOG: duration: 2.760 ms
postgres-1 | 2024-04-23 17:40:29.613 GMT [232] LOG: duration: 1.616 ms parse <unnamed>: UPDATE archive_jobs SET status = $1 WHERE id = ANY($2)
postgres-1 | 2024-04-23 17:40:29.615 GMT [232] LOG: duration: 1.604 ms bind <unnamed>: UPDATE archive_jobs SET status = $1 WHERE id = ANY($2)
postgres-1 | 2024-04-23 17:40:29.615 GMT [232] DETAIL: parameters: $1 = 'processing_active', $2 = '{9556b65f-bea4-43b8-93d0-8ea05d86a6db}'
postgres-1 | 2024-04-23 17:40:29.615 GMT [232] LOG: execute <unnamed>: UPDATE archive_jobs SET status = $1 WHERE id = ANY($2)
postgres-1 | 2024-04-23 17:40:29.615 GMT [232] DETAIL: parameters: $1 = 'processing_active', $2 = '{9556b65f-bea4-43b8-93d0-8ea05d86a6db}'
postgres-1 | 2024-04-23 17:40:29.618 GMT [232] LOG: duration: 2.163 ms
postgres-1 | 2024-04-23 17:40:34.981 GMT [232] LOG: duration: 0.044 ms bind S_1: COMMIT
postgres-1 | 2024-04-23 17:40:34.981 GMT [232] LOG: execute S_1: COMMIT

Processing jobs in real time

Above approach focused quite a bit on fetching rows from database aka polling, but now let’s switch gears and look at a different approach using some of PostgreSQL’s real-time capabilities. Specifically, we’re going to use the NOTIFY and LISTEN functionality in PostgreSQL. Let’s dive into how this works:

Database Side: We start by defining a trigger function called notify_change(). This function springs into action on either an INSERT or an UPDATE operation. When activated, it utilizes pg_notify to send a notification to the jobs_notification channel. The payload includes the JSON representation of the newly inserted or updated row, providing all the details any listener might need to know.

create function notify_change() returns trigger
language plpgsql
as
$$
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
PERFORM pg_notify('jobs_notification', row_to_json(NEW)::text);
END IF;
RETURN NEW;
END;
$$;

alter function notify_change() owner to "user";

Service Side in Java: The first step involves establishing a connection to the PostgreSQL database. Once connected, the service sends a LISTEN jobs_notification command to tune into the channel set up in the database. This step is crucial as it’s how the service starts picking up updates.

@PostConstruct
public void init() {

ObjectMapper om = new ObjectMapper();

// Connect and listen to notifications
connection.connect().thenAccept(conn -> {
System.out.println("Connected to PostgreSQL database!");
conn.sendQuery("LISTEN jobs_notification").thenAccept(queryResult -> {
System.out.println("Listening on channel 'jobs_notification'");
}
);

// Handle notifications
conn.registerNotifyListener( notification -> {
System.out.println("**************************************************");
System.out.println("Received notification on channel " + notification.getChannel() + ": " + notification.getPayload());
System.out.println("**************************************************");
try {
var job = om.readValue(notification.getPayload(), Job.class);
Job lockedJob = jobsRepository.fetchAndLockJobForProcessing(job.id());
jobService.processJob(lockedJob);

} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return null;
});
}).exceptionally(throwable -> {
System.err.println("Connection failed: " + throwable.getMessage());
return null;
});
}

One point to keep in mind is that these notifications are sent to all workers, which means you’ll have to manage concurrency again. My approach to addressing this involves only extracting the “Id” of the job from the notification, then using the repository to retrieve the job row afresh with a “select… for update” statement. This method ensures that any operations on the job data are thread-safe and correctly synchronized.

Here’s how the fetchAndLockJobForProcessing method is implemented:.

public Job fetchAndLockJobForProcessing(UUID id) {
try {
List<Job> jobs = jdbcTemplate.query(
"SELECT * FROM archive_jobs WHERE id = ? FOR UPDATE SKIP LOCKED",
(rs, rowNum) -> new Job(rs.getObject("id", UUID.class), rs.getString("description"), rs.getString("status")), id);

if (jobs.isEmpty()) {
return null; // No jobs available that match the ID, or they are locked
}
return jobs.get(0); // Return the first job since ID should be unique
} catch (Exception e) {
System.err.println("Error fetching job by ID: " + e.getMessage());
return null; // or handle more appropriately as per your error handling policy
}
}

This code ensures that each worker processes jobs without interference from others by locking the job row at the database level, which prevents multiple workers from processing the same job simultaneously.

Making table fast with partial index.

Tables containing job records can become quite large. Although disk space is relatively inexpensive nowadays, searching for jobs in a specific “state” can significantly impact performance. Fortunately, this issue can be effectively addressed using partial indexes.

Usually, indexes are done on Columns, however, with partial index you also also specify the rows that are indexed. A partial index is useful for commonly used where conditions that use constant values — like the status of job.

Queries like this are very common in queueing systems. The messages that were already processed are rarely needed. If they are needed, they are usually accessed by a more specific criteria like the primary key.

Syntax for creating partial index is suprisingly simple: a where clause.

CREATE INDEX idx_job_queue_archive_pending ON archive_jobs(status) 
WHERE status = 'active';

The index only contains the rows that satisfy the where clause. Index therefore, is very small and as a result very fast.

Making service resilient with (one time) polling

Event listeners are great for real-time updates, but there’s a potential issue if all the workers go down simultaneously. During such downtimes, any events triggered in the database could be lost. To mitigate this, we should also incorporate polling into our system. One approach is to limit polling to instances when the service restarts, allowing it to clear any backlog before resuming normal operations. Alternatively, we could implement more frequent polling but space these intervals out significantly, ensuring a balance between performance and data integrity.

Taking it a step further: Enforcing Valid State Transitions

Our current design guidelines enable the creation of a robust queuing system with PostgreSQL. However, you can add even more safeguards to enhance system reliability. A common issue I’ve encountered involves state and state transitions; if not carefully designed, these can lead to significant challenges and require meticulous management. One of my preferred solutions is to make illegal state transitions impossible.

For instance, under the current system, any worker might mistakenly change the state from A to B — even if it’s not a valid transition. Imagine a scenario where, due to a bug or incorrect implementation, a job moves from “completed” back to “active” without any system checks to prevent this incorrect transition.

To address this, we can introduce a mechanism using a state transition table. Let’s create a new table called “job_transitions,” which will contain just two columns: prev_job_status and cur_job_status. Here's how the schema would look:

-- Create the job_transitions table with primary key
CREATE TABLE job_transitions (
prev_job_status VARCHAR(255) NOT NULL,
curr_job_status VARCHAR(255) NOT NULL,
PRIMARY KEY (prev_job_status, curr_job_status)
);

The primary key for the table is composed of both columns, ensuring that each combination of previous and current job statuses is unique within the table.

And then we can insert all valid transition into this table.

-- Insert job transition states into the job_transitions table
INSERT INTO job_transitions (prev_job_status, curr_job_status)
VALUES ('archive_pending', 'archive_pending');
INSERT INTO job_transitions (prev_job_status, curr_job_status)
VALUES ('archive_pending', 'archive_processing');
INSERT INTO job_transitions (prev_job_status, curr_job_status)
VALUES ('archive_processing', 'archive_completed');
INSERT INTO job_transitions (prev_job_status, curr_job_status)
VALUES ('archive_completed', 'job_ready_for_deletion');

However for this to work, we now need to change our original table for jobs. I will now create a different table, let’s call this job_queue.

CREATE TABLE public.job_queue
(
prev_job_status VARCHAR(255) NOT NULL DEFAULT 'archive_pending',
curr_job_status VARCHAR(255) NOT NULL DEFAULT 'archive_pending',
job_status_time TIMESTAMP NOT NULL DEFAULT now(),
id UUID NOT NULL UNIQUE,
metadata JSON NOT NULL,
lease_expire TIMESTAMP,
CONSTRAINT fk_job_queue_job_transitions
FOREIGN KEY (prev_job_status, curr_job_status)
REFERENCES public.job_transitions(prev_job_status, curr_job_status)
ON DELETE CASCADE
);

In the job_queue table, where job instances and their current states are tracked, we implement a foreign key constraint (fk_job_queue_job_transitions) that links the prev_job_status and curr_job_status columns to the corresponding columns in the job_transitions table:

FOREIGN KEY (prev_job_status, curr_job_status) 
REFERENCES public.job_transitions(prev_job_status, curr_job_status)

This constraint serves several crucial roles:

  1. Validation of State Transitions: Every time a new job or a state change for an existing job is entered into the job_queue table, the database checks to ensure that the combination of prev_job_status and curr_job_status exists in the job_transitions table. If the combination doesn't exist, it means the transition isn't allowed based on the predefined rules, and the database will reject the insert or update operation.
  2. Preventing Illegal Changes: Since the job_queue is dependent on the job_transitions table for state changes, there's no way to bypass the rules defined in job_transitions. This enforced compliance prevents any application or user error from introducing states that could disrupt process flows.

Dealing with Stuck Jobs

It’s a common challenge that some jobs may become stuck (or fail) while in the “processing” state. Typically, our system is designed to select jobs that are either “active” or “ready,” which means these stalled jobs remain indefinitely in a processing state. To address this issue, we can introduce a “lease_expire” column in our “job_queue” table. The concept behind this column is to set a timeout for jobs when their state is transitioned from “active” to “processing.”

When this state transition occurs, the lease_expire column is set to the current time plus 15 minutes—this window is the allotted time for a job to complete. If a job remains in the "processing" state after the lease period has expired, it can then be picked up by another process or handled according to other defined logic.

This functionality is implemented through a PostgreSQL function tied to a trigger:

create function handle_job_update() returns trigger
language plpgsql
as
$$
BEGIN
-- Check if the current job status has changed
IF NEW.curr_job_status IS DISTINCT FROM OLD.curr_job_status THEN
-- Update the original table
UPDATE job_queue
SET prev_job_status = OLD.curr_job_status,
curr_job_status = NEW.curr_job_status,
metadata = NEW.metadata,
lease_expire = CASE
WHEN NEW.curr_job_status = 'archive_processing' THEN NOW() + INTERVAL '15 MINUTES'
ELSE lease_expire
END
WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$;

alter function handle_job_update() owner to "user";

and attaching that function to a trigger.

create trigger job_queue
instead of update
on job_queue
for each row
execute procedure handle_job_update();

This approach not only helps in managing jobs that are stuck in processing but also ensures that no job is left behind, maintaining the efficiency and reliability of the job processing system.

Simplifying table design with Updatable view

Our “job_queue” table has evolved into quite a sophisticated setup, packed with several fields and constraints that are crucial for managing job processes effectively. From a business perspective, however, we primarily care about just a few aspects: the current status of the job and its associated metadata. Here’s how we’ve laid out the columns and constraints in our complex table:

+------------------+------------------+-----------------------+
| Column Name | Data Type | Default/Constraints |
+------------------+------------------+-----------------------+
| prev_job_status | VARCHAR(255) | 'archive_pending' |
| curr_job_status | VARCHAR(255) | 'archive_pending' |
| job_status_time | TIMESTAMP | CURRENT_TIMESTAMP |
| id | UUID | Not Null, Unique |
| metadata | JSON | Not Null |
| lease_expire | TIMESTAMP | |
+------------------+------------------+-----------------------+
| Constraints |
+------------------------------------------------------------+
| FOREIGN KEY (prev_job_status, curr_job_status) |
| REFERENCES job_transitions(prev_job_status, curr_job_status)|
| ON DELETE CASCADE |
+------------------------------------------------------------+

To streamline interactions and focus on the essential data, it might be beneficial to abstract away the underlying complexities using an updatable view. We can achieve this by creating a simplified view called job_queue_view, which will provide a cleaner and more focused interface for application interactions:

+------------------+-------------+
| Column Name | Data Type |
+------------------+-------------+
| id | UUID |
| curr_job_status | VARCHAR(255)|
| metadata | JSON |
+------------------+-------------+

This view simplifies the user’s perspective of the data while still maintaining the ability to interact with the essential details. However, to ensure that any updates made through this view are reflected in the original job_queue table, we need to modify the existing trigger so that it operates based on this view instead of directly on the table. The function and trigger would then look like this:

create function handle_job_update() returns trigger
language plpgsql
as
$$
BEGIN
-- Check if the current job status has changed
IF NEW.curr_job_status IS DISTINCT FROM OLD.curr_job_status THEN
-- Update the original table
UPDATE job_queue
SET prev_job_status = OLD.curr_job_status,
curr_job_status = NEW.curr_job_status,
metadata = NEW.metadata,
lease_expire = CASE
WHEN NEW.curr_job_status = 'archive_processing' THEN NOW() + INTERVAL '15 MINUTES'
ELSE lease_expire
END
WHERE id = NEW.id;
END IF;
RETURN NEW;
END;
$$;

alter function handle_job_update() owner to "user";

create trigger job_queue_view
instead of update
on job_queue_view
for each row
execute procedure handle_job_update();

By directing the updates through the view, we maintain a clean and simple interface for applications while preserving the integrity and functionality of our underlying data structure. This approach helps in managing complexity and ensures that our system is both efficient and user-friendly.

And we can update our code to use the view:

package io.knav.pgjobqueue.advancedscheduler.repositories;


import io.knav.pgjobqueue.advancedscheduler.entities.JobNg;
import io.knav.pgjobqueue.advancedscheduler.entities.Metadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.UUID;

@Service
public class AdvancedJobRepository {

private final JdbcTemplate jdbcTemplate;

@Autowired
public AdvancedJobRepository(JdbcTemplate jdbcTemplate) {

this.jdbcTemplate = jdbcTemplate;
}

public void addJob(JobNg job) {
jdbcTemplate.update("INSERT INTO job_queue_view (id, metadata, curr_job_status) VALUES (?,?::json,?)", job.id(), job.metadata().asJson(), "archive_pending");
}

public JobNg fetchAndLockJobForProcessing(UUID id) {
try {

List<JobNg> jobs = jdbcTemplate.query(
"SELECT * FROM job_queue_view WHERE id = ? FOR UPDATE SKIP LOCKED",
new Object[]{id},
(rs, rowNum) -> {
UUID jobId = rs.getObject("id", UUID.class);
Metadata metadata = Metadata.fromJson(rs.getString("metadata"));
String jobStatus = rs.getString("curr_job_status");
return new JobNg(jobId, metadata, jobStatus);
});

return jobs.isEmpty() ? null : jobs.get(0);
} catch (DataAccessException dae) {
// Handle data access exceptions (such as SQL errors)
System.err.println("Data Access Error fetching job by ID: " + dae.getMessage());
} catch (Exception e) {
// Handle other exceptions (such as JSON parsing errors)
System.err.println("Error fetching job by ID: " + e.getMessage());
}
return null;
}

@Transactional
public List<JobNg> fetchJobs() {
// Lock and select jobs
List<JobNg> jobs = jdbcTemplate.query(
"SELECT id, metadata, curr_job_status FROM job_queue_view WHERE curr_job_status = ? FOR UPDATE SKIP LOCKED LIMIT ?",
(rs, rowNum) -> {
UUID id = rs.getObject("id", UUID.class);
try {
String metadataJson = rs.getString("metadata");
var metadata = Metadata.fromJson(metadataJson);
String currentJobStatus = rs.getString("curr_job_status");
return new JobNg(id, metadata, currentJobStatus);
} catch (Exception e) {
// Log or handle the exception as needed
throw new RuntimeException("Error reading metadata for job: " + id, e);
}
},
"archive_pending", 5
);

// Get IDs from the fetched jobs
List<UUID> jobIds = jobs.stream().map(JobNg::id).toList();

// Update statuses if jobs are found
if (!jobIds.isEmpty()) {
jdbcTemplate.update(
"UPDATE job_queue_view SET curr_job_status = ? WHERE id = ANY(?)",
"archive_processing", jobIds.toArray(UUID[]::new)
);
}

return jobs;
}


public void UpdateJobStatus(UUID id, String status) {
jdbcTemplate.update(
"UPDATE job_queue_view SET curr_job_status = ? WHERE id = ?",
status, id);
}


}

Here is the full implementation. Enjoy !

References:

  1. Thinking in sets (Joe celko).
  2. Part of implementation is based on this hackernews thread.

--

--