amqp-coffee

AMQP driver for node

npm install amqp-coffee
2 downloads in the last day
10 downloads in the last week
18 downloads in the last month

amqp-coffee

Build Status Node.JS AMQP 0.9.1 Client

Sample

AMQP = require('amqp-coffee') # path to this

testData = "the data to be published..  I am a string but could be anything"

amqpConnection = new AMQP {host:'localhost'}, (e, r)->
  if e?
    console.error "Error", e

  amqpConnection.queue {queue: "queueName"}, (e,q)->
    q.declare ()->
      q.bind "amq.direct", "queueName", ()->

      amqpConnection.publish "amq.direct", "queueName", testData, {confirm: true}, (err, res)->
        console.log "Message published"

      consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
        console.log message.data.toString()
        message.ack()

      , (e,r)->
        console.log "Consumer setup"
        amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
          if !e? then console.log "Message Sent"

Methods

new amqp-coffee([connectionOptions],[callback])

Creates a new amqp Connection. The connection is returned directly and in the callback. The connection extends EventEmitter

The connectionOptions argument should be an object which specifies:

  • host: a string of the hostname OR an array of hostname strings OR an array of hostname objects {host, port}
  • port: a integer of the port to connect to. Not used if host is an object.
  • login: "guest"
  • password: "guest"
  • vhost: '/'
  • port: 5672
  • heartbeat: 10000 # in ms
  • reconnect: true
  • reconnectDelayTime: 1000 # in ms
  • hostRandom: false

Host Examples

host: 'localhost'
host: {host: 'localhost', port: 15672}
host: ['localhost','yourhost']
host: [{host: 'localhost', port:15672}, {host: 'localhost', port:15673}]

Sample Connection

amqp-coffee = require('amqp-coffee')

amqp = new amqp-coffee {host: 'localhost'}, (error, amqpConnection)->
   assert(amqp == amqpConnection)

Event: 'ready'

Emitted when the connection is open successfully. This will be called after each successful reconnect.

Event: 'close'

Emitted when a open connection leaves the ready state and is closed.

Event: 'error'

Very rare, only emitted when there's a server version mismatch

connection.queue([queueOptions],[callback])

When creating a queue class using connection.queue, you can specify options that will be used in all the child methods.

The queueOptions argument should be an object which specifies:

  • queue: a string repensenting the queue name
  • autoDelete: default: true
  • noWait: default: false
  • exclusive: default: false. The queue can only be used by the current connection.
  • durable: default: false
  • passive: default: false. The queue creation will not fail if the queue already exists.

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

queue.declare([queueOptions],[callback])

Will take a new set of queueOptions, or use the default. Issues a queueDeclare and waits on queueDeclareOk if a callback is specified.

amqp = new AMQP, ()->
  amqp.queue({queue:'queueToCreate'}, (err, Queue)->
    Queue.declare (err, res)->
      # the queue is now declared

queue.delete([queueDeleteOptions],[callback])

The queueDeleteOptions argument should be an object which specifies:

  • queue: name of the queue
  • ifUnused: default: false
  • ifEmpty: default: true
  • noWait: default: false

queue.bind(exchange, routingkey, [queueName], [callback])

Sets up bindings from an already existing exchange to an already existing queue

queue.unbind(exchange, routingKey, [queueName], [callback])

Tears down an already existing binding

queue.messageCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the messageCount from the response

queue.consumerCount(queueOptions, callback)

Rabbitmq specific, re-declares the queue and returns the consumerCount from the response

connection.exchange([exchangeArgs],[callback])

When creating an exchange class using connection.exchange, you can specify options that will be used in all the child methods.

The exchangeArgs argument should be an object which specifies:

  • exchange: a string representing the exchange name
  • type: "direct"
  • passive: false
  • durable: false
  • noWait: false
  • autoDelete: true
  • internal: false

Both queues and exchanges use "temporary" channels, which are channels amqp-coffee manages specifically for declaring, binding, unbinding, and deleting queues and exchanges. After 2 seconds of inactivity these channels are closed, and reopened on demand.

exchange.declare([exchangeArgs],[callback])

exchange.delete([exchangeDeleteOptions], [callback])

The exchangeDeleteOptions argument should be an object which specifies:

  • exchange: the name of the exchange
  • ifUnused: false
  • noWait: false

connection.publish(exchange, routingKey, data, [publishOptions], [callback])

amqp-coffee manages publisher channels and sets them up on the first publish. A separate channel is used for confirm channels, compared to non confirm channels. So you should have a maximum of 2 channels publishing for a single connection.

  • exchange: string of the exchange to publish to
  • routingKey: string to use to route the message
  • data: any type of data, if it is an object it will be converted into json automatically and unconverted on consume. Strings are converted into buffers.
  • publishOptions: All parameters are passed through as arguments to the publisher.
    • confirm: false
    • mandatory: false
    • immediate: false
    • contentType: 'application/octet-stream'

connection.consume(queueName, options, messageListener, [callback])

consumers use their own channels and are re-subscribed to on reconnect. Returns a consumer object.

  • queueName: string of the queue to subscribe to
  • options:
    • noLocal: false
    • noAck: true
    • exclusive: false
    • noWait: false
    • prefetchCount : integer. If specified the consumer will enter qos mode and you will have to ack messages. If specified noAck will be set to false
  • messageListener: a function (message)
  • callback: a function that is called once the consume is setup

messageListener is a function that gets a message object which has the following attributes:

  • data: the data in its parsed form, eg a parsed json object, or a buffer representing a string
  • raw: the raw buffer that was returned
  • ack : only used when prefetchCount is specified
  • reject : only used when prefetchCount is specified
  • retry : only used when prefetchCount is specified
listener = (message)->
  # we will only get 1 message at a time because prefetchCount is set to 1
  console.log "Message Data", message.data.toString()
  message.ack()

amqp = new AMQP ()->
  amqp.queue {queue: 'testing'}, (e, queue)->
    queue.declare ()->
      queue.bind 'amq.direct', 'testing', ()->
        amqp.publish 'amq.direct', 'testing', 'here is one message 1'
        amqp.publish 'amq.direct', 'testing', 'here is one message 2'

      amqp.consume 'testing', {prefetchCount: 1}, listener, ()->
        console.log "Consumer Ready"

consume.setQos(prefetchCount, [callback])

Will update the prefetch count of an already existing consumer; can be used to dynamically tune a consumer.

consumer.cancel([callback])

sends basicCancel and waits on basicCancelOk

consumer.pause([callback])

consumer.cancel

consumer.resume([callback])

consumer.consume, sets up the consumer with a new consumer tag

consumer.flow(active, [callback])

An alias for consumer.pause (active == false) and consome.resume (active == true)

connection.close()

More documentation to come. The tests are a good place to reference.

Differences between amqp-coffee and node-amqp

First of all this was heavily inspired by https://github.com/postwait/node-amqp

Changes from node-amqp

  • the ability to share channels intelligently. ( if you are declaring multiple queues and exchanges there is no need to use multiple channels )
  • auto channel closing for transient channels ( channels used for declaring and binding if they are inactive )
  • consumer reconnecting
  • fixed out-of-order channel operations by ensuring things are writing in order and not overwriting buffers that may not have been pushed to the network.
  • switch away from event emitters for consumer acks
  • everything that can be async is async
  • native bson support for messages with contentType application/bson
  • ability to delete, bind, and unbind a queue without having to know everything about the queue like auto delete etc...
  • can get the message and consumer count of a queue
  • can turn flow control on and off for a consumer (pause, resume) receiving messages.
  • rabbitmq master queue connection preference. When you connect to an array of hosts that have queues that are highly available (HA) it can talk to the rabbit api and make sure it talks to the master node for that queue. You can get way better performance with consumer acks.
npm loves you