1144 lines
48 KiB
JavaScript
1144 lines
48 KiB
JavaScript
'use strict';
|
||
|
||
const mqtt = require('mqtt-connection');
|
||
const state2string = require(__dirname + '/common').state2string;
|
||
const convertTopic2id = require(__dirname + '/common').convertTopic2id;
|
||
const convertID2topic = require(__dirname + '/common').convertID2topic;
|
||
const messageboxRegex = new RegExp('\\.messagebox$');
|
||
|
||
// todo delete from persistentSessions the sessions and messages after some time
|
||
|
||
function MQTTServer(adapter, states) {
|
||
if (!(this instanceof MQTTServer)) return new MQTTServer(adapter, states);
|
||
|
||
const namespaceRegEx = new RegExp('^' + adapter.namespace.replace('.', '\\.') + '\\.');
|
||
|
||
let net;
|
||
let http;
|
||
let ws;
|
||
let wsStream;
|
||
let server = null;
|
||
let serverWs = null;
|
||
let serverForWs = null;
|
||
let clients = {};
|
||
let topic2id = {};
|
||
let id2topic = {};
|
||
let messageId = 1;
|
||
let persistentSessions = {};
|
||
let resending = false;
|
||
let resendTimer = null;
|
||
|
||
adapter.config.sendOnStartInterval = parseInt(adapter.config.sendOnStartInterval, 10) || 2000;
|
||
adapter.config.sendInterval = parseInt(adapter.config.sendInterval, 10) || 0;
|
||
|
||
this.destroy = (cb) => {
|
||
if (resendTimer) {
|
||
clearInterval(resendTimer);
|
||
resendTimer = null;
|
||
}
|
||
persistentSessions = {};
|
||
let tasks = 0;
|
||
let timeout;
|
||
if (cb) {
|
||
timeout = setTimeout(() => {
|
||
timeout = null;
|
||
if (cb) {
|
||
cb();
|
||
cb = null;
|
||
}
|
||
}, 2000);
|
||
}
|
||
if (server) {
|
||
tasks++;
|
||
// to release all resources
|
||
server.close(() => {
|
||
console.log('all gone!');
|
||
if (!--tasks && cb) {
|
||
clearTimeout(timeout);
|
||
cb();
|
||
cb = null;
|
||
}
|
||
});
|
||
server = null;
|
||
}
|
||
|
||
if (serverForWs) {
|
||
tasks++;
|
||
// to release all resources
|
||
serverForWs.close(() => {
|
||
console.log('all ws gone!');
|
||
if (!--tasks && cb) {
|
||
clearTimeout(timeout);
|
||
cb();
|
||
cb = null;
|
||
}
|
||
});
|
||
serverForWs = null;
|
||
}
|
||
if (!tasks && cb) {
|
||
clearTimeout(timeout);
|
||
cb();
|
||
cb = null;
|
||
}
|
||
};
|
||
|
||
this.onStateChange = (id, state) => {
|
||
adapter.log.debug('onStateChange ' + id + ': ' + JSON.stringify(state));
|
||
if (server) {
|
||
setImmediate(() => {
|
||
for (let k in clients) {
|
||
if (clients.hasOwnProperty(k)) {
|
||
sendState2Client(clients[k], id, state, adapter.config.defaultQoS, true);
|
||
}
|
||
}
|
||
for (let clientId in persistentSessions) {
|
||
if (persistentSessions.hasOwnProperty(clientId) && !clients[clientId]) {
|
||
(function (_clientId) {
|
||
getMqttMessage(persistentSessions[_clientId], id, state, adapter.config.defaultQoS, true, (err, message) => {
|
||
message && persistentSessions[_clientId].messages.push(message);
|
||
});
|
||
})(clientId);
|
||
}
|
||
}
|
||
});
|
||
}
|
||
};
|
||
|
||
function updateClients() {
|
||
let text = '';
|
||
if (clients) {
|
||
for (let id in clients) {
|
||
if (clients.hasOwnProperty(id)) {
|
||
text += (text ? ',' : '') + id;
|
||
}
|
||
}
|
||
}
|
||
|
||
adapter.setState('info.connection', {val: text, ack: true});
|
||
}
|
||
|
||
function getMqttMessage(client, id, state, qos, retain, cb) {
|
||
if (typeof qos === 'function') {
|
||
cb = qos;
|
||
qos = undefined;
|
||
}
|
||
if (typeof retain === 'function') {
|
||
cb = retain;
|
||
retain = undefined;
|
||
}
|
||
|
||
if (!id2topic[id]) {
|
||
return adapter.getForeignObject(id, (err, obj) => {
|
||
if (err) {
|
||
return cb(`Client [${client.id}] Cannot resolve topic name for ID: ${id} (err: ${err})`);
|
||
}
|
||
if (!obj) {
|
||
return cb(`Client [${client.id}] Cannot resolve topic name for ID: ${id} (object not found)`);
|
||
} else if (!obj._id) {
|
||
return cb(`Client [${client.id}] Cannot resolve topic name for ID: ${id} (object has no id): ${JSON.stringify(obj)}`);
|
||
} else if (!obj.native || !obj.native.topic) {
|
||
id2topic[obj._id] = convertID2topic(obj._id, null, adapter.config.prefix, adapter.namespace);
|
||
} else {
|
||
id2topic[obj._id] = obj.native.topic;
|
||
}
|
||
getMqttMessage(client, obj._id, state, qos, retain, cb);
|
||
});
|
||
}
|
||
|
||
// client has subscription for this ID
|
||
let message;
|
||
let topic;
|
||
let pattern;
|
||
if (client._subsID && client._subsID[id]) {
|
||
topic = id2topic[id];
|
||
if (adapter.config.extraSet && state && !state.ack) {
|
||
message = {
|
||
topic: topic + '/set',
|
||
payload: (state ? state2string(state.val) : null),
|
||
qos: client._subsID[id].qos
|
||
};
|
||
} else {
|
||
message = {
|
||
topic: topic,
|
||
payload: (state ? state2string(state.val) : null),
|
||
qos: client._subsID[id].qos
|
||
};
|
||
}
|
||
} else
|
||
// Check patterns
|
||
if (client._subs && (pattern = checkPattern(client._subs, id)) !== null) {
|
||
topic = id2topic[id];
|
||
// Cache the value
|
||
client._subsID[id] = pattern;
|
||
|
||
if (adapter.config.extraSet && state && !state.ack) {
|
||
message = {
|
||
topic: topic + '/set',
|
||
payload: (state ? state2string(state.val) : null),
|
||
qos: pattern.qos
|
||
};
|
||
} else {
|
||
message = {
|
||
topic: topic,
|
||
payload: (state ? state2string(state.val) : null),
|
||
qos: pattern.qos
|
||
};
|
||
}
|
||
}
|
||
if (message) {
|
||
message.qos = message.qos === undefined ? qos : message.qos;
|
||
message.retain = retain;
|
||
message.messageId = messageId;
|
||
message.ts = Date.now();
|
||
message.count = 0;
|
||
message.cmd = 'publish';
|
||
|
||
messageId++;
|
||
messageId &= 0xFFFFFFFF;
|
||
}
|
||
cb(null, message, client);
|
||
}
|
||
|
||
function sendState2Client(client, id, state, qos, retain, cb) {
|
||
if (messageboxRegex.test(id)) return;
|
||
getMqttMessage(client, id, state, qos, retain, (err, message, client) => {
|
||
if (message) {
|
||
if (adapter.config.debug) {
|
||
adapter.log.debug(`Client [${client.id}] send to this client "${message.topic}": ${(message.payload !== null ? message.payload : 'deleted')}`);
|
||
}
|
||
client.publish(message);
|
||
if (message.qos > 0) {
|
||
client._messages.push(message);
|
||
}
|
||
}
|
||
cb && cb(id);
|
||
});
|
||
}
|
||
|
||
function sendStates2Client(client, list) {
|
||
if (list && list.length) {
|
||
const id = list.shift();
|
||
sendState2Client(client, id, states[id], 0, true, () => {
|
||
setTimeout(() => sendStates2Client(client, list), adapter.config.sendInterval);
|
||
});
|
||
} else {
|
||
//return;
|
||
}
|
||
}
|
||
|
||
function resendMessages2Client(client, messages, i) {
|
||
i = i || 0;
|
||
if (messages && i < messages.length) {
|
||
try {
|
||
messages[i].ts = Date.now();
|
||
messages[i].count++;
|
||
adapter.log.debug(`Client [${client.id}] Resend messages on connect: ${messages[i].topic} = ${messages[i].payload}`);
|
||
if (messages[i].cmd === 'publish') {
|
||
client.publish(messages[i]);
|
||
}
|
||
} catch (e) {
|
||
adapter.log.warn(`Client [${client.id}] Cannot resend message: ${e}`);
|
||
}
|
||
|
||
if (adapter.config.sendInterval) {
|
||
setTimeout(() => resendMessages2Client(client, messages, i + 1), adapter.config.sendInterval);
|
||
} else {
|
||
setImmediate(() => resendMessages2Client(client, messages, i + 1));
|
||
}
|
||
} else {
|
||
//return;
|
||
}
|
||
}
|
||
|
||
/*
|
||
4.7.1.2 Multi-level wildcard
|
||
|
||
The number sign (‘#’ U+0023) is a wildcard character that matches any number of levels within a topic. The multi-level wildcard represents the parent and any number of child levels. The multi-level wildcard character MUST be specified either on its own or following a topic level separator. In either case it MUST be the last character specified in the Topic Filter [MQTT-4.7.1-2].
|
||
|
||
Non normative comment
|
||
For example, if a Client subscribes to “sport/tennis/player1/#”, it would receive messages published using these topic names:
|
||
· “sport/tennis/player1”
|
||
· “sport/tennis/player1/ranking”
|
||
· “sport/tennis/player1/score/wimbledon”
|
||
|
||
Non normative comment
|
||
· “sport/#” also matches the singular “sport”, since # includes the parent level.
|
||
· “#” is valid and will receive every Application Message
|
||
· “sport/tennis/#” is valid
|
||
· “sport/tennis#” is not valid
|
||
· “sport/tennis/#/ranking” is not valid
|
||
|
||
*/
|
||
function checkPattern(patterns, id) {
|
||
for (let pattern in patterns) {
|
||
if (patterns.hasOwnProperty(pattern) && patterns[pattern].regex.test(id)) {
|
||
return patterns[pattern];
|
||
}
|
||
}
|
||
|
||
return null;
|
||
}
|
||
|
||
function processTopic(id, topic, message, qos, retain, isAck, obj, ignoreClient, cb) {
|
||
// expand old version of objects
|
||
if (obj && namespaceRegEx.test(id) && (!obj.native || !obj.native.topic)) {
|
||
obj.native = obj.native || {};
|
||
obj.native.topic = topic;
|
||
adapter.setForeignObject(id, obj);
|
||
}
|
||
// this is topic from other adapter
|
||
topic2id[topic].id = id;
|
||
id2topic[id] = topic;
|
||
|
||
if (adapter.config.debug) adapter.log.debug('Server received "' + topic + '" (' + typeof message + '): ' + message);
|
||
|
||
if (message !== undefined) {
|
||
if (typeof message === 'object') {
|
||
adapter.setForeignState(id, message, (err, id) => states[id] = message);
|
||
} else {
|
||
adapter.setForeignState(id, {val: message, ack: isAck}, (err, id) => states[id] = {val: message, ack: isAck});
|
||
}
|
||
} else {
|
||
states[id] = {val: null, ack: isAck};
|
||
}
|
||
|
||
// send message to all other clients
|
||
if (adapter.config.onchange && server && message !== undefined) {
|
||
setImmediate(() => {
|
||
if (typeof message !== 'object') {
|
||
message = {val: message};
|
||
}
|
||
for (let k in clients) {
|
||
// if get and set have different topic names, send state to issuing client too.
|
||
if (!clients.hasOwnProperty(k) || (clients[k] === ignoreClient && !adapter.config.extraSet)) continue;
|
||
sendState2Client(clients[k], id, message, qos, retain, cb);
|
||
}
|
||
});
|
||
}
|
||
// ELSE
|
||
// this will be done indirect. Message will be sent to js-controller and if adapter is subscribed, it gets this message over stateChange
|
||
|
||
if (cb) cb();
|
||
}
|
||
|
||
function checkObject(id, topic, callback) {
|
||
topic2id[topic] = topic2id[topic] || {id: null};
|
||
|
||
adapter.getObject(id, (err, obj) => {
|
||
if (!obj) {
|
||
adapter.getForeignObject(id, (err, obj) => {
|
||
if (!obj) {
|
||
id = adapter.namespace + '.' + id;
|
||
// create state
|
||
obj = {
|
||
common: {
|
||
name: topic,
|
||
write: true,
|
||
read: true,
|
||
role: 'variable',
|
||
desc: 'mqtt server variable',
|
||
type: topic2id[topic].message !== undefined ? typeof topic2id[topic].message : 'string'
|
||
},
|
||
native: {
|
||
topic: topic
|
||
},
|
||
type: 'state'
|
||
};
|
||
if (obj.common.type === 'object' && topic2id[topic].message !== undefined && topic2id[topic].message.val !== undefined) {
|
||
obj.common.type = typeof topic2id[topic].message.val;
|
||
}
|
||
|
||
adapter.log.debug('Create object for topic: ' + topic + '[ID: ' + id + ']');
|
||
adapter.setForeignObject(id, obj, err => {
|
||
topic2id[topic].id = id;
|
||
err && adapter.log.error('setForeignObject: ' + err);
|
||
obj._id = id;
|
||
callback && callback(err, id, obj);
|
||
});
|
||
} else {
|
||
topic2id[topic].id = obj._id;
|
||
callback && callback(null, obj._id, obj);
|
||
}
|
||
});
|
||
} else {
|
||
topic2id[topic].id = obj._id;
|
||
callback && callback(null, obj._id, obj);
|
||
}
|
||
});
|
||
}
|
||
|
||
/*4.7.1.3 Single level wildcard
|
||
|
||
The plus sign (‘+’ U+002B) is a wildcard character that matches only one topic level.
|
||
|
||
The single-level wildcard can be used at any level in the Topic Filter, including first and last levels. Where it is used it MUST occupy an entire level of the filter [MQTT-4.7.1-3]. It can be used at more than one level in the Topic Filter and can be used in conjunction with the multilevel wildcard.
|
||
|
||
Non normative comment
|
||
For example, “sport/tennis/+” matches “sport/tennis/player1” and “sport/tennis/player2”, but not “sport/tennis/player1/ranking”. Also, because the single-level wildcard matches only a single level, “sport/+” does not match “sport” but it does match “sport/”.
|
||
|
||
Non normative comment
|
||
· “+” is valid
|
||
· “+/tennis/#” is valid
|
||
· “sport+” is not valid
|
||
· “sport/+/player1” is valid
|
||
· “/finance” matches “+/+” and “/+”, but not “+”
|
||
*/
|
||
function pattern2RegEx(pattern) {
|
||
pattern = convertTopic2id(pattern, true, adapter.config.prefix, adapter.namespace);
|
||
pattern = pattern.replace(/#/g, '*');
|
||
pattern = pattern.replace(/\$/g, '\\$');
|
||
pattern = pattern.replace(/\^/g, '\\^');
|
||
|
||
if (pattern !== '*') {
|
||
if (pattern[0] === '*' && pattern[pattern.length - 1] !== '*') pattern += '$';
|
||
if (pattern[0] !== '*' && pattern[pattern.length - 1] === '*') pattern = '^' + pattern;
|
||
if (pattern[0] === '+') pattern = '^[^.]*' + pattern.substring(1);
|
||
if (pattern[pattern.length - 1] === '+') pattern = pattern.substring(0, pattern.length - 1) + '[^.]*$';
|
||
}
|
||
pattern = pattern.replace(/\./g, '\\.');
|
||
pattern = pattern.replace(/\*/g, '.*');
|
||
pattern = pattern.replace(/\+/g, '[^.]*');
|
||
return pattern;
|
||
}
|
||
|
||
function receivedTopic(packet, client, cb) {
|
||
let isAck = true;
|
||
let topic = packet.topic;
|
||
let message = packet.payload;
|
||
const qos = packet.qos;
|
||
const retain = packet.retain;
|
||
const now = Date.now();
|
||
let id;
|
||
|
||
if (adapter.config.extraSet) {
|
||
if (packet.topic.match(/\/set$/)) {
|
||
isAck = false;
|
||
packet.topic = packet.topic.substring(0, packet.topic.length - 4);
|
||
topic = packet.topic;
|
||
}
|
||
}
|
||
|
||
if (topic2id[topic]) {
|
||
id = topic2id[topic].id || convertTopic2id(topic, false, adapter.config.prefix, adapter.namespace);
|
||
} else {
|
||
id = convertTopic2id(topic, false, adapter.config.prefix, adapter.namespace);
|
||
}
|
||
|
||
if (!id) {
|
||
adapter.log.error(`Client [${client.id}] Invalid topic name: ${JSON.stringify(topic)}`);
|
||
if (cb) {
|
||
cb();
|
||
cb = null;
|
||
}
|
||
return;
|
||
}
|
||
|
||
//adapter.log.info('Type: ' + typeof message);
|
||
let type = typeof message;
|
||
|
||
if (type !== 'string' && type !== 'number' && type !== 'boolean') {
|
||
message = message ? message.toString('utf8') : 'null';
|
||
type = 'string';
|
||
}
|
||
|
||
// try to convert 101,124,444,... To utf8 string
|
||
if (type === 'string' && message.match(/^(\d)+,\s?(\d)+,\s?(\d)+/)) {
|
||
//adapter.log.info('Try to convert ' + message);
|
||
|
||
let parts = message.split(',');
|
||
try {
|
||
let str = '';
|
||
for (let p = 0; p < parts.length; p++) {
|
||
str += String.fromCharCode(parseInt(parts[p].trim(), 10));
|
||
}
|
||
message = str;
|
||
} catch (e) {
|
||
// cannot convert and ignore it
|
||
}
|
||
//adapter.log.info('Converted ' + message);
|
||
}
|
||
|
||
// If state is unknown => create mqtt.X.topic
|
||
if ((adapter.namespace + '.' + id).length > adapter.config.maxTopicLength) {
|
||
adapter.log.warn(`Client [${client.id}] Topic name is too long: ${id.substring(0, 100)}...`);
|
||
if (cb) {
|
||
cb();
|
||
cb = null;
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (type === 'string') {
|
||
// Try to convert value
|
||
let _val = message.replace(',', '.').replace(/^\+/, '');
|
||
|
||
// +23.560 => 23.56, -23.000 => -23
|
||
if (_val.indexOf('.') !== -1) {
|
||
let i = _val.length - 1;
|
||
while (_val[i] === '0' || _val[i] === '.') {
|
||
i--;
|
||
if (_val[i + 1] === '.') break;
|
||
}
|
||
if (_val[i + 1] === '0' || _val[i + 1] === '.') {
|
||
_val = _val.substring(0, i + 1);
|
||
}
|
||
}
|
||
const f = parseFloat(_val);
|
||
|
||
if (f.toString() === _val) message = f;
|
||
if (message === 'true') message = true;
|
||
if (message === 'false') message = false;
|
||
}
|
||
|
||
if (type === 'string' && message[0] === '{') {
|
||
try {
|
||
const _message = JSON.parse(message);
|
||
// Fast solution
|
||
if (_message.val !== undefined) {
|
||
message = _message;
|
||
// Really right, but slow
|
||
//var valid = true;
|
||
//for (var attr in _message) {
|
||
// if (!_message.hasOwnProperty(attr)) continue;
|
||
// if (attr !== 'val' && attr !== 'ack' && attr !== 'ts' && attr !== 'q' &&
|
||
// attr !== 'lc' && attr !== 'comm' && attr !== 'lc') {
|
||
// valid = false;
|
||
// break;
|
||
// }
|
||
//}
|
||
//if (valid) message = _message;
|
||
}
|
||
} catch (e) {
|
||
adapter.log.error(`Client [${client.id}] Cannot parse ${message}`);
|
||
}
|
||
}
|
||
|
||
if (!topic2id[topic]) {
|
||
checkObject(id, topic, (err, id, obj) => {
|
||
processTopic(id, topic, message, qos, retain, isAck, obj, client);
|
||
if (cb) {
|
||
cb();
|
||
cb = null;
|
||
}
|
||
});
|
||
} else if (topic2id[topic].id === null) {
|
||
topic2id[topic].message = message;
|
||
// still looking for id
|
||
if (adapter.config.debug) {
|
||
adapter.log.debug(`Client [${client.id}] Server received (but in process) "${topic}" (${typeof message}): ${message}`);
|
||
}
|
||
if (cb) {
|
||
cb();
|
||
cb = null;
|
||
}
|
||
} else {
|
||
if (topic2id[topic].message !== undefined) {
|
||
delete topic2id[topic].message;
|
||
}
|
||
|
||
if (qos) {
|
||
for (const clientId in persistentSessions) {
|
||
if (persistentSessions.hasOwnProperty(clientId) && clientId !== client.id && !persistentSessions[clientId].connected) {
|
||
// try to collect this message if client subscribed
|
||
persistentSessions[clientId].messages.push({topic, qos, retain, messageId: packet.messageId, ts: now, payload: message, count: 0, cmd: 'publish'});
|
||
}
|
||
}
|
||
}
|
||
|
||
processTopic(topic2id[topic].id, topic, message, qos, retain, isAck, null, client, cb);
|
||
}
|
||
}
|
||
|
||
function clientClose(client, reason) {
|
||
if (!client) return;
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].connected = false;
|
||
}
|
||
|
||
if (client._sendOnStart) {
|
||
clearTimeout(client._sendOnStart);
|
||
client._sendOnStart = null;
|
||
}
|
||
if (client._resendonStart) {
|
||
clearTimeout(client._resendonStart);
|
||
client._resendonStart = null;
|
||
}
|
||
|
||
try {
|
||
if (clients[client.id] && (client.__secret === clients[client.id].__secret)) {
|
||
adapter.log.info(`Client [${client.id}] connection closed: ${reason}`);
|
||
delete clients[client.id];
|
||
updateClients();
|
||
if (client._will) {
|
||
receivedTopic(client._will, client, () => client.destroy());
|
||
} else {
|
||
client.destroy();
|
||
}
|
||
} else {
|
||
client.destroy();
|
||
}
|
||
} catch (e) {
|
||
adapter.log.warn(`Client [${client.id}] Cannot close client: ${e}`);
|
||
}
|
||
}
|
||
|
||
function startServer(config, socket, server, port, bind, ssl, ws) {
|
||
socket.on('connection', stream => {
|
||
let client;
|
||
if (ws) {
|
||
client = mqtt(wsStream(stream));
|
||
} else {
|
||
client = mqtt(stream);
|
||
}
|
||
|
||
// Store unique connection identifier
|
||
client.__secret = Date.now() + '_' + Math.round(Math.random() * 10000);
|
||
|
||
client.on('connect', options => {
|
||
// set client id
|
||
client.id = options.clientId;
|
||
client.cleanSession = options.cleanSession;
|
||
|
||
// get possible old client
|
||
let oldClient = clients[client.id];
|
||
|
||
if (config.user) {
|
||
if (config.user !== options.username ||
|
||
config.pass !== (options.password || '').toString()) {
|
||
adapter.log.warn(`Client [${client.id}] has invalid password(${options.password}) or username(${options.username})`);
|
||
client.connack({returnCode: 4});
|
||
if (oldClient) {
|
||
// delete existing client
|
||
delete clients[client.id];
|
||
updateClients();
|
||
oldClient.destroy();
|
||
}
|
||
client.destroy();
|
||
return;
|
||
}
|
||
}
|
||
|
||
if (oldClient) {
|
||
adapter.log.info(`Client [${client.id}] reconnected. Old secret ${clients[client.id].__secret}. New secret ${client.__secret}`);
|
||
// need to destroy the old client
|
||
|
||
if (client.__secret !== clients[client.id].__secret) {
|
||
// it is another socket!!
|
||
|
||
// It was following situation:
|
||
// - old connection was active
|
||
// - new connection is on the same TCP
|
||
// Just forget him
|
||
// oldClient.destroy();
|
||
}
|
||
} else {
|
||
adapter.log.info(`Client [${client.id}] connected with secret ${client.__secret}`);
|
||
}
|
||
|
||
let sessionPresent = false;
|
||
|
||
if (!client.cleanSession && adapter.config.storeClientsTime !== 0) {
|
||
if (persistentSessions[client.id]) {
|
||
sessionPresent = true;
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
} else {
|
||
persistentSessions[client.id] = {
|
||
_subsID: {},
|
||
_subs: {},
|
||
messages: [],
|
||
lastSeen: Date.now()
|
||
};
|
||
}
|
||
client._messages = persistentSessions[client.id].messages;
|
||
persistentSessions[client.id].connected = true;
|
||
} else if (client.cleanSession && persistentSessions[client.id]) {
|
||
delete persistentSessions[client.id];
|
||
}
|
||
client._messages = client._messages || [];
|
||
|
||
client.connack({returnCode: 0, sessionPresent});
|
||
clients[client.id] = client;
|
||
updateClients();
|
||
|
||
if (options.will) { // the client's will message options. object that supports the following properties:
|
||
// topic: the will topic. string
|
||
// payload: the will payload. string
|
||
// qos: will qos level. number
|
||
// retain: will retain flag. boolean
|
||
client._will = JSON.parse(JSON.stringify(options.will));
|
||
let id;
|
||
if (topic2id[client._will.topic]) {
|
||
id = topic2id[client._will.topic].id || convertTopic2id(client._will.topic, false, config.prefix, adapter.namespace);
|
||
} else {
|
||
id = convertTopic2id(client._will.topic, false, config.prefix, adapter.namespace);
|
||
}
|
||
checkObject(id, client._will.topic);
|
||
}
|
||
|
||
// Send all subscribed variables to client
|
||
if (config.publishAllOnStart) {
|
||
// Give to client 2 seconds to send subscribe
|
||
client._sendOnStart = setTimeout(() => {
|
||
client._sendOnStart = null;
|
||
let list = [];
|
||
// If client still connected
|
||
for (let id in states) {
|
||
if (states.hasOwnProperty(id)) {
|
||
list.push(id);
|
||
}
|
||
}
|
||
sendStates2Client(client, list);
|
||
}, adapter.config.sendOnStartInterval);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
client._subsID = persistentSessions[client.id]._subsID;
|
||
client._subs = persistentSessions[client.id]._subs;
|
||
if (persistentSessions[client.id].messages.length) {
|
||
// give to the client a little bit time
|
||
client._resendonStart = setTimeout(clientId => {
|
||
client._resendonStart = null;
|
||
resendMessages2Client(client, persistentSessions[clientId].messages);
|
||
}, 100, client.id);
|
||
}
|
||
}
|
||
});
|
||
|
||
client.on('publish', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends publish. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
if (packet.qos === 1) {
|
||
// send PUBACK to client
|
||
client.puback({
|
||
messageId: packet.messageId
|
||
});
|
||
} else if (packet.qos === 2) {
|
||
const pack = client._messages.find(e => {
|
||
return e.messageId === packet.messageId;
|
||
});
|
||
if (pack) {
|
||
// duplicate message => ignore
|
||
adapter.log.warn(`Client [${client.id}] Ignored duplicate message with ID: ${packet.messageId}`);
|
||
return;
|
||
} else {
|
||
packet.ts = Date.now();
|
||
packet.cmd = 'pubrel';
|
||
packet.count = 0;
|
||
client._messages.push(packet);
|
||
|
||
client.pubrec({
|
||
messageId: packet.messageId
|
||
});
|
||
return;
|
||
}
|
||
}
|
||
|
||
receivedTopic(packet, client);
|
||
});
|
||
|
||
// response for QoS2
|
||
client.on('pubrec', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends pubrec. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
let pos = null;
|
||
// remove this message from queue
|
||
client._messages.forEach((e, i) => {
|
||
if (e.messageId === packet.messageId) {
|
||
pos = i;
|
||
return false;
|
||
}
|
||
});
|
||
if (pos !== -1) {
|
||
client.pubrel({
|
||
messageId: packet.messageId
|
||
});
|
||
} else {
|
||
adapter.log.warn(`Client [${client.id}] Received pubrec on ${client.id} for unknown messageId ${packet.messageId}`);
|
||
}
|
||
});
|
||
|
||
// response for QoS2
|
||
client.on('pubcomp', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends pubcomp. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
let pos = null;
|
||
// remove this message from queue
|
||
client._messages.forEach((e, i) => {
|
||
if (e.messageId === packet.messageId) {
|
||
pos = i;
|
||
return false;
|
||
}
|
||
});
|
||
if (pos !== null) {
|
||
client._messages.splice(pos, 1);
|
||
} else {
|
||
adapter.log.warn(`Client [${client.id}] Received pubcomp for unknown message ID: ${packet.messageId}`);
|
||
}
|
||
});
|
||
|
||
// response for QoS2
|
||
client.on('pubrel', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends pubrel. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
let pos = null;
|
||
// remove this message from queue
|
||
client._messages.forEach((e, i) => {
|
||
if (e.messageId === packet.messageId) {
|
||
pos = i;
|
||
return false;
|
||
}
|
||
});
|
||
if (pos !== -1) {
|
||
client.pubcomp({
|
||
messageId: packet.messageId
|
||
});
|
||
receivedTopic(client._messages[pos], client);
|
||
} else {
|
||
adapter.log.warn(`Client [${client.id}] Received pubrel on ${client.id} for unknown messageId ${packet.messageId}`);
|
||
}
|
||
});
|
||
|
||
// response for QoS1
|
||
client.on('puback', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends puback. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
// remove this message from queue
|
||
let pos = null;
|
||
// remove this message from queue
|
||
client._messages.forEach((e, i) => {
|
||
if (e.messageId === packet.messageId) {
|
||
pos = i;
|
||
return false;
|
||
}
|
||
});
|
||
if (pos !== null) {
|
||
adapter.log.debug(`Client [${client.id}] Received puback for ${client.id} message ID: ${packet.messageId}`);
|
||
client._messages.splice(pos, 1);
|
||
} else {
|
||
adapter.log.warn(`Client [${client.id}] Received puback for unknown message ID: ${packet.messageId}`);
|
||
}
|
||
});
|
||
|
||
client.on('subscribe', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends subscribe. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
const granted = [];
|
||
if (!client._subsID) client._subsID = {};
|
||
if (!client._subs) client._subs = {};
|
||
|
||
for (let i = 0; i < packet.subscriptions.length; i++) {
|
||
granted.push(packet.subscriptions[i].qos);
|
||
|
||
const topic = packet.subscriptions[i].topic;
|
||
let id;
|
||
|
||
if (topic2id[topic]) {
|
||
id = topic2id[topic].id || convertTopic2id(topic, false, config.prefix, adapter.namespace);
|
||
} else {
|
||
id = convertTopic2id(topic, false, config.prefix, adapter.namespace);
|
||
}
|
||
|
||
if (!id) {
|
||
adapter.log.error(`Client [${client.id}] Invalid topic: ${topic}`);
|
||
continue;
|
||
}
|
||
|
||
// if pattern without wildcards
|
||
if (id.indexOf('*') === -1 && id.indexOf('#') === -1 && id.indexOf('+') === -1) {
|
||
// If state is unknown => create mqtt.X.topic
|
||
if (!topic2id[topic]) {
|
||
checkObject(id, topic, (err, id) => {
|
||
adapter.log.info(`Client [${client.id}] subscribes on topic "${topic}"`);
|
||
client._subsID[id] = {id, qos: packet.subscriptions[i].qos};
|
||
});
|
||
} else {
|
||
client._subsID[topic2id[topic].id] = {id: topic2id[topic].id, qos: packet.subscriptions[i].qos};
|
||
adapter.log.info(`Client [${client.id}] subscribes on "${topic2id[topic].id}"`);
|
||
if (adapter.config.publishOnSubscribe) {
|
||
adapter.log.info(`Client [${client.id}] publishOnSubscribe`);
|
||
sendState2Client(client, topic2id[topic].id, states[topic2id[topic].id]);
|
||
}
|
||
}
|
||
} else {
|
||
let pattern = topic;
|
||
// remove prefix
|
||
if (pattern.startsWith(adapter.config.prefix)) {
|
||
pattern = pattern.substring(adapter.config.prefix.length);
|
||
}
|
||
pattern = pattern.replace(/\//g, '.');
|
||
if (pattern[0] === '.') pattern = pattern.substring(1);
|
||
|
||
// add simple pattern
|
||
let regText = pattern2RegEx(pattern);
|
||
client._subs[topic] = {
|
||
regex: new RegExp(regText),
|
||
qos: packet.subscriptions[i].qos,
|
||
pattern: pattern
|
||
};
|
||
adapter.log.info(`Client [${client.id}] subscribes on "${topic}" with regex /${regText}/`);
|
||
|
||
// add simple mqtt.0.pattern
|
||
pattern = adapter.namespace + '/' + pattern;
|
||
regText = pattern2RegEx(pattern);
|
||
client._subs[adapter.namespace + '/' + topic] = {
|
||
regex: new RegExp(regText),
|
||
qos: packet.subscriptions[i].qos,
|
||
pattern: pattern
|
||
};
|
||
adapter.log.info(`Client [${client.id}] subscribes on "${topic}" with regex /${regText}/`);
|
||
|
||
if (adapter.config.publishOnSubscribe) {
|
||
adapter.log.info(`Client [${client.id}] publishOnSubscribe send all known states`);
|
||
for (const savedId in states) {
|
||
if (states.hasOwnProperty(savedId) && checkPattern(client._subs, savedId)) {
|
||
sendState2Client(client, savedId, states[savedId]);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
client.suback({granted: granted, messageId: packet.messageId});
|
||
});
|
||
|
||
client.on('unsubscribe', packet => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends unsubscribe. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
for (let i = 0; i < packet.unsubscriptions.length; i++) {
|
||
const topic = packet.unsubscriptions[i];
|
||
let id;
|
||
|
||
if (topic2id[topic]) {
|
||
id = topic2id[topic].id || convertTopic2id(topic, false, config.prefix, adapter.namespace);
|
||
} else {
|
||
id = convertTopic2id(topic, false, config.prefix, adapter.namespace);
|
||
}
|
||
|
||
if (!id) {
|
||
adapter.log.error(`Client [${client.id}] unsubscribes from invalid topic: ${topic}`);
|
||
continue;
|
||
}
|
||
|
||
// if pattern without wildcards
|
||
if (id.indexOf('*') === -1 && id.indexOf('#') === -1 && id.indexOf('+') === -1) {
|
||
// If state is known
|
||
if (topic2id[topic]) {
|
||
let _id = topic2id[topic].id;
|
||
if (client._subsID[_id]) {
|
||
delete client._subsID[_id];
|
||
adapter.log.info(`Client [${client.id}] unsubscribes on "${_id}"`);
|
||
} else {
|
||
adapter.log.info(`Client [${client.id}] unsubscribes on unknown "${_id}"`);
|
||
}
|
||
} else {
|
||
adapter.log.info(`Client [${client.id}] unsubscribes on unknown "${_id}"`);
|
||
}
|
||
} else {
|
||
let pattern = topic.replace(/\//g, '.');
|
||
if (pattern[0] === '.') pattern = pattern.substring(1);
|
||
|
||
// add simple pattern
|
||
if (client._subs[topic]) {
|
||
adapter.log.info(`Client [${client.id}] unsubscribes on "${topic}"`);
|
||
delete client._subs[topic];
|
||
if (client._subs[adapter.namespace + '/' + topic]) {// add simple mqtt.0.pattern
|
||
delete client._subs[adapter.namespace + '/' + topic];
|
||
adapter.log.info(`Client [${client.id}] unsubscribes on "${adapter.namespace}/${topic}"`);
|
||
}
|
||
} else {
|
||
adapter.log.warn(`Client [${client.id}] unsubscribes on unknwon "${topic}"`);
|
||
}
|
||
}
|
||
}
|
||
client.unsuback({messageId: packet.messageId});
|
||
});
|
||
|
||
client.on('pingreq', (/*packet*/) => {
|
||
if (clients[client.id] && client.__secret !== clients[client.id].__secret) {
|
||
return adapter.log.debug(`Old client ${client.id} with secret ${client.__secret} sends pingreq. Ignore! Actual secret is ${clients[client.id].__secret}`);
|
||
}
|
||
|
||
if (persistentSessions[client.id]) {
|
||
persistentSessions[client.id].lastSeen = Date.now();
|
||
}
|
||
|
||
adapter.log.debug(`Client [${client.id}] pingreq`);
|
||
client.pingresp();
|
||
});
|
||
|
||
// connection error handling
|
||
client.on('close', had_error => clientClose(client, had_error ? 'closed because of error' : 'closed'));
|
||
client.on('error', e => clientClose(client, e));
|
||
client.on('disconnect', () => clientClose(client, 'disconnected'));
|
||
|
||
});
|
||
(server || socket).listen(port, bind, () => {
|
||
adapter.log.info(`Starting MQTT ${ws ? '-WebSocket' : ''}${ssl ? ' (Secure)' : ''}' ${config.user ? 'authenticated ' : ''} server on port ${port}`);
|
||
});
|
||
}
|
||
|
||
function checkResends() {
|
||
const now = Date.now();
|
||
resending = true;
|
||
for (const clientId in clients) {
|
||
if (clients.hasOwnProperty(clientId) && clients[clientId] && clients[clientId]._messages) {
|
||
for (let m = clients[clientId]._messages.length - 1; m >= 0; m--) {
|
||
const message = clients[clientId]._messages[m];
|
||
if (now - message.ts >= adapter.config.retransmitInterval) {
|
||
if (message.count > adapter.config.retransmitCount) {
|
||
adapter.log.warn(`Client [${clientId}] Message ${message.messageId} deleted after ${message.count} retries`);
|
||
clients[clientId]._messages.splice(m, 1);
|
||
continue;
|
||
}
|
||
|
||
// resend this message
|
||
message.count++;
|
||
message.ts = now;
|
||
try {
|
||
adapter.log.debug(`Client [${clientId}] Resend message topic: ${message.topic}, payload: ${message.payload}`);
|
||
if (message.cmd === 'publish') {
|
||
clients[clientId].publish(message);
|
||
}
|
||
} catch (e) {
|
||
adapter.log.warn(`Client [${clientId}] Cannot publish message: ${e}`);
|
||
}
|
||
|
||
if (adapter.config.sendInterval) {
|
||
setTimeout(checkResends, adapter.config.sendInterval);
|
||
} else {
|
||
setImmediate(checkResends);
|
||
}
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// delete old sessions
|
||
if (adapter.config.storeClientsTime !== -1) {
|
||
for (const id in persistentSessions) {
|
||
if (persistentSessions.hasOwnProperty(id)) {
|
||
if (now - persistentSessions[id].lastSeen > adapter.config.storeClientsTime * 60000) {
|
||
delete persistentSessions[id];
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
resending = false;
|
||
}
|
||
|
||
(function _constructor(config) {
|
||
// create connected object and state
|
||
adapter.getObject('info.connection', (err, obj) => {
|
||
if (!obj || !obj.common || obj.common.type !== 'string') {
|
||
obj = {
|
||
_id: 'info.connection',
|
||
type: 'state',
|
||
common: {
|
||
role: 'info.clients',
|
||
name: 'List of connected clients',
|
||
type: 'string',
|
||
read: true,
|
||
write: false,
|
||
def: false
|
||
},
|
||
native: {}
|
||
};
|
||
|
||
adapter.setObject('info.connection', obj, () => updateClients());
|
||
} else {
|
||
updateClients();
|
||
}
|
||
});
|
||
|
||
config.port = parseInt(config.port, 10) || 1883;
|
||
config.retransmitInterval = config.retransmitInterval || 2000;
|
||
config.retransmitCount = config.retransmitCount || 10;
|
||
if (config.storeClientsTime === undefined) {
|
||
config.storeClientsTime = 1440;
|
||
} else {
|
||
config.storeClientsTime = parseInt(config.storeClientsTime, 10) || 0;
|
||
}
|
||
|
||
config.defaultQoS = parseInt(config.defaultQoS, 10) || 0;
|
||
|
||
if (config.ssl) {
|
||
net = net || require('tls');
|
||
if (config.webSocket) {
|
||
http = http || require('https');
|
||
}
|
||
} else {
|
||
net = net || require('net');
|
||
if (config.webSocket) {
|
||
http = http || require('http');
|
||
}
|
||
}
|
||
server = new net.Server(config.certificates);
|
||
startServer(config, server, null, config.port, config.bind, config.ssl, false);
|
||
if (config.webSocket) {
|
||
http = http || require('https');
|
||
ws = ws || require('ws');
|
||
wsStream = wsStream || require('websocket-stream');
|
||
serverForWs = http.createServer(config.certificates);
|
||
serverWs = new ws.Server({server: serverForWs});
|
||
|
||
startServer(config, serverWs, serverForWs, config.port + 1, config.bind, config.ssl, true);
|
||
}
|
||
|
||
resendTimer = setInterval(() => {
|
||
if (!resending) {
|
||
checkResends();
|
||
}
|
||
}, adapter.config.retransmitInterval || 2000);
|
||
|
||
})(adapter.config);
|
||
|
||
return this;
|
||
}
|
||
|
||
module.exports = MQTTServer;
|