在實際生產(chǎn)環(huán)境中,避免不了有很多后臺運行的任務(wù)和定時任務(wù),對任務(wù)狀態(tài)的監(jiān)控與及時告警可以盡量減少程序出錯時對用戶造成的影響。針對常見的兩種任務(wù)類型:定時任務(wù)、守護進程內(nèi)批處理任務(wù),利用 Node.js child_process 實現(xiàn)了任務(wù)狀態(tài)的監(jiān)控、重啟與郵件告警。具體的實現(xiàn),本文將為大家進行詳細的分析,希望對大家學習Node.js有所幫助。
現(xiàn)在的互聯(lián)網(wǎng)已經(jīng)不是單機作戰(zhàn)的時代了,分布式部署是非常常見的方式,一個項目中的任務(wù)可能運行在多臺服務(wù)器上,我們的監(jiān)控平臺要做到重啟某個任務(wù)就需要知道任務(wù)運行的具體服務(wù)器,針對這一個問題我們需要獲取到任務(wù)與服務(wù)器關(guān)系的確切信息,所以每臺運行任務(wù)的服務(wù)器需要在啟動任務(wù)時向任務(wù)狀態(tài)管理平臺注冊自己的信息。
任務(wù)狀態(tài)的維護依賴于任務(wù)運行服務(wù)器的心跳上報,每個任務(wù)設(shè)置一個超時時間,在任務(wù)啟動時向任務(wù)狀態(tài)管理平臺發(fā)送開始運行信號,在任務(wù)運行結(jié)束后向管理平臺發(fā)送運行完成信號。任務(wù)管理平臺根據(jù)任務(wù)設(shè)置的超時時間,在超時后仍然沒有接收到任務(wù)完成信號則判定任務(wù)失敗,將任務(wù)失敗信號發(fā)送回任務(wù)運行的服務(wù)器。再有任務(wù)運行服務(wù)器自行處理,如重啟任務(wù)或者結(jié)束任務(wù)等。
根據(jù)以上的邏輯,實際需要就是在任務(wù)運行的服務(wù)器實現(xiàn)一個任務(wù)調(diào)度功能與 HTTP 服務(wù)器用來監(jiān)聽管理平臺發(fā)送的信號;在管理平臺這邊實現(xiàn)任務(wù)服務(wù)器信息注冊、任務(wù)狀態(tài)監(jiān)管與超時告警。文字表述比較晦澀,具體流程可以參考一下的流程圖。
實現(xiàn)代碼
后續(xù)會把關(guān)鍵信息從代碼中抽離出來放到配置文件中,然后放到 GitHub 上,暫時以貼代碼的形式簡單展示一下。
// 任務(wù)運行服務(wù)器調(diào)度系統(tǒng)
'use strict';
// 內(nèi)建模塊
const fork = require('child_process').fork;
const path = require('path');
// 第三方模塊
const _ = require('lodash');
const CronJob = require('cron').CronJob;
const bodyParser = require('body-parser');
const express = require('express');
const request = require('request');
const uuid = require('uuid');
class TaskStatusManagementClient {
/**
* 初始化 TaskStatusClient
* @param {Object} taskClientConfig 服務(wù)器配置信息
*/
constructor(taskClientConfig) {
this.taskClientConfig = taskClientConfig;
this.taskHomePath = taskClientConfig.taskHomePath;
this.childsInstance = {};
this.crontabJobsInstance = {};
}
/**
* start TaskStatusClient
*/
start() {
this._process();
}
_process() {
let self = this;
// 根據(jù)服務(wù)器配置信息啟動所有任務(wù)
for(let taskConfig of self.taskClientConfig.tasks) {
switch(taskConfig.type) {
case 'daemon': {
self._daemonTaskHandler(taskConfig);
break;
}
case 'crontab': {
if(taskConfig.crontabRule) {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
}
// 在程序退出時結(jié)束所有子進程任務(wù)
process.on('exit', function (code) {
for(let child in self.childsInstance) {
if(self.childsInstance.hasOwnProperty(child)) {
self.childsInstance[child].kill('SIGHUP');
}
}
});
// 啟動 HTTP 服務(wù)器,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺發(fā)回的信號
let app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
app.post('/', function (req, res) {
let body;
try {
body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;
} catch (error) {
return res.status(400).json({ message: 'invalid json' });
}
res.status(200).json({ message: 'ok' });
// 收到任務(wù)狀態(tài)監(jiān)控平臺發(fā)回的信號后重啟任務(wù)
let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });
let taskIdentifier = '';
// daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名
// crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 + 任務(wù) ID
switch (taskConfig.type) {
case 'daemon': {
taskIdentifier = taskConfig.name;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.daemonTaskHandler) {
self.handlers.daemonTaskHandler(taskConfig);
} else {
self._daemonTaskHandler(taskConfig);
}
break;
}
case 'crontab': {
taskIdentifier = taskConfig.name + taskConfig.id;
self.childsInstance[taskIdentifier].kill('SIGHUP');
delete self.childsInstance[taskIdentifier];
if(self.handlers.crontabTaskHandler) {
self.handlers.crontabTaskHandler(taskConfig);
} else {
self._crontabTaskHanlder(taskConfig);
}
break;
}
default: {
console.log('unknow task type');
break;
}
}
});
app.listen(self.taskClientConfig.server.port, function () {
console.log('server start at: ' + self.taskClientConfig.server.port);
self._registerServer(self.taskClientConfig, function (error, result) {
if (error) {
console.log(error);
}
});
});
}
_daemonTaskHandler(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
expires: taskConfig.timeout
};
let child = fork(path.join(self.taskHomePath,taskConfig.file));
self.childsInstance[taskInfo.name] = child;
child.on('message', function (message) {
switch (message.signal) {
case 'start': {
taskInfo.id = message.id;
self._startTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
break;
}
case 'end': {
taskInfo.id = message.id;
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error)
}
});
break;
}
default: {
console.log('unknow signal');
break;
}
}
});
}
_crontabTaskHanlder(taskConfig) {
let self = this;
let taskInfo = {
appName: self.appName,
name: taskConfig.name,
id: uuid.v4(),
expires: taskConfig.timeout
};
self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {
self._startTask(taskInfo, function (error) {
if(error) {
console.log(error);
} else {
let child = fork(path.join(self.taskHomePath, taskConfig.file));
self.childsInstance[taskInfo.name + taskInfo.id] = child;
child.on('exit', function (code) {
// 子進程退出 code 為 0,代表正常退出,這時可以向監(jiān)控平臺發(fā)送任務(wù)已完成信號
if(code === 0) {
self._endTask(taskInfo, function (error) {
if (error) {
console.log(error);
}
});
}
});
}
});
}, undefined, true);
}
_startTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_endTask(taskInfo, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',
method: 'POST',
timeout: 5000,
form: taskInfo
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
_registerServer(taskClientConfig, callback) {
let requestOptions = {
uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',
method: 'POST',
timeout: 5000,
form: taskClientConfig
};
request(requestOptions, function (error, response, body) {
return callback(error, body);
});
}
}
module.exports = TaskStatusManagementClient;
// 調(diào)度系統(tǒng)的使用
'use strict';
const config = {
// 監(jiān)控平臺信息,必須
management: {
host: 'http://127.0.0.1',
port: 3000
},
// 當前服務(wù)器信息,必須
server: {
host: 'http://127.0.0.1',
port: 3001
},
// 當前服務(wù)器任務(wù)文件地址(絕對路徑),必須
taskHomePath: path.join(__dirname, 'tasks'),
// 任務(wù)配置信息,必須
tasks:[{
name: 'exampleTaskOne',
type: 'daemon',
file: 'example_task_one.js',
timeout: 10000
}, {
name: 'exampleTaskTwo',
type: 'crontab',
file: 'example_task_two.js',
crontabRule: '*/20 * * * * *', // 任務(wù)類型為 crontab 是此字段為必須
timeout: 10000
}]
};
let taskStatusManagementClient = new TaskStatusManagementClient(config);
taskStatusManagementClient.start();
監(jiān)控平臺 HTTP 服務(wù)器比較簡單,三個 API,用來將服務(wù)器信息、任務(wù)開始狀態(tài)、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫,這里就不在贅述。
// 監(jiān)控平臺任務(wù)狀態(tài)監(jiān)管
'use strict';
const async = require('async');
const config = require('config');
const mail = require('nodemailer').createTransport({
service: 'your email service',
auth: {
user: 'your email username',
pass: 'your email password'
}
});
const request = require('request');
const logger = require('../log/logger');
const moment = require('../libs/moment');
// 一下三個為數(shù)據(jù)庫操作連接,不需要關(guān)注內(nèi)部代碼,不影響代碼閱讀
const TaskResult = require('../models/task_result');
const TaskStatus = require('../models/task_status');
const TaskServer = require('../models/task_server');
/**
* 發(fā)送告警郵件
* @param {String} to 郵件接收者
* @param {String} subject 郵件主題
* @param {String} message 郵件內(nèi)容
* @param {Function} callback
*/
function sendWarningMessage(to, subject, message, callback) {
var options = {
'from': 'xxx@xxx.xxx',
'to': to,
'subject': subject,
'text': message,
'encoding': 'UTF-8'
};
mail.sendMail(options, function(error) {
console.log('send message to: ' + to);
return callback(error);
});
}
/**
* 處理執(zhí)行成功任務(wù)
* @param {String} task 任務(wù)狀態(tài)對象
* @param {Function} callback
*/
function handleSingleSucceedTask(task, callback) {
TaskResult.increaseTaskSuccessCount(task.name, function (error) {
return callback(error);
});
}
/**
* 處理執(zhí)行失敗的任務(wù)
* @param {String} task 任務(wù)狀態(tài)對象
* @param {Function} callback
*/
function handleSingleErrorTask(task, callback) {
async.waterfall([
// 增加任務(wù)執(zhí)行失敗次數(shù)
async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),
async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),
function (count, callback) {
callback(undefined);
// 給調(diào)度系統(tǒng)發(fā)信號
TaskServer.findTaskServerHost(task.name, function (error, serverHost) {
// 超過限制失敗次數(shù),發(fā)送告警郵件
if (count >= config.get('task.limitErrorCount')) {
sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`, function (error) {
if (error) {
logger.error(error);
}
});
TaskResult.resetTaskErrorCount(task.name, function (error) {
if (error) {
logger.error(error);
}
});
}
if (!error && serverHost) {
// send 'error' signal to task server
request({
uri: serverHost,
method: 'POST',
timeout: 5000,
form: {
name: task.name,
id: task.taskId,
pid: task.pid
}
}, function (error, response, body) {
if (error) {
logger.error(error);
}
});
}
});
}
], function (error) {
return callback(error);
});
}
/**
* 獲取所有已經(jīng)到達預(yù)定超時時間的任務(wù)并處理
*/
function process() {
let currentTime = new Date().getTime();
async.waterfall([
async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),
// 并行處理每個任務(wù)
function (tasks, callback) {
if (tasks.length > 0) {
async.parallel(tasks.map(function (task) {
return function (callback) {
// 超時后任務(wù)狀態(tài)仍然為 running,代表任務(wù)執(zhí)行失敗
if (task.status === 'running') {
handleSingleErrorTask(task, function (error) {
return callback(error);
});
} else {
handleSingleSucceedTask(task, function (error) {
return callback(error);
});
}
}
}), function (error) {
return callback(error);
});
} else {
return callback(new Error('no tasks need to exec'));
}
},
// 刪除已經(jīng)處理完成的任務(wù)
async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)
], function (error) {
if (error && error.message === 'no tasks need to exec') {
let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));
console.log(`no tasks, delay ${ delay }`);
setTimeout(process, config.get('task.noTaskDelayTime'));
} else if (error) {
logger.error(error);
process();
} else {
process();
}
});
}
module.exports = process;
文章來源:DuanPengfei's Blog