first commit

This commit is contained in:
2023-05-19 00:42:48 +08:00
commit 53de9c6c51
243 changed files with 39485 additions and 0 deletions

122
batch/README.md Normal file
View File

@@ -0,0 +1,122 @@
# Batch Queries
This document describes features from Batch Queries, it also details some internals that might be useful for maintainers
and developers.
## Redis data structures
### Jobs definition
Redis Hash: `batch:jobs:{UUID}`.
Redis DB: 5.
It stores the job definition, the user, and some metadata like the final status, the failure reason, and so.
### Job queues
Redis List: `batch:queue:{username}`.
Redis DB: 5.
It stores a pending list of jobs per user. It points to a job definition with the `{UUID}`.
### Job notifications
Redis Pub/Sub channel: `batch:users`.
Redis DB: 0.
In order to notify new jobs, it uses a Pub/Sub channel were the username for the queued job is published.
## Job types
Format for the currently supported query types, and what they are missing in terms of features.
### Simple
```json
{
"query": "update ..."
}
```
Does not support main fallback queries. Ideally it should support something like:
```json
{
"query": "update ...",
"onsuccess": "select 'general success fallback'",
"onerror": "select 'general error fallback'"
}
```
### Multiple
```json
{
"query": [
"update ...",
"select ... into ..."
]
}
```
Does not support main fallback queries. Ideally it should support something like:
```json
{
"query": [
"update ...",
"select ... into ..."
],
"onsuccess": "select 'general success fallback'",
"onerror": "select 'general error fallback'"
}
```
### Fallback
```json
{
"query": {
"query": [
{
"query": "select 1",
"onsuccess": "select 'success fallback query 1'",
"onerror": "select 'error fallback query 1'"
},
{
"query": "select 2",
"onerror": "select 'error fallback query 2'"
}
],
"onsuccess": "select 'general success fallback'",
"onerror": "select 'general error fallback'"
}
}
```
It's weird to have two nested `query` attributes. Also, it's not possible to mix _plain_ with _fallback_ ones.
Ideally it should support something like:
```json
{
"query": [
{
"query": "select 1",
"onsuccess": "select 'success fallback query 1'",
"onerror": "select 'error fallback query 1'"
},
"select 2"
],
"onsuccess": "select 'general success fallback'",
"onerror": "select 'general error fallback'"
}
}
```
Where you don't need a nested `query` attribute, it's just an array as in Multiple job type, and you can mix objects and
plain queries.

16
batch/batch-logger.js Normal file
View File

@@ -0,0 +1,16 @@
'use strict';
const Logger = require('../app/services/logger');
class BatchLogger extends Logger {
constructor (path, name) {
super(path, name);
}
log (job) {
return job.log(this.logger);
}
}
module.exports = BatchLogger;

217
batch/batch.js Normal file
View File

@@ -0,0 +1,217 @@
'use strict';
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var debug = require('./util/debug')('batch');
var queue = require('queue-async');
var HostScheduler = require('./scheduler/host-scheduler');
var EMPTY_QUEUE = true;
var MINUTE = 60 * 1000;
var SCHEDULE_INTERVAL = 1 * MINUTE;
function Batch(name, userDatabaseMetadataService, jobSubscriber, jobQueue, jobRunner, jobService, redisPool, logger) {
EventEmitter.call(this);
this.name = name || 'batch';
this.userDatabaseMetadataService = userDatabaseMetadataService;
this.jobSubscriber = jobSubscriber;
this.jobQueue = jobQueue;
this.jobRunner = jobRunner;
this.jobService = jobService;
this.logger = logger;
this.hostScheduler = new HostScheduler(this.name, { run: this.processJob.bind(this) }, redisPool);
// map: user => jobId. Will be used for draining jobs.
this.workInProgressJobs = {};
}
util.inherits(Batch, EventEmitter);
module.exports = Batch;
Batch.prototype.start = function () {
var self = this;
var onJobHandler = createJobHandler(self.name, self.userDatabaseMetadataService, self.hostScheduler);
self.jobQueue.scanQueues(function (err, queues) {
if (err) {
return self.emit('error', err);
}
queues.forEach(onJobHandler);
self._startScheduleInterval(onJobHandler);
self.jobSubscriber.subscribe(onJobHandler, function (err) {
if (err) {
return self.emit('error', err);
}
self.emit('ready');
});
});
};
function createJobHandler (name, userDatabaseMetadataService, hostScheduler) {
return function onJobHandler(user) {
userDatabaseMetadataService.getUserMetadata(user, function (err, userDatabaseMetadata) {
if (err) {
return debug('Could not get host user=%s from %s. Reason: %s', user, name, err.message);
}
var host = userDatabaseMetadata.host;
debug('[%s] onJobHandler(%s, %s)', name, user, host);
hostScheduler.add(host, user, function(err) {
if (err) {
return debug(
'Could not schedule host=%s user=%s from %s. Reason: %s', host, user, name, err.message
);
}
});
});
};
}
Batch.prototype._startScheduleInterval = function (onJobHandler) {
var self = this;
self.scheduleInterval = setInterval(function () {
self.jobQueue.getQueues(function (err, queues) {
if (err) {
return debug('Could not get queues from %s. Reason: %s', self.name, err.message);
}
queues.forEach(onJobHandler);
});
}, SCHEDULE_INTERVAL);
};
Batch.prototype._stopScheduleInterval = function () {
if (this.scheduleInterval) {
clearInterval(this.scheduleInterval);
}
};
Batch.prototype.processJob = function (user, callback) {
var self = this;
self.jobQueue.dequeue(user, function (err, jobId) {
if (err) {
return callback(new Error('Could not get job from "' + user + '". Reason: ' + err.message), !EMPTY_QUEUE);
}
if (!jobId) {
debug('Queue empty user=%s', user);
return callback(null, EMPTY_QUEUE);
}
self._processWorkInProgressJob(user, jobId, function (err, job) {
if (err) {
debug(err);
if (err.name === 'JobNotRunnable') {
return callback(null, !EMPTY_QUEUE);
}
return callback(err, !EMPTY_QUEUE);
}
debug(
'[%s] Job=%s status=%s user=%s (failed_reason=%s)',
self.name, jobId, job.data.status, user, job.failed_reason
);
self.logger.log(job);
return callback(null, !EMPTY_QUEUE);
});
});
};
Batch.prototype._processWorkInProgressJob = function (user, jobId, callback) {
var self = this;
self.setWorkInProgressJob(user, jobId, function (errSet) {
if (errSet) {
debug(new Error('Could not add job to work-in-progress list. Reason: ' + errSet.message));
}
self.jobRunner.run(jobId, function (err, job) {
self.clearWorkInProgressJob(user, jobId, function (errClear) {
if (errClear) {
debug(new Error('Could not clear job from work-in-progress list. Reason: ' + errClear.message));
}
return callback(err, job);
});
});
});
};
Batch.prototype.drain = function (callback) {
var self = this;
var workingUsers = this.getWorkInProgressUsers();
var batchQueues = queue(workingUsers.length);
workingUsers.forEach(function (user) {
batchQueues.defer(self._drainJob.bind(self), user);
});
batchQueues.awaitAll(function (err) {
if (err) {
debug('Something went wrong draining', err);
} else {
debug('Drain complete');
}
callback();
});
};
Batch.prototype._drainJob = function (user, callback) {
var self = this;
var job_id = this.getWorkInProgressJob(user);
if (!job_id) {
return process.nextTick(function () {
return callback();
});
}
this.jobService.drain(job_id, function (err) {
if (err && err.name === 'CancelNotAllowedError') {
return callback();
}
if (err) {
return callback(err);
}
self.jobQueue.enqueueFirst(user, job_id, callback);
});
};
Batch.prototype.stop = function (callback) {
this.removeAllListeners();
this._stopScheduleInterval();
this.jobSubscriber.unsubscribe(callback);
};
/* Work in progress jobs */
Batch.prototype.setWorkInProgressJob = function(user, jobId, callback) {
this.workInProgressJobs[user] = jobId;
this.jobService.addWorkInProgressJob(user, jobId, callback);
};
Batch.prototype.getWorkInProgressJob = function(user) {
return this.workInProgressJobs[user];
};
Batch.prototype.clearWorkInProgressJob = function(user, jobId, callback) {
delete this.workInProgressJobs[user];
this.jobService.clearWorkInProgressJob(user, jobId, callback);
};
Batch.prototype.getWorkInProgressUsers = function() {
return Object.keys(this.workInProgressJobs);
};

39
batch/index.js Normal file
View File

@@ -0,0 +1,39 @@
'use strict';
var JobRunner = require('./job_runner');
var QueryRunner = require('./query_runner');
var JobCanceller = require('./job_canceller');
var JobSubscriber = require('./pubsub/job-subscriber');
var UserDatabaseMetadataService = require('./user_database_metadata_service');
var JobPublisher = require('./pubsub/job-publisher');
var JobQueue = require('./job_queue');
var JobBackend = require('./job_backend');
var JobService = require('./job_service');
var BatchLogger = require('./batch-logger');
var Batch = require('./batch');
module.exports = function batchFactory (metadataBackend, redisPool, name, statsdClient, loggerPath) {
var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend);
var jobSubscriber = new JobSubscriber(redisPool);
var jobPublisher = new JobPublisher(redisPool);
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
var jobBackend = new JobBackend(metadataBackend, jobQueue);
var queryRunner = new QueryRunner(userDatabaseMetadataService);
var jobCanceller = new JobCanceller();
var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, metadataBackend, statsdClient);
var logger = new BatchLogger(loggerPath, 'batch-queries');
return new Batch(
name,
userDatabaseMetadataService,
jobSubscriber,
jobQueue,
jobRunner,
jobService,
redisPool,
logger
);
};

290
batch/job_backend.js Normal file
View File

@@ -0,0 +1,290 @@
'use strict';
var REDIS_PREFIX = 'batch:jobs:';
var REDIS_DB = 5;
var JobStatus = require('./job_status');
var queue = require('queue-async');
var debug = require('./util/debug')('job-backend');
function JobBackend(metadataBackend, jobQueue) {
this.metadataBackend = metadataBackend;
this.jobQueue = jobQueue;
this.maxNumberOfQueuedJobs = global.settings.batch_max_queued_jobs || 64;
this.inSecondsJobTTLAfterFinished = global.settings.finished_jobs_ttl_in_seconds || 2 * 3600; // 2 hours
this.hostname = global.settings.api_hostname || 'batch';
}
function toRedisParams(job) {
var redisParams = [REDIS_PREFIX + job.job_id];
var obj = JSON.parse(JSON.stringify(job));
delete obj.job_id;
for (var property in obj) {
if (obj.hasOwnProperty(property)) {
redisParams.push(property);
if (property === 'query' && typeof obj[property] !== 'string') {
redisParams.push(JSON.stringify(obj[property]));
} else {
redisParams.push(obj[property]);
}
}
}
return redisParams;
}
function toObject(job_id, redisParams, redisValues) {
var obj = {};
redisParams.shift(); // job_id value
redisParams.pop(); // WARN: weird function pushed by metadataBackend
for (var i = 0; i < redisParams.length; i++) {
// TODO: this should be moved to job model
if (redisParams[i] === 'query') {
try {
obj[redisParams[i]] = JSON.parse(redisValues[i]);
} catch (e) {
obj[redisParams[i]] = redisValues[i];
}
} else if (redisValues[i]) {
obj[redisParams[i]] = redisValues[i];
}
}
obj.job_id = job_id; // adds redisKey as object property
return obj;
}
function isJobFound(redisValues) {
return !!(redisValues[0] && redisValues[1] && redisValues[2] && redisValues[3] && redisValues[4]);
}
function getNotFoundError(job_id) {
var notFoundError = new Error('Job with id ' + job_id + ' not found');
notFoundError.name = 'NotFoundError';
return notFoundError;
}
JobBackend.prototype.get = function (job_id, callback) {
if (!job_id) {
return callback(getNotFoundError(job_id));
}
var self = this;
var redisParams = [
REDIS_PREFIX + job_id,
'user',
'status',
'query',
'created_at',
'updated_at',
'host',
'failed_reason',
'fallback_status',
'host',
'port',
'pass',
'dbname',
'dbuser'
];
self.metadataBackend.redisCmd(REDIS_DB, 'HMGET', redisParams , function (err, redisValues) {
if (err) {
return callback(err);
}
if (!isJobFound(redisValues)) {
return callback(getNotFoundError(job_id));
}
var jobData = toObject(job_id, redisParams, redisValues);
callback(null, jobData);
});
};
JobBackend.prototype.create = function (job, callback) {
var self = this;
this.jobQueue.size(job.user, function(err, size) {
if (err) {
return callback(new Error('Failed to create job, could not determine user queue size'));
}
if (size >= self.maxNumberOfQueuedJobs) {
return callback(new Error(
'Failed to create job. ' +
'Max number of jobs (' + self.maxNumberOfQueuedJobs + ') queued reached'
));
}
self.get(job.job_id, function (err) {
if (err && err.name !== 'NotFoundError') {
return callback(err);
}
self.save(job, function (err, jobSaved) {
if (err) {
return callback(err);
}
self.jobQueue.enqueue(job.user, job.job_id, function (err) {
if (err) {
return callback(err);
}
return callback(null, jobSaved);
});
});
});
});
};
JobBackend.prototype.update = function (job, callback) {
var self = this;
self.get(job.job_id, function (err) {
if (err) {
return callback(err);
}
self.save(job, callback);
});
};
JobBackend.prototype.save = function (job, callback) {
var self = this;
var redisParams = toRedisParams(job);
self.metadataBackend.redisCmd(REDIS_DB, 'HMSET', redisParams , function (err) {
if (err) {
return callback(err);
}
self.setTTL(job, function (err) {
if (err) {
return callback(err);
}
self.get(job.job_id, function (err, job) {
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
};
var WORK_IN_PROGRESS_JOB = {
DB: 5,
PREFIX_USER: 'batch:wip:user:',
USER_INDEX_KEY: 'batch:wip:users'
};
JobBackend.prototype.addWorkInProgressJob = function (user, jobId, callback) {
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
debug('add job %s to user %s (%s)', jobId, user, userWIPKey);
this.metadataBackend.redisMultiCmd(WORK_IN_PROGRESS_JOB.DB, [
['SADD', WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user],
['RPUSH', userWIPKey, jobId]
], callback);
};
JobBackend.prototype.clearWorkInProgressJob = function (user, jobId, callback) {
var self = this;
var DB = WORK_IN_PROGRESS_JOB.DB;
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
var params = [userWIPKey, 0, jobId];
self.metadataBackend.redisCmd(DB, 'LREM', params, function (err) {
if (err) {
return callback(err);
}
params = [userWIPKey, 0, -1];
self.metadataBackend.redisCmd(DB, 'LRANGE', params, function (err, workInProgressJobs) {
if (err) {
return callback(err);
}
debug('user %s has work in progress jobs %j', user, workInProgressJobs);
if (workInProgressJobs.length < 0) {
return callback();
}
debug('delete user %s from index', user);
params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY, user];
self.metadataBackend.redisCmd(DB, 'SREM', params, function (err) {
if (err) {
return callback(err);
}
return callback();
});
});
});
};
JobBackend.prototype.listWorkInProgressJobByUser = function (user, callback) {
var userWIPKey = WORK_IN_PROGRESS_JOB.PREFIX_USER + user;
var params = [userWIPKey, 0, -1];
this.metadataBackend.redisCmd(WORK_IN_PROGRESS_JOB.DB, 'LRANGE', params, callback);
};
JobBackend.prototype.listWorkInProgressJobs = function (callback) {
var self = this;
var DB = WORK_IN_PROGRESS_JOB.DB;
var params = [WORK_IN_PROGRESS_JOB.USER_INDEX_KEY];
this.metadataBackend.redisCmd(DB, 'SMEMBERS', params, function (err, workInProgressUsers) {
if (err) {
return callback(err);
}
if (workInProgressUsers < 1) {
return callback(null, {});
}
debug('found %j work in progress users', workInProgressUsers);
var usersQueue = queue(4);
workInProgressUsers.forEach(function (user) {
usersQueue.defer(self.listWorkInProgressJobByUser.bind(self), user);
});
usersQueue.awaitAll(function (err, userWorkInProgressJobs) {
if (err) {
return callback(err);
}
var workInProgressJobs = workInProgressUsers.reduce(function (users, user, index) {
users[user] = userWorkInProgressJobs[index];
debug('found %j work in progress jobs for user %s', userWorkInProgressJobs[index], user);
return users;
}, {});
callback(null, workInProgressJobs);
});
});
};
JobBackend.prototype.setTTL = function (job, callback) {
var self = this;
var redisKey = REDIS_PREFIX + job.job_id;
if (!JobStatus.isFinal(job.status)) {
return callback();
}
self.metadataBackend.redisCmd(REDIS_DB, 'EXPIRE', [ redisKey, this.inSecondsJobTTLAfterFinished ], callback);
};
module.exports = JobBackend;

78
batch/job_canceller.js Normal file
View File

@@ -0,0 +1,78 @@
'use strict';
var PSQL = require('cartodb-psql');
function JobCanceller() {
}
module.exports = JobCanceller;
JobCanceller.prototype.cancel = function (job, callback) {
const dbConfiguration = {
host: job.data.host,
port: job.data.port,
dbname: job.data.dbname,
user: job.data.dbuser,
pass: job.data.pass,
};
doCancel(job.data.job_id, dbConfiguration, callback);
};
function doCancel(job_id, dbConfiguration, callback) {
var pg = new PSQL(dbConfiguration);
getQueryPID(pg, job_id, function (err, pid) {
if (err) {
return callback(err);
}
if (!pid) {
return callback();
}
doCancelQuery(pg, pid, function (err, isCancelled) {
if (err) {
return callback(err);
}
if (!isCancelled) {
return callback(new Error('Query has not been cancelled'));
}
callback();
});
});
}
function getQueryPID(pg, job_id, callback) {
var getPIDQuery = "SELECT pid FROM pg_stat_activity WHERE query LIKE '/* " + job_id + " */%'";
pg.query(getPIDQuery, function(err, result) {
if (err) {
return callback(err);
}
if (!result.rows[0] || !result.rows[0].pid) {
// query is not running actually, but we have to callback w/o error to cancel the job anyway.
return callback();
}
callback(null, result.rows[0].pid);
});
}
function doCancelQuery(pg, pid, callback) {
var cancelQuery = 'SELECT pg_cancel_backend(' + pid + ')';
pg.query(cancelQuery, function (err, result) {
if (err) {
return callback(err);
}
var isCancelled = result.rows[0].pg_cancel_backend;
callback(null, isCancelled);
});
}

165
batch/job_queue.js Normal file
View File

@@ -0,0 +1,165 @@
'use strict';
var debug = require('./util/debug')('queue');
var queueAsync = require('queue-async');
function JobQueue(metadataBackend, jobPublisher) {
this.metadataBackend = metadataBackend;
this.jobPublisher = jobPublisher;
}
module.exports = JobQueue;
var QUEUE = {
DB: 5,
PREFIX: 'batch:queue:',
INDEX: 'batch:indexes:queue'
};
module.exports.QUEUE = QUEUE;
JobQueue.prototype.enqueue = function (user, jobId, callback) {
debug('JobQueue.enqueue user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
[ 'LPUSH', QUEUE.PREFIX + user, jobId ],
[ 'SADD', QUEUE.INDEX, user ]
], function (err) {
if (err) {
return callback(err);
}
this.jobPublisher.publish(user);
callback();
}.bind(this));
};
JobQueue.prototype.size = function (user, callback) {
this.metadataBackend.redisCmd(QUEUE.DB, 'LLEN', [ QUEUE.PREFIX + user ], callback);
};
JobQueue.prototype.dequeue = function (user, callback) {
var dequeueScript = [
'local job_id = redis.call("RPOP", KEYS[1])',
'if redis.call("LLEN", KEYS[1]) == 0 then',
' redis.call("SREM", KEYS[2], ARGV[1])',
'end',
'return job_id'
].join('\n');
var redisParams = [
dequeueScript, //lua source code
2, // Two "keys" to pass
QUEUE.PREFIX + user, //KEYS[1], the key of the queue
QUEUE.INDEX, //KEYS[2], the key of the index
user // ARGV[1] - value of the element to remove from the index
];
this.metadataBackend.redisCmd(QUEUE.DB, 'EVAL', redisParams, function (err, jobId) {
debug('JobQueue.dequeued user=%s, jobId=%s', user, jobId);
return callback(err, jobId);
});
};
JobQueue.prototype.enqueueFirst = function (user, jobId, callback) {
debug('JobQueue.enqueueFirst user=%s, jobId=%s', user, jobId);
this.metadataBackend.redisMultiCmd(QUEUE.DB, [
[ 'RPUSH', QUEUE.PREFIX + user, jobId ],
[ 'SADD', QUEUE.INDEX, user ]
], function (err) {
if (err) {
return callback(err);
}
this.jobPublisher.publish(user);
callback();
}.bind(this));
};
JobQueue.prototype.getQueues = function (callback) {
this.metadataBackend.redisCmd(QUEUE.DB, 'SMEMBERS', [ QUEUE.INDEX ], function (err, queues) {
if (err) {
return callback(err);
}
callback(null, queues);
});
};
JobQueue.prototype.scanQueues = function (callback) {
var self = this;
self.scan(function (err, queues) {
if (err) {
return callback(err);
}
self.addToQueueIndex(queues, function (err) {
if (err) {
return callback(err);
}
callback(null, queues);
});
});
};
JobQueue.prototype.scan = function (callback) {
var self = this;
var initialCursor = ['0'];
var users = {};
self._scan(initialCursor, users, function(err, users) {
if (err) {
return callback(err);
}
callback(null, Object.keys(users));
});
};
JobQueue.prototype._scan = function (cursor, users, callback) {
var self = this;
var redisParams = [cursor[0], 'MATCH', QUEUE.PREFIX + '*'];
self.metadataBackend.redisCmd(QUEUE.DB, 'SCAN', redisParams, function (err, currentCursor) {
if (err) {
return callback(null, users);
}
var queues = currentCursor[1];
if (queues) {
queues.forEach(function (queue) {
var user = queue.substr(QUEUE.PREFIX.length);
users[user] = true;
});
}
var hasMore = currentCursor[0] !== '0';
if (!hasMore) {
return callback(null, users);
}
self._scan(currentCursor, users, callback);
});
};
JobQueue.prototype.addToQueueIndex = function (users, callback) {
var self = this;
var usersQueues = queueAsync(users.length);
users.forEach(function (user) {
usersQueues.defer(function (user, callback) {
self.metadataBackend.redisCmd(QUEUE.DB, 'SADD', [ QUEUE.INDEX, user], callback);
}, user);
});
usersQueues.awaitAll(function (err) {
if (err) {
return callback(err);
}
callback(null);
});
};

144
batch/job_runner.js Normal file
View File

@@ -0,0 +1,144 @@
'use strict';
var errorCodes = require('../app/postgresql/error_codes').codeToCondition;
var jobStatus = require('./job_status');
var Profiler = require('step-profiler');
var _ = require('underscore');
var REDIS_LIMITS = {
DB: 5,
PREFIX: 'limits:batch:' // + username
};
function JobRunner(jobService, jobQueue, queryRunner, metadataBackend, statsdClient) {
this.jobService = jobService;
this.jobQueue = jobQueue;
this.queryRunner = queryRunner;
this.metadataBackend = metadataBackend;
this.statsdClient = statsdClient;
}
JobRunner.prototype.run = function (job_id, callback) {
var self = this;
var profiler = new Profiler({ statsd_client: self.statsdClient });
profiler.start('sqlapi.batch.job');
self.jobService.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
self.getQueryStatementTimeout(job.data.user, function(err, timeout) {
if (err) {
return callback(err);
}
var query = job.getNextQuery();
if (_.isObject(query)) {
if (Number.isFinite(query.timeout) && query.timeout > 0) {
timeout = Math.min(timeout, query.timeout);
}
query = query.query;
}
try {
job.setStatus(jobStatus.RUNNING);
} catch (err) {
return callback(err);
}
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
profiler.done('running');
self._run(job, query, timeout, profiler, callback);
});
});
});
};
JobRunner.prototype.getQueryStatementTimeout = function(username, callback) {
var timeout = 12 * 3600 * 1000;
if (Number.isFinite(global.settings.batch_query_timeout)) {
timeout = global.settings.batch_query_timeout;
}
var batchLimitsKey = REDIS_LIMITS.PREFIX + username;
this.metadataBackend.redisCmd(REDIS_LIMITS.DB, 'HGET', [batchLimitsKey, 'timeout'], function(err, timeoutLimit) {
if (timeoutLimit !== null && Number.isFinite(+timeoutLimit)) {
timeout = +timeoutLimit;
}
return callback(null, timeout);
});
};
JobRunner.prototype._run = function (job, query, timeout, profiler, callback) {
var self = this;
const dbparams = {
pass: job.data.pass,
user: job.data.dbuser,
dbname: job.data.dbname,
port: job.data.port,
host: job.data.host
};
self.queryRunner.run(job.data.job_id, query, job.data.user, timeout, dbparams, function (err /*, result */) {
if (err) {
if (!err.code) {
return callback(err);
}
// if query has been cancelled then it's going to get the current
// job status saved by query_canceller
if (cancelledByUser(err)) {
return self.jobService.get(job.data.job_id, callback);
}
}
try {
if (err) {
profiler.done('failed');
job.setStatus(jobStatus.FAILED, err.message);
} else {
profiler.done('success');
job.setStatus(jobStatus.DONE);
}
} catch (err) {
return callback(err);
}
self.jobService.save(job, function (err, job) {
if (err) {
return callback(err);
}
profiler.done('done');
profiler.end();
profiler.sendStats();
if (!job.hasNextQuery()) {
return callback(null, job);
}
self.jobQueue.enqueueFirst(job.data.user, job.data.job_id, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
});
});
};
function cancelledByUser(err) {
return errorCodes[err.code.toString()] === 'query_canceled' && err.message.match(/user.*request/);
}
module.exports = JobRunner;

136
batch/job_service.js Normal file
View File

@@ -0,0 +1,136 @@
'use strict';
var debug = require('./util/debug')('job-service');
var JobFactory = require('./models/job_factory');
var jobStatus = require('./job_status');
function JobService(jobBackend, jobCanceller) {
this.jobBackend = jobBackend;
this.jobCanceller = jobCanceller;
}
module.exports = JobService;
JobService.prototype.get = function (job_id, callback) {
this.jobBackend.get(job_id, function (err, data) {
if (err) {
return callback(err);
}
var job;
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.create = function (data, callback) {
try {
var job = JobFactory.create(data);
job.validate();
this.jobBackend.create(job.data, function (err) {
if (err) {
return callback(err);
}
callback(null, job);
});
} catch (err) {
return callback(err);
}
};
JobService.prototype.save = function (job, callback) {
var self = this;
try {
job.validate();
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, function (err, data) {
if (err) {
return callback(err);
}
try {
job = JobFactory.create(data);
} catch (err) {
return callback(err);
}
callback(null, job);
});
};
JobService.prototype.cancel = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
var isPending = job.isPending();
try {
job.setStatus(jobStatus.CANCELLED);
} catch (err) {
return callback(err);
}
if (isPending) {
return self.save(job, callback);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
return callback(err);
}
self.save(job, callback);
});
});
};
JobService.prototype.drain = function (job_id, callback) {
var self = this;
self.get(job_id, function (err, job) {
if (err) {
return callback(err);
}
self.jobCanceller.cancel(job, function (err) {
if (err) {
debug('There was an error while draining job %s, %s ', job_id, err);
return callback(err);
}
try {
job.setStatus(jobStatus.PENDING);
} catch (err) {
return callback(err);
}
self.jobBackend.update(job.data, callback);
});
});
};
JobService.prototype.addWorkInProgressJob = function (user, jobId, callback) {
this.jobBackend.addWorkInProgressJob(user, jobId, callback);
};
JobService.prototype.clearWorkInProgressJob = function (user, jobId, callback) {
this.jobBackend.clearWorkInProgressJob(user, jobId, callback);
};
JobService.prototype.listWorkInProgressJobs = function (callback) {
this.jobBackend.listWorkInProgressJobs(callback);
};

23
batch/job_status.js Normal file
View File

@@ -0,0 +1,23 @@
'use strict';
var JOB_STATUS_ENUM = {
PENDING: 'pending',
RUNNING: 'running',
DONE: 'done',
CANCELLED: 'cancelled',
FAILED: 'failed',
SKIPPED: 'skipped',
UNKNOWN: 'unknown'
};
module.exports = JOB_STATUS_ENUM;
var finalStatus = [
JOB_STATUS_ENUM.CANCELLED,
JOB_STATUS_ENUM.DONE,
JOB_STATUS_ENUM.FAILED,
JOB_STATUS_ENUM.UNKNOWN
];
module.exports.isFinal = function(status) {
return finalStatus.indexOf(status) !== -1;
};

74
batch/leader/locker.js Normal file
View File

@@ -0,0 +1,74 @@
'use strict';
var RedisDistlockLocker = require('./provider/redis-distlock');
var debug = require('../util/debug')('leader-locker');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var LOCK = {
TTL: 5000
};
function Locker(locker, ttl) {
EventEmitter.call(this);
this.locker = locker;
this.ttl = (Number.isFinite(ttl) && ttl > 0) ? ttl : LOCK.TTL;
this.renewInterval = this.ttl / 5;
this.intervalIds = {};
}
util.inherits(Locker, EventEmitter);
module.exports = Locker;
Locker.prototype.lock = function(resource, callback) {
var self = this;
debug('Locker.lock(%s, %d)', resource, this.ttl);
this.locker.lock(resource, this.ttl, function (err, lock) {
if (!err) {
self.startRenewal(resource);
}
return callback(err, lock);
});
};
Locker.prototype.unlock = function(resource, callback) {
var self = this;
debug('Locker.unlock(%s)', resource);
this.locker.unlock(resource, function(err) {
self.stopRenewal(resource);
return callback(err);
});
};
Locker.prototype.startRenewal = function(resource) {
var self = this;
if (!this.intervalIds.hasOwnProperty(resource)) {
this.intervalIds[resource] = setInterval(function() {
debug('Trying to extend lock resource=%s', resource);
self.locker.lock(resource, self.ttl, function(err, _lock) {
if (err) {
self.emit('error', err, resource);
return self.stopRenewal(resource);
}
if (_lock) {
debug('Extended lock resource=%s', resource);
}
});
}, this.renewInterval);
}
};
Locker.prototype.stopRenewal = function(resource) {
if (this.intervalIds.hasOwnProperty(resource)) {
clearInterval(this.intervalIds[resource]);
delete this.intervalIds[resource];
}
};
module.exports.create = function createLocker(type, config) {
if (type !== 'redis-distlock') {
throw new Error('Invalid type Locker type. Valid types are: "redis-distlock"');
}
var locker = new RedisDistlockLocker(config.pool);
return new Locker(locker, config.ttl);
};

View File

@@ -0,0 +1,111 @@
'use strict';
var REDIS_DISTLOCK = {
DB: 5,
PREFIX: 'batch:locks:'
};
var Redlock = require('redlock');
var debug = require('../../util/debug')('leader:redis-distlock');
function RedisDistlockLocker(redisPool) {
this.pool = redisPool;
this.redlock = new Redlock([{}], {
// see http://redis.io/topics/distlock
driftFactor: 0.01, // time in ms
// the max number of times Redlock will attempt to lock a resource before failing
retryCount: 3,
// the time in ms between attempts
retryDelay: 100
});
this._locks = {};
}
module.exports = RedisDistlockLocker;
module.exports.type = 'redis-distlock';
function resourceId(resource) {
return REDIS_DISTLOCK.PREFIX + resource;
}
RedisDistlockLocker.prototype.lock = function(resource, ttl, callback) {
var self = this;
debug('RedisDistlockLocker.lock(%s, %d)', resource, ttl);
var lockId = resourceId(resource);
var lock = this._getLock(lockId);
function acquireCallback(err, _lock) {
if (err) {
return callback(err);
}
self._setLock(lockId, _lock);
return callback(null, _lock);
}
if (lock) {
return this._tryExtend(lock, ttl, function(err, _lock) {
if (err) {
return self._tryAcquire(lockId, ttl, acquireCallback);
}
return callback(null, _lock);
});
} else {
return this._tryAcquire(lockId, ttl, acquireCallback);
}
};
RedisDistlockLocker.prototype.unlock = function(resource, callback) {
var self = this;
var lock = this._getLock(resourceId(resource));
if (lock) {
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return self.redlock.unlock(lock, function(err) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err);
});
});
}
};
RedisDistlockLocker.prototype._getLock = function(resource) {
if (this._locks.hasOwnProperty(resource)) {
return this._locks[resource];
}
return null;
};
RedisDistlockLocker.prototype._setLock = function(resource, lock) {
this._locks[resource] = lock;
};
RedisDistlockLocker.prototype._tryExtend = function(lock, ttl, callback) {
var self = this;
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return lock.extend(ttl, function(err, _lock) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err, _lock);
});
});
};
RedisDistlockLocker.prototype._tryAcquire = function(resource, ttl, callback) {
var self = this;
this.pool.acquire(REDIS_DISTLOCK.DB, function (err, client) {
if (err) {
return callback(err);
}
self.redlock.servers = [client];
return self.redlock.lock(resource, ttl, function(err, _lock) {
self.pool.release(REDIS_DISTLOCK.DB, client);
return callback(err, _lock);
});
});
};

View File

@@ -0,0 +1,144 @@
'use strict';
var asyncQ = require('queue-async');
var debug = require('../util/debug')('queue-mover');
var forever = require('../util/forever');
var QUEUE = {
OLD: {
DB: 5,
PREFIX: 'batch:queues:' // host
},
NEW: {
DB: 5,
PREFIX: 'batch:queue:' // user
}
};
function HostUserQueueMover(jobQueue, jobService, locker, redisPool) {
this.jobQueue = jobQueue;
this.jobService = jobService;
this.locker = locker;
this.pool = redisPool;
}
module.exports = HostUserQueueMover;
HostUserQueueMover.prototype.moveOldJobs = function(callback) {
var self = this;
this.getOldQueues(function(err, hosts) {
var async = asyncQ(4);
hosts.forEach(function(host) {
async.defer(self.moveOldQueueJobs.bind(self), host);
});
async.awaitAll(function (err) {
if (err) {
debug('Something went wrong moving jobs', err);
} else {
debug('Finished moving all jobs');
}
callback();
});
});
};
HostUserQueueMover.prototype.moveOldQueueJobs = function(host, callback) {
var self = this;
// do forever, it does not throw a stack overflow
forever(
function (next) {
self.locker.lock(host, function(err) {
// we didn't get the lock for the host
if (err) {
debug('Could not lock host=%s. Reason: %s', host, err.message);
return next(err);
}
debug('Locked host=%s', host);
self.processNextJob(host, next);
});
},
function (err) {
if (err) {
debug(err.name === 'EmptyQueue' ? err.message : err);
}
self.locker.unlock(host, callback);
}
);
};
//this.metadataBackend.redisCmd(QUEUE.DB, 'RPOP', [ QUEUE.PREFIX + user ], callback);
HostUserQueueMover.prototype.processNextJob = function (host, callback) {
var self = this;
this.pool.acquire(QUEUE.OLD.DB, function(err, client) {
if (err) {
return callback(err);
}
client.lpop(QUEUE.OLD.PREFIX + host, function(err, jobId) {
self.pool.release(QUEUE.OLD.DB, client);
debug('Found jobId=%s at queue=%s', jobId, host);
if (!jobId) {
var emptyQueueError = new Error('Empty queue');
emptyQueueError.name = 'EmptyQueue';
return callback(emptyQueueError);
}
self.jobService.get(jobId, function(err, job) {
if (err) {
debug(err);
return callback();
}
if (job) {
return self.jobQueue.enqueueFirst(job.data.user, jobId, function() {
return callback();
});
}
return callback();
});
});
});
};
HostUserQueueMover.prototype.getOldQueues = function(callback) {
var initialCursor = ['0'];
var hosts = {};
var self = this;
this.pool.acquire(QUEUE.OLD.DB, function(err, client) {
if (err) {
return callback(err);
}
self._getOldQueues(client, initialCursor, hosts, function(err, hosts) {
self.pool.release(QUEUE.DB, client);
return callback(err, Object.keys(hosts));
});
});
};
HostUserQueueMover.prototype._getOldQueues = function (client, cursor, hosts, callback) {
var self = this;
var redisParams = [cursor[0], 'MATCH', QUEUE.OLD.PREFIX + '*'];
client.scan(redisParams, function(err, currentCursor) {
if (err) {
return callback(null, hosts);
}
var queues = currentCursor[1];
if (queues) {
queues.forEach(function (queue) {
var user = queue.substr(QUEUE.OLD.PREFIX.length);
hosts[user] = true;
});
}
var hasMore = currentCursor[0] !== '0';
if (!hasMore) {
return callback(null, hosts);
}
self._getOldQueues(client, currentCursor, hosts, callback);
});
};

122
batch/models/job_base.js Normal file
View File

@@ -0,0 +1,122 @@
'use strict';
var util = require('util');
var uuid = require('node-uuid');
var JobStateMachine = require('./job_state_machine');
var jobStatus = require('../job_status');
var mandatoryProperties = [
'job_id',
'status',
'query',
'created_at',
'updated_at',
'host',
'user'
];
function JobBase(data) {
JobStateMachine.call(this);
var now = new Date().toISOString();
this.data = data;
if (!this.data.job_id) {
this.data.job_id = uuid.v4();
}
if (!this.data.created_at) {
this.data.created_at = now;
}
if (!this.data.updated_at) {
this.data.updated_at = now;
}
}
util.inherits(JobBase, JobStateMachine);
module.exports = JobBase;
// should be implemented by childs
JobBase.prototype.getNextQuery = function () {
throw new Error('Unimplemented method');
};
JobBase.prototype.hasNextQuery = function () {
return !!this.getNextQuery();
};
JobBase.prototype.isPending = function () {
return this.data.status === jobStatus.PENDING;
};
JobBase.prototype.isRunning = function () {
return this.data.status === jobStatus.RUNNING;
};
JobBase.prototype.isDone = function () {
return this.data.status === jobStatus.DONE;
};
JobBase.prototype.isCancelled = function () {
return this.data.status === jobStatus.CANCELLED;
};
JobBase.prototype.isFailed = function () {
return this.data.status === jobStatus.FAILED;
};
JobBase.prototype.isUnknown = function () {
return this.data.status === jobStatus.UNKNOWN;
};
JobBase.prototype.setQuery = function (query) {
var now = new Date().toISOString();
if (!this.isPending()) {
throw new Error('Job is not pending, it cannot be updated');
}
this.data.updated_at = now;
this.data.query = query;
};
JobBase.prototype.setStatus = function (finalStatus, errorMesssage) {
var now = new Date().toISOString();
var initialStatus = this.data.status;
var isValid = this.isValidTransition(initialStatus, finalStatus);
if (!isValid) {
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
}
this.data.updated_at = now;
this.data.status = finalStatus;
if (finalStatus === jobStatus.FAILED && errorMesssage) {
this.data.failed_reason = errorMesssage;
}
};
JobBase.prototype.validate = function () {
for (var i = 0; i < mandatoryProperties.length; i++) {
if (!this.data[mandatoryProperties[i]]) {
throw new Error('property "' + mandatoryProperties[i] + '" is mandatory');
}
}
};
JobBase.prototype.serialize = function () {
var data = JSON.parse(JSON.stringify(this.data));
delete data.host;
delete data.dbuser;
delete data.port;
delete data.dbname;
delete data.pass;
return data;
};
JobBase.prototype.log = function(/*logger*/) {
return false;
};

View File

@@ -0,0 +1,26 @@
'use strict';
var JobSimple = require('./job_simple');
var JobMultiple = require('./job_multiple');
var JobFallback = require('./job_fallback');
var Models = [ JobSimple, JobMultiple, JobFallback ];
function JobFactory() {
}
module.exports = JobFactory;
JobFactory.create = function (data) {
if (!data.query) {
throw new Error('You must indicate a valid SQL');
}
for (var i = 0; i < Models.length; i++) {
if (Models[i].is(data.query)) {
return new Models[i](data);
}
}
throw new Error('there is no job class for the provided query');
};

View File

@@ -0,0 +1,279 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var JobStatus = require('../job_status');
var QueryFallback = require('./query/query_fallback');
var MainFallback = require('./query/main_fallback');
var QueryFactory = require('./query/query_factory');
function JobFallback(jobDefinition) {
JobBase.call(this, jobDefinition);
this.init();
this.queries = [];
for (var i = 0; i < this.data.query.query.length; i++) {
this.queries[i] = QueryFactory.create(this.data, i);
}
if (MainFallback.is(this.data)) {
this.fallback = new MainFallback();
}
}
util.inherits(JobFallback, JobBase);
module.exports = JobFallback;
// 1. from user: {
// query: {
// query: [{
// query: 'select ...',
// onsuccess: 'select ..'
// }],
// onerror: 'select ...'
// }
// }
//
// 2. from redis: {
// status: 'pending',
// fallback_status: 'pending'
// query: {
// query: [{
// query: 'select ...',
// onsuccess: 'select ..'
// status: 'pending',
// fallback_status: 'pending',
// }],
// onerror: 'select ...'
// }
// }
JobFallback.is = function (query) {
if (!query.query) {
return false;
}
if (!Array.isArray(query.query)) {
return false;
}
for (var i = 0; i < query.query.length; i++) {
if (!QueryFallback.is(query.query[i])) {
return false;
}
}
return true;
};
JobFallback.prototype.init = function () {
for (var i = 0; i < this.data.query.query.length; i++) {
if (shouldInitStatus(this.data.query.query[i])){
this.data.query.query[i].status = JobStatus.PENDING;
}
if (shouldInitQueryFallbackStatus(this.data.query.query[i])) {
this.data.query.query[i].fallback_status = JobStatus.PENDING;
}
}
if (shouldInitStatus(this.data)) {
this.data.status = JobStatus.PENDING;
}
if (shouldInitFallbackStatus(this.data)) {
this.data.fallback_status = JobStatus.PENDING;
}
};
function shouldInitStatus(jobOrQuery) {
return !jobOrQuery.status;
}
function shouldInitQueryFallbackStatus(query) {
return (query.onsuccess || query.onerror) && !query.fallback_status;
}
function shouldInitFallbackStatus(job) {
return (job.query.onsuccess || job.query.onerror) && !job.fallback_status;
}
JobFallback.prototype.getNextQueryFromQueries = function () {
for (var i = 0; i < this.queries.length; i++) {
if (this.queries[i].hasNextQuery(this.data)) {
return this.queries[i].getNextQuery(this.data);
}
}
};
JobFallback.prototype.hasNextQueryFromQueries = function () {
return !!this.getNextQueryFromQueries();
};
JobFallback.prototype.getNextQueryFromFallback = function () {
if (this.fallback && this.fallback.hasNextQuery(this.data)) {
return this.fallback.getNextQuery(this.data);
}
};
JobFallback.prototype.getNextQuery = function () {
var query = this.getNextQueryFromQueries();
if (!query) {
query = this.getNextQueryFromFallback();
}
return query;
};
JobFallback.prototype.setQuery = function (query) {
if (!JobFallback.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobFallback.super_.prototype.setQuery.call(this, query);
};
JobFallback.prototype.setStatus = function (status, errorMesssage) {
var now = new Date().toISOString();
var hasChanged = this.setQueryStatus(status, this.data, errorMesssage);
hasChanged = this.setJobStatus(status, this.data, hasChanged, errorMesssage);
hasChanged = this.setFallbackStatus(status, this.data, hasChanged);
if (!hasChanged.isValid) {
throw new Error('Cannot set status to ' + status);
}
this.data.updated_at = now;
};
JobFallback.prototype.setQueryStatus = function (status, job, errorMesssage) {
return this.queries.reduce(function (hasChanged, query) {
var result = query.setStatus(status, this.data, hasChanged, errorMesssage);
return result.isValid ? result : hasChanged;
}.bind(this), { isValid: false, appliedToFallback: false });
};
JobFallback.prototype.setJobStatus = function (status, job, hasChanged, errorMesssage) {
var result = {
isValid: false,
appliedToFallback: false
};
status = this.shiftStatus(status, hasChanged);
result.isValid = this.isValidTransition(job.status, status);
if (result.isValid) {
job.status = status;
if (status === JobStatus.FAILED && errorMesssage && !hasChanged.appliedToFallback) {
job.failed_reason = errorMesssage;
}
}
return result.isValid ? result : hasChanged;
};
JobFallback.prototype.setFallbackStatus = function (status, job, hasChanged) {
var result = hasChanged;
if (this.fallback && !this.hasNextQueryFromQueries()) {
result = this.fallback.setStatus(status, job, hasChanged);
}
return result.isValid ? result : hasChanged;
};
JobFallback.prototype.shiftStatus = function (status, hasChanged) {
// jshint maxcomplexity: 7
if (hasChanged.appliedToFallback) {
if (!this.hasNextQueryFromQueries() && (status === JobStatus.DONE || status === JobStatus.FAILED)) {
status = this.getLastFinishedStatus();
} else if (status === JobStatus.DONE || status === JobStatus.FAILED){
status = JobStatus.PENDING;
}
} else if (this.hasNextQueryFromQueries() && status !== JobStatus.RUNNING) {
status = JobStatus.PENDING;
}
return status;
};
JobFallback.prototype.getLastFinishedStatus = function () {
return this.queries.reduce(function (lastFinished, query) {
var status = query.getStatus(this.data);
return this.isFinalStatus(status) ? status : lastFinished;
}.bind(this), JobStatus.DONE);
};
JobFallback.prototype.log = function(logger) {
if (!isFinished(this)) {
return false;
}
var queries = this.data.query.query;
for (var i = 0; i < queries.length; i++) {
var query = queries[i];
var logEntry = {
created: this.data.created_at,
waiting: elapsedTime(this.data.created_at, query.started_at),
time: query.started_at,
endtime: query.ended_at,
username: this.data.user,
dbhost: this.data.host,
job: this.data.job_id,
status: query.status,
elapsed: elapsedTime(query.started_at, query.ended_at)
};
var queryId = query.id;
var tag = 'query';
if (queryId) {
logEntry.query_id = queryId;
var node = parseQueryId(queryId);
if (node) {
logEntry.analysis = node.analysisId;
logEntry.node = node.nodeId;
logEntry.type = node.nodeType;
tag = 'analysis';
}
}
logger.info(logEntry, tag);
}
return true;
};
function isFinished (job) {
return JobStatus.isFinal(job.data.status) &&
(!job.data.fallback_status || JobStatus.isFinal(job.data.fallback_status));
}
function parseQueryId (queryId) {
var data = queryId.split(':');
if (data.length === 3) {
return {
analysisId: data[0],
nodeId: data[1],
nodeType: data[2]
};
}
return null;
}
function elapsedTime (started_at, ended_at) {
if (!started_at || !ended_at) {
return;
}
var start = new Date(started_at);
var end = new Date(ended_at);
return end.getTime() - start.getTime();
}

View File

@@ -0,0 +1,91 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
function JobMultiple(jobDefinition) {
JobBase.call(this, jobDefinition);
this.init();
}
util.inherits(JobMultiple, JobBase);
module.exports = JobMultiple;
JobMultiple.is = function (query) {
if (!Array.isArray(query)) {
return false;
}
// 1. From user: ['select * from ...', 'select * from ...']
// 2. From redis: [ { query: 'select * from ...', status: 'pending' },
// { query: 'select * from ...', status: 'pending' } ]
for (var i = 0; i < query.length; i++) {
if (typeof query[i] !== 'string') {
if (typeof query[i].query !== 'string') {
return false;
}
}
}
return true;
};
JobMultiple.prototype.init = function () {
if (!this.data.status) {
this.data.status = jobStatus.PENDING;
}
for (var i = 0; i < this.data.query.length; i++) {
if (!this.data.query[i].query && !this.data.query[i].status) {
this.data.query[i] = {
query: this.data.query[i],
status: jobStatus.PENDING
};
}
}
};
JobMultiple.prototype.getNextQuery = function () {
for (var i = 0; i < this.data.query.length; i++) {
if (this.data.query[i].status === jobStatus.PENDING) {
return this.data.query[i].query;
}
}
};
JobMultiple.prototype.setQuery = function (query) {
if (!JobMultiple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobMultiple.super_.prototype.setQuery.call(this, query);
};
JobMultiple.prototype.setStatus = function (finalStatus, errorMesssage) {
var initialStatus = this.data.status;
// if transition is to "done" and there are more queries to run
// then job status must be "pending" instead of "done"
// else job status transition to done (if "running")
if (finalStatus === jobStatus.DONE && this.hasNextQuery()) {
JobMultiple.super_.prototype.setStatus.call(this, jobStatus.PENDING);
} else {
JobMultiple.super_.prototype.setStatus.call(this, finalStatus, errorMesssage);
}
for (var i = 0; i < this.data.query.length; i++) {
var isValid = JobMultiple.super_.prototype.isValidTransition(this.data.query[i].status, finalStatus);
if (isValid) {
this.data.query[i].status = finalStatus;
if (finalStatus === jobStatus.FAILED && errorMesssage) {
this.data.query[i].failed_reason = errorMesssage;
}
return;
}
}
throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus);
};

View File

@@ -0,0 +1,34 @@
'use strict';
var util = require('util');
var JobBase = require('./job_base');
var jobStatus = require('../job_status');
function JobSimple(jobDefinition) {
JobBase.call(this, jobDefinition);
if (!this.data.status) {
this.data.status = jobStatus.PENDING;
}
}
util.inherits(JobSimple, JobBase);
module.exports = JobSimple;
JobSimple.is = function (query) {
return typeof query === 'string';
};
JobSimple.prototype.getNextQuery = function () {
if (this.isPending()) {
return this.data.query;
}
};
JobSimple.prototype.setQuery = function (query) {
if (!JobSimple.is(query)) {
throw new Error('You must indicate a valid SQL');
}
JobSimple.super_.prototype.setQuery.call(this, query);
};

View File

@@ -0,0 +1,39 @@
'use strict';
var assert = require('assert');
var JobStatus = require('../job_status');
var validStatusTransitions = [
[JobStatus.PENDING, JobStatus.RUNNING],
[JobStatus.PENDING, JobStatus.CANCELLED],
[JobStatus.PENDING, JobStatus.UNKNOWN],
[JobStatus.PENDING, JobStatus.SKIPPED],
[JobStatus.RUNNING, JobStatus.DONE],
[JobStatus.RUNNING, JobStatus.FAILED],
[JobStatus.RUNNING, JobStatus.CANCELLED],
[JobStatus.RUNNING, JobStatus.PENDING],
[JobStatus.RUNNING, JobStatus.UNKNOWN]
];
function JobStateMachine () {
}
module.exports = JobStateMachine;
JobStateMachine.prototype.isValidTransition = function (initialStatus, finalStatus) {
var transition = [ initialStatus, finalStatus ];
for (var i = 0; i < validStatusTransitions.length; i++) {
try {
assert.deepEqual(transition, validStatusTransitions[i]);
return true;
} catch (e) {
continue;
}
}
return false;
};
JobStateMachine.prototype.isFinalStatus = function (status) {
return JobStatus.isFinal(status);
};

View File

@@ -0,0 +1,78 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
function Fallback(index) {
QueryBase.call(this, index);
}
util.inherits(Fallback, QueryBase);
module.exports = Fallback;
Fallback.is = function (query) {
if (query.onsuccess || query.onerror) {
return true;
}
return false;
};
Fallback.prototype.getNextQuery = function (job) {
if (this.hasOnSuccess(job)) {
return this.getOnSuccess(job);
}
if (this.hasOnError(job)) {
return this.getOnError(job);
}
};
Fallback.prototype.getOnSuccess = function (job) {
if (job.query.query[this.index].status === jobStatus.DONE &&
job.query.query[this.index].fallback_status === jobStatus.PENDING) {
var onsuccessQuery = job.query.query[this.index].onsuccess;
if (onsuccessQuery) {
onsuccessQuery = onsuccessQuery.replace(/<%=\s*job_id\s*%>/g, job.job_id);
}
return onsuccessQuery;
}
};
Fallback.prototype.hasOnSuccess = function (job) {
return !!this.getOnSuccess(job);
};
Fallback.prototype.getOnError = function (job) {
if (job.query.query[this.index].status === jobStatus.FAILED &&
job.query.query[this.index].fallback_status === jobStatus.PENDING) {
var onerrorQuery = job.query.query[this.index].onerror;
if (onerrorQuery) {
onerrorQuery = onerrorQuery.replace(/<%=\s*job_id\s*%>/g, job.job_id);
onerrorQuery = onerrorQuery.replace(/<%=\s*error_message\s*%>/g, job.query.query[this.index].failed_reason);
}
return onerrorQuery;
}
};
Fallback.prototype.hasOnError = function (job) {
return !!this.getOnError(job);
};
Fallback.prototype.setStatus = function (status, job, errorMessage) {
var isValid = false;
isValid = this.isValidTransition(job.query.query[this.index].fallback_status, status);
if (isValid) {
job.query.query[this.index].fallback_status = status;
if (status === jobStatus.FAILED && errorMessage) {
job.query.query[this.index].failed_reason = errorMessage;
}
}
return isValid;
};
Fallback.prototype.getStatus = function (job) {
return job.query.query[this.index].fallback_status;
};

View File

@@ -0,0 +1,74 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
function MainFallback() {
QueryBase.call(this);
}
util.inherits(MainFallback, QueryBase);
module.exports = MainFallback;
MainFallback.is = function (job) {
if (job.query.onsuccess || job.query.onerror) {
return true;
}
return false;
};
MainFallback.prototype.getNextQuery = function (job) {
if (this.hasOnSuccess(job)) {
return this.getOnSuccess(job);
}
if (this.hasOnError(job)) {
return this.getOnError(job);
}
};
MainFallback.prototype.getOnSuccess = function (job) {
if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.PENDING) {
return job.query.onsuccess;
}
};
MainFallback.prototype.hasOnSuccess = function (job) {
return !!this.getOnSuccess(job);
};
MainFallback.prototype.getOnError = function (job) {
if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.PENDING) {
return job.query.onerror;
}
};
MainFallback.prototype.hasOnError = function (job) {
return !!this.getOnError(job);
};
MainFallback.prototype.setStatus = function (status, job, previous) {
var isValid = false;
var appliedToFallback = false;
if (previous.isValid && !previous.appliedToFallback) {
if (this.isFinalStatus(status) && !this.hasNextQuery(job)) {
isValid = this.isValidTransition(job.fallback_status, jobStatus.SKIPPED);
if (isValid) {
job.fallback_status = jobStatus.SKIPPED;
appliedToFallback = true;
}
}
} else if (!previous.isValid) {
isValid = this.isValidTransition(job.fallback_status, status);
if (isValid) {
job.fallback_status = status;
appliedToFallback = true;
}
}
return { isValid: isValid, appliedToFallback: appliedToFallback };
};

View File

@@ -0,0 +1,57 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var jobStatus = require('../../job_status');
function Query(index) {
QueryBase.call(this, index);
}
util.inherits(Query, QueryBase);
module.exports = Query;
Query.is = function (query) {
if (query.query && typeof query.query === 'string') {
return true;
}
return false;
};
Query.prototype.getNextQuery = function (job) {
if (job.query.query[this.index].status === jobStatus.PENDING) {
var query = {
query: job.query.query[this.index].query
};
if (Number.isFinite(job.query.query[this.index].timeout)) {
query.timeout = job.query.query[this.index].timeout;
}
return query;
}
};
Query.prototype.setStatus = function (status, job, errorMesssage) {
var isValid = false;
isValid = this.isValidTransition(job.query.query[this.index].status, status);
if (isValid) {
job.query.query[this.index].status = status;
if (status === jobStatus.RUNNING) {
job.query.query[this.index].started_at = new Date().toISOString();
}
if (this.isFinalStatus(status)) {
job.query.query[this.index].ended_at = new Date().toISOString();
}
if (status === jobStatus.FAILED && errorMesssage) {
job.query.query[this.index].failed_reason = errorMesssage;
}
}
return isValid;
};
Query.prototype.getStatus = function (job) {
return job.query.query[this.index].status;
};

View File

@@ -0,0 +1,31 @@
'use strict';
var util = require('util');
var JobStateMachine = require('../job_state_machine');
function QueryBase(index) {
JobStateMachine.call(this);
this.index = index;
}
util.inherits(QueryBase, JobStateMachine);
module.exports = QueryBase;
// should be implemented
QueryBase.prototype.setStatus = function () {
throw new Error('Unimplemented method');
};
// should be implemented
QueryBase.prototype.getNextQuery = function () {
throw new Error('Unimplemented method');
};
QueryBase.prototype.hasNextQuery = function (job) {
return !!this.getNextQuery(job);
};
QueryBase.prototype.getStatus = function () {
throw new Error('Unimplemented method');
};

View File

@@ -0,0 +1,16 @@
'use strict';
var QueryFallback = require('./query_fallback');
function QueryFactory() {
}
module.exports = QueryFactory;
QueryFactory.create = function (job, index) {
if (QueryFallback.is(job.query.query[index])) {
return new QueryFallback(job, index);
}
throw new Error('there is no query class for the provided query');
};

View File

@@ -0,0 +1,75 @@
'use strict';
var util = require('util');
var QueryBase = require('./query_base');
var Query = require('./query');
var Fallback = require('./fallback');
var jobStatus = require('../../job_status');
function QueryFallback(job, index) {
QueryBase.call(this, index);
this.init(job, index);
}
util.inherits(QueryFallback, QueryBase);
QueryFallback.is = function (query) {
if (Query.is(query)) {
return true;
}
return false;
};
QueryFallback.prototype.init = function (job, index) {
this.query = new Query(index);
if (Fallback.is(job.query.query[index])) {
this.fallback = new Fallback(index);
}
};
QueryFallback.prototype.getNextQuery = function (job) {
if (this.query.hasNextQuery(job)) {
return this.query.getNextQuery(job);
}
if (this.fallback && this.fallback.hasNextQuery(job)) {
return this.fallback.getNextQuery(job);
}
};
QueryFallback.prototype.setStatus = function (status, job, previous, errorMesssage) {
// jshint maxcomplexity: 9
var isValid = false;
var appliedToFallback = false;
if (previous.isValid && !previous.appliedToFallback) {
if (status === jobStatus.FAILED || status === jobStatus.CANCELLED) {
this.query.setStatus(jobStatus.SKIPPED, job, errorMesssage);
if (this.fallback) {
this.fallback.setStatus(jobStatus.SKIPPED, job);
}
}
} else if (!previous.isValid) {
isValid = this.query.setStatus(status, job, errorMesssage);
if (this.fallback) {
if (!isValid) {
isValid = this.fallback.setStatus(status, job, errorMesssage);
appliedToFallback = true;
} else if (isValid && this.isFinalStatus(status) && !this.fallback.hasNextQuery(job)) {
this.fallback.setStatus(jobStatus.SKIPPED, job);
}
}
}
return { isValid: isValid, appliedToFallback: appliedToFallback };
};
QueryFallback.prototype.getStatus = function (job) {
return this.query.getStatus(job);
};
module.exports = QueryFallback;

4
batch/pubsub/channel.js Normal file
View File

@@ -0,0 +1,4 @@
module.exports = {
DB: 0,
NAME: 'batch:users'
};

View File

@@ -0,0 +1,31 @@
'use strict';
var Channel = require('./channel');
var debug = require('./../util/debug')('pubsub:publisher');
var error = require('./../util/debug')('pubsub:publisher:error');
function JobPublisher(pool) {
this.pool = pool;
}
JobPublisher.prototype.publish = function (user) {
var self = this;
this.pool.acquire(Channel.DB, function (err, client) {
if (err) {
return error('Error adquiring redis client: ' + err.message);
}
client.publish(Channel.NAME, user, function (err) {
self.pool.release(Channel.DB, client);
if (err) {
return error('Error publishing to ' + Channel.NAME + ':' + user + ', ' + err.message);
}
debug('publish to ' + Channel.NAME + ':' + user);
});
});
};
module.exports = JobPublisher;

View File

@@ -0,0 +1,54 @@
'use strict';
var Channel = require('./channel');
var debug = require('./../util/debug')('pubsub:subscriber');
var error = require('./../util/debug')('pubsub:subscriber:error');
function JobSubscriber(pool) {
this.pool = pool;
}
module.exports = JobSubscriber;
JobSubscriber.prototype.subscribe = function (onJobHandler, callback) {
var self = this;
self.pool.acquire(Channel.DB, function(err, client) {
if (err) {
if (callback) {
callback(err);
}
return error('Error adquiring redis client: ' + err.message);
}
self.client = client;
client.removeAllListeners('message');
client.unsubscribe(Channel.NAME);
client.subscribe(Channel.NAME);
client.on('message', function (channel, user) {
debug('message received in channel=%s from user=%s', channel, user);
onJobHandler(user);
});
client.on('error', function () {
self.unsubscribe();
self.pool.release(Channel.DB, client);
self.subscribe(onJobHandler);
});
if (callback) {
callback();
}
});
};
JobSubscriber.prototype.unsubscribe = function (callback) {
if (this.client && this.client.connected) {
this.client.unsubscribe(Channel.NAME, callback);
} else {
if (callback) {
return callback(null);
}
}
};

54
batch/query_runner.js Normal file
View File

@@ -0,0 +1,54 @@
'use strict';
var PSQL = require('cartodb-psql');
var debug = require('./util/debug')('query-runner');
function QueryRunner(userDatabaseMetadataService) {
this.userDatabaseMetadataService = userDatabaseMetadataService;
}
module.exports = QueryRunner;
function hasDBParams (dbparams) {
return (dbparams.user && dbparams.host && dbparams.port && dbparams.dbname && dbparams.pass);
}
QueryRunner.prototype.run = function (job_id, sql, user, timeout, dbparams, callback) {
if (hasDBParams(dbparams)) {
return this._run(dbparams, job_id, sql, timeout, callback);
}
const dbConfigurationError = new Error('Batch Job DB misconfiguration');
return callback(dbConfigurationError);
};
QueryRunner.prototype._run = function (dbparams, job_id, sql, timeout, callback) {
var pg = new PSQL(dbparams);
pg.query('SET statement_timeout=' + timeout, function (err) {
if(err) {
return callback(err);
}
// mark query to allow to users cancel their queries
sql = '/* ' + job_id + ' */ ' + sql;
debug('Running query [timeout=%d] %s', timeout, sql);
pg.eventedQuery(sql, function (err, query) {
if (err) {
return callback(err);
}
query.on('error', callback);
query.on('end', function (result) {
// only if result is present then query is done sucessfully otherwise an error has happened
// and it was handled by error listener
if (result) {
callback(null, result);
}
});
});
});
};

View File

@@ -0,0 +1,11 @@
'use strict';
function FixedCapacity(capacity) {
this.capacity = Math.max(1, capacity);
}
module.exports = FixedCapacity;
FixedCapacity.prototype.getCapacity = function(callback) {
return callback(null, this.capacity);
};

View File

@@ -0,0 +1,32 @@
'use strict';
var util = require('util');
var debug = require('../../util/debug')('capacity-http-load');
var HttpSimpleCapacity = require('./http-simple');
function HttpLoadCapacity(host, capacityEndpoint) {
HttpSimpleCapacity.call(this, host, capacityEndpoint);
}
util.inherits(HttpLoadCapacity, HttpSimpleCapacity);
module.exports = HttpLoadCapacity;
HttpLoadCapacity.prototype.getCapacity = function(callback) {
this.getResponse(function(err, values) {
var capacity = 1;
if (err) {
return callback(null, capacity);
}
var cores = parseInt(values.cores, 10);
var relativeLoad = parseFloat(values.relative_load);
capacity = Math.max(1, Math.floor(((1 - relativeLoad) * cores) - 1));
capacity = Number.isFinite(capacity) ? capacity : 1;
debug('host=%s, capacity=%s', this.host, capacity);
return callback(null, capacity);
}.bind(this));
};

View File

@@ -0,0 +1,62 @@
'use strict';
var request = require('request');
var debug = require('../../util/debug')('capacity-http-simple');
function HttpSimpleCapacity(host, capacityEndpoint) {
this.host = host;
this.capacityEndpoint = capacityEndpoint;
this.lastResponse = null;
this.lastResponseTime = 0;
}
module.exports = HttpSimpleCapacity;
HttpSimpleCapacity.prototype.getCapacity = function(callback) {
this.getResponse(function(err, values) {
var capacity = 1;
if (err) {
return callback(null, capacity);
}
var availableCores = parseInt(values.available_cores, 10);
capacity = Math.max(availableCores, 1);
capacity = Number.isFinite(capacity) ? capacity : 1;
debug('host=%s, capacity=%s', this.host, capacity);
return callback(null, capacity);
}.bind(this));
};
HttpSimpleCapacity.prototype.getResponse = function(callback) {
var requestParams = {
method: 'POST',
url: this.capacityEndpoint,
timeout: 2000,
json: true
};
debug('getCapacity(%s)', this.host);
// throttle requests for 500 ms
var now = Date.now();
if (this.lastResponse !== null && ((now - this.lastResponseTime) < 500)) {
return callback(null, this.lastResponse);
}
request.post(requestParams, function(err, res, jsonRes) {
if (err) {
return callback(err);
}
if (jsonRes && jsonRes.retcode === 0) {
this.lastResponse = jsonRes.return_values || {};
// We could go more aggressive by updating lastResponseTime on failures.
this.lastResponseTime = now;
return callback(null, this.lastResponse);
}
return callback(new Error('Could not retrieve information from endpoint'));
}.bind(this));
};

View File

@@ -0,0 +1,85 @@
'use strict';
var _ = require('underscore');
var debug = require('../util/debug')('host-scheduler');
var Scheduler = require('./scheduler');
var Locker = require('../leader/locker');
var FixedCapacity = require('./capacity/fixed');
var HttpSimpleCapacity = require('./capacity/http-simple');
var HttpLoadCapacity = require('./capacity/http-load');
function HostScheduler(name, taskRunner, redisPool) {
this.name = name || 'scheduler';
this.taskRunner = taskRunner;
this.locker = Locker.create('redis-distlock', { pool: redisPool });
this.locker.on('error', function(err, host) {
debug('[%s] Locker.error %s', this.name, err.message);
this.unlock(host);
}.bind(this));
// host => Scheduler
this.schedulers = {};
}
module.exports = HostScheduler;
HostScheduler.prototype.add = function(host, user, callback) {
this.lock(host, function(err, scheduler) {
if (err) {
debug('[%s] Could not lock host=%s', this.name, host);
return callback(err);
}
scheduler.add(user);
var wasRunning = scheduler.schedule();
debug('[%s] Scheduler host=%s was running=%s', this.name, host, wasRunning);
return callback(err, wasRunning);
}.bind(this));
};
HostScheduler.prototype.getCapacityProvider = function(host) {
var strategy = global.settings.batch_capacity_strategy;
if (strategy === 'http-simple' || strategy === 'http-load') {
if (global.settings.batch_capacity_http_url_template) {
var endpoint = _.template(global.settings.batch_capacity_http_url_template, { dbhost: host });
debug('Using strategy=%s capacity. Endpoint=%s', strategy, endpoint);
if (strategy === 'http-simple') {
return new HttpSimpleCapacity(host, endpoint);
}
return new HttpLoadCapacity(host, endpoint);
}
}
var fixedCapacity = global.settings.batch_capacity_fixed_amount || 4;
debug('Using strategy=fixed capacity=%d', fixedCapacity);
return new FixedCapacity(fixedCapacity);
};
HostScheduler.prototype.lock = function(host, callback) {
debug('[%s] lock(%s)', this.name, host);
var self = this;
this.locker.lock(host, function(err) {
if (err) {
debug('[%s] Could not lock host=%s. Reason: %s', self.name, host, err.message);
return callback(err);
}
if (!self.schedulers.hasOwnProperty(host)) {
var scheduler = new Scheduler(self.getCapacityProvider(host), self.taskRunner);
scheduler.on('done', self.unlock.bind(self, host));
self.schedulers[host] = scheduler;
}
debug('[%s] Locked host=%s', self.name, host);
return callback(null, self.schedulers[host]);
});
};
HostScheduler.prototype.unlock = function(host) {
debug('[%s] unlock(%s)', this.name, host);
if (this.schedulers.hasOwnProperty(host)) {
// TODO stop scheduler?
delete this.schedulers[host];
}
this.locker.unlock(host, debug);
};

View File

@@ -0,0 +1,201 @@
'use strict';
// Inspiration from:
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
// - https://www.kernel.org/doc/Documentation/rbtree.txt
// - http://www.ibm.com/developerworks/linux/library/l-completely-fair-scheduler/
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var RBTree = require('bintrees').RBTree;
var debug = require('../util/debug')('scheduler');
var forever = require('../util/forever');
function Scheduler(capacity, taskRunner) {
EventEmitter.call(this);
debug('new Scheduler');
this.taskRunner = taskRunner;
this.capacity = capacity;
this.tasks = [];
this.users = {};
this.tasksTree = new RBTree(function(taskEntityA, taskEntityB) {
// if the user is the same it's the same entity
if (taskEntityA.user === taskEntityB.user) {
return 0;
}
// priority for entity with less executed jobs
if (taskEntityA.jobs !== taskEntityB.jobs) {
return taskEntityA.jobs - taskEntityB.jobs;
}
// priority for oldest job
if (taskEntityA.createdAt !== taskEntityB.createdAt) {
return taskEntityA.createdAt - taskEntityB.createdAt;
}
// we don't care if we arrive here
return -1;
});
}
util.inherits(Scheduler, EventEmitter);
module.exports = Scheduler;
Scheduler.prototype.add = function(user) {
debug('add(%s)', user);
var taskEntity = this.users[user];
if (taskEntity) {
if (taskEntity.status === STATUS.DONE) {
taskEntity.status = STATUS.PENDING;
this.tasksTree.insert(taskEntity);
this.emit('add');
}
return true;
} else {
taskEntity = new TaskEntity(user, this.tasks.length);
this.tasks.push(taskEntity);
this.users[user] = taskEntity;
this.tasksTree.insert(taskEntity);
this.emit('add');
return false;
}
};
Scheduler.prototype.schedule = function() {
if (this.running) {
return true;
}
this.running = true;
var self = this;
forever(
function (next) {
debug('Waiting for task');
self.acquire(function(err, taskEntity) {
debug('Acquired user=%j', taskEntity);
if (!taskEntity) {
return next(new Error('all users finished'));
}
self.tasksTree.remove(taskEntity);
taskEntity.running();
debug('Running task for user=%s', taskEntity.user);
self.taskRunner.run(taskEntity.user, function(err, userQueueIsEmpty) {
debug('Run task=%j, done=%s', taskEntity, userQueueIsEmpty);
taskEntity.ran(userQueueIsEmpty);
self.release(err, taskEntity);
});
// try to acquire next user
// will block until capacity slot is available
next();
});
},
function (err) {
debug('done: %s', err.message);
self.running = false;
self.emit('done');
self.removeAllListeners();
}
);
return false;
};
Scheduler.prototype.acquire = function(callback) {
this.removeAllListeners('add');
this.removeAllListeners('release');
if (this.tasks.every(is(STATUS.DONE))) {
return callback(null, null);
}
var self = this;
this.capacity.getCapacity(function(err, capacity) {
if (err) {
return callback(err);
}
debug('Trying to acquire task');
var running = self.tasks.filter(is(STATUS.RUNNING));
debug('[capacity=%d, running=%d] candidates=%j', capacity, running.length, self.tasks);
self.once('add', function() {
debug('Got a new task');
self.acquire(callback);
});
self.once('release', function() {
debug('Slot was released');
self.acquire(callback);
});
if (running.length >= capacity) {
debug('Not enough capacity');
return null;
}
var isRunningAny = self.tasks.some(is(STATUS.RUNNING));
var candidate = self.tasksTree.min();
if (isRunningAny && candidate === null) {
debug('Waiting for last task to finish');
return null;
}
if (candidate) {
self.emit('acquired', candidate.user);
}
return callback(null, candidate);
});
};
Scheduler.prototype.release = function(err, taskEntity) {
debug('Released %j', taskEntity);
if (taskEntity.is(STATUS.PENDING)) {
this.tasksTree.insert(taskEntity);
}
this.emit('release');
};
/* Task entities */
var STATUS = {
PENDING: 'pending',
RUNNING: 'running',
DONE: 'done'
};
function TaskEntity(user, createdAt) {
this.user = user;
this.createdAt = createdAt;
this.status = STATUS.PENDING;
this.jobs = 0;
}
TaskEntity.prototype.is = function(status) {
return this.status === status;
};
TaskEntity.prototype.running = function() {
this.status = STATUS.RUNNING;
};
TaskEntity.prototype.ran = function(userQueueIsEmpty) {
this.jobs++;
this.status = userQueueIsEmpty ? STATUS.DONE : STATUS.PENDING;
};
function is(status) {
return function(taskEntity) {
return taskEntity.is(status);
};
}

View File

@@ -0,0 +1,31 @@
'use strict';
function UserDatabaseMetadataService(metadataBackend) {
this.metadataBackend = metadataBackend;
}
UserDatabaseMetadataService.prototype.getUserMetadata = function (username, callback) {
var self = this;
this.metadataBackend.getAllUserDBParams(username, function (err, userDatabaseMetadata) {
if (err) {
return callback(err);
}
callback(null, self.parseMetadataToDatabase(userDatabaseMetadata));
});
};
UserDatabaseMetadataService.prototype.parseMetadataToDatabase = function (userDatabaseMetadata) {
var dbParams = userDatabaseMetadata;
var dbopts = {};
dbopts.port = dbParams.dbport || global.settings.db_batch_port || global.settings.db_port;
dbopts.host = dbParams.dbhost;
dbopts.dbname = dbParams.dbname;
return dbopts;
};
module.exports = UserDatabaseMetadataService;

7
batch/util/debug.js Normal file
View File

@@ -0,0 +1,7 @@
'use strict';
var debug = require('debug');
module.exports = function batchDebug (ns) {
return debug(['batch', ns].join(':'));
};

11
batch/util/forever.js Normal file
View File

@@ -0,0 +1,11 @@
'use strict';
module.exports = function forever(fn, done) {
function next(err) {
if (err) {
return done(err);
}
fn(next);
}
next();
};