/* jshint -W097 */// jshint strict:false /*jslint node: true */ 'use strict'; var utils = require(__dirname + '/lib/utils'); // Get common adapter utils var SQL = require('sql-client'); var commons = require(__dirname + '/lib/aggregate'); var SQLFuncs = null; var fs = require('fs'); var clients = { postgresql: {name: 'PostgreSQLClient', multiRequests: true}, mysql: {name: 'MySQLClient', multiRequests: true}, sqlite: {name: 'SQLite3Client', multiRequests: false}, mssql: {name: 'MSSQLClient', multiRequests: true} }; var types = { 'number': 0, 'string': 1, 'boolean': 2, 'object': 1 }; var dbNames = [ 'ts_number', 'ts_string', 'ts_bool' ]; var storageTypes = [ 'Number', 'String', 'Boolean' ]; var clientPool; var sqlDPs = {}; var from = {}; var subscribeAll = false; var tasks = []; var tasksReadType = []; var multiRequests = true; var tasksStart = []; var finished = false; var connected = null; var isFromRunning = {}; var aliasMap = {}; var adapter = utils.Adapter('sql'); adapter.on('objectChange', function (id, obj) { var tmpState; var now = new Date().getTime(); var formerAliasId = aliasMap[id] ? aliasMap[id] : id; if (obj && obj.common && ( // todo remove history sometime (2016.08) - Do not forget object selector in io-package.json (obj.common.history && obj.common.history[adapter.namespace] && obj.common.history[adapter.namespace].enabled) || (obj.common.custom && obj.common.custom[adapter.namespace] && obj.common.custom[adapter.namespace].enabled) ) ) { var realId = id; var checkForRemove = true; if (obj.common.custom && obj.common.custom[adapter.namespace] && obj.common.custom[adapter.namespace].aliasId) { if (obj.common.custom[adapter.namespace].aliasId !== id) { aliasMap[id] = obj.common.custom[adapter.namespace].aliasId; adapter.log.debug('Registered Alias: ' + id + ' --> ' + aliasMap[id]); id = aliasMap[id]; checkForRemove = false; } else { adapter.log.warn('Ignoring Alias-ID because identical to ID for ' + id); obj.common.custom[adapter.namespace].aliasId = ''; } } if (checkForRemove && aliasMap[id]) { adapter.log.debug('Removed Alias: ' + id + ' !-> ' + aliasMap[id]); delete aliasMap[id]; } if (!(sqlDPs[formerAliasId] && sqlDPs[formerAliasId][adapter.namespace]) && !subscribeAll) { // un-subscribe for (var _id in sqlDPs) { if (sqlDPs.hasOwnProperty(sqlDPs[_id].realId)) { adapter.unsubscribeForeignStates(sqlDPs[_id].realId); } } subscribeAll = true; adapter.subscribeForeignStates('*'); } var writeNull = !(sqlDPs[id] && sqlDPs[id][adapter.namespace]); if (sqlDPs[formerAliasId] && sqlDPs[formerAliasId].relogTimeout) { clearTimeout(sqlDPs[formerAliasId].relogTimeout); } var storedIndex = null; var storedType = null; if (sqlDPs[id] && sqlDPs[id].index !== undefined) storedIndex = sqlDPs[id].index; if (sqlDPs[id] && sqlDPs[id].dbtype !== undefined) storedType = sqlDPs[id].dbtype; else if (sqlDPs[formerAliasId] && sqlDPs[formerAliasId].dbtype !== undefined) storedType = sqlDPs[formerAliasId].dbtype; // todo remove history sometime (2016.08) sqlDPs[id] = obj.common.custom || obj.common.history; if (storedIndex !== null) sqlDPs[id].index = storedIndex; if (storedType !== null) sqlDPs[id].dbtype = storedType; if (sqlDPs[id].index === undefined) { getId(id, sqlDPs[id].dbtype, reInit); } else { reInit(); } function reInit() { adapter.log.debug('remembered Index/Type ' + sqlDPs[id].index + ' / ' + sqlDPs[id].dbtype); sqlDPs[id].realId = realId; if (sqlDPs[id][adapter.namespace].retention !== undefined && sqlDPs[id][adapter.namespace].retention !== null && sqlDPs[id][adapter.namespace].retention !== '') { sqlDPs[id][adapter.namespace].retention = parseInt(sqlDPs[id][adapter.namespace].retention, 10) || 0; } else { sqlDPs[id][adapter.namespace].retention = adapter.config.retention; } if (sqlDPs[id][adapter.namespace].debounce !== undefined && sqlDPs[id][adapter.namespace].debounce !== null && sqlDPs[id][adapter.namespace].debounce !== '') { sqlDPs[id][adapter.namespace].debounce = parseInt(sqlDPs[id][adapter.namespace].debounce, 10) || 0; } else { sqlDPs[id][adapter.namespace].debounce = adapter.config.debounce; } sqlDPs[id][adapter.namespace].changesOnly = sqlDPs[id][adapter.namespace].changesOnly === 'true' || sqlDPs[id][adapter.namespace].changesOnly === true; if (sqlDPs[id][adapter.namespace].changesRelogInterval !== undefined && sqlDPs[id][adapter.namespace].changesRelogInterval !== null && sqlDPs[id][adapter.namespace].changesRelogInterval !== '') { sqlDPs[id][adapter.namespace].changesRelogInterval = parseInt(sqlDPs[id][adapter.namespace].changesRelogInterval, 10) || 0; } else { sqlDPs[id][adapter.namespace].changesRelogInterval = adapter.config.changesRelogInterval; } if (sqlDPs[id][adapter.namespace].changesRelogInterval > 0) { sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id); } if (sqlDPs[id][adapter.namespace].changesMinDelta !== undefined && sqlDPs[id][adapter.namespace].changesMinDelta !== null && sqlDPs[id][adapter.namespace].changesMinDelta !== '') { sqlDPs[id][adapter.namespace].changesMinDelta = parseFloat(sqlDPs[id][adapter.namespace].changesMinDelta.toString().replace(/,/g, '.')) || 0; } else { sqlDPs[id][adapter.namespace].changesMinDelta = adapter.config.changesMinDelta; } if (!sqlDPs[id][adapter.namespace].storageType) sqlDPs[id][adapter.namespace].storageType = false; // add one day if retention is too small if (sqlDPs[id][adapter.namespace].retention && sqlDPs[id][adapter.namespace].retention <= 604800) { sqlDPs[id][adapter.namespace].retention += 86400; } if (writeNull && adapter.config.writeNulls) { writeNulls(id); } adapter.log.info('enabled logging of ' + id + ', Alias=' + (id !== realId)); } } else { if (aliasMap[id]) { adapter.log.debug('Removed Alias: ' + id + ' !-> ' + aliasMap[id]); delete aliasMap[id]; } id = formerAliasId; if (sqlDPs[id]) { adapter.log.info('disabled logging of ' + id); if (sqlDPs[id].relogTimeout) clearTimeout(sqlDPs[id].relogTimeout); if (sqlDPs[id].timeout) clearTimeout(sqlDPs[id].timeout); if (Object.assign) { tmpState = Object.assign({}, sqlDPs[id].state); } else { tmpState = JSON.parse(JSON.stringify(sqlDPs[id].state)); } var state = sqlDPs[id].state ? tmpState : null; if (sqlDPs[id].skipped) { pushValueIntoDB(id, sqlDPs[id].skipped); sqlDPs[id].skipped = null; } var nullValue = {val: null, ts: now, lc: now, q: 0x40, from: 'system.adapter.' + adapter.namespace}; if (sqlDPs[id][adapter.namespace] && adapter.config.writeNulls) { if (sqlDPs[id][adapter.namespace].changesOnly && state && state.val !== null) { (function (_id, _state, _nullValue) { _state.ts = now; _state.from = 'system.adapter.' + adapter.namespace; nullValue.ts += 4; nullValue.lc += 4; // because of MS SQL adapter.log.debug('Write 1/2 "' + _state.val + '" _id: ' + _id); pushValueIntoDB(_id, _state, function () { // terminate values with null to indicate adapter stop. timestamp + 1# adapter.log.debug('Write 2/2 "null" _id: ' + _id); pushValueIntoDB(_id, _nullValue, function() { delete sqlDPs[id][adapter.namespace]; }); }); })(id, state, nullValue); } else { // terminate values with null to indicate adapter stop. timestamp + 1 adapter.log.debug('Write 0 NULL _id: ' + id); pushValueIntoDB(id, nullValue, function() { delete sqlDPs[id][adapter.namespace]; }); } } else { delete sqlDPs[id][adapter.namespace]; } } } }); adapter.on('stateChange', function (id, state) { id = aliasMap[id] ? aliasMap[id] : id; pushHistory(id, state); }); adapter.on('unload', function (callback) { finish(callback); }); adapter.on('ready', function () { main(); }); adapter.on('message', function (msg) { processMessage(msg); }); process.on('SIGINT', function () { // close connection to DB finish(); }); process.on('SIGTERM', function () { // close connection to DB finish(); }); function setConnected(isConnected) { if (connected !== isConnected) { connected = isConnected; adapter.setState('info.connection', connected, true); } } var _client = false; function connect() { if (!clientPool) { setConnected(false); var params = { server: adapter.config.host, // needed for MSSQL host: adapter.config.host, // needed for PostgeSQL , MySQL user: adapter.config.user, password: adapter.config.password, max_idle: (adapter.config.dbtype === 'sqlite') ? 1 : 2 }; if (adapter.config.port) { params.port = adapter.config.port; } if (adapter.config.encrypt) { params.options = { encrypt: true // Use this if you're on Windows Azure }; } if (adapter.config.dbtype === 'postgres') { params.database = 'postgres'; } if (adapter.config.dbtype === 'sqlite') { params = getSqlLiteDir(adapter.config.fileName); } else // special solution for postgres. Connect first to Db "postgres", create new DB "yunkong2" and then connect to "yunkong2" DB. if (_client !== true && adapter.config.dbtype === 'postgresql') { if (adapter.config.dbtype === 'postgresql') { params.database = 'postgres'; } if (!adapter.config.dbtype) { adapter.log.error('DB Type is not defined!'); return; } if (!clients[adapter.config.dbtype] || !clients[adapter.config.dbtype].name) { adapter.log.error('Unknown type "' + adapter.config.dbtype + '"'); return; } if (!SQL[clients[adapter.config.dbtype].name]) { adapter.log.error('SQL package "' + clients[adapter.config.dbtype].name + '" is not installed.'); return; } // connect first to DB postgres and create yunkong2 DB _client = new SQL[clients[adapter.config.dbtype].name](params); return _client.connect(function (err) { if (err) { adapter.log.error(err); setTimeout(function () { connect(); }, 30000); return; } _client.execute('CREATE DATABASE ' + adapter.config.dbname + ';', function (err /* , rows, fields */) { _client.disconnect(); if (err && err.code !== '42P04') { // if error not about yet exists _client = false; adapter.log.error(err); setTimeout(function () { connect(); }, 30000); } else { _client = true; setTimeout(function () { connect(); }, 100); } }); }); } if (adapter.config.dbtype === 'postgresql') { params.database = adapter.config.dbname; } try { if (!clients[adapter.config.dbtype].name) { adapter.log.error('Unknown SQL type selected: "' + adapter.config.dbtype + '"'); } else if (!SQL[clients[adapter.config.dbtype].name + 'Pool']) { adapter.log.error('Selected SQL DB was not installed properly: "' + adapter.config.dbtype + '". SQLite requires build tools on system. See README.md'); } else { clientPool = new SQL[clients[adapter.config.dbtype].name + 'Pool'](params); return clientPool.open(function (err) { if (err) { adapter.log.error(err); setTimeout(function () { connect(); }, 30000); } else { setTimeout(function () { connect(); }, 0); } }); } } catch (ex) { if (ex.toString() === 'TypeError: undefined is not a function') { adapter.log.error('Node.js DB driver for "' + adapter.config.dbtype + '" could not be installed.'); } else { adapter.log.error(ex.toString()); } setConnected(false); return setTimeout(function () { connect(); }, 30000); } } allScripts(SQLFuncs.init(adapter.config.dbname), function (err) { if (err) { //adapter.log.error(err); return setTimeout(function () { connect(); }, 30000); } else { adapter.log.info('Connected to ' + adapter.config.dbtype); setConnected(true); // read all DB IDs and all FROM ids if (!multiRequests) { getAllIds(function () { getAllFroms(); processStartValues(); }); } else { getAllIds(function () { processStartValues(); }); } } }); } // Find sqlite data directory function getSqlLiteDir(fileName) { fileName = fileName || 'sqlite.db'; fileName = fileName.replace(/\\/g, '/'); if (fileName[0] === '/' || fileName.match(/^\w:\//)) { return fileName; } else { // normally /opt/yunkong2/node_modules/yunkong2.js-controller // but can be /example/yunkong2.js-controller var tools = require(utils.controllerDir + '/lib/tools'); var config = tools.getConfigFileName().replace(/\\/g, '/'); var parts = config.split('/'); parts.pop(); config = parts.join('/') + '/sqlite'; // create sqlite directory if (!fs.existsSync(config)) { fs.mkdirSync(config); } return config + '/' + fileName; } } function testConnection(msg) { msg.message.config.port = parseInt(msg.message.config.port, 10) || 0; var params = { server: msg.message.config.host, host: msg.message.config.host, user: msg.message.config.user, password: msg.message.config.password }; if (msg.message.config.port) { params.port = msg.message.config.port; } if (msg.message.config.dbtype === 'postgresql' && !SQL.PostgreSQLClient) { var postgres = require(__dirname + '/lib/postgresql-client'); for (var attr in postgres) { if (!SQL[attr]) SQL[attr] = postgres[attr]; } } else if (msg.message.config.dbtype === 'mssql' && !SQL.MSSQLClient) { var mssql = require(__dirname + '/lib/mssql-client'); for (var _attr in mssql) { if (!SQL[_attr]) SQL[_attr] = mssql[_attr]; } } if (msg.message.config.dbtype === 'postgresql') { params.database = 'postgres'; } else if (msg.message.config.dbtype === 'sqlite') { params = getSqlLiteDir(msg.message.config.fileName); } var timeout; try { var client = new SQL[clients[msg.message.config.dbtype].name](params); timeout = setTimeout(function () { timeout = null; adapter.sendTo(msg.from, msg.command, {error: 'connect timeout'}, msg.callback); }, 5000); client.connect(function (err) { if (err) { if (timeout) { clearTimeout(timeout); timeout = null; } return adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback); } client.execute("SELECT 2 + 3 AS x", function (err /* , rows, fields */) { client.disconnect(); if (timeout) { clearTimeout(timeout); timeout = null; return adapter.sendTo(msg.from, msg.command, {error: err ? err.toString() : null}, msg.callback); } }); }); } catch (ex) { if (timeout) { clearTimeout(timeout); timeout = null; } if (ex.toString() === 'TypeError: undefined is not a function') { return adapter.sendTo(msg.from, msg.command, {error: 'Node.js DB driver could not be installed.'}, msg.callback); } else { return adapter.sendTo(msg.from, msg.command, {error: ex.toString()}, msg.callback); } } } function destroyDB(msg) { try { allScripts(SQLFuncs.destroy(adapter.config.dbname), function (err) { if (err) { adapter.log.error(err); adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback); } else { adapter.sendTo(msg.from, msg.command, {error: null}, msg.callback); // restart adapter setTimeout(function () { adapter.getForeignObject('system.adapter.' + adapter.namespace, function (err, obj) { if (!err) { adapter.setForeignObject(obj._id, obj); } else { adapter.log.error('Cannot read object "system.adapter.' + adapter.namespace + '": ' + err); adapter.stop(); } }); }, 2000); } }); } catch (ex) { return adapter.sendTo(msg.from, msg.command, {error: ex.toString()}, msg.callback); } } function _userQuery(msg, callback) { try { adapter.log.debug(msg.message); clientPool.borrow(function (err, client) { if (err) { adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback); if (callback) callback(); } else { client.execute(msg.message, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; clientPool.return(client); adapter.sendTo(msg.from, msg.command, {error: err ? err.toString() : null, result: rows}, msg.callback); if (callback) callback(); }); } }); } catch (err) { adapter.sendTo(msg.from, msg.command, {error: err.toString()}, msg.callback); if (callback) callback(); } } // execute custom query function query(msg) { if (!multiRequests) { if (tasks.length > 100) { adapter.log.error('Cannot queue new requests, because more than 100'); adapter.sendTo(msg.from, msg.command, {error: 'Cannot queue new requests, because more than 100'}, msg.callback); return; } tasks.push({operation: 'userQuery', msg: msg}); if (tasks.length === 1) processTasks(); } else { _userQuery(msg); } } // one script function oneScript(script, cb) { try { clientPool.borrow(function (err, client) { if (err || !client) { clientPool.close(); clientPool = null; adapter.log.error(err); if (cb) cb(err); return; } adapter.log.debug(script); client.execute(script, function(err /* , rows, fields */) { adapter.log.debug('Response: ' + JSON.stringify(err)); if (err) { // Database 'yunkong2' already exists. Choose a different database name. if (err.number === 1801 || //There is already an object named 'sources' in the database. err.number === 2714) { // do nothing err = null; } else if (err.message && err.message.match(/^SQLITE_ERROR: table [\w_]+ already exists$/)) { // do nothing err = null; } else if (err.errno == 1007 || err.errno == 1050) { // if database exists or table exists // do nothing err = null; } else if (err.code === '42P04') {// if database exists or table exists // do nothing err = null; } else if (err.code === '42P07') { var match = script.match(/CREATE\s+TABLE\s+(\w*)\s+\(/); if (match) { adapter.log.debug('OK. Table "' + match[1] + '" yet exists'); err = null; } else { adapter.log.error(script); adapter.log.error(err); } } else { adapter.log.error(script); adapter.log.error(err); } } if (cb) cb(err); clientPool.return(client); }); }); } catch(ex) { adapter.log.error(ex); if (cb) cb(ex); } } // all scripts function allScripts(scripts, index, cb) { if (typeof index === 'function') { cb = index; index = 0; } index = index || 0; if (scripts && index < scripts.length) { oneScript(scripts[index], function (err) { if (err) { if (cb) cb(err); } else { allScripts(scripts, index + 1, cb); } }); } else { if (cb) cb(); } } function finish(callback) { function finishId(id) { if (sqlDPs[id].relogTimeout) { clearTimeout(sqlDPs[id].relogTimeout); sqlDPs[id].relogTimeout = null; } if (sqlDPs[id].timeout) { clearTimeout(sqlDPs[id].timeout); sqlDPs[id].timeout = null; } var tmpState; if (Object.assign) { tmpState = Object.assign({}, sqlDPs[id].state); } else { tmpState = JSON.parse(JSON.stringify(sqlDPs[id].state)); } var state = sqlDPs[id].state ? tmpState : null; if (sqlDPs[id].skipped) { count++; pushValueIntoDB(id, sqlDPs[id].skipped, function () { if (!--count) { if (clientPool) { clientPool.close(); clientPool = null; } if (typeof finished === 'object') { setTimeout(function (cb) { for (var f = 0; f < cb.length; f++) { cb[f](); } }, 500, finished); finished = true; } } }); sqlDPs[id].skipped = null; } var nullValue = {val: null, ts: now, lc: now, q: 0x40, from: 'system.adapter.' + adapter.namespace}; if (sqlDPs[id][adapter.namespace] && adapter.config.writeNulls) { if (sqlDPs[id][adapter.namespace].changesOnly && state && state.val !== null) { count++; (function (_id, _state, _nullValue) { _state.ts = now; _state.from = 'system.adapter.' + adapter.namespace; nullValue.ts += 4; nullValue.lc += 4; // because of MS SQL adapter.log.debug('Write 1/2 "' + _state.val + '" _id: ' + _id); pushValueIntoDB(_id, _state, function () { // terminate values with null to indicate adapter stop. timestamp + 1# adapter.log.debug('Write 2/2 "null" _id: ' + _id); pushValueIntoDB(_id, _nullValue, function () { if (!--count) { if (clientPool) { clientPool.close(); clientPool = null; } if (typeof finished === 'object') { setTimeout(function (cb) { for (var f = 0; f < cb.length; f++) { cb[f](); } }, 500, finished); finished = true; } } }); }); })(id, state, nullValue); } else { // terminate values with null to indicate adapter stop. timestamp + 1 count++; adapter.log.debug('Write 0 NULL _id: ' + id); pushValueIntoDB(id, nullValue, function () { if (!--count) { if (clientPool) { clientPool.close(); clientPool = null; } if (typeof finished === 'object') { setTimeout(function (cb) { for (var f = 0; f < cb.length; f++) { cb[f](); } }, 500, finished); finished = true; } } }); } } } adapter.unsubscribeForeignStates('*'); var count = 0; if (finished) { if (callback) { if (finished === true) { callback(); } else { finished.push(callback); } } return; } finished = [callback]; var now = new Date().getTime(); var dpcount = 0; var delay = 0; for (var id in sqlDPs) { if (!sqlDPs.hasOwnProperty(id)) continue; dpcount++; delay += (dpcount%50 === 0) ? 1000: 0; setTimeout(finishId, delay, id); } if (!dpcount && callback) { if (clientPool) { clientPool.close(); clientPool = null; } callback(); } } function processMessage(msg) { if (msg.command === 'getHistory') { getHistory(msg); } else if (msg.command === 'test') { testConnection(msg); } else if (msg.command === 'destroy') { destroyDB(msg); } else if (msg.command === 'query') { query(msg); } else if (msg.command === 'storeState') { storeState(msg); } else if (msg.command === 'getDpOverview') { getDpOverview(msg); } else if (msg.command === 'enableHistory') { enableHistory(msg); } else if (msg.command === 'disableHistory') { disableHistory(msg); } else if (msg.command === 'getEnabledDPs') { getEnabledDPs(msg); } else if (msg.command === 'stopInstance') { finish(function () { if (msg.callback) { adapter.sendTo(msg.from, msg.command, 'stopped', msg.callback); setTimeout(function () { process.exit(0); }, 200); } }); } } function fixSelector(callback) { // fix _design/custom object adapter.getForeignObject('_design/custom', function (err, obj) { if (!obj || obj.views.state.map.indexOf('common.history') === -1 || obj.views.state.map.indexOf('common.custom') === -1) { obj = { _id: '_design/custom', language: 'javascript', views: { state: { map: 'function(doc) { if (doc.type===\'state\' && (doc.common.custom || doc.common.history)) emit(doc._id, doc.common.custom || doc.common.history) }' } } }; adapter.setForeignObject('_design/custom', obj, function (err) { if (callback) callback(err); }); } else { if (callback) callback(err); } }); } function processStartValues() { if (tasksStart && tasksStart.length) { var task = tasksStart.shift(); if (sqlDPs[task.id][adapter.namespace].changesOnly) { adapter.getForeignState(sqlDPs[task.id].realId, function (err, state) { var now = task.now || new Date().getTime(); pushHistory(task.id, { val: null, ts: state ? now - 4 : now, // 4 is because of MS SQL lc: state ? now - 4 : now, // 4 is because of MS SQL ack: true, q: 0x40, from: 'system.adapter.' + adapter.namespace }); if (state) { state.ts = now; state.lc = now; state.from = 'system.adapter.' + adapter.namespace; pushHistory(task.id, state); } setTimeout(processStartValues, 0); }); } else { pushHistory(task.id, { val: null, ts: task.now || new Date().getTime(), lc: task.now || new Date().getTime(), ack: true, q: 0x40, from: 'system.adapter.' + adapter.namespace }); setTimeout(processStartValues, 0); } if (sqlDPs[task.id][adapter.namespace] && sqlDPs[task.id][adapter.namespace].changesRelogInterval > 0) { if (sqlDPs[task.id].relogTimeout) clearTimeout(sqlDPs[task.id].relogTimeout); sqlDPs[task.id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[task.id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[task.id][adapter.namespace].changesRelogInterval * 500, task.id); } } } function writeNulls(id, now) { if (!id) { now = new Date().getTime(); for (var _id in sqlDPs) { if (sqlDPs.hasOwnProperty(_id) && sqlDPs[_id] && sqlDPs[_id][adapter.namespace]) { writeNulls(_id, now); } } } else { now = now || new Date().getTime(); tasksStart.push({id: id, now: now}); if (tasksStart.length === 1 && connected) { processStartValues(); } } } function main() { setConnected(false); adapter.config.dbname = adapter.config.dbname || 'yunkong2'; if (adapter.config.writeNulls === undefined) adapter.config.writeNulls = true; adapter.config.retention = parseInt(adapter.config.retention, 10) || 0; adapter.config.debounce = parseInt(adapter.config.debounce, 10) || 0; adapter.config.requestInterval = (adapter.config.requestInterval === undefined || adapter.config.requestInterval === null || adapter.config.requestInterval === '') ? 0 : parseInt(adapter.config.requestInterval, 10) || 0; if (adapter.config.changesRelogInterval !== null && adapter.config.changesRelogInterval !== undefined) { adapter.config.changesRelogInterval = parseInt(adapter.config.changesRelogInterval, 10); } else { adapter.config.changesRelogInterval = 0; } if (!clients[adapter.config.dbtype]) { adapter.log.error('Unknown DB type: ' + adapter.config.dbtype); adapter.stop(); } if (adapter.config.multiRequests !== undefined && adapter.config.dbtype !== 'SQLite3Client' && adapter.config.dbtype !== 'sqlite') { clients[adapter.config.dbtype].multiRequests = adapter.config.multiRequests; } if (adapter.config.changesMinDelta !== null && adapter.config.changesMinDelta !== undefined) { adapter.config.changesMinDelta = parseFloat(adapter.config.changesMinDelta.toString().replace(/,/g, '.')); } else { adapter.config.changesMinDelta = 0; } multiRequests = clients[adapter.config.dbtype].multiRequests; if (!multiRequests) adapter.config.writeNulls = false; adapter.config.port = parseInt(adapter.config.port, 10) || 0; if (adapter.config.round !== null && adapter.config.round !== undefined) { adapter.config.round = Math.pow(10, parseInt(adapter.config.round, 10)); } else { adapter.config.round = null; } if (adapter.config.dbtype === 'postgresql' && !SQL.PostgreSQLClient) { var postgres = require(__dirname + '/lib/postgresql-client'); for (var attr in postgres) { if (postgres.hasOwnProperty(attr) && !SQL[attr]) { SQL[attr] = postgres[attr]; } } } else if (adapter.config.dbtype === 'mssql' && !SQL.MSSQLClient) { var mssql = require(__dirname + '/lib/mssql-client'); for (var attr_ in mssql) { if (mssql.hasOwnProperty(attr_) && !SQL[attr_]) { SQL[attr_] = mssql[attr_]; } } } SQLFuncs = require(__dirname + '/lib/' + adapter.config.dbtype); fixSelector(function () { // read all custom settings adapter.objects.getObjectView('custom', 'state', {}, function (err, doc) { var count = 0; if (doc && doc.rows) { for (var i = 0, l = doc.rows.length; i < l; i++) { if (doc.rows[i].value) { var id = doc.rows[i].id; var realId = id; if (doc.rows[i].value[adapter.namespace] && doc.rows[i].value[adapter.namespace].aliasId) { aliasMap[id] = doc.rows[i].value[adapter.namespace].aliasId; adapter.log.debug('Found Alias: ' + id + ' --> ' + aliasMap[id]); id = aliasMap[id]; } sqlDPs[id] = doc.rows[i].value; if (!sqlDPs[id][adapter.namespace]) { delete sqlDPs[id]; } else { count++; adapter.log.info('enabled logging of ' + id + ', Alias=' + (id !== realId) + ', ' + count + ' points now activated'); if (sqlDPs[id][adapter.namespace].retention !== undefined && sqlDPs[id][adapter.namespace].retention !== null && sqlDPs[id][adapter.namespace].retention !== '') { sqlDPs[id][adapter.namespace].retention = parseInt(sqlDPs[id][adapter.namespace].retention, 10) || 0; } else { sqlDPs[id][adapter.namespace].retention = adapter.config.retention; } if (sqlDPs[id][adapter.namespace].debounce !== undefined && sqlDPs[id][adapter.namespace].debounce !== null && sqlDPs[id][adapter.namespace].debounce !== '') { sqlDPs[id][adapter.namespace].debounce = parseInt(sqlDPs[id][adapter.namespace].debounce, 10) || 0; } else { sqlDPs[id][adapter.namespace].debounce = adapter.config.debounce; } sqlDPs[id][adapter.namespace].changesOnly = sqlDPs[id][adapter.namespace].changesOnly === 'true' || sqlDPs[id][adapter.namespace].changesOnly === true; if (sqlDPs[id][adapter.namespace].changesRelogInterval !== undefined && sqlDPs[id][adapter.namespace].changesRelogInterval !== null && sqlDPs[id][adapter.namespace].changesRelogInterval !== '') { sqlDPs[id][adapter.namespace].changesRelogInterval = parseInt(sqlDPs[id][adapter.namespace].changesRelogInterval, 10) || 0; } else { sqlDPs[id][adapter.namespace].changesRelogInterval = adapter.config.changesRelogInterval; } if (sqlDPs[id][adapter.namespace].changesRelogInterval > 0) { sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id); } if (sqlDPs[id][adapter.namespace].changesMinDelta !== undefined && sqlDPs[id][adapter.namespace].changesMinDelta !== null && sqlDPs[id][adapter.namespace].changesMinDelta !== '') { sqlDPs[id][adapter.namespace].changesMinDelta = parseFloat(sqlDPs[id][adapter.namespace].changesMinDelta) || 0; } else { sqlDPs[id][adapter.namespace].changesMinDelta = adapter.config.changesMinDelta; } if (!sqlDPs[id][adapter.namespace].storageType) sqlDPs[id][adapter.namespace].storageType = false; // add one day if retention is too small if (sqlDPs[id][adapter.namespace].retention && sqlDPs[id][adapter.namespace].retention <= 604800) { sqlDPs[id][adapter.namespace].retention += 86400; } if (sqlDPs[id][adapter.namespace] && sqlDPs[id][adapter.namespace].changesRelogInterval > 0) { if (sqlDPs[id].relogTimeout) clearTimeout(sqlDPs[id].relogTimeout); sqlDPs[id].relogTimeout = setTimeout(reLogHelper, (sqlDPs[id][adapter.namespace].changesRelogInterval * 500 * Math.random()) + sqlDPs[id][adapter.namespace].changesRelogInterval * 500, id); } sqlDPs[id].realId = realId; } } } } if (adapter.config.writeNulls) writeNulls(); if (count < 20) { for (var _id in sqlDPs) { if (sqlDPs.hasOwnProperty(_id)) { adapter.subscribeForeignStates(sqlDPs[_id].realId); } } } else { subscribeAll = true; adapter.subscribeForeignStates('*'); } }); }); adapter.subscribeForeignObjects('*'); if (adapter.config.dbtype === 'sqlite' || adapter.config.host) { connect(); } } function pushHistory(id, state, timerRelog) { if (timerRelog === undefined) timerRelog = false; // Push into DB if (sqlDPs[id]) { var settings = sqlDPs[id][adapter.namespace]; if (!settings || !state) return; adapter.log.debug('new value received for ' + id + ', new-value=' + state.val + ', ts=' + state.ts + ', relog=' + timerRelog); if (state.val !== null && typeof state.val === 'string' && settings.storageType !== 'String') { var f = parseFloat(state.val); if (f == state.val) { state.val = f; } } if (sqlDPs[id].state && settings.changesOnly && !timerRelog) { if (settings.changesRelogInterval === 0) { if (state.ts !== state.lc) { sqlDPs[id].skipped = state; // remember new timestamp adapter.log.debug('value not changed ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts); return; } } else if (sqlDPs[id].lastLogTime) { if ((state.ts !== state.lc) && (Math.abs(sqlDPs[id].lastLogTime - state.ts) < settings.changesRelogInterval * 1000)) { sqlDPs[id].skipped = state; // remember new timestamp adapter.log.debug('value not changed relog' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts); return; } if (state.ts !== state.lc) { adapter.log.debug('value-changed-relog ' + id + ', value=' + state.val + ', lastLogTime=' + sqlDPs[id].lastLogTime + ', ts=' + state.ts); } } if (sqlDPs[id].state.val !== null && (settings.changesMinDelta !== 0) && (typeof state.val === 'number') && (Math.abs(sqlDPs[id].state.val - state.val) < settings.changesMinDelta)) { adapter.log.debug('Min-Delta not reached ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts); sqlDPs[id].skipped = state; // remember new timestamp return; } else if (typeof state.val === 'number') { adapter.log.debug('Min-Delta reached ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts); } else { adapter.log.debug('Min-Delta ignored because no number ' + id + ', last-value=' + sqlDPs[id].state.val + ', new-value=' + state.val + ', ts=' + state.ts); } } if (sqlDPs[id].relogTimeout) { clearTimeout(sqlDPs[id].relogTimeout); sqlDPs[id].relogTimeout = null; } if (settings.changesRelogInterval > 0) { sqlDPs[id].relogTimeout = setTimeout(reLogHelper, settings.changesRelogInterval * 1000, id); } var ignoreDebonce = false; if (timerRelog) { state.ts = new Date().getTime(); adapter.log.debug('timed-relog ' + id + ', value=' + state.val + ', lastLogTime=' + sqlDPs[id].lastLogTime + ', ts=' + state.ts); ignoreDebonce = true; } else { if (settings.changesOnly && sqlDPs[id].skipped) { sqlDPs[id].state = sqlDPs[id].skipped; pushHelper(id); } if (sqlDPs[id].state && ((sqlDPs[id].state.val === null && state.val !== null) || (sqlDPs[id].state.val !== null && state.val === null))) { ignoreDebonce = true; } else if (!sqlDPs[id].state && state.val === null) { ignoreDebonce = true; } // only store state if really changed sqlDPs[id].state = state; } sqlDPs[id].lastLogTime = state.ts; sqlDPs[id].skipped = null; if (settings.debounce && !ignoreDebonce) { // Discard changes in debounce time to store last stable value if (sqlDPs[id].timeout) clearTimeout(sqlDPs[id].timeout); sqlDPs[id].timeout = setTimeout(pushHelper, settings.debounce, id); } else { pushHelper(id); } } } function reLogHelper(_id) { if (!sqlDPs[_id]) { adapter.log.info('non-existing id ' + _id); return; } sqlDPs[_id].relogTimeout = null; if (sqlDPs[_id].skipped) { sqlDPs[_id].state = sqlDPs[_id].skipped; sqlDPs[_id].state.from = 'system.adapter.' + adapter.namespace; sqlDPs[_id].skipped = null; pushHistory(_id, sqlDPs[_id].state, true); } else { adapter.getForeignState(sqlDPs[_id].realId, function (err, state) { if (err) { adapter.log.info('init timed Relog: can not get State for ' + _id + ' : ' + err); } else if (!state) { adapter.log.info('init timed Relog: disable relog because state not set so far for ' + _id + ': ' + JSON.stringify(state)); } else { adapter.log.debug('init timed Relog: getState ' + _id + ': Value=' + state.val + ', ack=' + state.ack + ', ts=' + state.ts + ', lc=' + state.lc); sqlDPs[_id].state = state; pushHistory(_id, sqlDPs[_id].state, true); } }); } } function pushHelper(_id) { if (!sqlDPs[_id] || !sqlDPs[_id].state) return; var _settings = sqlDPs[_id][adapter.namespace]; // if it was not deleted in this time if (_settings) { sqlDPs[_id].timeout = null; if (sqlDPs[_id].state.val !== null) { if (typeof sqlDPs[_id].state.val === 'object') { sqlDPs[_id].state.val = JSON.stringify(sqlDPs[_id].state.val); } adapter.log.debug('Datatype ' + _id + ': Currently: ' + typeof sqlDPs[_id].state.val + ', StorageType: ' + _settings.storageType); if (typeof sqlDPs[_id].state.val === 'string' && _settings.storageType !== 'String') { adapter.log.debug('Do Automatic Datatype conversion for ' + _id); var f = parseFloat(sqlDPs[_id].state.val); if (f == sqlDPs[_id].state.val) { sqlDPs[_id].state.val = f; } else if (sqlDPs[_id].state.val === 'true') { sqlDPs[_id].state.val = true; } else if (sqlDPs[_id].state.val === 'false') { sqlDPs[_id].state.val = false; } } if (_settings.storageType === 'String' && typeof sqlDPs[_id].state.val !== 'string') { sqlDPs[_id].state.val = sqlDPs[_id].state.val.toString(); } else if (_settings.storageType === 'Number' && typeof sqlDPs[_id].state.val !== 'number') { if (typeof sqlDPs[_id].state.val === 'boolean') { sqlDPs[_id].state.val = sqlDPs[_id].state.val?1:0; } else { adapter.log.info('Do not store value "' + sqlDPs[_id].state.val + '" for ' + _id + ' because no number'); return; } } else if (_settings.storageType === 'Boolean' && typeof sqlDPs[_id].state.val !== 'boolean') { sqlDPs[_id].state.val = !!sqlDPs[_id].state.val; } } else { adapter.log.debug('Datatype ' + _id + ': Currently: null'); } pushValueIntoDB(_id, sqlDPs[_id].state); } } function getAllIds(cb) { var query = SQLFuncs.getIdSelect(adapter.config.dbname); adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { if (cb) cb(err); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err); clientPool.return(client); return; } if (rows.length) { var id; for (var r = 0; r < rows.length; r++) { id = rows[r].name; sqlDPs[id] = sqlDPs[id] || {}; sqlDPs[id].index = rows[r].id; if (rows[r].type !== null) sqlDPs[id].dbtype = rows[r].type; } if (cb) cb(); clientPool.return(client); } }); }); } function getAllFroms(cb) { var query = SQLFuncs.getFromSelect(adapter.config.dbname); adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { if (cb) cb(err); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err); clientPool.return(client); return; } if (rows.length) { for (var r = 0; r < rows.length; r++) { from[rows[r].name] = rows[r].id; } if (cb) cb(); clientPool.return(client); } }); }); } function _checkRetention(query, cb) { adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { adapter.log.error(err); if (cb) cb(); return; } client.execute(query, function (err /* , rows, fields */ ) { if (err) adapter.log.error('Cannot delete ' + query + ': ' + err); clientPool.return(client); if (cb) cb(); }); }); } function checkRetention(id) { if (sqlDPs[id] && sqlDPs[id][adapter.namespace] && sqlDPs[id][adapter.namespace].retention) { var d = new Date(); var dt = d.getTime(); // check every 6 hours if (!sqlDPs[id].lastCheck || dt - sqlDPs[id].lastCheck >= 21600000/* 6 hours */) { sqlDPs[id].lastCheck = dt; var query = SQLFuncs.retention(adapter.config.dbname, sqlDPs[id].index, dbNames[sqlDPs[id].type], sqlDPs[id][adapter.namespace].retention); if (!multiRequests) { if (tasks.length > 100) { adapter.log.error('Cannot queue new requests, because more than 100'); return; } tasks.push({operation: 'delete', query: query}); if (tasks.length === 1) processTasks(); } else { _checkRetention(query); } } } } function _insertValueIntoDB(query, id, cb) { adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { adapter.log.error(err); if (cb) cb(); return; } client.execute(query, function (err /* , rows, fields */) { if (err) adapter.log.error('Cannot insert ' + query + ': ' + err); clientPool.return(client); checkRetention(id); if (cb) cb(); }); }); } function processReadTypes() { if (tasksReadType && tasksReadType.length) { var task = tasksReadType.shift(); adapter.log.debug('Type set in Def for ' + task.id + ': ' + sqlDPs[task.id][adapter.namespace].storageType); if (sqlDPs[task.id][adapter.namespace].storageType) { sqlDPs[task.id].type = types[sqlDPs[task.id][adapter.namespace].storageType.toLowerCase()]; adapter.log.debug('Type (from Def) for ' + task.id + ': ' + sqlDPs[task.id].type); processVerifyTypes(task); } else if (sqlDPs[task.id].dbtype !== undefined) { sqlDPs[task.id].type = sqlDPs[task.id].dbtype; sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type]; adapter.log.debug('Type (from DB-Type) for ' + task.id + ': ' + sqlDPs[task.id].type); processVerifyTypes(task); } else { adapter.getForeignObject(sqlDPs[task.id].realId, function (err, obj) { if (err) { adapter.log.warn('Error while get Object for Def: ' + err); } if (obj && obj.common && obj.common.type) { adapter.log.debug(obj.common.type.toLowerCase() + ' / ' + types[obj.common.type.toLowerCase()] + ' / ' + JSON.stringify(obj.common)); sqlDPs[task.id].type = types[obj.common.type.toLowerCase()]; sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type]; adapter.log.debug('Type (from Obj) for ' + task.id + ': ' + sqlDPs[task.id].type); processVerifyTypes(task); } if (sqlDPs[task.id].type === undefined) { adapter.getForeignState(sqlDPs[task.id].realId, function (err, state) { if (err) { adapter.log.warn('Store data for ' + task.id + ' as string because no other valid type found (' + obj.common.type.toLowerCase() + ' and no state)'); sqlDPs[task.id].type = 1; // string } else if (state && state.val !== null && state.val !== undefined && types[typeof state.val] !== undefined) { sqlDPs[task.id].type = types[typeof state.val]; sqlDPs[task.id][adapter.namespace].storageType = storageTypes[sqlDPs[task.id].type]; } else { adapter.log.warn('Store data for ' + task.id + ' as string because no other valid type found (' + (state?(typeof state.val):'state not existing') + ')'); sqlDPs[task.id].type = 1; // string } adapter.log.debug('Type (from State) for ' + task.id + ': ' + sqlDPs[task.id].type); processVerifyTypes(task); }); } }); } } } function processVerifyTypes(task) { if (sqlDPs[task.id].index !== undefined && sqlDPs[task.id].type !== undefined && sqlDPs[task.id].type !== sqlDPs[task.id].dbtype) { sqlDPs[task.id].dbtype = sqlDPs[task.id].type; var query = SQLFuncs.getIdUpdate(adapter.config.dbname, sqlDPs[task.id].index, sqlDPs[task.id].type); adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { processVerifyTypes(task); return; } client.execute(query, function (err, rows /* , fields */) { if (err) { adapter.log.error('error updating history config for ' + task.id + ' to pin datatype: ' + query + ': ' + err); } else { adapter.log.info('changed history configuration to pin detected datatype for ' + task.id); } clientPool.return(client); processVerifyTypes(task); }); }); return; } pushValueIntoDB(task.id, task.state); setTimeout(processReadTypes, 50); } function pushValueIntoDB(id, state, cb) { if (!sqlDPs[id]) return; if (!clientPool) { adapter.log.warn('No connection to SQL-DB'); if (cb) cb('No connection to SQL-DB'); return; } var type; if (sqlDPs[id].type !== undefined) { type = sqlDPs[id].type; } else { // read type from DB tasksReadType.push({id: id, state: state}); if (tasksReadType.length === 1) { processReadTypes(); } return; } if (type === undefined) { // Can not happen anymore if (state.val === null) { adapter.log.warn('Ignore null value for ' + id + ' because no type defined till now.'); if (cb) cb('Ignore null value for ' + id + ' because no type defined till now.'); return; } adapter.log.warn('Cannot store values of type "' + typeof state.val + '" for ' + id); if (cb) cb('Cannot store values of type "' + typeof state.val + '" ' + id); return; } var tmpState; // get id if state if (sqlDPs[id].index === undefined) { sqlDPs[id].isRunning = sqlDPs[id].isRunning || []; if (Object.assign) { tmpState = Object.assign({}, state); } else { tmpState = JSON.parse(JSON.stringify(state)); } sqlDPs[id].isRunning.push({id: id, state: tmpState, cb: cb}); if (sqlDPs[id].isRunning.length === 1) { // read or create in DB return getId(id, type, function (err, _id) { if (err) { adapter.log.warn('Cannot get index of "' + _id + '": ' + err); if (sqlDPs[_id].isRunning) { for (var t = 0; t < sqlDPs[_id].isRunning.length; t++) { if (sqlDPs[_id].isRunning[t].cb) sqlDPs[_id].isRunning[t].cb('Cannot get index of "' + sqlDPs[_id].isRunning[t].id + '": ' + err); } } } else { if (sqlDPs[_id].isRunning) { for (var k = 0; k < sqlDPs[_id].isRunning.length; k++) { pushValueIntoDB(sqlDPs[_id].isRunning[k].id, sqlDPs[_id].isRunning[k].state, sqlDPs[_id].isRunning[k].cb); } } } sqlDPs[_id].isRunning = null; }); } return; } // get from if (state.from && !from[state.from]) { isFromRunning[state.from] = isFromRunning[state.from] || []; if (Object.assign) { tmpState = Object.assign({}, state); } else { tmpState = JSON.parse(JSON.stringify(state)); } isFromRunning[state.from].push({id: id, state: tmpState, cb: cb}); if (isFromRunning[state.from].length === 1) { // read or create in DB return getFrom(state.from, function (err, from) { if (err) { adapter.log.warn('Cannot get "from" for "' + from + '": ' + err); if (isFromRunning[from]) { for (var t = 0; t < isFromRunning[from].length; t++) { if (isFromRunning[from][t].cb) isFromRunning[from][t].cb('Cannot get "from" for "' + from + '": ' + err); } } } else { if (isFromRunning[from]) { for (var k = 0; k < isFromRunning[from].length; k++) { pushValueIntoDB(isFromRunning[from][k].id, isFromRunning[from][k].state, isFromRunning[from][k].cb); } } } isFromRunning[from] = null; }); } return; } // if greater than 2000.01.01 00:00:00 if (state.ts > 946681200000) { state.ts = parseInt(state.ts, 10); } else { state.ts = parseInt(state.ts, 10) * 1000 + (parseInt(state.ms, 10) || 0); } try { if (state.val !== null && typeof state.val === 'object') { state.val = JSON.stringify(state.val); } } catch (err) { adapter.log.error('Cannot convert the object value "' + id + '"'); if (cb) cb('Cannot convert the object value "' + id + '"'); return; } // increase timestamp if last is the same if (sqlDPs[id].ts && state.ts === sqlDPs[id].ts) { state.ts++; } // remember last timestamp sqlDPs[id].ts = state.ts; var query = SQLFuncs.insert(adapter.config.dbname, sqlDPs[id].index, state, from[state.from] || 0, dbNames[type]); if (!multiRequests) { if (tasks.length > 100) { adapter.log.error('Cannot queue new requests, because more than 100'); if (cb) cb('Cannot queue new requests, because more than 100'); return; } tasks.push({operation: 'insert', query: query, id: id, callback: cb}); if (tasks.length === 1) { processTasks(); } } else { _insertValueIntoDB(query, id, cb); } } var lockTasks = false; function processTasks() { if (lockTasks) { adapter.log.debug('Tries to execute task, but last one not finished!'); return; } lockTasks = true; if (tasks.length) { if (tasks[0].operation === 'insert') { _insertValueIntoDB(tasks[0].query, tasks[0].id, function () { if (tasks[0].callback) tasks[0].callback(); tasks.shift(); lockTasks = false; if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval); }); } else if (tasks[0].operation === 'select') { _getDataFromDB(tasks[0].query, tasks[0].options, function (err, rows) { if (tasks[0].callback) tasks[0].callback(err, rows); tasks.shift(); lockTasks = false; if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval); }); } else if (tasks[0].operation === 'userQuery') { _userQuery(tasks[0].msg, function () { if (tasks[0].callback) tasks[0].callback(); tasks.shift(); lockTasks = false; if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval); }); } else if (tasks[0].operation === 'delete') { _checkRetention(tasks[0].query, function () { if (tasks[0].callback) tasks[0].callback(); tasks.shift(); lockTasks = false; if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval); }); } else { adapter.log.error('unknown task: ' + tasks[0].operation); if (tasks[0].callback) tasks[0].callback(); tasks.shift(); lockTasks = false; if (tasks.length) setTimeout(processTasks, adapter.config.requestInterval); } } } // my be it is required to cache all the data in memory function getId(id, type, cb) { var query = SQLFuncs.getIdSelect(adapter.config.dbname, id); adapter.log.debug(query); if (!clientPool) { if (cb) cb('No connection', id); return; } clientPool.borrow(function (err, client) { if (err) { if (cb) cb(err, id); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err, id); clientPool.return(client); return; } if (!rows.length) { if (type !== null) { // insert query = SQLFuncs.getIdInsert(adapter.config.dbname, id, type); adapter.log.debug(query); client.execute(query, function (err /* , rows, fields */) { if (err) { adapter.log.error('Cannot insert ' + query + ': ' + err); if (cb) cb(err, id); clientPool.return(client); return; } query = SQLFuncs.getIdSelect(adapter.config.dbname,id); adapter.log.debug(query); client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) { rows = rows.rows; } if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err, id); clientPool.return(client); return; } sqlDPs[id].index = rows[0].id; sqlDPs[id].type = rows[0].type; if (cb) cb(null, id); clientPool.return(client); }); }); } else { if (cb) cb('id not found', id); clientPool.return(client); } } else { sqlDPs[id].index = rows[0].id; sqlDPs[id].type = rows[0].type; if (cb) cb(null, id); clientPool.return(client); } }); }); } // my be it is required to cache all the data in memory function getFrom(_from, cb) { // var sources = (adapter.config.dbtype !== 'postgresql' ? (adapter.config.dbname + '.') : '') + 'sources'; var query = SQLFuncs.getFromSelect(adapter.config.dbname, _from); adapter.log.debug(query); if (!clientPool) { if (cb) cb('No connection', _from); return; } clientPool.borrow(function (err, client) { if (err) { if (cb) cb(err, _from); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err, _from); clientPool.return(client); return; } if (!rows.length) { // insert query = SQLFuncs.getFromInsert(adapter.config.dbname, _from); adapter.log.debug(query); client.execute(query, function (err /* , rows, fields */) { if (err) { adapter.log.error('Cannot insert ' + query + ': ' + err); if (cb) cb(err, _from); clientPool.return(client); return; } query = SQLFuncs.getFromSelect(adapter.config.dbname, _from); adapter.log.debug(query); client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); if (cb) cb(err, _from); clientPool.return(client); return; } from[_from] = rows[0].id; if (cb) cb(null, _from); clientPool.return(client); }); }); } else { from[_from] = rows[0].id; if (cb) cb(null, _from); clientPool.return(client); } }); }); } function sortByTs(a, b) { var aTs = a.ts; var bTs = b.ts; return ((aTs < bTs) ? -1 : ((aTs > bTs) ? 1 : 0)); } function _getDataFromDB(query, options, callback) { adapter.log.debug(query); clientPool.borrow(function (err, client) { if (err) { if (callback) callback(err); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; // because descending if (!err && rows && !options.start && options.count) { rows.sort(sortByTs); } if (rows) { var isNumber = null; for (var c = 0; c < rows.length; c++) { if (isNumber === null && rows[c].val !== null) { isNumber = (parseFloat(rows[c].val) == rows[c].val); } if (typeof rows[c].ts === 'string') { rows[c].ts = parseInt(rows[c].ts, 10); } // if less than 2000.01.01 00:00:00 if (rows[c].ts < 946681200000) { rows[c].ts *= 1000; } if (adapter.common.loglevel === 'debug') { rows[c].date = new Date(parseInt(rows[c].ts, 10)); } if (options.ack) { rows[c].ack = !!rows[c].ack; } if (isNumber && adapter.config.round && rows[c].val !== null) { rows[c].val = Math.round(rows[c].val * adapter.config.round) / adapter.config.round; } if (sqlDPs[options.index].type === 2) { rows[c].val = !!rows[c].val; } } } clientPool.return(client); if (callback) callback(err, rows); }); }); } function getDataFromDB(db, options, callback) { var query = SQLFuncs.getHistory(adapter.config.dbname, db, options); adapter.log.debug(query); if (!multiRequests) { if (tasks.length > 100) { adapter.log.error('Cannot queue new requests, because more than 100'); if (callback) callback('Cannot queue new requests, because more than 100'); return; } tasks.push({operation: 'select', query: query, options: options, callback: callback}); if (tasks.length === 1) processTasks(); } else { _getDataFromDB(query, options, callback); } } function getHistory(msg) { var startTime = new Date().getTime(); var options = { id: msg.message.id === '*' ? null : msg.message.id, start: msg.message.options.start, end: msg.message.options.end || ((new Date()).getTime() + 5000000), step: parseInt(msg.message.options.step, 10) || null, count: parseInt(msg.message.options.count, 10) || 500, ignoreNull: msg.message.options.ignoreNull, aggregate: msg.message.options.aggregate || 'average', // One of: max, min, average, total limit: msg.message.options.limit || adapter.config.limit || 2000, from: msg.message.options.from || false, q: msg.message.options.q || false, ack: msg.message.options.ack || false, ms: msg.message.options.ms || false, addId: msg.message.options.addId || false, sessionId: msg.message.options.sessionId }; if (options.id && aliasMap[options.id]) { options.id = aliasMap[options.id]; } if (options.ignoreNull === 'true') options.ignoreNull = true; // include nulls and replace them with last value if (options.ignoreNull === 'false') options.ignoreNull = false; // include nulls if (options.ignoreNull === '0') options.ignoreNull = 0; // include nulls and replace them with 0 if (options.ignoreNull !== true && options.ignoreNull !== false && options.ignoreNull !== 0) options.ignoreNull = false; if (!sqlDPs[options.id]) { commons.sendResponse(adapter, msg, options, [], startTime); return; } if (options.start > options.end) { var _end = options.end; options.end = options.start; options.start =_end; } if (!options.start && !options.count) { options.start = (new Date()).getTime() - 5030000; // - 1 year } if (sqlDPs[options.id].type === undefined && sqlDPs[options.id].dbtype !== undefined) { if (sqlDPs[options.id][adapter.namespace] && sqlDPs[options.id][adapter.namespace].storageType) { if (storageTypes.indexOf(sqlDPs[options.id][adapter.namespace].storageType) === sqlDPs[options.id].dbtype) { adapter.log.debug('For getHistory for id ' + options.id + ': Type empty, use storageType dbtype ' + sqlDPs[options.id].dbtype); sqlDPs[options.id].type = sqlDPs[options.id].dbtype; } } else { adapter.log.debug('For getHistory for id ' + options.id + ': Type empty, use dbtype ' + sqlDPs[options.id].dbtype); sqlDPs[options.id].type = sqlDPs[options.id].dbtype; } } if (options.id && sqlDPs[options.id].index === undefined) { // read or create in DB return getId(options.id, null, function (err) { if (err) { adapter.log.warn('Cannot get index of "' + options.id + '": ' + err); commons.sendResponse(adapter, msg, options, [], startTime); } else { getHistory(msg); } }); } if (options.id && sqlDPs[options.id].type === undefined) { adapter.log.warn('For getHistory for id ' + options.id + ': Type empty. Need to write data first. Index = ' + sqlDPs[options.id].index); commons.sendResponse(adapter, msg, options, 'Please wait till next data record is logged and reload.', startTime); return; } var type = sqlDPs[options.id].type; if (options.id) { options.index = options.id; options.id = sqlDPs[options.id].index; } // if specific id requested if (options.id || options.id === 0) { getDataFromDB(dbNames[type], options, function (err, data) { commons.sendResponse(adapter, msg, options, (err ? err.toString() : null) || data, startTime); }); } else { // if all IDs requested var rows = []; var count = 0; for (var db = 0; db < dbNames.length; db++) { count++; getDataFromDB(dbNames[db], options, function (err, data) { if (data) rows = rows.concat(data); if (!--count) { rows.sort(sortByTs); commons.sendResponse(adapter, msg, options, rows, startTime); } }); } } } function storeState(msg) { if (!msg.message || !msg.message.id || !msg.message.state) { adapter.log.error('storeState called with invalid data'); adapter.sendTo(msg.from, msg.command, { error: 'Invalid call: ' + JSON.stringify(msg) }, msg.callback); return; } var id; if (Array.isArray(msg.message)) { for (var i = 0; i < msg.message.length; i++) { id = aliasMap[msg.message[i].id] ? aliasMap[msg.message[i].id] : msg.message[i].id; pushValueIntoDB(id, msg.message[i].state); } } else if (Array.isArray(msg.message.state)) { for (var j = 0; j < msg.message.state.length; j++) { id = aliasMap[msg.message.id] ? aliasMap[msg.message.id] : msg.message.id; pushValueIntoDB(id, msg.message.state[j]); } } else { id = aliasMap[msg.message.id] ? aliasMap[msg.message.id] : msg.message.id; pushValueIntoDB(id, msg.message.state); } adapter.sendTo(msg.from, msg.command, { success: true, connected: !!clientPool }, msg.callback); } function getDpOverview(msg) { var result = {}; var query = SQLFuncs.getIdSelect(adapter.config.dbname); adapter.log.info(query); clientPool.borrow(function (err, client) { if (err) { adapter.sendTo(msg.from, msg.command, { error: 'Cannot select ' + query + ': ' + err }, msg.callback); return; } client.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); adapter.sendTo(msg.from, msg.command, { error: 'Cannot select ' + query + ': ' + err }, msg.callback); clientPool.return(client); return; } adapter.log.info('Query result ' + JSON.stringify(rows)); if (rows.length) { for (var r = 0; r < rows.length; r++) { if (!result[rows[r].type]) result[rows[r].type] = {}; result[rows[r].type][rows[r].id] = {}; result[rows[r].type][rows[r].id].name = rows[r].name; switch(dbNames[rows[r].type]) { case 'ts_number': result[rows[r].type][rows[r].id].type = 'number'; break; case 'ts_string': result[rows[r].type][rows[r].id].type = 'string'; break; case 'ts_bool': result[rows[r].type][rows[r].id].type = 'boolean'; break; } } adapter.log.info('inited result: ' + JSON.stringify(result)); getFirstTsForIds(client, 0, result, msg); } }); }); } function getFirstTsForIds(dbClient, typeId, resultData, msg) { if (typeId < dbNames.length) { if (!resultData[typeId]) { getFirstTsForIds(dbClient, typeId + 1, resultData, msg); } else { var query = SQLFuncs.getFirstTs(adapter.config.dbname, dbNames[typeId]); adapter.log.info(query); dbClient.execute(query, function (err, rows /* , fields */) { if (rows && rows.rows) rows = rows.rows; if (err) { adapter.log.error('Cannot select ' + query + ': ' + err); adapter.sendTo(msg.from, msg.command, { error: 'Cannot select ' + query + ': ' + err }, msg.callback); clientPool.return(dbClient); return; } adapter.log.info('Query result ' + JSON.stringify(rows)); if (rows.length) { for (var r = 0; r < rows.length; r++) { if (resultData[typeId][rows[r].id]) { resultData[typeId][rows[r].id].ts = rows[r].ts; } } } adapter.log.info('enhanced result (' + typeId + '): ' + JSON.stringify(resultData)); setTimeout(getFirstTsForIds, 5000, dbClient, typeId + 1, resultData, msg); }); } } else { clientPool.return(dbClient); adapter.log.info('consolidate data ...'); var result = {}; for (var ti = 0; ti < dbNames.length; ti++ ) { if (resultData[ti]) { for (var index in resultData[ti]) { if (!resultData[ti].hasOwnProperty(index)) continue; var id = resultData[ti][index].name; if (!result[id]) { result[id] = {}; result[id].type = resultData[ti][index].type; result[id].ts = resultData[ti][index].ts; } else { result[id].type = 'undefined'; if (resultData[ti][index].ts < result[id].ts) { result[id].ts = resultData[ti][index].ts; } } } } } adapter.log.info('Result: ' + JSON.stringify(result)); adapter.sendTo(msg.from, msg.command, { succes: true, result: result }, msg.callback); } } function enableHistory(msg) { if (!msg.message || !msg.message.id) { adapter.log.error('enableHistory called with invalid data'); adapter.sendTo(msg.from, msg.command, { error: 'Invalid call' }, msg.callback); return; } var obj = {}; obj.common = {}; obj.common.custom = {}; if (msg.message.options) { obj.common.custom[adapter.namespace] = msg.message.options; } else { obj.common.custom[adapter.namespace] = {}; } obj.common.custom[adapter.namespace].enabled = true; adapter.extendForeignObject(msg.message.id, obj, function (err) { if (err) { adapter.log.error('enableHistory: ' + err); adapter.sendTo(msg.from, msg.command, { error: err }, msg.callback); } else { adapter.log.info(JSON.stringify(obj)); adapter.sendTo(msg.from, msg.command, { success: true }, msg.callback); } }); } function disableHistory(msg) { if (!msg.message || !msg.message.id) { adapter.log.error('disableHistory called with invalid data'); adapter.sendTo(msg.from, msg.command, { error: 'Invalid call' }, msg.callback); return; } var obj = {}; obj.common = {}; obj.common.custom = {}; obj.common.custom[adapter.namespace] = {}; obj.common.custom[adapter.namespace].enabled = false; adapter.extendForeignObject(msg.message.id, obj, function (err) { if (err) { adapter.log.error('disableHistory: ' + err); adapter.sendTo(msg.from, msg.command, { error: err }, msg.callback); } else { adapter.log.info(JSON.stringify(obj)); adapter.sendTo(msg.from, msg.command, { success: true }, msg.callback); } }); } function getEnabledDPs(msg) { var data = {}; for (var id in sqlDPs) { if (sqlDPs.hasOwnProperty(id) && sqlDPs[id] && sqlDPs[id][adapter.namespace]) { data[sqlDPs[id].realId] = sqlDPs[id][adapter.namespace]; } } adapter.sendTo(msg.from, msg.command, data, msg.callback); } process.on('uncaughtException', function(err) { adapter.log.warn('Exception: ' + err); });