@openmessage/qstream

0.1.0 • Public • Published

QStream

Topic-Based Messaging Queue on top of Redis Streams

Example

Producer

const streams = require('@openmessage/qstream')();

streams.publish('your-topic', data);

see more at examples

Consumer

const streams = require('@openmessage/qstream')();

const group = await streams.group('your-topic', 'group/queue name');

group.consume(async (data) => {
    console.log({ data });
    return true;
});

see more at examples

Usage

Connection

const QStream = require('@openmessage/qstream');
const qstream = QStream(redisUrl);

redisUrl: Valid Redis URL format

Publish/Produce/Emit

qstream.publish('your-topic', data);

data: can be any valid javascript object, primitive values not supported

With extra args, like maxLen, that will cap the stream to the specified length:

qstream.publish('your-topic', data, 10);

Or approximated maxLen:

qstream.publish('your-topic', data, '~10');

By default streams will be capped to aprox 10000 (MAXLEN ~ 10000). If you don't want your stream to be capped, you have to explicitly set the last arg of publish to null.

Consumer Group

const group = await streams.group('your-topic', 'consumer-group/queue-name');

Consumers in the same consumer group will load balance jobs among them

Subscrie/Consume/Listen

group.consume(async (data) => {
    console.log({ data });
    return true;
});

The function passed to the consume method can be a promise

group.consume(console.log, 10);

as a second parameter to the consume function it receives the number of concurrent jobs, defaults to 1

Debug

This lib uses debug to debug the processing

DEBUG=qstream:* npm start

Roadmap

  • [x] Add proper logging debug?
  • [ ] Add linting
  • [ ] Add Tests
  • [ ] Add CI / CD
  • [ ] Handle unacked messages (CLAIM, PENDING)
  • [ ] Add pub/sub case (fanout)
  • [ ] Add timeline case
  • [ ] Add documentation for history rebuild
  • [ ] Improve docs

Package Sidebar

Install

npm i @openmessage/qstream

Weekly Downloads

17

Version

0.1.0

License

MIT

Unpacked Size

12.7 kB

Total Files

12

Last publish

Collaborators

  • 5h1ru
  • agares
  • jakelaz
  • kuryaki