diff --git a/README.md b/README.md index 338f10cb2..9bf415336 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,8 @@ the `connect` event. Typically a `net.Socket`. offline * `reconnectPeriod`: `1000` milliseconds, interval between two reconnections + * `connectTimeout`: `30 * 1000` milliseconds, time to wait before a + CONNACK is received * `incomingStore`: a [Store](#store) for the incoming packets * `outgoingStore`: a [Store](#store) for the outgoing packets * `will`: a message that will sent by the broker automatically when @@ -209,6 +211,12 @@ version 1.3 and 1.4 works fine without those. Emitted on successful (re)connection (i.e. connack rc=0). +#### Event `'reconnect'` + +`function() {}` + +Emitted when a reconnect starts. + #### Event `'close'` `function() {}` diff --git a/lib/client.js b/lib/client.js index 26ba76afc..8659d0504 100644 --- a/lib/client.js +++ b/lib/client.js @@ -18,6 +18,7 @@ var events = require('events'), protocolId: 'MQTT', protocolVersion: 4, reconnectPeriod: 1000, + connectTimeout: 30 * 1000, clean: true }; @@ -97,6 +98,8 @@ function MqttClient (streamBuilder, options) { this.queue = []; // Are we intentionally disconnecting? this.disconnecting = false; + // connack timer + this.connackTimer = null; // Reconnect timer this.reconnectTimer = null; // MessageIDs starting with 1 @@ -220,8 +223,8 @@ MqttClient.prototype._setupStream = function () { // Echo connection errors parser.on('error', this.emit.bind(this, 'error')); - outStore = this.outgoingStore - .createStream(); + outStore = this.outgoingStore.createStream(); + // Control of stored messages outStore.once('readable', function () { function storeDeliver () { @@ -245,6 +248,9 @@ MqttClient.prototype._setupStream = function () { }) .on('error', this.emit.bind(this, 'error')); + this.connackTimer = setTimeout(function () { + that._cleanUp(true); + }, this.options.connectTimeout); }; MqttClient.prototype._handlePacket = function (packet, done) { @@ -650,6 +656,8 @@ MqttClient.prototype._handleConnack = function (packet) { 'Not authorized' ]; + clearTimeout(this.connackTimer); + if (0 === rc) { this.emit('connect'); } else if (0 < rc) { diff --git a/package.json b/package.json index 02b55b255..7cf38f52d 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "mqtt", "description": "A library for the MQTT protocol", - "version": "1.1.5", + "version": "1.2.0", "contributors": [ "Adam Rudd ", "Matteo Collina (https://github.com/mcollina)" diff --git a/test/client.js b/test/client.js index ac64b3df7..29f19477f 100644 --- a/test/client.js +++ b/test/client.js @@ -13,6 +13,8 @@ var mqtt = require('..'), // works in node v0.8 process.nextTick(callback); }, + net = require('net'), + eos = require('end-of-stream'), port = 9876, server; @@ -168,5 +170,34 @@ describe('MqttClient', function () { }); }); }); + + it('should reconnect if a connack is not received in an interval', function (done) { + this.timeout(2000); + + var server2 = net.createServer().listen(port + 43); + + server2.on('connection', function (c) { + eos(c, function () { + server2.close(); + }); + }); + + server2.on('listening', function () { + + var client = mqtt.connect({ servers: [ + { port: port + 43, host: 'localhost' }, + { port: port, host: 'localhost' } + ], connectTimeout: 500 }); + + server.once('client', function (serverClient) { + serverClient.disconnect(); + done(); + }); + + client.once('connect', function () { + client.stream.destroy(); + }); + }); + }); }); });