mkue
A MongoDB-backed job queueing mechanism.
- Concurrency handling
- Throttling inputs
- Persistence of all input/output
- FIFO
- Exits the process gracefully
Example
Dispatcher:
var Queue = ; var queue = ;// set a dbqueuecollection = db;// make sure indexes are setqueue; queue
Worker:
var Queue = ; var queue = ;// set a dbqueuecollection = db;// make sure indexes are setqueue; // define a namespaced functionqueue; // set the concurrencyqueue; // start listeningqueue;
API
var queue = new Queue([options])
The options are:
concurrency <1>
- number of jobs to be processed in parallel in this processdelay <1000>
- delay to query the next batch of jobs on draincollection
- the MongoDB collection for this queue
queue.collection =
You are required to set the collection for this worker queue manually.
queue.concurrency(count )
Set the maximum number of concurrent, local jobs.
queue.delay(ms | )
Set the delay after draining the queue to start looking for jobs again.
queue.ensureIndexes().then( => )
Set the indexes for queues and currently processing jobs. Assumes that the queue is always short.
queue.processing().then( count => )
Get the current number of jobs being processed.
queue.queued().then( count => )
Get the current number of jobs in the queue.
queue.queue([ms | ])
Waits ms
to start a new job.
queue.dispatch([name ], fn ).then( job => )
Add a job to the queue.
queue.get([name ], options ).then( job => )
Get the latest job with name
and options
.
May or may not be completed yet.
queue.getById().then( job => )
Get a job by its ID.
queue.poll([name ], options , [ms | ]).then( job => )
Poll the latest job at interval ms
with name
and options
until it's complete.
queue.define([name ], fn )
Define a function.
name
defaults to 'default'
if not set.
fn
's API should be:
You only need to define this on a worker process.
queue.run()
Start running a new job. Call this on a worker process.
queue.close()
Stop creating new jobs.