node-resque

an opinionated implementation of resque in node

npm install node-resque
48 downloads in the last day
205 downloads in the last week
1 174 downloads in the last month

node-resque

Delayed Tasks in nodejs. A very opinionated but compatible API with resque and resque scheduler

Nodei stats

Build Status

Usage

I learn best by examples:

/////////////////////////
// REQUIRE THE PACKAGE //
/////////////////////////

var NR = require("node-resque");

///////////////////////////
// SET UP THE CONNECTION //
///////////////////////////

var connectionDetails = {
  host:      "127.0.0.1",
  password:  "",
  port:      6379,
  database:  0,
}

//////////////////////////////
// DEFINE YOUR WORKER TASKS //
//////////////////////////////

var jobs = {
  "add": {
    perform: function(a,b,callback){
      var answer = a + b;
      callback(null, answer);
    },
  },
  "subtract": {
    perform: function(a,b,callback){
      var answer = a - b;
      callback(null, answer);
    },
  },
};

////////////////////
// START A WORKER //
////////////////////

var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs, function(){
  worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers
  worker.start();
});

///////////////////////
// START A SCHEDULER //
///////////////////////

var scheduler = new NR.scheduler({connection: connectionDetails}, function(){
  scheduler.start();
});

/////////////////////////
// REGESTER FOR EVENTS //
/////////////////////////

worker.on('start',           function(){ console.log("worker started"); })
worker.on('end',             function(){ console.log("worker ended"); })
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); })
worker.on('poll',            function(queue){ console.log("worker polling " + queue); })
worker.on('job',             function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); })
worker.on('reEnqueue',       function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
worker.on('success',         function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
worker.on('error',           function(queue, job, error){ console.log("job failed " + queue + " " + JSON.stringify(job) + " >> " + error); })
worker.on('pause',           function(){ console.log("worker paused"); })

scheduler.on('start',             function(){ console.log("scheduler started"); })
scheduler.on('end',               function(){ console.log("scheduler ended"); })
scheduler.on('error',             function(error){ console.log("scheduler error >> " + error); })
scheduler.on('poll',              function(){ console.log("scheduler polling"); })
scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })
scheduler.on('transferred_job',    function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); })

////////////////////////
// CONNECT TO A QUEUE //
////////////////////////

var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
  queue.enqueue('math', "add", [1,2]);
  queue.enqueue('math', "add", [2,3]);
  queue.enqueueIn(3000, 'math', "subtract", [2,1]);
});

Configutation Options:

new queue requires only the "queue" variable to be set. You can also pass the jobs hash to it.

new worker has some additonal options:

options = {
  looping: true,
  timeout: 5000,
  queues:  "*",
  name:    os.hostname() + ":" + process.pid
}

The configuration hash passed to new worker, new scheduler or new queue can also take a connection option.

var connectionDetails = {
  package:   "redis",
  host:      "127.0.0.1",
  password:  "",
  port:      6379,
  database:  0,
  namespace: "resque",
}

var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
  worker.start();
});

You can also pass redis client directly.

// assume you already initialize redis client before

var connectionDetails = { redis: redisClient }

var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
  worker.start();
});

Notes

  • Be sure to call worker.end() before shutting down your application if you want to properly clear your worker status from resque
  • When ending your application, be sure to allow your workers time to finsih what they are working on
  • worker.workerCleanup() only works for *nix operating systems (osx, unix, solaris, etc)
  • If you are using any plugins which effect beforeEnqueue or afterEnqueue, be sure to pass the jobs argument to the new Queue constructor
  • If you plan to run more than one worker per nodejs process, be sure to name them something distinct. Names must follow the patern hostname:pid+unique_id. For example:
var name = os.hostname() + ":" + process.pid() + "+" + counter;
var worker = new NR.worker({connection: connectionDetails, queues: 'math', 'name' : name}, jobs);

Queue Managment

Additonal methods provided on the queue object:

  • queue.prototype.queues = function(callback)
    • callback(error, array_of_queues)
  • queue.prototype.length = function(q, callback)
    • callback(error, number_of_elements_in_queue)
  • queue.prototype.del = function(q, func, args, count, callback)
    • callback(error, number_of_items_deleted)
  • queue.prototype.delDelayed = function(q, func, args, callback)
    • callback(error, timestamps_the_job_was_removed_from)
  • queue.prototype.scheduledAt = function(q, func, args, callback)
    • callback(error, timestamps_the_job_is_scheduled_for)

Plugins

Just like ruby's resque, you can write worker plugins. They look look like this. The 4 hooks you have are before_enqueue, after_enqueue, before_perform, and after_perform


var myPlugin = function(worker, func, queue, job, args, options){
  var self = this;
  self.name = 'myPlugin';
  self.worker = worker;
  self.queue = queue;
  self.func = func;
  self.job = job;
  self.args = args;
  self.options = options;
}

////////////////////
// PLUGIN METHODS //
////////////////////

myPlugin.prototype.before_enqueue = function(callback){
  // console.log("** before_enqueue")
  callback(null, true);
}

myPlugin.prototype.after_enqueue = function(callback){
  // console.log("** after_enqueue")
  callback(null, true);
}

myPlugin.prototype.before_perform = function(callback){
  // console.log("** before_perform")
  callback(null, true);
}

myPlugin.prototype.after_perform = function(callback){
  // console.log("** after_perform")
  callback(null, true);
}

And then your plugin can be invoked within a job like this:

var jobs = {
  "add": {
    plugins: [ 'myPlugin' ],
    pluginOptions: {
      myPlugin: { thing: 'stuff' },
    },
    perform: function(a,b,callback){
      var answer = a + b;
      callback(null, answer);
    },
  },
}

notes

  • All plugins which return (error, toRun). if toRun = false on beforeEnqueue, the job beign inqueued will be thrown away, and if toRun = false on beforePerfporm, the job will be reEnqued and not run at this time. However, it doesn't really matter what toRun returns on the after hooks.

  • There are a few included plugins, all in the lib/plugins/* directory. You can rewrite you own and include it like this:

var jobs = {
  "add": {
    plugins: [ require('myplugin') ],
    pluginOptions: {
      myPlugin: { thing: 'stuff' },
    },
    perform: function(a,b,callback){
      var answer = a + b;
      callback(null, answer);
    },
  },
}

Acknowledgments

Most of this code was inspired by / stolen from coffee-resque and coffee-resque-scheduler. Thanks!

npm loves you