MongoMQ2 is a light-weight Node.js library that turns MongoDB collections into general-purpose message queues or event logs, without additional deployments or infrastructure.
At an expense of throughput compared to specialized message queues and brokers like SQS, SNS, RabbitMQ or Kafka, you get:
There's more:
MongoMQ2 can be an effective and flexible building block for message- and event-driven architectures, especially if you're already on MongoDB and don't want to introduce additional system components to deploy and operate.
npm install mongomq2 mongodb
import { MongoClient, ObjectId } from 'mongodb';
import { MessageQueue } from 'mongomq2';
const mongoClient = new MongoClient('mongodb://localhost:27017');
type MyMessage = InputMessage | OutputMessage;
interface InputMessage {
_id?: ObjectId;
type: 'input';
data: string;
}
interface OutputMessage {
_id?: ObjectId;
type: 'output';
result: string;
}
// create MessageQueue
const messageCollection = mongoClient.db().collection<MyMessage>('messages');
const messageQueue = new MessageQueue(messageCollection);
// Consume "input" messages (including past ones)
// Publish one "output" message per "input" message
messageQueue.consume<InputMessage>(
// consumer callback to be executed at least once per message
async (message) => {
console.log(`Processing ${message.data}...`);
await messageQueue.publish({ type: 'output', result: message.data + '!' });
},
{
group: 'handleInput', // group identifier, unique per consumer callback
filter: { type: 'input' }, // only consume messages of type "input"
},
);
// Subscribe to (future) "output" messages
messageQueue.subscribe<OutputMessage>(
(message) => {
console.log(`Processing done: ${message.result}`);
},
{ filter: { type: 'output' } },
);
// Publish some messages
await messageQueue.publish({ type: 'input', data: 'hello' });
await messageQueue.publish({ type: 'input', data: 'world' });
// > Processing xxx... (processed exactly once)
// > Processing done: xxx! (per active subscriber)
(See API documentation for a detailed reference of all configuration and functionalities.)
const messageCollection = mongoClient.db().collection<MyMessage>('messages');
const messageQueue = new MessageQueue(messageCollection);
await messageQueue.publish({ type: 'input' });
Useful for:
Can be used inside transactions by passing a session (same as MongoDB insertOne
).
messageQueue.publishBatched({ type: 'input' });
majority
write concern, retries)bestEffort: false
(no write concern, no retries)Useful for:
messageQueue.consume(
(message) => {
// handle message
},
{
// consumer group identifier, defaults to collection name
group: 'myConsumerGroup',
filter: {
// optional filter
},
},
);
messageQueue.on('deadLetter', (err, message, group) => {
// handle dead letter, i.e. message that failed repeatedly and exhausted maxRetries
});
group
, each matching message is consumed by at most one consumer.group
.
group
property stable per consumer callback.group
).Useful for:
messageQueue.subscribe(
(message) => {
// handle message
},
{
filter: {
// optional local filter applied in memory
},
},
);
MessageQueue
instance creates one MongoDB change stream.
MessageQueue
instance,.subscribe(...)
calls with local filters.Useful for:
EventEmitters
..on('error', (err, message?, group?) => /* report error */)
to monitor errors..close()
MongoMQ2 clients on shutdown (before closing the MongoClient).
docker-compose.yml
._id
index which always exists (no other indexes required)_c
field per message document (no other metadata is generated)For common workloads (message size ~1 KB, produced and consumed in the same time frame), MongoMQ2 should be able to handle hundreds of messages per second in most environments; plenty for a variety of use cases.
As discussed earlier, MongoMQ2's trade-offs are
Your mileage may vary.
Generally, MongoMQ2 is bound by the performance and latency of the underlying MongoDB.
Publishing/producing messages in MongoMQ2 is bound by insertion time
on the message collection. Insertion time depends on message size
and number of indexes on the message collection.
As stated above, the simplest use cases only need the _id
index.
Consumers are bound by MongoDB findOneAndUpdate
performance, which will
usually perform an index scan (IXSCAN
) on the _id
index. This scan is mainly
bound by the number of messages currently being consumed, as consumers are
able to seek efficiently based on _id
via time-based ordering.
Additionally, findOneAndUpdate
performs some locking internally,
which may degrade for large numbers of concurrent producers/consumers.
See test/benchmarks
for a benchmark suite
(as of yet, severely lacking - PRs welcome!).