node-queue-pool

manage concurrent actions/jobs, manage limits resource

npm install node-queue-pool
4 downloads in the last week
10 downloads in the last month

由来

应该说 javascript 的核心就是异步处理,每一个执行的动作只要是涉及到I/O的,几乎都是异步处理。因此当我们做任何异步操作时,如通过网络发送各类请求,都需要一些通用的异步处理机制。包括限制并发数,限制速率,超时检测和处理,操作日志和统计,结果收集等等。鉴于这些需求具有通用性,因此我设计 super-queue 来一揽子的支持这些需求。super-queue 对异步的 javascript 具有基础性,应该成为其他各种各样的 javascript 库和模块的基础模块。

Comparison to other Modules

list

unique features

  • priority control, urgent action call
  • suspend / resume control
  • timeout control
  • sync execution that return will do callback(null,rval), exception will do callback(err), and not count for statistics
  • statistics
  • versatile control methods, by mere concurrency or by mere execute rate, by active or passive resource available
  • can pass a container to mount result to the result tree
  • can pass fn only, if want pass data, use wrapper is ok, simple uniform API
  • do not known which data is passed to Queue, more secure
  • can pass desc for logging for every action
  • serializable and can save to and recover from disk
  • know when all is done, or when maxError is reached, and can just quit ahead when the goal is hit(used to support all async collection APIs)
  • by batch. when state reach queue, do pause, when drain, collect them to do a batch job

名言

one async call can use classic callback to drive to next code, all cases there is more than one async call concurrently will utilize ActionQueue. so async code have only two pattens. use ActionQueue to synthesize concurrent async calls to a classic callback based async function

不支持的特性

  • waterfall/step 模式不支持,因为只要用简单的chain回调即可,都是受控串行执行,和 queue 完全没有关系
  • 如果依赖2个以上的异步结果,就不能用 chain 模式了,应该使用 queue(async.map,async.cache) 来获得结束事件,并得到结果数组或结果对象 可以说,只要有并行存在都可以使用 queue.push.end, async.map, async.cache 来获得结束事件和结果参数

  • 如果不支持 queue, 而是进入队列都执行,那么 queue 的意义就在于触发 close(err,result) 事件, 这样一个 queue wrapper 就成为了一个标准的 NodeJS 异步函数, 其最后一个参数为结束回调 cb(err,result)

  • q.result 可以自行定义,每步活动都可以进行修改利用。但是对 ActionQueue 完全透明,ActionQueue 不会自动处理和记录 q.result,因为它只负责任务管理本身,而不触及任务的结果。这和 ActionQueue 不问入参,不问结果的哲理相符。

  • cache 入参和结果的关系完全在 cache 模块完成

    return cache(key) || function() { do real sync or async work, and finally return/throw/callback }

    and since ActionQueue do not know act parameters, so it can not support cache for lack of key

应用场景

  • 利用并发限制特性,结合 superagent,jsdom 进行网站扫描,压力测试、数据抓取、动态网站转换、移动办公、模拟点击等等
  • 利用关闭事件触发机制,支持 async, 并发异步转换为标准 callback based 异步函数的机制

复杂的特性 ——------

  • 可以同时支持并发控制和速率控制,先到先控制
  • 可以支持自由、主动资源、被动资源的任选一个,即某个请求必须先得到空闲资源后才能执行,资源可以是 slot 号,或者是具体的资源。

// a simple wrapper that can enqueue different parameters function wrapper1(p1,p2,p3,...) { pq.enqueue(1,'desc',(function(rc,rcb) { // some content variables // for p1,p2,p3 // do something async rcb(returnValue); })); }

// a more complex wrapper that can cache anync operation result and reuse it function wrapper2(p1,p2,p3,...) { var cacheValue = cache[[p1,p2,p3]]; if(cacheValue&&cacheValue.isFresh()) return cacheValue; pq.enqueue(1,'desc',(function(rc,rcb) { // some content variables // for p1,p2,p3 // do something async // use result as a cache cache[[p1,p2,p3]] = new cacheItem(returnValue); rcb(returnValue); })); }

wrapper1(1,2,3); wrapper2(4,5,6);

fs.stat('file1'); fs.stat('file2'); fs.stat('file3'); fs.stat('file4');

pq.enqueue('file1',fs.stat,1,desc); pq.enqueue('file2',fs.stat,1,desc); pq.enqueue('file3',fs.stat,1,desc); AQ.enqueue('file4',fs.stat,1,desc); 可以控制 fs.stat 的并发度 pq.results 为每次执行的结果,保存为数组。 还可以同时保存为 cache.

和 async 比较

ActionQueue .setMaxErrors() .setTimeout() .on('end',function(errors,results) { ... });

or more simple

new ActionQueue()

.forEach(fs.write) // 只确保完成,不报错即可
.filter(fs.exists) // 记录返回 true 的入参
.reject(fs.exists) // 记录返回假的入参
.map(fs.stat) // 在确保不报错完成的基础上,记录结果
.concat(fs.readdir) // 将结果数组合并
.sortBy() // 按照结果的顺序排序入参
.cache(fs.stat) // 在确保不报错完成的基础上,记录参数到结果的 mapping 关系
.detect(fs.exists) // 得到第一个存在的文件
.some(fs.exists) // 如果存在则返回 true, 其实和 detect 一样
.every(fs.exists) // 如果都返回 true, 如果一项为假,立即返回 false;否则最终返回 true

.enqueue(files)
.onFin(function(errors,results) {...})
.enqueue(data,{name:result key, priority:, desc:...});
.end(); // 标明不再有新的任务进入,否则相关 resultHandler 不能认为完成
;

data 只能有一个,是一个 json object

think list

  1. 基于 slot,真的重要吗;直接一个 busyList,一个 freeList 不行吗;这样释放 busyList 的资源时,需要找准,只能通过扫描
  2. 不对资源实例做任何读写,完全透明
  3. 可以控制 free slot 按照什么顺序,是尽量复用少数资源,还是尽量分摊资源
  4. 资源数量是固定的,如 oracle 反向连接数,如扫描网页的数量等等
  5. 资源可以是虚的,也可仅仅简单的控制并发数
  6. 设计设定激发资源占满和快要占满的事件,用于动态扩展资源
  7. 设计设定激发资源占用率低的事件,用于动态删除资源
  8. 可以设定资源超时时长,如果长时间不释放资源,升起事件,参数为资源
  9. 资源可以是一个 action 行动
  10. 性能优化,可以频繁的添加删除资源,频繁的占用释放资源,性能不受影响

如果异步动作间没有关系,那么他们就一起执行好了,最多收到并发控制。 如果异步动作间存在关系,那么: 1、act1 在执行 执行完后,调用后续步骤的 act2(err,result) act3(err,result) 好了,这个属于上一步知道下一步是什么 对于 module-step 来讲,下一步就是动作数组的下一项,上一步无需知道下一步的名字,下一步不也无需名字 对于 module-fnqueue 来讲,下一步就是形参中依赖本动作的其他动作,这要求每步都要有名字用于说明依赖关系 module-fnqueue 支持并发等更为复杂的模型,入参技能接受被依赖过程的结果,又能通过 this 来产生共享的结果, 当然也可以通过外部变量产生共享结果。 module-fnqueue 可以描述复杂的依赖关系,更为通用

if (检查文件是否存在).then(function(){ if 存在

else 不存在 动态重建一个 }

如果每个异步过程就是一个已有的标准异步过程,如 afunc(p1,p2,..,cb(err,result))

那么 ActionQueue 就可以 push 它,aq.push(afunc,p1,p2,p3, rhandle); 这样就会调用 afunc(p1,p2,p3,cb(err,result)) 并将 result 带入 rhandle(input,result,idx) 如果遇到 cb(err) 则不调用 rhandle 因为其实每步 act 其中其实就是一个已有过程,可能在 nodeJS 中已经定义号,现在只要通过 rhandle 接受结果继续执行即可。 同时可以支持 cache 了。

Design Features

  • 并发基于 slots
  • 每个 slot 记录正在执行操作的开始执行时间,因此可以为每个正在执行的动作产生超时
  • 每个 slot 也记录正在执行操作的简洁,因此可以在超时时输出带有简介的日志
  • 有了超时后,可以防止哪些执行出去就没有反应的动作一直占用一个并发数,最终可能导致完全停止

  • 队列优先级别机制

  • 任务对活动队列完全透明,不读写任何任务相关数据

  • 活动也完全不了解活动队列

API documentation

var ActionQueue = new require('node-queue-pool').ActionQueue , aq = new ActionQueue(maxConcurrency, timeout in milliseconds, maxPriority) ;

create a action queue

add a action to queue

aq.queue(act)

When act is about to execute, pass fin callback as the only one parameter, act call fin() when the action is finished

if call fin with error object, ActionQueue will count the execution as failure.

According to the logging setting, ActionQueue will log the error.

comparison between direct execution and throttled queued execution

function someThingToDo(p1,p2,p3,...) {
    ...
}

direct execution

someThingToDo();

throttled exection

change someThingToDo to

function someThingToDo(p1,p2,p3,...) {
    aq.queue(function actual(fin) {
        ...
        fin();
    }
}

wrap all code in someThingToDo with a closure, and pass it in to aq.queue al all work to do!

This way, all former code calling someThingToDo is not forced to change, but someThingToDo is upgraded to throttled version.

future plan

  • allow pause/resume, this can void problems with restart
  • allow dynamic tune concurrency, if it find
  • trace state(empty,normal,throttled,overwhelm)
  • emit event(drain,active,relieved,throttle,overload)
  • logging start,finish,failure of actions
  • efficient schedule a action to execute at the specified time.

congestion resolve

如果大量请求要求执行,而并发数不够或者资源池中没有可用的资源,那么请求就要排队。 一个任务要是排队时间太长,可以允许它取消执行,

资源等待超时

方法就是在全局设定排队超时门限,或是在每个等待任务上设置排队超时门限, 然后其后台 setTimeout 设定按照最近排队超时时间定时,查看排队有无执行完, 若没执行完,则执行活动,并带入 error 参数。 若执行完,则要查找最近需完成的任务的时间,继续 setTimeout 重复上述步骤, 查找算法为全部队列扫描法(如果允许为每个任务分别设定不同的超时要求的化) 如果要为每个任务单独设定超时,那么就要按照每个任务单独设置 timer.

任务执行超时

任务执行时间过长后,触发超时事件,这时可以尽快释放资源。

RC 需要有一个 wrapper

这样,就可以设置 busy rc 属性,包括

  • startTime
  • timeoutTime
  • event:timeout
  • rc.release() 用于释放资源,还原到资源池中
  • rc.setTimeout(2000, clearCB(rc));

或者不设 handle ,而是设置一个 slot 同步的 handle

pool.priority(3).timeout(3).desc('a action').require(then(rc,fin){});

pool.require( { priority:2, timeout:3000, desc:'xxx'}, then(rc,fin));

pool.require(priority,then(rc,fin) { ... fin(); }

pool.setExecTimeout(2000,function(slot) {

});

pool&queue interface

pq.enqueue(options, then(rc,fin))

  1. ActionQueue rc = slot id
  2. any pool rc = rc

then(fin) then(rc,fin) then(fin,key,idx)

如果总是有上一步驱动下一步那就简单了,在上一步的结束部分调用下一步好了

pq.usePriorityQueue

pq.setExecTimeout

设置统一的超时执行时间,不设则默认不超时,可能造成资源不能回收,影响并发度

pq.setWaitTimeout

设置统一的等待时间,不设则默认不超时,可能造成持有非常长的队列,其中存在过期执行无效的内容

pq.suspend()

暂停队列执行,已执行的都继续执行,排队等待的任务不再调度。全部已执行都完毕后触发 suspended 事件。

pq.resume()

继续运行,将登录队列中的任务调度执行。

support case

纯粹控制并发度

比如说对于 web spider 软件来说,比如说 noradle 平台下企号通网站的压力测试,有比如说沃阅读模拟阅读支持考核的软件。

控制资源池

如各种客户端

控制被动资源池

如 Noradle 的 oracle 反向连接

控制速率

这个不是控制并发度,而是控制单位时间内发送的请求量 如 20/s

自适应控制速率

如错误太多或超时太多自动降低速率

counter threshold

针对某个

npm loves you