batch-writer
This module provides a write stream which uses the maximum number of parallel write operations and the selected batch size to keep the stream moving while preventing node from firing too many writes at a time. When the maximum number of operations have been reached the stream is paused until one of the operations complete. This write stream implementation can optionally be configured in object mode.
Instantiation
batchSize parallelOperations writeOperation streamOptions
-
batchSize
: Maximum size of a batch of chunks, or objects, before starting a new write operation. -
parallelOperations
: Maximum number of write operations allowed to run in parallel before pausing the stream. (i.e.- number of database connections) -
writeOperation
: The write operation for each batch of data. This must be a function with one parameterdata
wrapped in a Promise to signal when the write operation has completed or failed. Errors should be passed through aPromise.reject(err)
which will then be emitted to the stream. On successresolve()
should be called to allow another operation to begin if available. Any data returned in theresolve
will be ignored so no more processing can be done after the write operation for that batch of data.async // The array of data is available in the one (only) parameterasync {// Perform write operation within a promise, resolve() on success and reject(err) to emit the error in stream}; -
streamOptions
(optional): These are the stream options used to configure the base write stream implementation. By default, this will be set to{}
. A common use of these options would be to set the stream to object mode.
Example Implementation
// Batch 1000 data objects per insertconst BATCH_SIZE = 1000;// Limit insert operations to 5 at a timeconst PARALLEL_OPS = 5;// Turn on object mode (optional configuration)const streamOptions = objectMode: true; // Create enough connections for the maximum number of simultaneous write operationsconst pool = mysql; // Set up a write operation with one parameter for the array of dataconst writeOp = async async { const insertQuery = /* Format data into insert statement */; pool; }; const dbWriter = BATCH_SIZE PARALLEL_OPS writeOp streamOptions; const readStream = /* Create read stream to obtain data to insert */; // Pipe the read stream through the write stream to batch up the inserts// into parallel operationsreadStream;