Files
yunkong2.sql/main.js
2018-08-08 21:31:17 +08:00

2071 lines
82 KiB
JavaScript

/* jshint -W097 */// jshint strict:false
/*jslint node: true */
'use strict';
var utils = require(__dirname + '/lib/utils'); // Get common adapter utils
var SQL = require('sql-client');
var commons = require(__dirname + '/lib/aggregate');
var SQLFuncs = null;
var fs = require('fs');
var clients = {
postgresql: {name: 'PostgreSQLClient', multiRequests: true},
mysql: {name: 'MySQLClient', multiRequests: true},
sqlite: {name: 'SQLite3Client', multiRequests: false},
mssql: {name: 'MSSQLClient', multiRequests: true}
};
var types = {
'number': 0,
'string': 1,
'boolean': 2,
'object': 1
};
var dbNames = [
'ts_number',
'ts_string',
'ts_bool'
];
var storageTypes = [
'Number',
'String',
'Boolean'
];
var clientPool;
var sqlDPs = {};
var from = {};
var subscribeAll = false;
var tasks = [];
var tasksReadType = [];
var multiRequests = true;
var tasksStart = [];
var finished = false;
var connected = null;
var isFromRunning = {};
var aliasMap = {};
var adapter = utils.Adapter('sql');
adapter.on('objectChange', function (id, obj) {
var tmpState;
var now = new Date().getTime();
var formerAliasId = aliasMap[id] ? aliasMap[id] : id;
if (obj && obj.common &&
(
// todo remove history sometime (2016.08) - Do not forget object selector in io-package.json
(obj.common.history && obj.common.history[adapter.namespace] && obj.common.history[adapter.namespace].enabled) ||
(obj.common.custom && obj.common.custom[adapter.namespace] && obj.common.custom[adapter.namespace].enabled)
)
) {
var realId = id;
var checkForRemove = true;
if (obj.common.custom && obj.common.custom[adapter.namespace] && obj.common.custom[adapter.namespace].aliasId) {
if (obj.common.custom[adapter.namespace].aliasId !== id) {
aliasMap[id] = obj.common.custom[adapter.namespace].aliasId;
adapter.log.debug('Registered Alias: ' + id + ' --> ' + aliasMap[id]);
id = aliasMap[id];
checkForRemove = false;
}
else {
adapter.log.warn('Ignoring Alias-ID because identical to ID for ' + id);
obj.common.custom[adapter.namespace].aliasId = '';
}
}
if (checkForRemove && aliasMap[id]) {
adapter.log.debug('Removed Alias: ' + id + ' !-> ' + aliasMap[id]);
delete aliasMap[id];
}
if (!(sqlDPs[formerAliasId] && sqlDPs[formerAliasId][adapter.namespace]) && !subscribeAll) {
// un-subscribe
for (var _id in sqlDPs) {
if (sqlDPs.hasOwnProperty(sqlDPs[_id].realId)) {
adapter.unsubscribeForeignStates(sqlDPs[_id].realId);
}
}
subscribeAll = true;
adapter.subscribeForeignStates('*');
}
var writeNull = !(sqlDPs[id] && sqlDPs[id][adapter.namespace]);
if (sqlDPs[formerAliasId] && sqlDPs[formerAliasId].relogTimeout) {
clearTimeout(sqlDPs[formerAliasId].relogTimeout);
}
var storedIndex = null;
var storedType = null;
if (sqlDPs[id] && sqlDPs[id].index !== undefined) storedIndex = sqlDPs[id].index;
if (sqlDPs[id] && sqlDPs[id].dbtype !== undefined) storedType = sqlDPs[id].dbtype;
else if (sqlDPs[formerAliasId] && sqlDPs[formerAliasId].dbtype !== undefined) storedType = sqlDPs[formerAliasId].dbtype;
// todo remove history sometime (2016.08)
sqlDPs[id] = obj.common.custom || obj.common.history;
if (storedIndex !== null) sqlDPs[id].index = storedIndex;
if (storedType !== null) sqlDPs[id].dbtype = storedType;
if (sqlDPs[id].index === undefined) {
getId(id, sqlDPs[id].dbtype, reInit);
}
else {
reInit();
}
function reInit() {
adapter.log.debug('remembered Index/Type ' + sqlDPs[id].index + ' / ' + sqlDPs[id].dbtype);
sqlDPs[id].realId = realId;
if (sqlDPs[id][adapter.namespace].retention !== undefined && sqlDPs[id][adapter.namespace].retention !== null && sqlDPs[id][adapter.namespace].retention !== '') {
sqlDPs[id][adapter.namespace].retention = parseInt(sqlDPs[id][adapter.namespace].retention, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].retention = adapter.config.retention;
}
if (sqlDPs[id][adapter.namespace].debounce !== undefined && sqlDPs[id][adapter.namespace].debounce !== null && sqlDPs[id][adapter.namespace].debounce !== '') {
sqlDPs[id][adapter.namespace].debounce = parseInt(sqlDPs[id][adapter.namespace].debounce, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].debounce = adapter.config.debounce;
}
sqlDPs[id][adapter.namespace].changesOnly = sqlDPs[id][adapter.namespace].changesOnly === 'true' || sqlDPs[id][adapter.namespace].changesOnly === true;
if (sqlDPs[id][adapter.namespace].changesRelogInterval !== undefined && sqlDPs[id][adapter.namespace].changesRelogInterval !== null && sqlDPs[id][adapter.namespace].changesRelogInterval !== '') {
sqlDPs[id][adapter.namespace].changesRelogInterval = parseInt(sqlDPs[id][adapter.namespace].changesRelogInterval, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].changesRelogInterval = adapter.config.changesRelogInterval;
}
if (sqlDPs[id][adapter.namespace].changesRelogInterval > 0) {
sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id);
}
if (sqlDPs[id][adapter.namespace].changesMinDelta !== undefined && sqlDPs[id][adapter.namespace].changesMinDelta !== null && sqlDPs[id][adapter.namespace].changesMinDelta !== '') {
sqlDPs[id][adapter.namespace].changesMinDelta = parseFloat(sqlDPs[id][adapter.namespace].changesMinDelta.toString().replace(/,/g, '.')) || 0;
} else {
sqlDPs[id][adapter.namespace].changesMinDelta = adapter.config.changesMinDelta;
}
if (!sqlDPs[id][adapter.namespace].storageType) sqlDPs[id][adapter.namespace].storageType = false;
// add one day if retention is too small
if (sqlDPs[id][adapter.namespace].retention && sqlDPs[id][adapter.namespace].retention <= 604800) {
sqlDPs[id][adapter.namespace].retention += 86400;
}
if (writeNull && adapter.config.writeNulls) {
writeNulls(id);
}
adapter.log.info('enabled logging of ' + id + ', Alias=' + (id !== realId));
}
}
else {
if (aliasMap[id]) {
adapter.log.debug('Removed Alias: ' + id + ' !-> ' + aliasMap[id]);
delete aliasMap[id];
}
id = formerAliasId;
if (sqlDPs[id]) {
adapter.log.info('disabled logging of ' + id);
if (sqlDPs[id].relogTimeout) clearTimeout(sqlDPs[id].relogTimeout);
if (sqlDPs[id].timeout) clearTimeout(sqlDPs[id].timeout);
if (Object.assign) {
tmpState = Object.assign({}, sqlDPs[id].state);
}
else {
tmpState = JSON.parse(JSON.stringify(sqlDPs[id].state));
}
var state = sqlDPs[id].state ? tmpState : null;
if (sqlDPs[id].skipped) {
pushValueIntoDB(id, sqlDPs[id].skipped);
sqlDPs[id].skipped = null;
}
var nullValue = {val: null, ts: now, lc: now, q: 0x40, from: 'system.adapter.' + adapter.namespace};
if (sqlDPs[id][adapter.namespace] && adapter.config.writeNulls) {
if (sqlDPs[id][adapter.namespace].changesOnly && state && state.val !== null) {
(function (_id, _state, _nullValue) {
_state.ts = now;
_state.from = 'system.adapter.' + adapter.namespace;
nullValue.ts += 4;
nullValue.lc += 4; // because of MS SQL
adapter.log.debug('Write 1/2 "' + _state.val + '" _id: ' + _id);
pushValueIntoDB(_id, _state, function () {
// terminate values with null to indicate adapter stop. timestamp + 1#
adapter.log.debug('Write 2/2 "null" _id: ' + _id);
pushValueIntoDB(_id, _nullValue, function() {
delete sqlDPs[id][adapter.namespace];
});
});
})(id, state, nullValue);
}
else {
// terminate values with null to indicate adapter stop. timestamp + 1
adapter.log.debug('Write 0 NULL _id: ' + id);
pushValueIntoDB(id, nullValue, function() {
delete sqlDPs[id][adapter.namespace];
});
}
}
else {
delete sqlDPs[id][adapter.namespace];
}
}
}
});
adapter.on('stateChange', function (id, state) {
id = aliasMap[id] ? aliasMap[id] : id;
pushHistory(id, state);
});
adapter.on('unload', function (callback) {
finish(callback);
});
adapter.on('ready', function () {
main();
});
adapter.on('message', function (msg) {
processMessage(msg);
});
process.on('SIGINT', function () {
// close connection to DB
finish();
});
process.on('SIGTERM', function () {
// close connection to DB
finish();
});
function setConnected(isConnected) {
if (connected !== isConnected) {
connected = isConnected;
adapter.setState('info.connection', connected, true);
}
}
var _client = false;
function connect() {
if (!clientPool) {
setConnected(false);
var params = {
server: adapter.config.host, // needed for MSSQL
host: adapter.config.host, // needed for PostgeSQL , MySQL
user: adapter.config.user,
password: adapter.config.password,
max_idle: (adapter.config.dbtype === 'sqlite') ? 1 : 2
};
if (adapter.config.port) {
params.port = adapter.config.port;
}
if (adapter.config.encrypt) {
params.options = {
encrypt: true // Use this if you're on Windows Azure
};
}
if (adapter.config.dbtype === 'postgres') {
params.database = 'postgres';
}
if (adapter.config.dbtype === 'sqlite') {
params = getSqlLiteDir(adapter.config.fileName);
}
else
// special solution for postgres. Connect first to Db "postgres", create new DB "yunkong2" and then connect to "yunkong2" DB.
if (_client !== true && adapter.config.dbtype === 'postgresql') {
if (adapter.config.dbtype === 'postgresql') {
params.database = 'postgres';
}
if (!adapter.config.dbtype) {
adapter.log.error('DB Type is not defined!');
return;
}
if (!clients[adapter.config.dbtype] || !clients[adapter.config.dbtype].name) {
adapter.log.error('Unknown type "' + adapter.config.dbtype + '"');
return;
}
if (!SQL[clients[adapter.config.dbtype].name]) {
adapter.log.error('SQL package "' + clients[adapter.config.dbtype].name + '" is not installed.');
return;
}
// connect first to DB postgres and create yunkong2 DB
_client = new SQL[clients[adapter.config.dbtype].name](params);
return _client.connect(function (err) {
if (err) {
adapter.log.error(err);
setTimeout(function () {
connect();
}, 30000);
return;
}
_client.execute('CREATE DATABASE ' + adapter.config.dbname + ';', function (err /* , rows, fields */) {
_client.disconnect();
if (err && err.code !== '42P04') { // if error not about yet exists
_client = false;
adapter.log.error(err);
setTimeout(function () {
connect();
}, 30000);
} else {
_client = true;
setTimeout(function () {
connect();
}, 100);
}
});
});
}
if (adapter.config.dbtype === 'postgresql') {
params.database = adapter.config.dbname;
}
try {
if (!clients[adapter.config.dbtype].name) {
adapter.log.error('Unknown SQL type selected: "' + adapter.config.dbtype + '"');
} else if (!SQL[clients[adapter.config.dbtype].name + 'Pool']) {
adapter.log.error('Selected SQL DB was not installed properly: "' + adapter.config.dbtype + '". SQLite requires build tools on system. See README.md');
} else {
clientPool = new SQL[clients[adapter.config.dbtype].name + 'Pool'](params);
return clientPool.open(function (err) {
if (err) {
adapter.log.error(err);
setTimeout(function () {
connect();
}, 30000);
} else {
setTimeout(function () {
connect();
}, 0);
}
});
}
} catch (ex) {
if (ex.toString() === 'TypeError: undefined is not a function') {
adapter.log.error('Node.js DB driver for "' + adapter.config.dbtype + '" could not be installed.');
} else {
adapter.log.error(ex.toString());
}
setConnected(false);
return setTimeout(function () {
connect();
}, 30000);
}
}
allScripts(SQLFuncs.init(adapter.config.dbname), function (err) {
if (err) {
//adapter.log.error(err);
return setTimeout(function () {
connect();
}, 30000);
} else {
adapter.log.info('Connected to ' + adapter.config.dbtype);
setConnected(true);
// read all DB IDs and all FROM ids
if (!multiRequests) {
getAllIds(function () {
getAllFroms();
processStartValues();
});
} else {
getAllIds(function () {
processStartValues();
});
}
}
});
}
// Find sqlite data directory
function getSqlLiteDir(fileName) {
fileName = fileName || 'sqlite.db';
fileName = fileName.replace(/\\/g, '/');
if (fileName[0] === '/' || fileName.match(/^\w:\//)) {
return fileName;
}
else {
// normally /opt/yunkong2/node_modules/yunkong2.js-controller
// but can be /example/yunkong2.js-controller
var tools = require(utils.controllerDir + '/lib/tools');
var config = tools.getConfigFileName().replace(/\\/g, '/');
var parts = config.split('/');
parts.pop();
config = parts.join('/') + '/sqlite';
// create sqlite directory
if (!fs.existsSync(config)) {
fs.mkdirSync(config);
}
return config + '/' + fileName;
}
}
function testConnection(msg) {
msg.message.config.port = parseInt(msg.message.config.port, 10) || 0;
var params = {
server: msg.message.config.host,
host: msg.message.config.host,
user: msg.message.config.user,
password: msg.message.config.password
};
if (msg.message.config.port) {
params.port = msg.message.config.port;
}
if (msg.message.config.dbtype === 'postgresql' && !SQL.PostgreSQLClient) {
var postgres = require(__dirname + '/lib/postgresql-client');
for (var attr in postgres) {
if (!SQL[attr]) SQL[attr] = postgres[attr];
}
} else
if (msg.message.config.dbtype === 'mssql' && !SQL.MSSQLClient) {
var mssql = require(__dirname + '/lib/mssql-client');
for (var _attr in mssql) {
if (!SQL[_attr]) SQL[_attr] = mssql[_attr];
}
}
if (msg.message.config.dbtype === 'postgresql') {
params.database = 'postgres';
} else if (msg.message.config.dbtype === 'sqlite') {
params = getSqlLiteDir(msg.message.config.fileName);
}
var timeout;
try {
var client = new SQL[clients[msg.message.config.dbtype].name](params);
timeout = setTimeout(function () {
timeout = null;
adapter.sendTo(msg.from, msg.command, {error: 'connect timeout'}, msg.callback);
}, 5000);
client.connect(function (err) {
if (err) {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
return adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback);
}
client.execute("SELECT 2 + 3 AS x", function (err /* , rows, fields */) {
client.disconnect();
if (timeout) {
clearTimeout(timeout);
timeout = null;
return adapter.sendTo(msg.from, msg.command, {error: err ? err.toString() : null}, msg.callback);
}
});
});
} catch (ex) {
if (timeout) {
clearTimeout(timeout);
timeout = null;
}
if (ex.toString() === 'TypeError: undefined is not a function') {
return adapter.sendTo(msg.from, msg.command, {error: 'Node.js DB driver could not be installed.'}, msg.callback);
} else {
return adapter.sendTo(msg.from, msg.command, {error: ex.toString()}, msg.callback);
}
}
}
function destroyDB(msg) {
try {
allScripts(SQLFuncs.destroy(adapter.config.dbname), function (err) {
if (err) {
adapter.log.error(err);
adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback);
} else {
adapter.sendTo(msg.from, msg.command, {error: null}, msg.callback);
// restart adapter
setTimeout(function () {
adapter.getForeignObject('system.adapter.' + adapter.namespace, function (err, obj) {
if (!err) {
adapter.setForeignObject(obj._id, obj);
} else {
adapter.log.error('Cannot read object "system.adapter.' + adapter.namespace + '": ' + err);
adapter.stop();
}
});
}, 2000);
}
});
} catch (ex) {
return adapter.sendTo(msg.from, msg.command, {error: ex.toString()}, msg.callback);
}
}
function _userQuery(msg, callback) {
try {
adapter.log.debug(msg.message);
clientPool.borrow(function (err, client) {
if (err) {
adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback);
if (callback) callback();
} else {
client.execute(msg.message, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
clientPool.return(client);
adapter.sendTo(msg.from, msg.command, {error: err ? err.toString() : null, result: rows}, msg.callback);
if (callback) callback();
});
}
});
} catch (err) {
adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback);
if (callback) callback();
}
}
// execute custom query
function query(msg) {
if (!multiRequests) {
if (tasks.length > 100) {
adapter.log.error('Cannot queue new requests, because more than 100');
adapter.sendTo(msg.from, msg.command, {error: 'Cannot queue new requests, because more than 100'}, msg.callback);
return;
}
tasks.push({operation: 'userQuery', msg: msg});
if (tasks.length === 1) processTasks();
} else {
_userQuery(msg);
}
}
// one script
function oneScript(script, cb) {
try {
clientPool.borrow(function (err, client) {
if (err || !client) {
clientPool.close();
clientPool = null;
adapter.log.error(err);
if (cb) cb(err);
return;
}
adapter.log.debug(script);
client.execute(script, function(err /* , rows, fields */) {
adapter.log.debug('Response: ' + JSON.stringify(err));
if (err) {
// Database 'yunkong2' already exists. Choose a different database name.
if (err.number === 1801 ||
//There is already an object named 'sources' in the database.
err.number === 2714) {
// do nothing
err = null;
} else
if (err.message && err.message.match(/^SQLITE_ERROR: table [\w_]+ already exists$/)) {
// do nothing
err = null;
} else
if (err.errno == 1007 || err.errno == 1050) { // if database exists or table exists
// do nothing
err = null;
} else
if (err.code === '42P04') {// if database exists or table exists
// do nothing
err = null;
}
else if (err.code === '42P07') {
var match = script.match(/CREATE\s+TABLE\s+(\w*)\s+\(/);
if (match) {
adapter.log.debug('OK. Table "' + match[1] + '" yet exists');
err = null;
} else {
adapter.log.error(script);
adapter.log.error(err);
}
} else {
adapter.log.error(script);
adapter.log.error(err);
}
}
if (cb) cb(err);
clientPool.return(client);
});
});
} catch(ex) {
adapter.log.error(ex);
if (cb) cb(ex);
}
}
// all scripts
function allScripts(scripts, index, cb) {
if (typeof index === 'function') {
cb = index;
index = 0;
}
index = index || 0;
if (scripts && index < scripts.length) {
oneScript(scripts[index], function (err) {
if (err) {
if (cb) cb(err);
} else {
allScripts(scripts, index + 1, cb);
}
});
} else {
if (cb) cb();
}
}
function finish(callback) {
function finishId(id) {
if (sqlDPs[id].relogTimeout) {
clearTimeout(sqlDPs[id].relogTimeout);
sqlDPs[id].relogTimeout = null;
}
if (sqlDPs[id].timeout) {
clearTimeout(sqlDPs[id].timeout);
sqlDPs[id].timeout = null;
}
var tmpState;
if (Object.assign) {
tmpState = Object.assign({}, sqlDPs[id].state);
}
else {
tmpState = JSON.parse(JSON.stringify(sqlDPs[id].state));
}
var state = sqlDPs[id].state ? tmpState : null;
if (sqlDPs[id].skipped) {
count++;
pushValueIntoDB(id, sqlDPs[id].skipped, function () {
if (!--count) {
if (clientPool) {
clientPool.close();
clientPool = null;
}
if (typeof finished === 'object') {
setTimeout(function (cb) {
for (var f = 0; f < cb.length; f++) {
cb[f]();
}
}, 500, finished);
finished = true;
}
}
});
sqlDPs[id].skipped = null;
}
var nullValue = {val: null, ts: now, lc: now, q: 0x40, from: 'system.adapter.' + adapter.namespace};
if (sqlDPs[id][adapter.namespace] && adapter.config.writeNulls) {
if (sqlDPs[id][adapter.namespace].changesOnly && state && state.val !== null) {
count++;
(function (_id, _state, _nullValue) {
_state.ts = now;
_state.from = 'system.adapter.' + adapter.namespace;
nullValue.ts += 4;
nullValue.lc += 4; // because of MS SQL
adapter.log.debug('Write 1/2 "' + _state.val + '" _id: ' + _id);
pushValueIntoDB(_id, _state, function () {
// terminate values with null to indicate adapter stop. timestamp + 1#
adapter.log.debug('Write 2/2 "null" _id: ' + _id);
pushValueIntoDB(_id, _nullValue, function () {
if (!--count) {
if (clientPool) {
clientPool.close();
clientPool = null;
}
if (typeof finished === 'object') {
setTimeout(function (cb) {
for (var f = 0; f < cb.length; f++) {
cb[f]();
}
}, 500, finished);
finished = true;
}
}
});
});
})(id, state, nullValue);
} else {
// terminate values with null to indicate adapter stop. timestamp + 1
count++;
adapter.log.debug('Write 0 NULL _id: ' + id);
pushValueIntoDB(id, nullValue, function () {
if (!--count) {
if (clientPool) {
clientPool.close();
clientPool = null;
}
if (typeof finished === 'object') {
setTimeout(function (cb) {
for (var f = 0; f < cb.length; f++) {
cb[f]();
}
}, 500, finished);
finished = true;
}
}
});
}
}
}
adapter.unsubscribeForeignStates('*');
var count = 0;
if (finished) {
if (callback) {
if (finished === true) {
callback();
} else {
finished.push(callback);
}
}
return;
}
finished = [callback];
var now = new Date().getTime();
var dpcount = 0;
var delay = 0;
for (var id in sqlDPs) {
if (!sqlDPs.hasOwnProperty(id)) continue;
dpcount++;
delay += (dpcount%50 === 0) ? 1000: 0;
setTimeout(finishId, delay, id);
}
if (!dpcount && callback) {
if (clientPool) {
clientPool.close();
clientPool = null;
}
callback();
}
}
function processMessage(msg) {
if (msg.command === 'getHistory') {
getHistory(msg);
}
else if (msg.command === 'test') {
testConnection(msg);
}
else if (msg.command === 'destroy') {
destroyDB(msg);
}
else if (msg.command === 'query') {
query(msg);
}
else if (msg.command === 'storeState') {
storeState(msg);
}
else if (msg.command === 'getDpOverview') {
getDpOverview(msg);
}
else if (msg.command === 'enableHistory') {
enableHistory(msg);
}
else if (msg.command === 'disableHistory') {
disableHistory(msg);
}
else if (msg.command === 'getEnabledDPs') {
getEnabledDPs(msg);
} else if (msg.command === 'stopInstance') {
finish(function () {
if (msg.callback) {
adapter.sendTo(msg.from, msg.command, 'stopped', msg.callback);
setTimeout(function () {
process.exit(0);
}, 200);
}
});
}
}
function fixSelector(callback) {
// fix _design/custom object
adapter.getForeignObject('_design/custom', function (err, obj) {
if (!obj || obj.views.state.map.indexOf('common.history') === -1 || obj.views.state.map.indexOf('common.custom') === -1) {
obj = {
_id: '_design/custom',
language: 'javascript',
views: {
state: {
map: 'function(doc) { if (doc.type===\'state\' && (doc.common.custom || doc.common.history)) emit(doc._id, doc.common.custom || doc.common.history) }'
}
}
};
adapter.setForeignObject('_design/custom', obj, function (err) {
if (callback) callback(err);
});
} else {
if (callback) callback(err);
}
});
}
function processStartValues() {
if (tasksStart && tasksStart.length) {
var task = tasksStart.shift();
if (sqlDPs[task.id][adapter.namespace].changesOnly) {
adapter.getForeignState(sqlDPs[task.id].realId, function (err, state) {
var now = task.now || new Date().getTime();
pushHistory(task.id, {
val: null,
ts: state ? now - 4 : now, // 4 is because of MS SQL
lc: state ? now - 4 : now, // 4 is because of MS SQL
ack: true,
q: 0x40,
from: 'system.adapter.' + adapter.namespace
});
if (state) {
state.ts = now;
state.lc = now;
state.from = 'system.adapter.' + adapter.namespace;
pushHistory(task.id, state);
}
setTimeout(processStartValues, 0);
});
}
else {
pushHistory(task.id, {
val: null,
ts: task.now || new Date().getTime(),
lc: task.now || new Date().getTime(),
ack: true,
q: 0x40,
from: 'system.adapter.' + adapter.namespace
});
setTimeout(processStartValues, 0);
}
if (sqlDPs[task.id][adapter.namespace] && sqlDPs[task.id][adapter.namespace].changesRelogInterval > 0) {
if (sqlDPs[task.id].relogTimeout) clearTimeout(sqlDPs[task.id].relogTimeout);
sqlDPs[task.id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[task.id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[task.id][adapter.namespace].changesRelogInterval * 500, task.id);
}
}
}
function writeNulls(id, now) {
if (!id) {
now = new Date().getTime();
for (var _id in sqlDPs) {
if (sqlDPs.hasOwnProperty(_id) && sqlDPs[_id] && sqlDPs[_id][adapter.namespace]) {
writeNulls(_id, now);
}
}
} else {
now = now || new Date().getTime();
tasksStart.push({id: id, now: now});
if (tasksStart.length === 1 && connected) {
processStartValues();
}
}
}
function main() {
setConnected(false);
adapter.config.dbname = adapter.config.dbname || 'yunkong2';
if (adapter.config.writeNulls === undefined) adapter.config.writeNulls = true;
adapter.config.retention = parseInt(adapter.config.retention, 10) || 0;
adapter.config.debounce = parseInt(adapter.config.debounce, 10) || 0;
adapter.config.requestInterval = (adapter.config.requestInterval === undefined || adapter.config.requestInterval === null || adapter.config.requestInterval === '') ? 0 : parseInt(adapter.config.requestInterval, 10) || 0;
if (adapter.config.changesRelogInterval !== null && adapter.config.changesRelogInterval !== undefined) {
adapter.config.changesRelogInterval = parseInt(adapter.config.changesRelogInterval, 10);
} else {
adapter.config.changesRelogInterval = 0;
}
if (!clients[adapter.config.dbtype]) {
adapter.log.error('Unknown DB type: ' + adapter.config.dbtype);
adapter.stop();
}
if (adapter.config.multiRequests !== undefined && adapter.config.dbtype !== 'SQLite3Client' && adapter.config.dbtype !== 'sqlite') {
clients[adapter.config.dbtype].multiRequests = adapter.config.multiRequests;
}
if (adapter.config.changesMinDelta !== null && adapter.config.changesMinDelta !== undefined) {
adapter.config.changesMinDelta = parseFloat(adapter.config.changesMinDelta.toString().replace(/,/g, '.'));
} else {
adapter.config.changesMinDelta = 0;
}
multiRequests = clients[adapter.config.dbtype].multiRequests;
if (!multiRequests) adapter.config.writeNulls = false;
adapter.config.port = parseInt(adapter.config.port, 10) || 0;
if (adapter.config.round !== null && adapter.config.round !== undefined) {
adapter.config.round = Math.pow(10, parseInt(adapter.config.round, 10));
} else {
adapter.config.round = null;
}
if (adapter.config.dbtype === 'postgresql' && !SQL.PostgreSQLClient) {
var postgres = require(__dirname + '/lib/postgresql-client');
for (var attr in postgres) {
if (postgres.hasOwnProperty(attr) && !SQL[attr]) {
SQL[attr] = postgres[attr];
}
}
} else
if (adapter.config.dbtype === 'mssql' && !SQL.MSSQLClient) {
var mssql = require(__dirname + '/lib/mssql-client');
for (var attr_ in mssql) {
if (mssql.hasOwnProperty(attr_) && !SQL[attr_]) {
SQL[attr_] = mssql[attr_];
}
}
}
SQLFuncs = require(__dirname + '/lib/' + adapter.config.dbtype);
fixSelector(function () {
// read all custom settings
adapter.objects.getObjectView('custom', 'state', {}, function (err, doc) {
var count = 0;
if (doc && doc.rows) {
for (var i = 0, l = doc.rows.length; i < l; i++) {
if (doc.rows[i].value) {
var id = doc.rows[i].id;
var realId = id;
if (doc.rows[i].value[adapter.namespace] && doc.rows[i].value[adapter.namespace].aliasId) {
aliasMap[id] = doc.rows[i].value[adapter.namespace].aliasId;
adapter.log.debug('Found Alias: ' + id + ' --> ' + aliasMap[id]);
id = aliasMap[id];
}
sqlDPs[id] = doc.rows[i].value;
if (!sqlDPs[id][adapter.namespace]) {
delete sqlDPs[id];
} else {
count++;
adapter.log.info('enabled logging of ' + id + ', Alias=' + (id !== realId) + ', ' + count + ' points now activated');
if (sqlDPs[id][adapter.namespace].retention !== undefined && sqlDPs[id][adapter.namespace].retention !== null && sqlDPs[id][adapter.namespace].retention !== '') {
sqlDPs[id][adapter.namespace].retention = parseInt(sqlDPs[id][adapter.namespace].retention, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].retention = adapter.config.retention;
}
if (sqlDPs[id][adapter.namespace].debounce !== undefined && sqlDPs[id][adapter.namespace].debounce !== null && sqlDPs[id][adapter.namespace].debounce !== '') {
sqlDPs[id][adapter.namespace].debounce = parseInt(sqlDPs[id][adapter.namespace].debounce, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].debounce = adapter.config.debounce;
}
sqlDPs[id][adapter.namespace].changesOnly = sqlDPs[id][adapter.namespace].changesOnly === 'true' || sqlDPs[id][adapter.namespace].changesOnly === true;
if (sqlDPs[id][adapter.namespace].changesRelogInterval !== undefined && sqlDPs[id][adapter.namespace].changesRelogInterval !== null && sqlDPs[id][adapter.namespace].changesRelogInterval !== '') {
sqlDPs[id][adapter.namespace].changesRelogInterval = parseInt(sqlDPs[id][adapter.namespace].changesRelogInterval, 10) || 0;
} else {
sqlDPs[id][adapter.namespace].changesRelogInterval = adapter.config.changesRelogInterval;
}
if (sqlDPs[id][adapter.namespace].changesRelogInterval > 0) {
sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id);
}
if (sqlDPs[id][adapter.namespace].changesMinDelta !== undefined && sqlDPs[id][adapter.namespace].changesMinDelta !== null && sqlDPs[id][adapter.namespace].changesMinDelta !== '') {
sqlDPs[id][adapter.namespace].changesMinDelta = parseFloat(sqlDPs[id][adapter.namespace].changesMinDelta) || 0;
} else {
sqlDPs[id][adapter.namespace].changesMinDelta = adapter.config.changesMinDelta;
}
if (!sqlDPs[id][adapter.namespace].storageType) sqlDPs[id][adapter.namespace].storageType = false;
// add one day if retention is too small
if (sqlDPs[id][adapter.namespace].retention && sqlDPs[id][adapter.namespace].retention <= 604800) {
sqlDPs[id][adapter.namespace].retention += 86400;
}
if (sqlDPs[id][adapter.namespace] && sqlDPs[id][adapter.namespace].changesRelogInterval > 0) {
if (sqlDPs[id].relogTimeout) clearTimeout(sqlDPs[id].relogTimeout);
sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id);
}
sqlDPs[id].realId = realId;
}
}
}
}
if (adapter.config.writeNulls) writeNulls();
if (count < 20) {
for (var _id in sqlDPs) {
if (sqlDPs.hasOwnProperty(_id)) {
adapter.subscribeForeignStates(sqlDPs[_id].realId);
}
}
} else {
subscribeAll = true;
adapter.subscribeForeignStates('*');
}
});
});
adapter.subscribeForeignObjects('*');
if (adapter.config.dbtype === 'sqlite' || adapter.config.host) {
connect();
}
}
function pushHistory(id, state, timerRelog) {
if (timerRelog === undefined) timerRelog = false;
// Push into DB
if (sqlDPs[id]) {
var settings = sqlDPs[id][adapter.namespace];
if (!settings || !state) return;
adapter.log.debug('new value received for ' + id + ', new-value=' + state.val + ', ts=' + state.ts + ', relog=' + timerRelog);
if (state.val !== null && typeof state.val === 'string' && settings.storageType !== 'String') {
var f = parseFloat(state.val);
if (f == state.val) {
state.val = f;
}
}
if (sqlDPs[id].state && settings.changesOnly && !timerRelog) {
if (settings.changesRelogInterval === 0) {
if (state.ts !== state.lc) {
sqlDPs[id].skipped = state; // remember new timestamp
adapter.log.debug('value not changed ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts);
return;
}
}
else if (sqlDPs[id].lastLogTime) {
if ((state.ts !== state.lc) && (Math.abs(sqlDPs[id].lastLogTime - state.ts) < settings.changesRelogInterval * 1000)) {
sqlDPs[id].skipped = state; // remember new timestamp
adapter.log.debug('value not changed relog' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts);
return;
}
if (state.ts !== state.lc) {
adapter.log.debug('value-changed-relog ' + id + ', value=' + state.val + ', lastLogTime=' + sqlDPs[id].lastLogTime + ', ts=' + state.ts);
}
}
if (sqlDPs[id].state.val !== null && (settings.changesMinDelta !== 0) && (typeof state.val === 'number') && (Math.abs(sqlDPs[id].state.val - state.val) < settings.changesMinDelta)) {
adapter.log.debug('Min-Delta not reached ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts);
sqlDPs[id].skipped = state; // remember new timestamp
return;
}
else if (typeof state.val === 'number') {
adapter.log.debug('Min-Delta reached ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts);
}
else {
adapter.log.debug('Min-Delta ignored because no number ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts);
}
}
if (sqlDPs[id].relogTimeout) {
clearTimeout(sqlDPs[id].relogTimeout);
sqlDPs[id].relogTimeout = null;
}
if (settings.changesRelogInterval > 0) {
sqlDPs[id].relogTimeout = setTimeout(reLogHelper, settings.changesRelogInterval * 1000, id);
}
var ignoreDebonce = false;
if (timerRelog) {
state.ts = new Date().getTime();
adapter.log.debug('timed-relog ' + id + ', value=' + state.val + ', lastLogTime=' + sqlDPs[id].lastLogTime + ', ts=' + state.ts);
ignoreDebonce = true;
} else {
if (settings.changesOnly && sqlDPs[id].skipped) {
sqlDPs[id].state = sqlDPs[id].skipped;
pushHelper(id);
}
if (sqlDPs[id].state && ((sqlDPs[id].state.val === null && state.val !== null) || (sqlDPs[id].state.val !== null && state.val === null))) {
ignoreDebonce = true;
}
else if (!sqlDPs[id].state && state.val === null) {
ignoreDebonce = true;
}
// only store state if really changed
sqlDPs[id].state = state;
}
sqlDPs[id].lastLogTime = state.ts;
sqlDPs[id].skipped = null;
if (settings.debounce && !ignoreDebonce) {
// Discard changes in debounce time to store last stable value
if (sqlDPs[id].timeout) clearTimeout(sqlDPs[id].timeout);
sqlDPs[id].timeout = setTimeout(pushHelper, settings.debounce, id);
} else {
pushHelper(id);
}
}
}
function reLogHelper(_id) {
if (!sqlDPs[_id]) {
adapter.log.info('non-existing id ' + _id);
return;
}
sqlDPs[_id].relogTimeout = null;
if (sqlDPs[_id].skipped) {
sqlDPs[_id].state = sqlDPs[_id].skipped;
sqlDPs[_id].state.from = 'system.adapter.' + adapter.namespace;
sqlDPs[_id].skipped = null;
pushHistory(_id, sqlDPs[_id].state, true);
}
else {
adapter.getForeignState(sqlDPs[_id].realId, function (err, state) {
if (err) {
adapter.log.info('init timed Relog: can not get State for ' + _id + ' : ' + err);
}
else if (!state) {
adapter.log.info('init timed Relog: disable relog because state not set so far for ' + _id + ': ' + JSON.stringify(state));
}
else {
adapter.log.debug('init timed Relog: getState ' + _id + ': Value=' + state.val + ', ack=' + state.ack + ', ts=' + state.ts + ', lc=' + state.lc);
sqlDPs[_id].state = state;
pushHistory(_id, sqlDPs[_id].state, true);
}
});
}
}
function pushHelper(_id) {
if (!sqlDPs[_id] || !sqlDPs[_id].state) return;
var _settings = sqlDPs[_id][adapter.namespace];
// if it was not deleted in this time
if (_settings) {
sqlDPs[_id].timeout = null;
if (sqlDPs[_id].state.val !== null) {
if (typeof sqlDPs[_id].state.val === 'object') {
sqlDPs[_id].state.val = JSON.stringify(sqlDPs[_id].state.val);
}
adapter.log.debug('Datatype ' + _id + ': Currently: ' + typeof sqlDPs[_id].state.val + ', StorageType: ' + _settings.storageType);
if (typeof sqlDPs[_id].state.val === 'string' && _settings.storageType !== 'String') {
adapter.log.debug('Do Automatic Datatype conversion for ' + _id);
var f = parseFloat(sqlDPs[_id].state.val);
if (f == sqlDPs[_id].state.val) {
sqlDPs[_id].state.val = f;
} else if (sqlDPs[_id].state.val === 'true') {
sqlDPs[_id].state.val = true;
} else if (sqlDPs[_id].state.val === 'false') {
sqlDPs[_id].state.val = false;
}
}
if (_settings.storageType === 'String' && typeof sqlDPs[_id].state.val !== 'string') {
sqlDPs[_id].state.val = sqlDPs[_id].state.val.toString();
}
else if (_settings.storageType === 'Number' && typeof sqlDPs[_id].state.val !== 'number') {
if (typeof sqlDPs[_id].state.val === 'boolean') {
sqlDPs[_id].state.val = sqlDPs[_id].state.val?1:0;
}
else {
adapter.log.info('Do not store value "' + sqlDPs[_id].state.val + '" for ' + _id + ' because no number');
return;
}
}
else if (_settings.storageType === 'Boolean' && typeof sqlDPs[_id].state.val !== 'boolean') {
sqlDPs[_id].state.val = !!sqlDPs[_id].state.val;
}
}
else {
adapter.log.debug('Datatype ' + _id + ': Currently: null');
}
pushValueIntoDB(_id, sqlDPs[_id].state);
}
}
function getAllIds(cb) {
var query = SQLFuncs.getIdSelect(adapter.config.dbname);
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
if (cb) cb(err);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err);
clientPool.return(client);
return;
}
if (rows.length) {
var id;
for (var r = 0; r < rows.length; r++) {
id = rows[r].name;
sqlDPs[id] = sqlDPs[id] || {};
sqlDPs[id].index = rows[r].id;
if (rows[r].type !== null) sqlDPs[id].dbtype = rows[r].type;
}
if (cb) cb();
clientPool.return(client);
}
});
});
}
function getAllFroms(cb) {
var query = SQLFuncs.getFromSelect(adapter.config.dbname);
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
if (cb) cb(err);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err);
clientPool.return(client);
return;
}
if (rows.length) {
for (var r = 0; r < rows.length; r++) {
from[rows[r].name] = rows[r].id;
}
if (cb) cb();
clientPool.return(client);
}
});
});
}
function _checkRetention(query, cb) {
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
adapter.log.error(err);
if (cb) cb();
return;
}
client.execute(query, function (err /* , rows, fields */ ) {
if (err) adapter.log.error('Cannot delete ' + query + ': ' + err);
clientPool.return(client);
if (cb) cb();
});
});
}
function checkRetention(id) {
if (sqlDPs[id] && sqlDPs[id][adapter.namespace] && sqlDPs[id][adapter.namespace].retention) {
var d = new Date();
var dt = d.getTime();
// check every 6 hours
if (!sqlDPs[id].lastCheck || dt - sqlDPs[id].lastCheck >= 21600000/* 6 hours */) {
sqlDPs[id].lastCheck = dt;
var query = SQLFuncs.retention(adapter.config.dbname, sqlDPs[id].index, dbNames[sqlDPs[id].type], sqlDPs[id][adapter.namespace].retention);
if (!multiRequests) {
if (tasks.length > 100) {
adapter.log.error('Cannot queue new requests, because more than 100');
return;
}
tasks.push({operation: 'delete', query: query});
if (tasks.length === 1) processTasks();
} else {
_checkRetention(query);
}
}
}
}
function _insertValueIntoDB(query, id, cb) {
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
adapter.log.error(err);
if (cb) cb();
return;
}
client.execute(query, function (err /* , rows, fields */) {
if (err) adapter.log.error('Cannot insert ' + query + ': ' + err);
clientPool.return(client);
checkRetention(id);
if (cb) cb();
});
});
}
function processReadTypes() {
if (tasksReadType && tasksReadType.length) {
var task = tasksReadType.shift();
adapter.log.debug('Type set in Def for ' + task.id + ': ' + sqlDPs[task.id][adapter.namespace].storageType);
if (sqlDPs[task.id][adapter.namespace].storageType) {
sqlDPs[task.id].type = types[sqlDPs[task.id][adapter.namespace].storageType.toLowerCase()];
adapter.log.debug('Type (from Def) for ' + task.id + ': ' + sqlDPs[task.id].type);
processVerifyTypes(task);
}
else if (sqlDPs[task.id].dbtype !== undefined) {
sqlDPs[task.id].type = sqlDPs[task.id].dbtype;
sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type];
adapter.log.debug('Type (from DB-Type) for ' + task.id + ': ' + sqlDPs[task.id].type);
processVerifyTypes(task);
}
else {
adapter.getForeignObject(sqlDPs[task.id].realId, function (err, obj) {
if (err) {
adapter.log.warn('Error while get Object for Def: ' + err);
}
if (obj && obj.common && obj.common.type) {
adapter.log.debug(obj.common.type.toLowerCase() + ' / ' + types[obj.common.type.toLowerCase()] + ' / ' + JSON.stringify(obj.common));
sqlDPs[task.id].type = types[obj.common.type.toLowerCase()];
sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type];
adapter.log.debug('Type (from Obj) for ' + task.id + ': ' + sqlDPs[task.id].type);
processVerifyTypes(task);
}
if (sqlDPs[task.id].type === undefined) {
adapter.getForeignState(sqlDPs[task.id].realId, function (err, state) {
if (err) {
adapter.log.warn('Store data for ' + task.id + ' as string because no other valid type found (' + obj.common.type.toLowerCase() + ' and no state)');
sqlDPs[task.id].type = 1; // string
}
else if (state && state.val !== null && state.val !== undefined && types[typeof state.val] !== undefined) {
sqlDPs[task.id].type = types[typeof state.val];
sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type];
}
else {
adapter.log.warn('Store data for ' + task.id + ' as string because no other valid type found (' + (state?(typeof state.val):'state not existing') + ')');
sqlDPs[task.id].type = 1; // string
}
adapter.log.debug('Type (from State) for ' + task.id + ': ' + sqlDPs[task.id].type);
processVerifyTypes(task);
});
}
});
}
}
}
function processVerifyTypes(task) {
if (sqlDPs[task.id].index !== undefined && sqlDPs[task.id].type !== undefined && sqlDPs[task.id].type !== sqlDPs[task.id].dbtype) {
sqlDPs[task.id].dbtype = sqlDPs[task.id].type;
var query = SQLFuncs.getIdUpdate(adapter.config.dbname, sqlDPs[task.id].index, sqlDPs[task.id].type);
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
processVerifyTypes(task);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (err) {
adapter.log.error('error updating history config for ' + task.id + ' to pin datatype: ' + query + ': ' + err);
}
else {
adapter.log.info('changed history configuration to pin detected datatype for ' + task.id);
}
clientPool.return(client);
processVerifyTypes(task);
});
});
return;
}
pushValueIntoDB(task.id, task.state);
setTimeout(processReadTypes, 50);
}
function pushValueIntoDB(id, state, cb) {
if (!sqlDPs[id]) return;
if (!clientPool) {
adapter.log.warn('No connection to SQL-DB');
if (cb) cb('No connection to SQL-DB');
return;
}
var type;
if (sqlDPs[id].type !== undefined) {
type = sqlDPs[id].type;
}
else {
// read type from DB
tasksReadType.push({id: id, state: state});
if (tasksReadType.length === 1) {
processReadTypes();
}
return;
}
if (type === undefined) { // Can not happen anymore
if (state.val === null) {
adapter.log.warn('Ignore null value for ' + id + ' because no type defined till now.');
if (cb) cb('Ignore null value for ' + id + ' because no type defined till now.');
return;
}
adapter.log.warn('Cannot store values of type "' + typeof state.val + '" for ' + id);
if (cb) cb('Cannot store values of type "' + typeof state.val + '" ' + id);
return;
}
var tmpState;
// get id if state
if (sqlDPs[id].index === undefined) {
sqlDPs[id].isRunning = sqlDPs[id].isRunning || [];
if (Object.assign) {
tmpState = Object.assign({}, state);
}
else {
tmpState = JSON.parse(JSON.stringify(state));
}
sqlDPs[id].isRunning.push({id: id, state: tmpState, cb: cb});
if (sqlDPs[id].isRunning.length === 1) {
// read or create in DB
return getId(id, type, function (err, _id) {
if (err) {
adapter.log.warn('Cannot get index of "' + _id + '": ' + err);
if (sqlDPs[_id].isRunning) {
for (var t = 0; t < sqlDPs[_id].isRunning.length; t++) {
if (sqlDPs[_id].isRunning[t].cb) sqlDPs[_id].isRunning[t].cb('Cannot get index of "' + sqlDPs[_id].isRunning[t].id + '": ' + err);
}
}
} else {
if (sqlDPs[_id].isRunning) {
for (var k = 0; k < sqlDPs[_id].isRunning.length; k++) {
pushValueIntoDB(sqlDPs[_id].isRunning[k].id, sqlDPs[_id].isRunning[k].state, sqlDPs[_id].isRunning[k].cb);
}
}
}
sqlDPs[_id].isRunning = null;
});
}
return;
}
// get from
if (state.from && !from[state.from]) {
isFromRunning[state.from] = isFromRunning[state.from] || [];
if (Object.assign) {
tmpState = Object.assign({}, state);
}
else {
tmpState = JSON.parse(JSON.stringify(state));
}
isFromRunning[state.from].push({id: id, state: tmpState, cb: cb});
if (isFromRunning[state.from].length === 1) {
// read or create in DB
return getFrom(state.from, function (err, from) {
if (err) {
adapter.log.warn('Cannot get "from" for "' + from + '": ' + err);
if (isFromRunning[from]) {
for (var t = 0; t < isFromRunning[from].length; t++) {
if (isFromRunning[from][t].cb) isFromRunning[from][t].cb('Cannot get "from" for "' + from + '": ' + err);
}
}
} else {
if (isFromRunning[from]) {
for (var k = 0; k < isFromRunning[from].length; k++) {
pushValueIntoDB(isFromRunning[from][k].id, isFromRunning[from][k].state, isFromRunning[from][k].cb);
}
}
}
isFromRunning[from] = null;
});
}
return;
}
// if greater than 2000.01.01 00:00:00
if (state.ts > 946681200000) {
state.ts = parseInt(state.ts, 10);
} else {
state.ts = parseInt(state.ts, 10) * 1000 + (parseInt(state.ms, 10) || 0);
}
try {
if (state.val !== null && typeof state.val === 'object') {
state.val = JSON.stringify(state.val);
}
} catch (err) {
adapter.log.error('Cannot convert the object value "' + id + '"');
if (cb) cb('Cannot convert the object value "' + id + '"');
return;
}
// increase timestamp if last is the same
if (sqlDPs[id].ts && state.ts === sqlDPs[id].ts) {
state.ts++;
}
// remember last timestamp
sqlDPs[id].ts = state.ts;
var query = SQLFuncs.insert(adapter.config.dbname, sqlDPs[id].index, state, from[state.from] || 0, dbNames[type]);
if (!multiRequests) {
if (tasks.length > 100) {
adapter.log.error('Cannot queue new requests, because more than 100');
if (cb) cb('Cannot queue new requests, because more than 100');
return;
}
tasks.push({operation: 'insert', query: query, id: id, callback: cb});
if (tasks.length === 1) {
processTasks();
}
} else {
_insertValueIntoDB(query, id, cb);
}
}
var lockTasks = false;
function processTasks() {
if (lockTasks) {
adapter.log.debug('Tries to execute task, but last one not finished!');
return;
}
lockTasks = true;
if (tasks.length) {
if (tasks[0].operation === 'insert') {
_insertValueIntoDB(tasks[0].query, tasks[0].id, function () {
if (tasks[0].callback) tasks[0].callback();
tasks.shift();
lockTasks = false;
if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval);
});
}
else if (tasks[0].operation === 'select') {
_getDataFromDB(tasks[0].query, tasks[0].options, function (err, rows) {
if (tasks[0].callback) tasks[0].callback(err, rows);
tasks.shift();
lockTasks = false;
if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval);
});
}
else if (tasks[0].operation === 'userQuery') {
_userQuery(tasks[0].msg, function () {
if (tasks[0].callback) tasks[0].callback();
tasks.shift();
lockTasks = false;
if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval);
});
}
else if (tasks[0].operation === 'delete') {
_checkRetention(tasks[0].query, function () {
if (tasks[0].callback) tasks[0].callback();
tasks.shift();
lockTasks = false;
if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval);
});
} else {
adapter.log.error('unknown task: ' + tasks[0].operation);
if (tasks[0].callback) tasks[0].callback();
tasks.shift();
lockTasks = false;
if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval);
}
}
}
// my be it is required to cache all the data in memory
function getId(id, type, cb) {
var query = SQLFuncs.getIdSelect(adapter.config.dbname, id);
adapter.log.debug(query);
if (!clientPool) {
if (cb) cb('No connection', id);
return;
}
clientPool.borrow(function (err, client) {
if (err) {
if (cb) cb(err, id);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err, id);
clientPool.return(client);
return;
}
if (!rows.length) {
if (type !== null) {
// insert
query = SQLFuncs.getIdInsert(adapter.config.dbname, id, type);
adapter.log.debug(query);
client.execute(query, function (err /* , rows, fields */) {
if (err) {
adapter.log.error('Cannot insert ' + query + ': ' + err);
if (cb) cb(err, id);
clientPool.return(client);
return;
}
query = SQLFuncs.getIdSelect(adapter.config.dbname,id);
adapter.log.debug(query);
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) {
rows = rows.rows;
}
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err, id);
clientPool.return(client);
return;
}
sqlDPs[id].index = rows[0].id;
sqlDPs[id].type = rows[0].type;
if (cb) cb(null, id);
clientPool.return(client);
});
});
} else {
if (cb) cb('id not found', id);
clientPool.return(client);
}
} else {
sqlDPs[id].index = rows[0].id;
sqlDPs[id].type = rows[0].type;
if (cb) cb(null, id);
clientPool.return(client);
}
});
});
}
// my be it is required to cache all the data in memory
function getFrom(_from, cb) {
// var sources = (adapter.config.dbtype !== 'postgresql' ? (adapter.config.dbname + '.') : '') + 'sources';
var query = SQLFuncs.getFromSelect(adapter.config.dbname, _from);
adapter.log.debug(query);
if (!clientPool) {
if (cb) cb('No connection', _from);
return;
}
clientPool.borrow(function (err, client) {
if (err) {
if (cb) cb(err, _from);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err, _from);
clientPool.return(client);
return;
}
if (!rows.length) {
// insert
query = SQLFuncs.getFromInsert(adapter.config.dbname, _from);
adapter.log.debug(query);
client.execute(query, function (err /* , rows, fields */) {
if (err) {
adapter.log.error('Cannot insert ' + query + ': ' + err);
if (cb) cb(err, _from);
clientPool.return(client);
return;
}
query = SQLFuncs.getFromSelect(adapter.config.dbname, _from);
adapter.log.debug(query);
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
if (cb) cb(err, _from);
clientPool.return(client);
return;
}
from[_from] = rows[0].id;
if (cb) cb(null, _from);
clientPool.return(client);
});
});
} else {
from[_from] = rows[0].id;
if (cb) cb(null, _from);
clientPool.return(client);
}
});
});
}
function sortByTs(a, b) {
var aTs = a.ts;
var bTs = b.ts;
return ((aTs < bTs) ? -1 : ((aTs > bTs) ? 1 : 0));
}
function _getDataFromDB(query, options, callback) {
adapter.log.debug(query);
clientPool.borrow(function (err, client) {
if (err) {
if (callback) callback(err);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
// because descending
if (!err && rows && !options.start && options.count) {
rows.sort(sortByTs);
}
if (rows) {
var isNumber = null;
for (var c = 0; c < rows.length; c++) {
if (isNumber === null && rows[c].val !== null) {
isNumber = (parseFloat(rows[c].val) == rows[c].val);
}
if (typeof rows[c].ts === 'string') {
rows[c].ts = parseInt(rows[c].ts, 10);
}
// if less than 2000.01.01 00:00:00
if (rows[c].ts < 946681200000) {
rows[c].ts *= 1000;
}
if (adapter.common.loglevel === 'debug') {
rows[c].date = new Date(parseInt(rows[c].ts, 10));
}
if (options.ack) {
rows[c].ack = !!rows[c].ack;
}
if (isNumber && adapter.config.round && rows[c].val !== null) {
rows[c].val = Math.round(rows[c].val * adapter.config.round) / adapter.config.round;
}
if (sqlDPs[options.index].type === 2) {
rows[c].val = !!rows[c].val;
}
}
}
clientPool.return(client);
if (callback) callback(err, rows);
});
});
}
function getDataFromDB(db, options, callback) {
var query = SQLFuncs.getHistory(adapter.config.dbname, db, options);
adapter.log.debug(query);
if (!multiRequests) {
if (tasks.length > 100) {
adapter.log.error('Cannot queue new requests, because more than 100');
if (callback) callback('Cannot queue new requests, because more than 100');
return;
}
tasks.push({operation: 'select', query: query, options: options, callback: callback});
if (tasks.length === 1) processTasks();
} else {
_getDataFromDB(query, options, callback);
}
}
function getHistory(msg) {
var startTime = new Date().getTime();
var options = {
id: msg.message.id === '*' ? null : msg.message.id,
start: msg.message.options.start,
end: msg.message.options.end || ((new Date()).getTime() + 5000000),
step: parseInt(msg.message.options.step, 10) || null,
count: parseInt(msg.message.options.count, 10) || 500,
ignoreNull: msg.message.options.ignoreNull,
aggregate: msg.message.options.aggregate || 'average', // One of: max, min, average, total
limit: msg.message.options.limit || adapter.config.limit || 2000,
from: msg.message.options.from || false,
q: msg.message.options.q || false,
ack: msg.message.options.ack || false,
ms: msg.message.options.ms || false,
addId: msg.message.options.addId || false,
sessionId: msg.message.options.sessionId
};
if (options.id && aliasMap[options.id]) {
options.id = aliasMap[options.id];
}
if (options.ignoreNull === 'true') options.ignoreNull = true; // include nulls and replace them with last value
if (options.ignoreNull === 'false') options.ignoreNull = false; // include nulls
if (options.ignoreNull === '0') options.ignoreNull = 0; // include nulls and replace them with 0
if (options.ignoreNull !== true && options.ignoreNull !== false && options.ignoreNull !== 0) options.ignoreNull = false;
if (!sqlDPs[options.id]) {
commons.sendResponse(adapter, msg, options, [], startTime);
return;
}
if (options.start > options.end) {
var _end = options.end;
options.end = options.start;
options.start =_end;
}
if (!options.start && !options.count) {
options.start = (new Date()).getTime() - 5030000; // - 1 year
}
if (sqlDPs[options.id].type === undefined && sqlDPs[options.id].dbtype !== undefined) {
if (sqlDPs[options.id][adapter.namespace] && sqlDPs[options.id][adapter.namespace].storageType) {
if (storageTypes.indexOf(sqlDPs[options.id][adapter.namespace].storageType) === sqlDPs[options.id].dbtype) {
adapter.log.debug('For getHistory for id ' + options.id + ': Type empty, use storageType dbtype ' + sqlDPs[options.id].dbtype);
sqlDPs[options.id].type = sqlDPs[options.id].dbtype;
}
}
else {
adapter.log.debug('For getHistory for id ' + options.id + ': Type empty, use dbtype ' + sqlDPs[options.id].dbtype);
sqlDPs[options.id].type = sqlDPs[options.id].dbtype;
}
}
if (options.id && sqlDPs[options.id].index === undefined) {
// read or create in DB
return getId(options.id, null, function (err) {
if (err) {
adapter.log.warn('Cannot get index of "' + options.id + '": ' + err);
commons.sendResponse(adapter, msg, options, [], startTime);
} else {
getHistory(msg);
}
});
}
if (options.id && sqlDPs[options.id].type === undefined) {
adapter.log.warn('For getHistory for id ' + options.id + ': Type empty. Need to write data first. Index = ' + sqlDPs[options.id].index);
commons.sendResponse(adapter, msg, options, 'Please wait till next data record is logged and reload.', startTime);
return;
}
var type = sqlDPs[options.id].type;
if (options.id) {
options.index = options.id;
options.id = sqlDPs[options.id].index;
}
// if specific id requested
if (options.id || options.id === 0) {
getDataFromDB(dbNames[type], options, function (err, data) {
commons.sendResponse(adapter, msg, options, (err ? err.toString() : null) || data, startTime);
});
} else {
// if all IDs requested
var rows = [];
var count = 0;
for (var db = 0; db < dbNames.length; db++) {
count++;
getDataFromDB(dbNames[db], options, function (err, data) {
if (data) rows = rows.concat(data);
if (!--count) {
rows.sort(sortByTs);
commons.sendResponse(adapter, msg, options, rows, startTime);
}
});
}
}
}
function storeState(msg) {
if (!msg.message || !msg.message.id || !msg.message.state) {
adapter.log.error('storeState called with invalid data');
adapter.sendTo(msg.from, msg.command, {
error: 'Invalid call: ' + JSON.stringify(msg)
}, msg.callback);
return;
}
var id;
if (Array.isArray(msg.message)) {
for (var i = 0; i < msg.message.length; i++) {
id = aliasMap[msg.message[i].id] ? aliasMap[msg.message[i].id] : msg.message[i].id;
pushValueIntoDB(id, msg.message[i].state);
}
} else if (Array.isArray(msg.message.state)) {
for (var j = 0; j < msg.message.state.length; j++) {
id = aliasMap[msg.message.id] ? aliasMap[msg.message.id] : msg.message.id;
pushValueIntoDB(id, msg.message.state[j]);
}
} else {
id = aliasMap[msg.message.id] ? aliasMap[msg.message.id] : msg.message.id;
pushValueIntoDB(id, msg.message.state);
}
adapter.sendTo(msg.from, msg.command, {
success: true,
connected: !!clientPool
}, msg.callback);
}
function getDpOverview(msg) {
var result = {};
var query = SQLFuncs.getIdSelect(adapter.config.dbname);
adapter.log.info(query);
clientPool.borrow(function (err, client) {
if (err) {
adapter.sendTo(msg.from, msg.command, {
error: 'Cannot select ' + query + ': ' + err
}, msg.callback);
return;
}
client.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
adapter.sendTo(msg.from, msg.command, {
error: 'Cannot select ' + query + ': ' + err
}, msg.callback);
clientPool.return(client);
return;
}
adapter.log.info('Query result ' + JSON.stringify(rows));
if (rows.length) {
for (var r = 0; r < rows.length; r++) {
if (!result[rows[r].type]) result[rows[r].type] = {};
result[rows[r].type][rows[r].id] = {};
result[rows[r].type][rows[r].id].name = rows[r].name;
switch(dbNames[rows[r].type]) {
case 'ts_number': result[rows[r].type][rows[r].id].type = 'number';
break;
case 'ts_string': result[rows[r].type][rows[r].id].type = 'string';
break;
case 'ts_bool': result[rows[r].type][rows[r].id].type = 'boolean';
break;
}
}
adapter.log.info('inited result: ' + JSON.stringify(result));
getFirstTsForIds(client, 0, result, msg);
}
});
});
}
function getFirstTsForIds(dbClient, typeId, resultData, msg) {
if (typeId < dbNames.length) {
if (!resultData[typeId]) {
getFirstTsForIds(dbClient, typeId + 1, resultData, msg);
} else {
var query = SQLFuncs.getFirstTs(adapter.config.dbname, dbNames[typeId]);
adapter.log.info(query);
dbClient.execute(query, function (err, rows /* , fields */) {
if (rows && rows.rows) rows = rows.rows;
if (err) {
adapter.log.error('Cannot select ' + query + ': ' + err);
adapter.sendTo(msg.from, msg.command, {
error: 'Cannot select ' + query + ': ' + err
}, msg.callback);
clientPool.return(dbClient);
return;
}
adapter.log.info('Query result ' + JSON.stringify(rows));
if (rows.length) {
for (var r = 0; r < rows.length; r++) {
if (resultData[typeId][rows[r].id]) {
resultData[typeId][rows[r].id].ts = rows[r].ts;
}
}
}
adapter.log.info('enhanced result (' + typeId + '): ' + JSON.stringify(resultData));
setTimeout(getFirstTsForIds, 5000, dbClient, typeId + 1, resultData, msg);
});
}
} else {
clientPool.return(dbClient);
adapter.log.info('consolidate data ...');
var result = {};
for (var ti = 0; ti < dbNames.length; ti++ ) {
if (resultData[ti]) {
for (var index in resultData[ti]) {
if (!resultData[ti].hasOwnProperty(index)) continue;
var id = resultData[ti][index].name;
if (!result[id]) {
result[id] = {};
result[id].type = resultData[ti][index].type;
result[id].ts = resultData[ti][index].ts;
} else {
result[id].type = 'undefined';
if (resultData[ti][index].ts < result[id].ts) {
result[id].ts = resultData[ti][index].ts;
}
}
}
}
}
adapter.log.info('Result: ' + JSON.stringify(result));
adapter.sendTo(msg.from, msg.command, {
succes: true,
result: result
}, msg.callback);
}
}
function enableHistory(msg) {
if (!msg.message || !msg.message.id) {
adapter.log.error('enableHistory called with invalid data');
adapter.sendTo(msg.from, msg.command, {
error: 'Invalid call'
}, msg.callback);
return;
}
var obj = {};
obj.common = {};
obj.common.custom = {};
if (msg.message.options) {
obj.common.custom[adapter.namespace] = msg.message.options;
}
else {
obj.common.custom[adapter.namespace] = {};
}
obj.common.custom[adapter.namespace].enabled = true;
adapter.extendForeignObject(msg.message.id, obj, function (err) {
if (err) {
adapter.log.error('enableHistory: ' + err);
adapter.sendTo(msg.from, msg.command, {
error: err
}, msg.callback);
} else {
adapter.log.info(JSON.stringify(obj));
adapter.sendTo(msg.from, msg.command, {
success: true
}, msg.callback);
}
});
}
function disableHistory(msg) {
if (!msg.message || !msg.message.id) {
adapter.log.error('disableHistory called with invalid data');
adapter.sendTo(msg.from, msg.command, {
error: 'Invalid call'
}, msg.callback);
return;
}
var obj = {};
obj.common = {};
obj.common.custom = {};
obj.common.custom[adapter.namespace] = {};
obj.common.custom[adapter.namespace].enabled = false;
adapter.extendForeignObject(msg.message.id, obj, function (err) {
if (err) {
adapter.log.error('disableHistory: ' + err);
adapter.sendTo(msg.from, msg.command, {
error: err
}, msg.callback);
} else {
adapter.log.info(JSON.stringify(obj));
adapter.sendTo(msg.from, msg.command, {
success: true
}, msg.callback);
}
});
}
function getEnabledDPs(msg) {
var data = {};
for (var id in sqlDPs) {
if (sqlDPs.hasOwnProperty(id) && sqlDPs[id] && sqlDPs[id][adapter.namespace]) {
data[sqlDPs[id].realId] = sqlDPs[id][adapter.namespace];
}
}
adapter.sendTo(msg.from, msg.command, data, msg.callback);
}
process.on('uncaughtException', function(err) {
adapter.log.warn('Exception: ' + err);
});