nsqueue
Simple, well-tested node client for nsqd. nslookupd smarts not included (yet).
install
$ npm install nsqueue
quick start
- Download nsq and run it
- If you're using mac run
make download-osx
- If you're using linux run
make download-linux
- If you're using mac run
- After download you can test it by running
make test
example
var nsqueue = var client = host: 'localhost' port: 4150 //connect to the nsqd server process over TCPclient //publish buffersclient //publish strings (utf8 assumed)client //publish jsonclient //publish multiple messages in a batchvar messages = 'Hi again, Dad!' say: 'hi' to: 'dad' client client client client client
api
new Client(options)
options: object options
should be an object with port
& host
properties. No default value is assumed if they are not supplied.
client.connect([callback])
Connects the client to the host
& port
pair supplied in the constructor.
optional callback(error: Error) callback called with error event if a connection error was encountered. If no callback is supplied and an error is encountered during connection, the client will emit the error as an error
event.
client.publish(topic, data, [callback])
topic: string must be a string of valid nsqd topic characters
data: buffer, string, or object the payload of the message
- if string, assumed to be utf8 encoded
- if object, passed to
JSON.stringify
before publishing
optional callback(error: Error) called when the message has been received by the nsqd server. Passed an Error
object if there was a problem publishing the message.
If no callback is supplied any publish error will be emitted as an error
event.
client.publishAll(topic, datums, [callback])
The same as client.publish
but takes an array of data payloads as the second argument and publishes them as a batch. Batch publishing is part of the nsqd protocol and is generally more efficient when publishing many messages at once, though error handling is harder because you only know m of n
messages failed in the event of an error, not which ones. Each of the items in the datam
array will be considered its own message by nsqd.
topic: string must be a string of valid nsqd topic characters
datam: Array of buffers, strings, and/or objects the payloads of the messages
optional callback(error: Error) called when the messages have all been received by the nsqd server. Passed an Error
object if there was a problem publishing any of the messages.
If no callback is supplied any publish error will be emitted as an error
event.
client.subscribe(topic, channel, [callback])
topic: string the topic this client should subscribe to
channel: string the channel this client should subscribe to
optional callback(error: Error) called when the client has successfully subscribed to the topic/channel pair. Passed an Error
object i there was a problem subscribing to the channel/topic pair.
If no callback is supplied any error during subscribing will be emitted as an error
event.
client.on('message', callback(message))
Adds an event listener which is called every time a message is received on this client. Messages will only be received on the channel/topic pair the client is subscribed to. Note: the client does no internal buffering of incomming messages. Once the client is subscribed to a topic/channel pair events will start 'flowing' in immediately after the subscribe callback is called. You can add a message
event listener before even calling subscribe. If no callback is supplied, any er
If no callback is supplied any publish error will be emitted as an error
event.
client.end([callback])
Disconnects the client.
optional callback(error: Error) called when the client has disconnected cleanly from the server nsqd process and the socket is closed.
client.concurrency: int
Default value: 1
The maximum number of in-flight messages the nsqd server will deliver to this client at one time. Setting this to 5
for example will allow 5 in-flight messages sent to this client. As each message is finished or requeued the server will send more messages down to the client until it again has 5 in-flight at a time. This value can be changed at any time and will take affect as soon as the next in-flight message is either requeued or finished.
message
Message objects are not created by you directly. They are emitted from clients with an active subscription on a topic/channel pair through the message
event.
message.data: Buffer
The raw binary data of the message. Call message.data.toString('utf8')
for a string representation.
note: because the nsqd protocol does not allow empty messages, this will never be null.
message.json(): Object
A helper which calls JSON.parse(message.data.toString('utf8'))
and returns the results because it is so common to send & receive JSON messages.
note: calling this on a message which has non-valid JSON contents will throw a json parsing exception.
message.finish() : bool
Call this when you're done processing the message. Tells the nsqd server process you have successfully finished processing this message. The server will remove this message from the queue and not send it out to any more clients.
Returns true
if the response was sent to the nsqd server. If the message has already been responded to -- it is no longer .inFlight == true
-- then this is a no-op and returns false
.
If there is an error finishing this message, the client will emit an error
event.
note: currently the binary protocol does not communicate anything back in the event of a successful FIN
message. There's no way to have a callback for message.finish()
at this time -- it's fire and forget.
message.requeue(timeoutInMilliseconds: int)
timeoutInMilliseconds: int millisecond timeout the nsqd server will wait before attempting to deliver the message again.
Signals the nsqd server to requeue the message and deliver it again. You usually call this if the message consumer has failed to process the message appropriately.
Returns true
if the response was sent to the nsqd server. If the message has already been responded to -- it is no longer .inFlight == true
-- then this is a no-op and returns false
.
If there is a problem requeuing this message, the client will emit an error
event.
note: currently the binary protocol does not communicate anything back in the event of a successful REQ
message. There's no way to have a callback for message.requeue(1000)
at this time -- it's fire and forget.
message.touch()
Signal the nsqd server you want more time to process this message.
note: currently the binary protocol does not communicate anything back in the event of a successful TOUCH
message. There's no way to have a callback for message.touch()
at this time -- it's fire and forget.
message.inFlight: bool
Initially set to true
. This will be set to false
after calling message.finish()
or message.requeu()
.
note: it is currently considered an error for a client to respond to a message more than once, so calling message.finish()
or message.requeue()
more than once on a message will only send the FIN
or REQ
packet to the nsqd server once for each message. If you absolutely must send FIN
or REQ
twice (which will usually cause the nsqueue client to emit an error) then you have to manually toggle message.inFlight = true
before calling message.finish()
or message.requeue()
again.
client
testing
Most of the test are functional style tests. They assume a running instance of nsqd on localhost:4150
. Once you have nsqd reachable at localhost:4150
run the tests by typing mocha
at the project root.
contributions
I love contributions! Fork & send pull requests please! After a few pull requests I can add you as a contributor with push & pull acess if you're interested. If you find any problems or want to undertake more advanced/crazy refactorings please feel free to open an issue and we can discuss.
LICENSE
Copyright (c) 2014 Brian Carlson (brian.m.carlson@gmail.com)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.