ack
Stability: 1 - Experimental
Ack is a tracker mechanism inspired by XOR tracking in Storm that guarantees message processing. It can track multitudes of messages/events in ack chains and report whether or not all of them have been processed.
Installation
npm install ack
Tests
npm test
Overview
Ack is a tracker mechanism inspired by XOR tracking in Storm that guarantees message processing. It can track multitudes of messages/events and report whether or not all of them have been processed.
How it works
Ack uses the XOR operation for all the magic. Here is a quick overview of the relevant aspects of XOR. The XOR (^
) operation has the following properties:
A ^ A = 0
and
A ^ B ^ C = B ^ A ^ C
, A ^ D ^ B ^ A ^ B ^ D = 0
.
Ack chain
Let's say that you want to do a letter count, and you have a database containing text files that contain words.
database -> files -> words
Our approach will be event driven, so we will iterate through the database
and emit a file
event for each file we encounter.
+-> 'file'
/
database ---> 'file'
\
+-> 'file'
Another part of our computation will accept those file
events and emit word
events for each word encountered.
+-> 'word'
/
... file ---> 'word'
\
+-> 'word'
How do we track that a particular file has been fully processed when assuming that at each point the processing of any one of the words in that file could fail?
For every file
, you can create a unique tag
and a random xorStamp
. We will then initialize an ack chain. In the below example, I will use a simple bit string in place of xorStamp
for illustration purposes. In real use, you want to use a random Buffer, perhaps generated via:
var xorStamp = crypto;
In our example, we generate tag
and xorStamp
.
WARNING: Pseudocode below.
var Ack = ;var ack = ; var tag = "unique-file-tag";var fileStamp = '00101001'; // this is just for illustration (use a random Buffer) ack;
Next, each file is broken up into words. Here comes the tricky part. We are going to do a lot of things at once.
First, we already registered the start of file processing via ack.add(...)
, now, we will acknowledge finishing the processing of that file. To acknowledge, we will send the fileStamp
again (remember A ^ A = 0
).
Second, at the same time, we will acknowledge starting the processing of each word. Let's say we have word1
, word2
, word3
. We will generate a stamp for each word, so word1Stamp
, word2Stamp
and word3Stamp
. To acknowledge the starting of the processing we will send those word stamps to the acker.
Now, remember that A ^ A ^ B ^ C = 0 ^ B ^ C = B ^ C
. More precisely:
var fileStamp = '00101001'; // we mark completing filevar word1Stamp = '00100101'; // we mark start of computing word1var word2Stamp = '10101001'; // we mark start of computing word2var word3Stamp = '11101001'; // we mark start of computing word3 var xorOfAll = '01001100'; // fileStamp XOR word1Stamp XOR word2Stamp XOR word3Stamp ack; // we stamp with just one stamp for all ops above
At this point, what happened inside of Ack is the XOR of previous state with the newly stamped one.
var previousStamp = '00101001'; // original fileStampvar inboundStamp = '01001100'; // xorOfAll from above var currentState = '01100101'; // previousStamp XOR inboundStamp
So, we've managed to acknowledge multiple operations all at once, and we are still storing only the currentState
.
Next, notice what happens as we successfully process each word.
var currentState = '01100101'; // currentState from above var word1Stamp = '00100101'; // we mark finishing of word1ack; currentState = '01000000'; // currentState XOR word1Stamp var word2Stamp = '10101001'; // we mark finishing of word2ack; currentState = '11101001'; // currentState XOR word2Stamp var word3Stamp = '11101001'; // we mark finishing of word3ack; currentState = '00000000'; // currentState XOR word3Stamp// emit 'acked' event, all words for file have been processed!
That's it. The XOR math works out really well for tracking these types of computation where one event generates multiple child events. This can keep going further down the chain as long as we acknowledge completing our parent processing together with initiation of any child processing. Despite all that activity, the amount of information we store is always one state per entire ack chain.
The above example used binary looking strings for illustrative purposes. The real implementation uses Buffers. Additionally, the stamps need to be sufficiently large and random to prevent erronous acked
events. Storm implementation found a 64bit random integer to be sufficient in practice.
Documentation
Ack
Public API
- new Ack(options)
- ack.add(tag, xorStamp)
- ack.fail(tag)
- ack.stamp(tag, xorStamp)
- Event 'acked'
- Event 'failed'
Ack.eqv(first, second)
CAUTION: reserved for internal use
first
: Buffer First buffer to compare.second
: Buffer Second buffer to compare.- Return: Boolean
true
if equal,false
otherwise.
Ack.xor(first, second, [zeroCallback])
CAUTION: reserved for internal use
first
: Buffer First buffer to compare.second
: Buffer Second buffer to compare.zeroCallback
: Function (Default: undefined) Optional callback to call if the result of XOR is 0.- Return: Buffer The result of
first
XORsecond
The lengths of the buffers must be equal.
new Ack()
Creates a new Ack instance.
ack.add(tag, xorStamp)
tag
: String A unique identifier to track this ack chain.xorStamp
: Buffer Initial stamp to start the ack chain fortag
.
ack.fail(tag)
tag
: String A unique identifier of a previously addedtag
.
Removes the tag
and associated xorStamp
from the acker and emits the failed
event for the tag
.
ack.stamp(tag, xorStamp)
tag
: String A unique identifier to track this ack chain.xorStamp
: Buffer Initial stamp to start the ack chain fortag
.
acked
Event tag
: String A unique identifier of a previously addedtag
.
Emitted when the ack chain for a previously added tag
succeeds. Success is defined as the cumulative XOR operation of initial add()
xorStamp
and any following stamp()
xorStamp
s that results in xorStamp
being all 0s.
Success removes the tag
and associated xorStamp
from the acker and emits the acked
event for the tag
.
failed
Event tag
: String A unique identifier of a previously addedtag
.
Emitted when the ack chain for a previously added tag
fails.
Sources
The implementation has been sourced from:
- Flip Kromer's presentation at Austin Hadoop Users Group (AHUG) (presentation not yet published)
- Guaranteeing message processing