paced-work-stream

1.0.3 • Public • Published

PacedWorkStream

NPM Version Node Build Status Coverage Status dependencies Status

Node.js transform stream working at constant pace and concurrent for object mode

Features

  • Work time at once can be specified. (workMS option)
  • Concurrent workers can be specified. (concurrency option)
  • Fires done event after when all workers have finished asynchrous -processes
  • Counting tag system to call this.countTag(<tag>) in _workPromise, you can get summarized results tagCounts grouped by tag.
  • Node.js 6.10 or later

Targets

  • API client that needs to handle the rate-limit
  • DB client that needs to handle the read/write capacity units like AWS DynamoDB

Install

$ npm install -save paced-work-stream

How to Use

Create a PacedWorkStream

new PacedWorkStream(options, workPromise)

  • options <Object>
    • concurrency is the number of concurrent processes.
    • workMS is milliseconds of work time at once that contains process-time and wait-time.
    • delay is enable to start concurrent process in order delay for a time that divided workMS by concurrency, default is false. workPromise must return functions wrap each promise. Refer to the following figure for detailed operation pattern.
    • highWaterMark is maximum object buffer size. If you use flow mode, you should set it at least concurrency.
  • workPromise is function(item): must return a Promise processing the item or a Function that returns a Promise.

Delay Figure

Create subclass that extends PacedWorkStream

  • super(options) must be called in the constructor.
  • _workPromise method must be overrided and return a Promise processing the item or a Function that returns a Promise.
class MyWorkStream extends PacedWorkStream {
  constructor(options) {
    super(options);
  }
  _workPromise(item) {
    return () => {
      this.countTag(item.tag);
      return Promise.resolve(item.value);
    };
  }
}

Examples

const es = require('event-stream');
const devnull = require('dev-null');
const PacedWorkStream = require('paced-work-stream');
 
const pwStream = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);
 
    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })
  .on('done', function() {
    console.log(this.tagCounts);
  }).on('error', (err) => {
    console.error(err);
  });
 
const reader = es.readArray([11, 12, 21, 22, 31])
reader.pipe(pwStream).pipe(devnull({ objectMode: true }));
  • Pay attention to handling done event to get last tagCounts because workers haven't processed items on finish event.
  • If stream need not output, the stream must pipe dev-null.

Console output

$ node example.js
2016-09-11T03:17:50.000Z Begin 11
2016-09-11T03:17:50.003Z Begin 12
2016-09-11T03:17:50.605Z End 11
2016-09-11T03:17:50.605Z End 12
2016-09-11T03:17:51.009Z Begin 21
2016-09-11T03:17:51.009Z Begin 22
2016-09-11T03:17:51.606Z End 21
2016-09-11T03:17:51.606Z End 22
2016-09-11T03:17:52.004Z Begin 31
2016-09-11T03:17:52.607Z End 31
{ workDone: 5 }

Using with Promised Lifestream

Promised Lifestream is useful for stream pipeline. The following example gets the same result as above.

'use strict';
 
const es = require('event-stream');
const PromisedLife = require('promised-lifestream');
 
const PacedWorkStream = require('paced-work-stream');
 
const pacedWorker = new PacedWorkStream({
    concurrency: 2,
    workMS: 1000,
    highWaterMark: 5
  }, function(item) {
    console.log(new Date().toISOString(), 'Begin', item);
 
    return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.countTag('workDone');
          console.log(new Date().toISOString(), 'End', item);
          resolve();
        }, 600); // workMS contains the time.
      })
  })
 
PromisedLife([
  es.readArray([11, 12, 21, 22, 31]),
  pacedWorker
])
.then(() => {
  console.log(pacedWorker.tagCounts);
})
.catch(err => {
  console.error(err);
});

License

MIT

Package Sidebar

Install

npm i paced-work-stream

Weekly Downloads

0

Version

1.0.3

License

MIT

Unpacked Size

12.1 kB

Total Files

5

Last publish

Collaborators

  • tilfin