Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Currently, it supports the following transports:
<p>
👉 <strong>Memory:</strong> A simple in-memory transport for testing purposes.<br />
👉 <strong>Redis:</strong> A Redis transport for production usage.<br />
👉 <strong>Mqtt:</strong> A Mqtt transport for production usage.
👉 <strong>Mqtt:</strong> A Mqtt transport for production usage.<br />
👉 <strong>Postgres:</strong> A PostgreSQL transport using NOTIFY/LISTEN for production usage.
</p>

## Table of Contents
Expand Down Expand Up @@ -49,6 +50,7 @@ The module exposes a manager that can be used to register buses.
import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
import { mqtt } from '@boringnode/bus/transports/mqtt'
import { postgres } from '@boringnode/bus/transports/postgres'
import { memory } from '@boringnode/bus/transports/memory'

const manager = new BusManager({
Expand All @@ -69,7 +71,16 @@ const manager = new BusManager({
port: 1883,
}),
},
}
postgres: {
transport: postgres({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password',
}),
},
},
})
```

Expand All @@ -88,6 +99,7 @@ By default, the bus will use the `default` transport. You can specify different
```typescript
manager.use('redis').publish('channel', 'Hello world')
manager.use('mqtt').publish('channel', 'Hello world')
manager.use('postgres').publish('channel', 'Hello world')
```

### Without the manager
Expand All @@ -105,8 +117,8 @@ const transport = new RedisTransport({

const bus = new Bus(transport, {
retryQueue: {
retryInterval: '100ms'
}
retryInterval: '100ms',
},
})
```

Expand All @@ -126,10 +138,10 @@ const manager = new BusManager({
port: 6379,
}),
retryQueue: {
retryInterval: '100ms'
}
retryInterval: '100ms',
},
},
}
},
})

manager.use('redis').publish('channel', 'Hello World')
Expand All @@ -143,13 +155,13 @@ You have multiple options to configure the retry queue.
export interface RetryQueueOptions {
// Enable the retry queue (default: true)
enabled?: boolean

// Defines if we allow duplicates messages in the retry queue (default: true)
removeDuplicates?: boolean

// The maximum size of the retry queue (default: null)
maxSize?: number | null

// The interval between each retry (default: false)
retryInterval?: Duration | false
}
Expand All @@ -169,7 +181,7 @@ const buggyTransport = new ChaosTransport(new MemoryTransport())
const bus = new Bus(buggyTransport)

/**
* Now, every time you will try to publish a message, the transport
* Now, every time you will try to publish a message, the transport
* will throw an error.
*/
buggyTransport.alwaysThrow()
Expand Down
9 changes: 8 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
"@japa/runner": "^5.0.0",
"@swc/core": "^1.15.8",
"@testcontainers/hivemq": "^11.11.0",
"@testcontainers/postgresql": "^11.11.0",
"@testcontainers/redis": "^11.11.0",
"@types/node": "^20.17.19",
"@types/object-hash": "^3.0.6",
"@types/pg": "^8.11.10",
"c8": "^10.1.3",
"del-cli": "^7.0.0",
"eslint": "^9.39.2",
"ioredis": "^5.9.0",
"mqtt": "^5.14.1",
"pg": "^8.18.0",
"prettier": "^3.7.4",
"release-it": "^19.2.3",
"testcontainers": "^11.11.0",
Expand All @@ -58,11 +61,15 @@
"object-hash": "^3.0.0"
},
"peerDependencies": {
"ioredis": "^5.0.0"
"ioredis": "^5.0.0",
"pg": "^8.0.0"
},
"peerDependenciesMeta": {
"ioredis": {
"optional": true
},
"pg": {
"optional": true
}
},
"author": "Romain Lanz <romain.lanz@pm.me>",
Expand Down
230 changes: 230 additions & 0 deletions src/transports/postgres.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/**
* @boringnode/bus
*
* @license MIT
* @copyright BoringNode
*/

import { Client } from 'pg'
import { assert } from '@poppinss/utils/assert'

import debug from '../debug.js'
import { JsonEncoder } from '../encoders/json_encoder.js'
import type {
Transport,
TransportEncoder,
TransportMessage,
Serializable,
SubscribeHandler,
PostgresTransportConfig,
} from '../types/main.js'

export function postgres(config: PostgresTransportConfig, encoder?: TransportEncoder) {
return () => new PostgresTransport(config, encoder)
}

export class PostgresTransport implements Transport {
readonly #publisher: Client
#subscriber: Client
readonly #encoder: TransportEncoder
readonly #channelHandlers: Map<string, SubscribeHandler<any>> = new Map()
#publisherConnected: boolean = false
#subscriberConnected: boolean = false
#gracefulDisconnect: boolean = false
#config: PostgresTransportConfig
#reconnectCallback: (() => void) | undefined

#id: string | undefined

constructor(config: PostgresTransportConfig, encoder?: TransportEncoder)
constructor(config: string, encoder?: TransportEncoder)
constructor(options: PostgresTransportConfig | string, encoder?: TransportEncoder) {
this.#encoder = encoder ?? new JsonEncoder()

if (typeof options === 'string') {
this.#config = { connectionString: options }
} else {
this.#config = options
}

this.#publisher = new Client(this.#config)
this.#subscriber = new Client(this.#config)
}

setId(id: string): Transport {
this.#id = id

return this
}

async #ensureConnected(): Promise<void> {
if (!this.#publisherConnected) {
await this.#publisher.connect()
this.#publisherConnected = true
}
if (!this.#subscriberConnected) {
await this.#subscriber.connect()
this.#subscriberConnected = true
}
}

async disconnect(): Promise<void> {
this.#gracefulDisconnect = true
this.#publisherConnected = false
this.#subscriberConnected = false

const promises: Promise<void>[] = []

try {
promises.push(this.#publisher.end())
} catch (err) {
// Ignore errors during disconnect
}

try {
promises.push(this.#subscriber.end())
} catch (err) {
// Ignore errors during disconnect
}

await Promise.allSettled(promises)
}

async publish(channel: string, message: Serializable): Promise<void> {
assert(this.#id, 'You must set an id before publishing a message')

await this.#ensureConnected()

const encoded = this.#encoder.encode({ payload: message, busId: this.#id })
const payloadString = typeof encoded === 'string' ? encoded : encoded.toString('base64')

// Use pg's built-in escaping methods to safely escape the identifiers and literals
const escapedChannel = this.#publisher.escapeIdentifier(channel)
const escapedPayload = this.#publisher.escapeLiteral(payloadString)

// Use NOTIFY to send the message
await this.#publisher.query(`NOTIFY ${escapedChannel}, ${escapedPayload}`)
}

async subscribe<T extends Serializable>(
channel: string,
handler: SubscribeHandler<T>
): Promise<void> {
await this.#ensureConnected()

// Store the handler for this channel
this.#channelHandlers.set(channel, handler)

this.#ensureNotificationListener()

// Subscribe to the channel using LISTEN
const escapedChannel = this.#subscriber.escapeIdentifier(channel)
await this.#subscriber.query(`LISTEN ${escapedChannel}`)
}

#ensureNotificationListener() {
// Set up the notification listener if not already set
if (this.#subscriber.listenerCount('notification') > 0) {
return
}

this.#subscriber.on('notification', (msg) => {
if (msg.channel) {
const channelHandler = this.#channelHandlers.get(msg.channel)
if (channelHandler && msg.payload) {
debug('received message for channel "%s"', msg.channel)

try {
const data = this.#encoder.decode<TransportMessage<any>>(msg.payload)

/**
* Ignore messages published by this bus instance
*/
if (data.busId === this.#id) {
debug('ignoring message published by the same bus instance')
return
}

channelHandler(data.payload)
} catch (error) {
debug('error decoding message: %o', error)
}
}
}
})
}

onReconnect(callback: () => void): void {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll also want to track that you are in fact actually reconnecting, and in that case, block ensureConnected on that reconnection promise, otherwise it's possible that a temporary disconnect can trigger a double connection.

this.#reconnectCallback = callback
this.#setupReconnectionListener()
}

#setupReconnectionListener() {
this.#subscriber.on('error', (err) => {
debug('subscriber error: %o', err)
})

this.#subscriber.on('end', () => {
debug('subscriber connection ended')
this.#subscriberConnected = false

if (this.#gracefulDisconnect) {
return
}

this.#attemptReconnection()
})
}

#attemptReconnection(attempt = 0) {
const baseDelay = 1000
const maxDelay = 60000
// Exponential backoff with jitter
const delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay) + Math.random() * 1000

debug('attempting reconnection in %d ms (attempt %d)', delay, attempt)

setTimeout(() => {
if (this.#gracefulDisconnect) return

const newClient = new Client(this.#config)

newClient
.connect()
.then(() => {
Comment on lines 167 to 194

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll probably want to use some form of exponential backoff with randomised jitter to ensure safe reconnection during outages.

You probably also want to check here if you should reconnect, e.g., disconnect may emit an end event (I can't recall if it does)

this.#subscriber = newClient
this.#subscriberConnected = true
debug('reconnected to postgres')

this.#ensureNotificationListener()
this.#setupReconnectionListener()

if (this.#reconnectCallback) {
this.#reconnectCallback()
}

// Re-subscribe to all channels
const channels = Array.from(this.#channelHandlers.keys())
if (channels.length > 0) {
const query = channels
.map((channel) => `LISTEN ${this.#subscriber.escapeIdentifier(channel)}`)
.join('; ')

this.#subscriber.query(query).catch((err) => {
debug('error re-subscribing to channels: %o', err)
})
}
})
.catch((err) => {
debug('error reconnecting: %o', err)
this.#attemptReconnection(attempt + 1)
})
}, delay)
}

async unsubscribe(channel: string): Promise<void> {
this.#channelHandlers.delete(channel)
const escapedChannel = this.#subscriber.escapeIdentifier(channel)
await this.#subscriber.query(`UNLISTEN ${escapedChannel}`)
}
}
10 changes: 10 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import type { RedisOptions } from 'ioredis'
import type { IClientOptions } from 'mqtt'
import type { ClientConfig } from 'pg'

export type { Redis, Cluster } from 'ioredis'
export type { Client } from 'pg'
export type TransportFactory = () => Transport

/**
Expand Down Expand Up @@ -66,6 +68,14 @@ export interface MqttTransportConfig {
options?: IClientOptions
}

export interface PostgresTransportConfig extends ClientConfig {
/**
* Connection string for PostgreSQL. If provided, it will be used instead
* of the individual connection properties.
*/
connectionString?: string
}

export interface Transport {
setId: (id: string) => Transport
onReconnect: (callback: () => void) => void
Expand Down
Loading