stepify

Executing nodejs asynchronous tasks by steps chain

npm install stepify
1 downloads in the last week
24 downloads in the last month

stepify

Build Status NPM version

stepify是一个简单易用的Node.js异步流程控制库,提供一种比较灵活的方式完成Node.js(多)任务。

目标是将复杂的任务进行拆分成多步完成,使得每一步的执行过程更加透明,化繁为简。

stepify侧重流程控制,甚至可以和其他异步类库配合使用。

stepify特点

  • 最基本的API的就3个:step()done()run(),简单容易理解。

  • 精细的粒度划分(同时支持单/多任务),执行顺序可定制化。

  • 每一个异步操作都经过特殊的封装,内部只需要关心这个异步的执行过程。

  • 链式(chain)调用,代码逻辑看起来比较清晰。

  • 灵活的回调函数定制和参数传递。

  • 统一处理单个异步操作的异常,也可根据需要单独处理某个任务的异常。

最简单的用法

简单实现基于oauth2授权获取用户基本资料的例子:

// Authorizing based on oauth2 workflow
Stepify()
    .step('getCode', function(appId, rUri) {
        var root = this;
        request.get('[authorize_uri]', function(err, res, body) {
            root.done(err, JSON.parse(body).code);
        });
    }, [appId], [redirectUri])
    .step('getToken', function(code) {
        var root = this;
        request.post('[token_uri]', function(err, res, body) {
            root.done(err, JSON.parse(body).access_token);
        });
    })
    .step('getInfo', function(token) {
        request.get('[info_uri]?token=' + token, function(err, res, body) {
            // got user info, pass it to client via http response
        });
    })
    .run();

多个step共用一个handle、静态参数、动态参数传递的例子:

Stepify()
    .step('read', __filename)
    .step(function(buf) {
        // buf is the buffer content of __filename
        var root = this;
        var writed = 'test.js';

        // do more stuff with buf
        // this demo just replace all spaces simply
        buf = buf.toString().replace(/\s+/g, '');
        fs.writeFile(writed, buf, function(err) {
            // writed is the name of target file,
            // it will be passed into next step as the first argument
            root.done(err, writed);
        });
    })
    .step('read')
    // `read` here is a common handle stored in workflow
    .read(function(p, encoding) {
        fs.readFile(p, encoding || null, this.done.bind(this));
    })
    .run();

这里多了一个read()方法,但read方法并不是stepify内置的方法。实际上,您可以任意“扩展”stepify链!它的奥妙在于step()方法的参数,详细请看step调用说明

可以看到,一个复杂的异步操作,通过stepify定制,每一步都是那么清晰可读!

安装

$ npm install stepify

运行测试

$ npm install
$ mocha

灵活使用

var Stepify = require('stepify');
var workflow1 = Stepify().step(fn).step(fn)...run();
// or
var workflow2 = new Stepify().step(fn).step(fn)...run();
// or
var workflow3 = Stepify().step(fn).step(fn);
// do some stuff ...
workflow3.run();
// or
var workflow4 = Stepify().task('foo').step(fn).step(fn).task('bar').step(fn).step(fn);
// do some stuff ...
workflow4.run(['foo', 'bar']);
var workflow5 = Stepify().step(fn).step(fn);
workflow5.debug = true;
workflow5.error = function(err) {};
workflow5.result = function(result) {};
...
workflow5.run();
// more ...

注:文档几乎所有的例子都是采用链式调用,但是拆开执行也是没有问题的。

原理

概念:

  • task:完成一件复杂的事情,可以把它拆分成一系列任务,这些个任务有可能它的执行需要依赖上一个任务的完成结果,它执行的同时也有可能可以和其他一些任务并行,串行并行相结合,这其实跟真实世界是很吻合的。

  • step:每一个task里边可再细分,可以理解成“一步一步完成一个任务(Finish a task step by step)”,正所谓“一步一个脚印”是也。

stepify内部实际上有两个主要的类,一个是Stepify,一个是Step。

Stepify()的调用会返回一个Stepify实例,在这里称之为workflow,用于调度所有task的执行。

step()的调用会创建一个Step实例,用于完成具体的异步操作(当然也可以是同步操作,不过意义不大),step之间使用简单的api(done方法和next方法)传递。

API 文档

Stepify类:

调用Stepify即可创建一个workflow。

Step类:

Step类只在Stepify实例调用step方法时创建,不必显式调用。


debug()

描述:开启debug模式,打印一些log,方便开发。

调用:debug(flag)

参数:

  • {Boolean} flag 默认是false

例子:

var work = Stepify().debug(true);
// or
var work = Stepify();
work.debug = true;

task()

描述:显式创建一个task,task()的调用是可选的。在新定制一个task时,如果没有显式调用task(),则这个task的第一个step()内部会先生成一个task,后续的step都是挂在这个task上面,每一个task内部会维持自己的step队列。多个task使用pend方法分割。

调用:task([taskName])

参数:

  • {String} taskName 可选参数,默认是_UNAMED_TASK_[index]。为这个task分配一个名字,如果有多个task实例并且执行顺序需要(使用run()方法)自定义,则设置下taskName方便一点。

例子:

var myWork1 = Stepify().task('foo').step('step1').step('step2').run();
// equal to
var myWork1 = Stepify().step('step1').step('step2').run();
// multiply tasks
var myWork2 = Stepify()
    .task('foo')
        .step(fn)
        .step(fn)
    .task('bar')
        .step(fn)
        .step(fn)
    .task('baz')
        .step(fn)
        .step(fn)
    .run();

step()

描述:定义当前task的一个异步操作,每一次step()调用都会实例化一个Step推入task的step队列。这个方法是整个lib的核心所在。

调用:step(stepName, stepHandle, *args)

参数:

  • {String} stepName 可选参数,但在不传stepHandle时是必传参数。为这个step分配一个名称。当stepHandle没有传入时,会在Stepify原型上扩展一个以stepName命名的方法,而它具体的实现则在调用stepName方法时决定,这个方法详情请看stepName说明

  • {Function} stepHandle 可选参数,但在stepName不传时是必传参数。在里边具体定义一个异步操作的过程。stepHandle的执行分两步,先查找这个step所属的task上有没有stepHandle,找不到则查找Stepify实例上有没有stepHandle,再没有就抛异常。

  • {Mix} *args 可选参数,表示这个step的已知参数(即静态参数),在stepHandle执行的时候会把静态参与动态参数(通过done或者next传入)合并作为stepHandle的最终参数。

例子:

  • 参数传递
Stepify()
    .step(function() {
        var root = this;
        setTimeout(function() {
            // 这里done的第二个参数(100)即成为下一个stepHandle的动态参数
            root.done(null, 100);
        }, 100);
    })
    .step(function(start, n) {
        // start === 50
        // n === 100
        var root = this;
        setTimeout(function() {
            root.done();
        }, start + n);
    }, 50)
    .run();
  • 扩展原型链
Stepify()
    .step('sleep')
    // more step ...
    .step('sleep', 50)
    .sleep(function(start, n) {
        var args = [].slice.call(arguments, 0);
        var root = this;

        n = args.length ? args.reduce(function(mem, arg) {return mem + arg;}) : 100;
        setTimeout(function() {
            root.done(null, n);
        }, n);
    })
    .run();

pend()

描述:结束一个task的定义,会影响扩展到Stepify原型链上的stepName方法的执行。

调用:pend()

参数: 无参数。

例子:见stepName部分

stepName()

描述:这是一个虚拟方法,它是通过动态扩展Stepify类原型链实现的,具体调用的名称由step方法的stepName参数决定。扩展原型链的stepName的必要条件是step方法传了stepName(stepName需要是一个可以通过.访问属性的js变量)但是stepHandle没有传,且stepName在原型链上没定义过,当workflow执行结束之后会删除已经扩展到原型链上的所有方法。当调用实例上的stepName方法时,会检测此时有没有在定义的task(使用pend方法结束一个task的定义),如果有则把传入的handle挂到到这个task的handles池里,没有则挂到Stepify的handles池。

调用:stepName(stepHandle)

参数:

  • {Function} stepHandle 必传参数,定义stepName对应的stepHandle,可以在多个task之间共享。

例子:

  • pend()的影响
Stepify()
    .step('mkdir', './foo')
    // 这样定义,在执行到sleep时会抛异常,
    // 因为这个task上面没定义过sleep的具体操作
    .step('sleep', 100)
    .pend()
    .step('sleep', 200)
    .step('mkdir', './bar')
    .sleep(function(n) {
        var root = this;
        setTimeout(function() {
            root.done();
        }, n);
    })
    // 这个pend的调用,使得mkdir方法传入的handle挂在了Stepify handles池中,
    // 所以第一个task调用mkdir方法不会抛异常
    .pend()
    .mkdir(function(p) {
        fs.mkdir(p, this.done.bind(this));
    })
    .run();
  • stepHandle的查找
Stepify()
    .step('mkdir', './foo')
    // 定义当前task上的mkdirHandle,这里其实直接.step('mkdir', fn)更清晰
    .mkdir(function(p) {
        fs.mkdir(p, 0755, this.done.bind(this));
    })
    .step('sleep', 100)
    .pend()
    // 这个task上没定义mkdirHandle,会往Stepify类的handles池去找
    .step('mkdir', './bar')
    .step('sleep', 200)
    .pend()
    .sleep(function(n) {
        var root = this;
        setTimeout(function() {
            root.done();
        }, n);
    })
    .mkdir(function(p) {
        fs.mkdir(p, this.done.bind(this));
    })
    .run();

error()

描述:定制task的异常处理函数。

调用:error(errorHandle)

参数:

  • {Function} errorHandle 必传参数 默认会直接抛出异常并中断当前task的执行。每一个task都可以定制自己的errorHandle,亦可为所有task定制errorHandle。每个step执行如果出错会直接进入这个errorHandle,后面是否继续执行取决于errorHandle内部定义。errorHandle第一个参数便是具体异常信息。

注意:errorHandle的执行环境是发生异常所在的那个step,也就是说Step类定义的所有方法在errorHandle内部均可用,您可以在异常时决定是否继续执行下一步,或者使用this.taskNamethis.name分别访问所属task的名称和step的名称,进而得到更详细的异常信息。

例子:

Stepify()
    .step(fn)
    .step(fn)
    // 这个task的异常会走到这里
    .error(function(err) {
        console.error('Error occurs when running task %s\'s %s step!', this.taskName, this.name);
        if(err.message.match(/can_ignore/)) {
            // 继续执行下一步
            this.next();
        } else {
            throw err;
        }
    })
    .pend()
    .step(fn)
    .step(fn)
    .pend()
    // 所有没显式定义errorHandle的所有task异常都会走到这里
    .error(function(err) {
        console.error(err.stack);
        res.send(500, 'Server error!');
    })
    .run();

result()

描述:所有task执行完之后,输出结果。在Stepify内部,会保存一份结果数组,通过step的fulfill方法可以将结果push到这个数组里,result执行的时候将这个数组传入finishHandle。

调用:result(finishHandle)

参数:

  • {Function} finishHandle,result本身是可选调用的,如果调用了result,则finishHandle是必传参数。

例子:

Stepify()
    .step(function() {
        var root = this;
        setTimeout(function() {
            root.fulfill(100);
            root.done(null);
        }, 100);
    })
    .step(function() {
        var root = this;
        fs.readFile(__filename, function(err, buf) {
            if(err) return root.done(err);
            root.fulfill(buf.toString());
            root.done();
        });
    })
    .result(function(r) {
        console.log(r); // [100, fs.readFileSync(__filename).toString()]
    })
    .run();

run()

描述:开始执行所定制的整个workflow。这里比较灵活,执行顺序可自行定制,甚至可以定义一个workflow,分多种模式执行。

调用:run(*args)

参数:

  • {Mix} 可选参数,类型可以是字符串(taskName)、数字(task定义的顺序,从0开始)、数组(指定哪些tasks可以并行),也可以混合起来使用(数组不支持嵌套)。默认是按照定义的顺序串行执行所有tasks。

例子:

function createTask() {
    return Stepify()
        .task('task1')
            .step(function() {
                c1++;
                fs.readdir(__dirname, this.wrap());
            })
            .step('sleep')
            .step('exec', 'cat', __filename)
        .task('task2')
            .step('sleep')
            .step(function() {
                c2++;
                var root = this;
                setTimeout(function() {
                    root.done(null);
                }, 1500);
            })
            .step('exec', 'ls', '-l')
        .task('task3')
            .step('readFile', __filename)
            .step('timer', function() {
                c3++;
                var root = this;
                setTimeout(function() {
                    root.done();
                }, 1000);
            })
            .step('sleep')
            .readFile(function(p) {
                fs.readFile(p, this.done.bind(this));
            })
        .task('task4')
            .step('sleep')
            .step(function(p) {
                c4++;
                fs.readFile(p, this.wrap());
            }, __filename)
        .pend()
        .sleep(function() {
            console.log('Task %s sleep.', this.taskName);
            var root = this;
            setTimeout(function() {
                root.done(null);
            }, 2000);
        })
        .exec(function(cmd, args) {
            cmd = [].slice.call(arguments, 0);
            var root = this;
            exec(cmd.join(' '), this.wrap());
        });
};

var modes = {
    'Default(serial)': [], // 10621 ms.
    'Customized-serial': ['task1', 'task3', 'task4', 'task2'], // 10624 ms.
    'Serial-mix-parallel-1': ['task1', ['task3', 'task4'], 'task2'], // 8622 ms.
    'Serial-mix-parallel-2': [['task1', 'task3', 'task4'], 'task2'], // 6570 ms.
    'Serial-mix-parallel-3': [['task1', 'task3'], ['task4', 'task2']], // 6576 ms.
    'All-parallel': [['task1', 'task3', 'task4', 'task2']], // 3552 ms.
    'Part-of': ['task2', 'task4'] // 5526 ms.
};

var test = function() {
    var t = Date.now();
    var task;

    Object.keys(modes).forEach(function(mode) {
        task = createTask();

        task.result = function() {
            console.log(mode + ' mode finished and took %d ms.', Date.now() - t);
        };

        task.run.apply(task, modes[mode]);
    });

    setTimeout(function() {
        log(c1, c2, c3 ,c4); // [6, 7, 6, 7]
    }, 15000);
};

test();

done()

描述:标识完成了一个异步操作(step)。

调用:done([err, callback, *args])

参数:

  • {String|Error|null} err 错误描述或Error对象实例。参数遵循Node.js的回调约定,可以不传参数,如果需要传递参数,则第一个参数必须是error对象。

  • {Function} callback 可选参数 自定义回调函数,默认是next,即执行下一步。

  • {Mix} *args 这个参数是传递给callback的参数,也就是作为下一步的动态参数。一般来说是将这一步的执行结果传递给下一步。

例子:

Stepify()
    .step(function() {
        var root = this;
        setTimeout(function() {
            root.done();
        }, 200);
    })
    .step(function() {
        var root = this;
        exec('curl "https://github.com/"', function(err, res) {
            // end this task in error occured
            if(err) root.end();
            else root.done(null, res);
        });
    })
    .step(function(res) {
        var root = this;
        setTimeout(function() {
            // do some stuff with res ...
            console.log(res);
            root.done();
        }, 100);
    })
    .run();

wrap()

描述:其实就是this.done.bind(this)的简写,包装done函数保证它的执行环境是当前step。比如原生的fs.readFile()的callback的执行环境被设置为nullfs.js#L91

调用:wrap()

参数:无

例子:

Stepify()
    .step(function() {
        fs.readdir(__dirname, this.done.bind(this));
    })
    .step(function() {
        fs.readFile(__filename, this.wrap());
    })
    .run();

fulfill()

描述:把step执行的结果推入结果队列,最终传入finishHandle。最终结果数组的元素顺序在传入给finishHandle时不做任何修改。

调用:fulfill(*args)

参数:

  • {Mix} 可选参数 可以是一个或者多个参数,会一一push到结果队列。

例子:

// Assuming retrieving user info
Stepify()
    .step(function() {
        var root = this;
        db.getBasic(function(err, basic) {
            root.fulfill(basic || null);
            root.done(err, basic.id);
        });
    })
    .step(function(id) {
        var root = this;
        db.getDetail(id, function(err, detail) {
            root.fulfill(detail || null);
            root.done(err);
        });
    })
    .error(function(err) {
        console.error(err);
        res.send(500, 'Get user info error.');
    })
    .result(function(r) {
        res.render('user', {basic: r[0], detail: r[1]});
    })
    .run();

vars()

描述:暂存临时变量,在整个workflow的运行期可用。如果不想在workflow之外使用var申明别的变量,可以考虑用vars()。

调用:vars(key[, value])

参数:

  • {String} key 变量名。访问临时变量。

  • {Mix} value 变量值。如果只传入key则是访问变量,如果传入两个值则是写入变量并返回这个value。

例子:

Stepify()
    .step(function() {
        this.vars('foo', 'bar');
        // todo
    })
    .pend()
    .step(function() {
        // todo
        console.log(this.vars('foo')); // bar
    })
    .run();

parallel()

描述:简单的并发支持。这里还可以考虑引用其他模块(如:async)完成并行任务。

调用:parallel(arr[, iterator, *args, callback])

参数:

  • {Array} arr 必传参数。需要并行执行的一个数组,对于数组元素只有一个要求,就是如果有函数则所有元素都必须是一个函数。

  • {Function} iterator 如果arr参数是一个函数数组,这个参数是不用传的,否则是必传参数,它迭代运行arr的每一个元素。iterator的第一个参数是arr中的某一个元素,第二个参数是回调函数(callback),当异步执行完之后需要调用callback(err, data)

  • {Mix} *args 传递给iterator的参数,在迭代器执行的时候,arr数组的每一个元素作为iterator的第一个参数,*args则作为剩下的传入。

  • {Function} callback 可选参数(约定当最后一个参数是函数时认为它是回调函数) 默认是next。这个并行任务的执行结果会作为一个数组按arr中定义的顺序传入callback,如果执行遇到错误,则直接交给errHandle处理。

例子:

传入一个非函数数组(parallel(arr, iterator[, *arg, callback]))

Stepify()
    .step(function() {
        fs.readdir(path.resolve('./test'), this.wrap());
    })
    .step(function(list) {
        list = list.filter(function(p) {return path.extname(p).match('js');});
        list.forEach(function(file, i) {list[i] = path.resolve('./test', file);});
        // 注释部分就相当于默认的this.next
        this.parallel(list, fs.readFile, {encoding: 'utf8'}/*, function(bufArr) {this.next(bufArr);}*/);
    })
    .step(function(bufArr) {
        // fs.writeFile('./combiled.js', Buffer.concat(bufArr), this.done.bind(this));
        // or
        this.parallel(bufArr, fs.writeFile.bind(this, './combiled.js'));
    })
    .run();

传入函数数组(parallel(fnArr[]))

Stepify()
    .step(function() {
        this.parallel([
            function(callback) {
                fs.readFile(__filename, callback);
            },
            function(callback) {
                setTimeout(function() {
                    callback(null, 'some string...');
                }, 500);
            }
        ]);
    })
    .step(function(list) {
        console.log(list); // [fs.readFileSync(__filename), 'some string...']
        // todo...
    })
    .run();

下面是一个应用到某项目里的例子:

...
.step('fetch-images', function() {
    var root = this;
    var localQ = [];
    var remoteQ = [];

    // do dome stuff to get localQ and remoteQ

    this.parallel([
        function(callback) {
            root.parallel(localQ, function(frameData, cb) {
                // ...
                db.getContentType(frameData.fileName, function(type) {
                    var imgPath = frameData.fileName + '.' + type;
                    // ...
                    db.fetchByFileName(imgPath).pipe(fs.createWriteStream(targetUrl));
                    cb(null);
                });
            }, function(r) {callback(null, r);});
        },
        function(callback) {
            root.parallel(remoteQ, function(frameData, cb) {
                var prop = frames[frameData['frame']].children[frameData['elem']]['property'];
                // ...
                request({url: prop.src}, function(err, res, body) {
                    // ...
                    cb(null);
                }).pipe(fs.createWriteStream(targetUrl));
            }, function(r) {callback(null, r);});
        },
    ]);
})
...

jump()

描述:在step之间跳转。这样会打乱step的执行顺序,谨慎使用jump,以免导致死循环

调用:jump(index|stepName)

参数:

  • {Number} index 要跳转的step索引。在step创建的时候会自建一个索引属性,使用this._index可以访问它。

  • {String} stepName step创建时传入的名称。

例子:

Stepify()
    .step('a', fn)
    .step('b', fn)
    .step(function() {
        if(!this.vars('flag')) {
            this.jump('a');
            this.vars('flag', 1)
        } else {
            this.next();
        }

        // 其他的异步操作
    })
    .step('c', fn)
    .run();

next()

描述:显式调用下一个step,并将数据传给下一step(即下一个step的动态参数)。其实等同于done(null, *args)。

调用:next([*args])

参数:

  • {Mix} *args 可选参数 类型不限,数量不限。

例子:

Stepify()
    .step(function() {
        // do some stuff ...
        this.next('foo', 'bar');
    })
    .step(function(a, b, c) {
        a.should.equal('test');
        b.should.equal('foo');
        c.should.equal('bar');
    }, 'test')
    .run();

end()

描述:终止当前task的执行。如果遇到异常并传递给end,则直接交给errorHandle,和done一样。不传或者传null则跳出所在task执行下一个task,没有则走到result,没有定义result则退出进程。

调用:end(err)

参数:

  • {Error|null} err 可选参数, 默认null。

例子:

Stepify()
    .step(fn)
    .step(function() {
        if(Math.random() > 0.5) {
            this.end();
        } else {
            // todo ...
        }
    })
    .step(fn)
    .run();

最后,欢迎fork或者提交bug

License

MIT http://rem.mit-license.org

npm loves you