pull fn stream
Model an async function call with a pull stream. This is a transform stream that takes arrays and emits streams. Each returned stream emits a start event and a response event.
install
$ npm install pull-fn-stream
example
var S = require('pull-stream')var flatMerge = require('pull-flat-merge')var test = require('tape')var toStream = require('../') var fns = { a: function (cb) { setTimeout(function () { cb(null, 'a') }, 50) }, b: function (cb) { setTimeout(function () { cb(null, 'b') }, 100) }, error: function (cb) { setTimeout(function () { cb(new Error('test error')) }, 0) }} function asyncOk (arg, cb) { process.nextTick(function () { cb(null, arg) })} function asyncErr (msg, cb) { process.nextTick(function () { cb(new Error(msg)) })} test('call async function', function (t) { t.plan(2) var stream = toStream(asyncOk, 'key') var expected = [ { type: 'start', op: 'key' }, { type: 'start', op: 'key' }, { type: 'key', resp: 'arg' }, { type: 'key', resp: 'arg2' } ] S( S.values(['arg', 'arg2']), stream(), flatMerge(), S.collect(function (err, res) { t.error(err, 'error') t.deepEqual(res, expected, 'should emit events') }) )}) test('async error', function (t) { t.plan(1) var stream = toStream(asyncErr, 'key') S( S.once('test msg'), stream(), flatMerge(), S.collect(function (err, res) { t.equal(err.message, 'test msg', 'should end with error') }) )}) test('from object', function (t) { t.plan(2) var expected = [ { type: 'start', op: 'b' }, { type: 'start', op: 'a' }, { type: 'start', op: 'b' }, { type: 'a', resp: 'a' }, { type: 'b', resp: 'b' }, { type: 'b', resp: 'b' } ] S( S.values(['b', 'a', 'b']), toStream.fromObject(fns)(), flatMerge(), S.collect(function (err, evs) { t.error(err, 'error') t.deepEqual(evs, expected, 'should emit events in order') }) )}) test('pass arguments', function (t) { var fns = { a: function (a,b,c,cb) { setTimeout(function () { cb(null, [a,b,c]) }, 50) }, b: function (d,cb) { setTimeout(function () { cb(null, d) }, 100) } } var expected = [ { type: 'start', op: 'b' }, { type: 'start', op: 'a' }, { type: 'a', resp: [1,2,3] }, { type: 'b', resp: 4 } ] t.plan(1) S( S.values([ ['b',4], ['a',1,2,3] ]), toStream.fromObject(fns)(), flatMerge(), S.collect(function (err, evs) { t.deepEqual(evs, expected, 'should pass args in array') }) )}) test('error from object', function (t) { t.plan(1) S( S.values(['error']), toStream.fromObject(fns)(), flatMerge(), S.collect(function (err, res) { t.equal(err.message, 'test error', 'should pass error in stream') }) )})