Efficient background task processing is essential for modern web applications. By leveraging PostgreSQL as a queue backend and the pg-boss library, developers can implement a reliable job queue system directly within their Node.js applications. This approach centralizes queue management in the database, offering durability and simplicity without the need for a separate queuing service.
Overview of pg-boss
pg-boss is a Node.js library designed to provide background job processing by utilizing PostgreSQL as its storage engine. It offers a straightforward API, making it easy to integrate into existing Node.js projects. With features like retry logic, dead letter queues, and custom schemas, pg-boss is well-suited for applications that require reliable asynchronous task execution.
Setting Up the Environment
To begin, pg-boss and its dependencies must be installed in the Node.js project. The following commands add the necessary packages:
bashnpm install pg pg-boss
npm install -D @types/pg
A PostgreSQL database instance is required. Developers can quickly spin up a local instance using Docker:
bashdocker compose up -d
This ensures the database is ready for queue operations.
Establishing the Database Connection
A dedicated file, such as src/pgBoss.ts
, can be used to configure and initialize the pg-boss instance. The connection parameters are typically sourced from environment variables for flexibility and security.
javascriptimport PgBoss from 'pg-boss';
const pgBossInstance = new PgBoss({
host: process.env.POSTGRES_HOST,
port: Number(process.env.POSTGRES_PORT),
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
});
pgBossInstance.on('error', console.error);
await pgBossInstance.start();
export default pgBossInstance;
This setup ensures that pg-boss is connected and ready to manage queues.
Creating and Managing Queues
To define a queue, a constant can be declared in a shared file, such as src/common.ts
:
typescriptexport const QUEUE_NAME = 'user-creation-queue';
The queue is then created using the createQueue
method, specifying options like retry limits:
typescriptimport pgBossInstance from "./pgBoss";
import { QUEUE_NAME, UserCreatedTask } from "./common";
await pgBossInstance.createQueue(QUEUE_NAME, {
name: QUEUE_NAME,
retryLimit: 2
});
export function enqueueJob(job: UserCreatedTask) {
return pgBossInstance.send(QUEUE_NAME, job);
}
This configuration ensures that failed jobs are retried up to two times before being discarded.
Enqueuing Jobs
Jobs can be added to the queue from the main application logic. For example, in src/index.ts
, the enqueue function is called, and the result is logged:
typescriptimport logger from 'logger';
import { enqueueJob } from 'queue';
const idTask = await enqueueJob(task);
logger.info(task, `Task with id ${idTask} has been pushed`);
This process allows the application to offload tasks for background processing efficiently.
Processing Jobs with Workers
A worker script, such as src/worker.js
, is responsible for consuming and processing jobs from the queue. The worker listens for new jobs and handles them according to the business logic:
typescriptimport { setTimeout } from "timers/promises";
import { QUEUE_NAME, UserCreatedTask } from "./common";
import pgBossInstance from "./pgBoss";
pgBossInstance.work<UserCreatedTask>(QUEUE_NAME, async ([job]) => {
if (!messagesHandled[job.id]) {
messagesHandled[job.id] = { retries: 0, result: 'created' };
} else {
messagesHandled[job.id].retries++;
}
const resultType = Math.random() > 0.6;
const fakeImplementation = resultType ? 'success' : 'error';
const timeout = fakeImplementation === 'success' ? 2000 : 1000;
await setTimeout(timeout);
if (fakeImplementation === 'error') {
messagesHandled[job.id].result = 'error';
printMessagesHandled();
throw new Error(`User created task got error with id: ${job.id}`);
} else {
messagesHandled[job.id].result = 'success';
printMessagesHandled();
}
});
This example simulates job processing, randomly determining success or failure and handling retries as configured.
Testing the Queue System
Running the application demonstrates the queue in action. Jobs are enqueued, processed, and their outcomes are logged, including the number of retries for each job. This provides clear visibility into the queue’s operation and reliability.
Conclusion
Integrating pg-boss with PostgreSQL offers a streamlined solution for background job processing in Node.js applications. The setup is straightforward, the API is intuitive, and the system is robust enough for many use cases. However, developers should monitor database load to ensure optimal performance, especially as the number of queued tasks grows. For scenarios requiring higher throughput or more advanced features, exploring alternative queue systems may be beneficial.
Read more such articles from our Newsletter here.