99热99这里只有精品6国产,亚洲中文字幕在线天天更新,在线观看亚洲精品国产福利片 ,久久久久综合网

歡迎加入QQ討論群258996829
麥子學院 頭像
蘋果6袋
6
麥子學院

Node.js 如何監(jiān)控任務(wù)狀態(tài)?

發(fā)布時間:2016-11-18 16:27  回復(fù):0  查看:2405   最后回復(fù):2016-11-18 16:27  

在實際生產(chǎn)環(huán)境中,避免不了有很多后臺運行的任務(wù)和定時任務(wù),對任務(wù)狀態(tài)的監(jiān)控與及時告警可以盡量減少程序出錯時對用戶造成的影響。針對常見的兩種任務(wù)類型:定時任務(wù)、守護進程內(nèi)批處理任務(wù),利用 Node.js child_process 實現(xiàn)了任務(wù)狀態(tài)的監(jiān)控、重啟與郵件告警。具體的實現(xiàn),本文將為大家進行詳細的分析,希望對大家學習Node.js有所幫助。

 

現(xiàn)在的互聯(lián)網(wǎng)已經(jīng)不是單機作戰(zhàn)的時代了,分布式部署是非常常見的方式,一個項目中的任務(wù)可能運行在多臺服務(wù)器上,我們的監(jiān)控平臺要做到重啟某個任務(wù)就需要知道任務(wù)運行的具體服務(wù)器,針對這一個問題我們需要獲取到任務(wù)與服務(wù)器關(guān)系的確切信息,所以每臺運行任務(wù)的服務(wù)器需要在啟動任務(wù)時向任務(wù)狀態(tài)管理平臺注冊自己的信息。

任務(wù)狀態(tài)的維護依賴于任務(wù)運行服務(wù)器的心跳上報,每個任務(wù)設(shè)置一個超時時間,在任務(wù)啟動時向任務(wù)狀態(tài)管理平臺發(fā)送開始運行信號,在任務(wù)運行結(jié)束后向管理平臺發(fā)送運行完成信號。任務(wù)管理平臺根據(jù)任務(wù)設(shè)置的超時時間,在超時后仍然沒有接收到任務(wù)完成信號則判定任務(wù)失敗,將任務(wù)失敗信號發(fā)送回任務(wù)運行的服務(wù)器。再有任務(wù)運行服務(wù)器自行處理,如重啟任務(wù)或者結(jié)束任務(wù)等。

根據(jù)以上的邏輯,實際需要就是在任務(wù)運行的服務(wù)器實現(xiàn)一個任務(wù)調(diào)度功能與 HTTP 服務(wù)器用來監(jiān)聽管理平臺發(fā)送的信號;在管理平臺這邊實現(xiàn)任務(wù)服務(wù)器信息注冊、任務(wù)狀態(tài)監(jiān)管與超時告警。文字表述比較晦澀,具體流程可以參考一下的流程圖。

Node.js 如何監(jiān)控任務(wù)狀態(tài)?

實現(xiàn)代碼

后續(xù)會把關(guān)鍵信息從代碼中抽離出來放到配置文件中,然后放到 GitHub 上,暫時以貼代碼的形式簡單展示一下。

 

// 任務(wù)運行服務(wù)器調(diào)度系統(tǒng)

'use strict';

 

// 內(nèi)建模塊

const fork = require('child_process').fork;

const path = require('path');

 

// 第三方模塊

const _ = require('lodash');

const CronJob = require('cron').CronJob;

const bodyParser = require('body-parser');

const express = require('express');

const request = require('request');

const uuid = require('uuid');

 

class TaskStatusManagementClient {

    /**

     * 初始化 TaskStatusClient

     * @param {Object} taskClientConfig 服務(wù)器配置信息

     */

    constructor(taskClientConfig) {

        this.taskClientConfig = taskClientConfig;

        this.taskHomePath = taskClientConfig.taskHomePath;

        this.childsInstance = {};

        this.crontabJobsInstance = {};

    }

 

    /**

     * start TaskStatusClient

     */

    start() {

        this._process();

    }

 

    _process() {

        let self = this;

 

        // 根據(jù)服務(wù)器配置信息啟動所有任務(wù)

        for(let taskConfig of self.taskClientConfig.tasks) {

            switch(taskConfig.type) {

                case 'daemon': {

                    self._daemonTaskHandler(taskConfig);

                    break;

                }

                case 'crontab': {

                    if(taskConfig.crontabRule) {

                        self._crontabTaskHanlder(taskConfig);

                    }

                    break;

                }

                default: {

                    console.log('unknow task type');

                    break;

                }

            }

        }

 

        // 在程序退出時結(jié)束所有子進程任務(wù)

        process.on('exit', function (code) {

            for(let child in self.childsInstance) {

                if(self.childsInstance.hasOwnProperty(child)) {

                    self.childsInstance[child].kill('SIGHUP');

                }

            }

        });

 

        // 啟動 HTTP 服務(wù)器,監(jiān)聽任務(wù)狀態(tài)監(jiān)控平臺發(fā)回的信號

        let app = express();

        app.use(bodyParser.json());

        app.use(bodyParser.urlencoded({ extended: false }));

        app.post('/', function (req, res) {

            let body;

            try {

                body = typeof req.body !== 'object' ? JSON.parse(req.body) : req.body;

            } catch (error) {

                return res.status(400).json({ message: 'invalid json' });

            }

            res.status(200).json({ message: 'ok' });

 

            // 收到任務(wù)狀態(tài)監(jiān)控平臺發(fā)回的信號后重啟任務(wù)

            let taskConfig = _.find(self.taskClientConfig.tasks, { name: body.name });

            let taskIdentifier = '';

            // daemon 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名

            // crontab 類型的任務(wù)在 childsInstance 中的 key 是任務(wù)名 任務(wù) ID

            switch (taskConfig.type) {

                case 'daemon': {

                    taskIdentifier = taskConfig.name;

                    self.childsInstance[taskIdentifier].kill('SIGHUP');

                    delete self.childsInstance[taskIdentifier];

                    if(self.handlers.daemonTaskHandler) {

                        self.handlers.daemonTaskHandler(taskConfig);

                    } else {

                        self._daemonTaskHandler(taskConfig);

                    }

                    break;

                }

                case 'crontab': {

                    taskIdentifier = taskConfig.name + taskConfig.id;

                    self.childsInstance[taskIdentifier].kill('SIGHUP');

                    delete self.childsInstance[taskIdentifier];

                    if(self.handlers.crontabTaskHandler) {

                        self.handlers.crontabTaskHandler(taskConfig);

                    } else {

                        self._crontabTaskHanlder(taskConfig);

                    }

                    break;

                }

                default: {

                    console.log('unknow task type');

                    break;

                }

            }

 

        });

        app.listen(self.taskClientConfig.server.port, function () {

            console.log('server start at: ' + self.taskClientConfig.server.port);

            self._registerServer(self.taskClientConfig, function (error, result) {

                if (error) {

                    console.log(error);

                }

            });

        });

    }

 

    _daemonTaskHandler(taskConfig) {

        let self = this;

        let taskInfo = {

            appName: self.appName,

            name: taskConfig.name,

            expires: taskConfig.timeout

        };

        let child = fork(path.join(self.taskHomePath,taskConfig.file));

        self.childsInstance[taskInfo.name] = child;

        child.on('message', function (message) {

            switch (message.signal) {

                case 'start': {

                    taskInfo.id = message.id;

                    self._startTask(taskInfo, function (error) {

                        if (error) {

                            console.log(error);

                        }

                    });

                    break;

                }

                case 'end': {

                    taskInfo.id = message.id;

                    self._endTask(taskInfo, function (error) {

                        if (error) {

                            console.log(error)

                        }

                    });

                    break;

                }

                default: {

                    console.log('unknow signal');

                    break;

                }

            }

        });

    }

 

    _crontabTaskHanlder(taskConfig) {

        let self = this;

        let taskInfo = {

            appName: self.appName,

            name: taskConfig.name,

            id: uuid.v4(),

            expires: taskConfig.timeout

        };

        self.crontabJobsInstance[taskInfo.name + taskInfo.id] = new CronJob(taskConfig.crontabRule, function () {

            self._startTask(taskInfo, function (error) {

                if(error) {

                    console.log(error);

                } else {

                    let child = fork(path.join(self.taskHomePath, taskConfig.file));

                    self.childsInstance[taskInfo.name + taskInfo.id] = child;

                    child.on('exit', function (code) {

                        // 子進程退出 code 為 0,代表正常退出,這時可以向監(jiān)控平臺發(fā)送任務(wù)已完成信號

                        if(code === 0) {

                            self._endTask(taskInfo, function (error) {

                                if (error) {

                                    console.log(error);

                                }

                            });

                        }

                    });

                }

            });

        }, undefined, true);

    }

 

    _startTask(taskInfo, callback) {

        let requestOptions = {

            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/start',

            method: 'POST',

            timeout: 5000,

            form: taskInfo

        };

        request(requestOptions, function (error, response, body) {

            return callback(error, body);

        });

    }

 

    _endTask(taskInfo, callback) {

        let requestOptions = {

            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/status/end',

            method: 'POST',

            timeout: 5000,

            form: taskInfo

        };

        request(requestOptions, function (error, response, body) {

            return callback(error, body);

        });

    }

 

    _registerServer(taskClientConfig, callback) {

        let requestOptions = {

            uri: this.taskClientConfig.management.host + ':' + this.taskClientConfig.management.port + '/tasks/servers',

            method: 'POST',

            timeout: 5000,

            form: taskClientConfig

        };

        request(requestOptions, function (error, response, body) {

            return callback(error, body);

        });

    }

}

 

module.exports = TaskStatusManagementClient;

// 調(diào)度系統(tǒng)的使用

'use strict';

 

const config = {

    // 監(jiān)控平臺信息,必須

    management: {

        host: 'http://127.0.0.1',

        port: 3000

    },

    // 當前服務(wù)器信息,必須

    server: {

        host: 'http://127.0.0.1',

        port: 3001

    },

    // 當前服務(wù)器任務(wù)文件地址(絕對路徑),必須

    taskHomePath: path.join(__dirname, 'tasks'),

    // 任務(wù)配置信息,必須

    tasks:[{

        name: 'exampleTaskOne',

        type: 'daemon',

        file: 'example_task_one.js',

        timeout: 10000

    }, {

        name: 'exampleTaskTwo',

        type: 'crontab',

        file: 'example_task_two.js',

        crontabRule: '*/20 * * * * *',  // 任務(wù)類型為 crontab 是此字段為必須

        timeout: 10000

    }]

};

let taskStatusManagementClient = new TaskStatusManagementClient(config);

taskStatusManagementClient.start();

 

監(jiān)控平臺 HTTP 服務(wù)器比較簡單,三個 API,用來將服務(wù)器信息、任務(wù)開始狀態(tài)、任務(wù)結(jié)束狀態(tài)寫入數(shù)據(jù)庫,這里就不在贅述。

 

// 監(jiān)控平臺任務(wù)狀態(tài)監(jiān)管

'use strict';

 

const async = require('async');

const config = require('config');

const mail = require('nodemailer').createTransport({

    service: 'your email service',

    auth: {

        user: 'your email username',

        pass: 'your email password'

    }

});

const request = require('request');

 

const logger = require('../log/logger');

const moment = require('../libs/moment');

// 一下三個為數(shù)據(jù)庫操作連接,不需要關(guān)注內(nèi)部代碼,不影響代碼閱讀

const TaskResult = require('../models/task_result');

const TaskStatus = require('../models/task_status');

const TaskServer = require('../models/task_server');

 

/**

 * 發(fā)送告警郵件

 * @param {String} to 郵件接收者

 * @param {String} subject 郵件主題

 * @param {String} message 郵件內(nèi)容

 * @param {Function} callback

 */

function sendWarningMessage(to, subject, message, callback) {

    var options = {

        'from': 'xxx@xxx.xxx',

        'to': to,

        'subject': subject,

        'text': message,

        'encoding': 'UTF-8'

    };

    mail.sendMail(options, function(error) {

        console.log('send message to: ' + to);

        return callback(error);

    });

}

 

/**

 * 處理執(zhí)行成功任務(wù)

 * @param {String} task 任務(wù)狀態(tài)對象

 * @param {Function} callback

 */

function handleSingleSucceedTask(task, callback) {

        TaskResult.increaseTaskSuccessCount(task.name, function (error) {

            return callback(error);

        });

}

 

/**

 * 處理執(zhí)行失敗的任務(wù)

 * @param {String} task 任務(wù)狀態(tài)對象

 * @param {Function} callback

 */

function handleSingleErrorTask(task, callback) {

    async.waterfall([

        // 增加任務(wù)執(zhí)行失敗次數(shù)

        async.apply(TaskResult.increaseTaskErrorCount.bind(TaskResult), task.name),

        async.apply(TaskResult.getTaskErrorCount.bind(TaskResult), task.name),

        function (count, callback) {

            callback(undefined);

            // 給調(diào)度系統(tǒng)發(fā)信號

            TaskServer.findTaskServerHost(task.name, function (error, serverHost) {

                // 超過限制失敗次數(shù),發(fā)送告警郵件

                if (count >= config.get('task.limitErrorCount')) {

                    sendWarningMessage(config.get('task.noticeUserEmailAddress'), '定時任務(wù)告警', `${ serverHost }: "${ task.name }" 執(zhí)行失敗超過預(yù)定失敗次數(shù)`,  function (error) {

                        if (error) {

                            logger.error(error);

                        }

                    });

                    TaskResult.resetTaskErrorCount(task.name, function (error) {

                        if (error) {

                            logger.error(error);

                        }

                    });

                }

                if (!error && serverHost) {

                    // send 'error' signal to task server

                    request({

                        uri: serverHost,

                        method: 'POST',

                        timeout: 5000,

                        form: {

                            name: task.name,

                            id: task.taskId,

                            pid: task.pid

                        }

                    }, function (error, response, body) {

                        if (error) {

                            logger.error(error);

                        }

                    });

                }

            });

        }

    ], function (error) {

        return callback(error);

    });

}

 

/**

 * 獲取所有已經(jīng)到達預(yù)定超時時間的任務(wù)并處理

 */

function process() {

    let currentTime = new Date().getTime();

    async.waterfall([

        async.apply(TaskStatus.getExpiredTasks.bind(TaskStatus), currentTime),

        // 并行處理每個任務(wù)

        function (tasks, callback) {

            if (tasks.length > 0) {

                async.parallel(tasks.map(function (task) {

                    return function (callback) {

                        // 超時后任務(wù)狀態(tài)仍然為 running,代表任務(wù)執(zhí)行失敗

                        if (task.status === 'running') {

                            handleSingleErrorTask(task, function (error) {

                                return callback(error);

                            });

                        } else {

                            handleSingleSucceedTask(task, function (error) {

                                return callback(error);

                            });

                        }

                    }

                }), function (error) {

                    return callback(error);

                });

            } else {

                return callback(new Error('no tasks need to exec'));

            }

        },

        // 刪除已經(jīng)處理完成的任務(wù)

        async.apply(TaskStatus.removeExpiredTasks.bind(TaskStatus), currentTime)

    ], function (error) {

        if (error && error.message === 'no tasks need to exec') {

            let delay = moment.millisecondToDayMinuteHourSecond(config.get('task.noTaskDelayTime'));

            console.log(`no tasks, delay ${ delay }`);

            setTimeout(process, config.get('task.noTaskDelayTime'));

        } else if (error) {

            logger.error(error);

            process();

        } else {

            process();

        }

    });

}

 

module.exports = process;

 

 

文章來源:DuanPengfei's Blog 


您還未登錄,請先登錄

熱門帖子

最新帖子

?