ice-stream

An expressive streaming utility for node, inspired by Lo-Dash, Async, and event-stream. Ice-stream goal is to allow for complex processing of streaming data.

npm install ice-stream
2 downloads in the last week
8 downloads in the last month

Ice-Stream

An expressive streaming utility for node, inspired by Lo-Dash, Async, and event-stream.

Ice-Stream aims to make stream processing as easy as the ubiquitous mentioned above make processing arrays and objects.

About Streams

Stream processing is basically pumping data through a number of operations, piece by piece. Using streams is especially useful when:

  • There is more data than available memory
  • The data source is slow, e.g. over a network, user input
  • Some of the data can be processed without all of the data

In some cases it is useful to think about and operate on a stream as a continual flow of data, and sometimes it is better to think about it as a segmented, chunk by chunk flow. Ice-Stream's methods do both, depending on the operation.

Examples

First, to include Ice-Stream

var is = require('ice-stream');

Using the static methods results in a typical Node Stream

// Stream from a file, through a lowercaser, to an output file
is.toLower( fs.createReadStream('file.txt') ).pipe( fs.createWriteStream('file-low.txt') );

Passing a Stream to the constructor generates a wrapped stream, which can be chained

// Parse out unique keywords from the file and output them to stdout
is( fs.createReadStream('file.txt') ).split(' ').toLower().unique().without('ruby', 'python').join('\n').out();

Constructor(mixed)

The Ice-Stream variable can be used as a namespace to access the stream methods, as a function to wrap basic Node streams, and as a constructor to create streams for data.

Examples

// Wrap a basic Stream
var wstream1 = is( fs.createReadStream('file.txt') );

// Create a text stream
var wstream2 = is('stream this data');

// The above result is wrapped so we can immediately chain it
wstream2.split(' ').join('\n').out();

// Create a stream from an array
is(['stream', 'this', 'data']).join('\n').out();

// Create a non-text stream from an array
is([1, 4, 6, 2, 91]).map(function(num) {
  return num*2;
}).join('\n').out();

Methods


exec(cmd)

Spawn an external process process. Input is passed to stdin of the new process, and output comes from stdout. Any data that is received from stderr is emitted as an error.

Arguments

  • cmd - The command to run

split([separator])

Chunks the data based on the delimiter. Concatenates and buffers the input until the delimiter is found, at which point the buffered data is emitted. The delimiters are removed and not emitted. When the input stream closes, the final data chunk is emitted. Note that this method converts input to strings.

Arguments

  • separator - String specifying where to split the input stream. Defaults to \n.

join([separator])

Injects data in between chunks of the input stream. Note that a split() followed by a join() will produce the same overall stream, but the chunking will be different.

Arguments

  • separator - The extra data to emit between chunks

toLower()

Converts the input to lower case.


toUpper()

Converts the input to upper case.


map(iterator)

Maps each stream chunk using a synchronous callback function.

Arguments

  • iterator(chunk) - A synchronous function which returns the new chunk.

mapAsync(iterator)

Maps the stream chunks using an async callback. Note that iterator will be called in parallel as chunks are received, and the output order is determined by when the callbacks finish, not the input order.

Arguments

  • iterator(chunk, callback) - The user-defined function which performs the mapping. The first callback parameter is an optional error, with the second parameter being the mapped value.

mapAsyncSeries(iterator)

Same as above, except the chunks are guaranteed to remain in order when emitted. Note that the iterator will still be called in parallel as chunks are received, but the results are buffered to ensure proper emission order.

Arguments

  • iterator(chunk, callback) - Same as above.

filter(iterator)

Sends each chunk to a user-defined iterator which determines whether or not to send the chunk on.

Arguments

  • iterator(chunk) - A synchronous function which returns true to keep the chunk

filterAsync(iterator)

Send each chunk to a user-defined asynchronous function. Note that iterator will be called in parallel as chunks are received, and the output order is determined by when the callbacks finish, not the input order.

Arguments

  • iterator(chunk, callback) - The user-defined function which performs the filtering. The first callback parameter is a boolean. There is no err callback parameter.

filterAsyncSeries(callback)

Same as above, but the chunks are guaranteed to remain in order

Arguments

  • iterator(chunk, callback) - Same as above.

unique()

Stores a hash of processed chunks, and discards already seen chunks. This works with string or object streams, but objects will be hashed based on their toString() result.


without(chunk1[, chunk2, chunk3...])

Discard the specified chunks using strict equality. This works on string or object streams.


out()

Simply pipes the stream to stdout.


npm loves you