rqueue

A node.js implementation of RQueue, includes Worker and Queue.

npm install rqueue
11 downloads in the last month

node-rqueue

node-rqueue is a simple Redis based work queue for communicating between multiple platforms.

Usage

var rq      = require('rqueue')
var assert  = require('assert').ok
var q       = q.createQueue(
  { host    : '127.0.0.1'
  , port    : 6379
  , auth    : 'redis-password'
  , name    : 'superq'
  , prefix  : 'local:'
  , timeout : 5000
  }
)

// Start the worker
q.listen()
console.log('Worker', q.id, 'started')

// Listen for jobs. Emits rq.Job objects
q.on('data', function (job) {
  console.log('Processing job', job.id)
  job.done() // Move to `completed` status
  // or
  // job.retry() // Re-queue job
  // job.done(error) // Move job to `failed` status
  // job.retry(error)

  // Emit custom events on pubsub
  job.emit('progress', 100)

  // Listen for events with a redis client:
  // redis.psubscribe('local:rq:superq:progress:*')
  // redis.on('pmessage:local:rq:superq:progress:*', function (key) {
  //   // Key is `local:rq:superq:progress:*job.id*`
  //   var job_id = key.split(':').pop()
  //   q.getJob(id, gotJob)
  // })
  //
  // rqueue emits a lot of events for you. The job id is always the message.
  // A somewhat exhaustive list:
  // (Prefix all these with: q.prefix + ':rq:' + q.name + ':'. For the above
  //  queue it would be `local:rq:superq:`)
  //
  // * Everytime a job changes (job.forward) status, a event named after the
  //   new status is emitted. E.g.
  //
  //   * `queued`, `failed`, `complete`, `timeout`, `running`
  //
  // * Everytime a job is saved, a `save` event is emitted.
  //   If it is a new job, `new` is emitted as well.
  //
  // * Everytime a job is removed, a `remove` event is emitted.

  // IMPORTANT if you want the worker registration / deregistration to work
  // as expected.
  q.end()

  // If a worker has failed or crashed, the id will be in
  // `local:rq:superq:workers` and its unfinished running jobs
  // will be in `local:rq:superq:running:*workerid*`
  // You can migrate the unfinished worker by:
  //
  // q.migrate('workerid', callback)
})

// Quickly create a job and push it to 'queued'
q.write({ my : 'data' })
// Callback can be used as second argument.

// Create a job and mess with individual options.
var job = q.job({ my : 'data' })
assert(null === job.status)
assert(0 === job.retries)

// Timeouts
assert(5000 === job.timeout) // Set from default timeout in queue options
job.timeout = 10000 // 10 seconds

// Priority. Higher is more important. Default is 1
assert(1 === job.priority)
job.priority = 9001 // Over 9000

// Add some messages
job.message('Custom info message')
job.error(new Error('fail'))

// Save the job and push it to the 'queued' status
job.forward('queued', function (error) {
  if (error) throw error

  // Get a job from an id
  q.findJob(job.id, function (error, job) {})
})
assert('queued' === job.status)

// Or save it for forwarding later
job.save(function (error) {})

Implement in another language

TODO

npm loves you