common-mq

0.4.0 • Public • Published

common-mq

common-mq is an abstracted message queue client package for Node.js with a clean, common API for working with various message queue providers.

Build Status Build status Coverage Status

Provider Support

common-mq supports the following message queue providers.

If you want to see more queue providers supported, add a comment to the queue provider tracking issue. Pull requests are always welcome, also!

Installation

npm install common-mq

Usage Example

var mq = require('common-mq');
 
var queue = mq.connect('sqs://todos', {
 
  // delete the message after emitting 'message' event
  deleteAfterReceive: true,
 
  // SQS queue specific attributes
  attributes: {
    VisibilityTimeout: '60',
    ReceiveMessageWaitTimeSeconds: '20'
  },
 
  awsConfig: { region: 'us-east-1' }
 
});
 
queue.on('ready', function() {
  setTimeout(function() { queue.publish({ task: 'take out trash' }); }, 1000);
  setTimeout(function() { queue.publish({ task: 'wash dishes' }); }, 5000);
  setTimeout(function() { queue.publish({ task: 'sweep floor' }); }, 6000);
});
 
queue.on('error', function(err) {
  console.log(err);
});
 
queue.on('message', function(message) {
  console.log(message);
});

Other examples can be found in the examples/ directory. Clone the repository, then run with:

npm run example
npm run example:zmq
npm run example:sqs # Edit examples/sqs.simple.js first to include your AWS config 
npm run example:amqp # Assumes AMQP server (like RabbitMQ) is running on localhost port 5672 

Documentation

Connecting to a Queue

.connect(url [, options])

Connects to a queue using a URL scheme (i.e, provider://hostname/queue). The optional options argument passes provider specific configuration (see Providers section below).

Returns a reference to a queue.

// With simple url scheme
var queue = require('common-mq').connect('zmq://localhost:5555/todos');
 
// With additional options
var queue = require('common-mq').connect('sqs://todos', {
  deleteAfterReceive: true,
  attributes: { VisibilityTimeout: '120' }
});

.connect(options)

Connects to a queue using a detailed config. See Providers section below for required and optional properties for each provider.

Returns a reference to a queue.

var queue = require('common-mq').connect({
  provider: 'amqp',
  hostname: 'localhost',
  port: 5672,
  queueName: 'todos',
  exchangeName: 'MyTodoExchange',
  ssl: { enabled: true }
});

Class: Queue

Event: 'ready'

Emitted when the queue is ready to publish or start receiving messages.

queue.on('ready', function() {
  console.log('Queue is ready to start publishing or receiving messages.');
});

Event: 'message'

Attaching the first listener to the 'message' event will automatically subscribe to the queue. When a message is received from the queue, the 'message' event is emitted to all listeners.

Arguments:

  • message string|object|Buffer
  • messageId string
queue.on('message', function(message) {
  console.log('Message received!', message);
});

Event: 'error'

Emitted if an error occurs while communicating with the queue or receiving messages.

Arguments:

  • error Error
queue.on('error', function(error) {
  console.log('Oops!', error.toString());
});

.publish(message [, options])

Publishes a message to the queue. message can be a JavaScript object, a string, or a Buffer.

The optional options argument passes provider-specific options to the related publishing functionality of the queue provider used.

queue.publish('This is a string message');
queue.publish({ foo: 'bar', baz: 3 });
queue.publish(new Buffer([1, 2, 3, 4, 5, 6, 7, 8]));
 
// Example with SQS publish options
queue.publish('My message', { 'MessageDeduplicationId': 'abcdef123456' });

.ack(messageId)

If the queue does not support automatically removing messages (once received by a subscriber), or this option is turned off in the options, you will need to call the the .ack() method once you have successfully received the message. The underlying queue provider implementation will handle deleting or acknowledging the message, depending how that particular queue provider handles this concept. Not all providers (e.g. ZeroMQ) support this and this will behave as a noop.

The messageId will be the same as passed by the 'message' event callback.

queue.on('message', function(message, messageId) {
  console.log('Message received!', message);
  queue.ack(messageId);
});

.close()

Some queues need to be explicitly closed to clean up the connections. Use this method to disconnect from the queue. If not needed by the queue provider, it will behave as a noop.

queue.close();

Providers

AMQP

Connection URL Syntax: amqp://<hostname>:<port>/<queueName>

Required options:

  • provider string (when not using connection URL)
  • hostname string (when not using connection URL)
  • port number (when not using connection URL)
  • queueName string (when not using connection URL)
  • exchangeName string

The options object is passed on as options to the amqp-node createConnection method. Any of the connection options can be included in the common-mq options object.

Amazon SQS

Connection URL Syntax: sqs://<queueName>

Required options:

  • provider string (when not using connection URL)
  • queueName string (when not using connection URL)
  • awsConfig object|string AWS config options to pass to the update method, otherwise a path to AWS config file to pass to the loadFromPath method - see AWS Developer Guide for more information

Other options:

  • attributes object (default: not set) Corresponds to the Attributes object on the SQS createQeuue method params
  • deleteAfterReceive boolean (default: false) When true, deletes the message immediately after receiving
  • maxReceiveCount number (default: 1) Max number of messages to receive at once
  • visibilityTimeout number (default: set per queue attributes) Number of seconds to keep messages hidden after receive
  • waitTimeSeconds number (default: set per queue attributes) Number of seconds to wait for new messages before ending each poll request

ZeroMQ

Connection URL Syntax: zmq://<hostname>:<port>/<queueName>

Required options:

  • provider string (when not using connection URL)
  • hostname string (when not using connection URL)
  • port number (when not using connection URL)
  • queueName string (when not using connection URL)

License

MIT

Readme

Keywords

none

Package Sidebar

Install

npm i common-mq

Weekly Downloads

3

Version

0.4.0

License

MIT

Last publish

Collaborators

  • lloydcotten