Лучший способ выполнить параллельную обработку в Node.js

Я пытаюсь написать небольшое приложение node, которое будет искать и анализировать большое количество файлов в файловой системе. Чтобы ускорить поиск, мы пытаемся использовать какое-то сокращение карты. План был бы следующим упрощенным сценарием:

  • Веб-запрос приходит с поисковым запросом
  • запускается 3 процесса, каждый из которых получает 1000 (разных) файлов
  • Как только процесс завершится, он вернется к основному потоку
  • Как только все процессы завершатся, основной поток будет продолжаться, возвращая комбинированный результат как результат JSON

Вопросы, которые у меня есть, следующие: Это можно сделать в Node? Каков рекомендуемый способ сделать это?

Я играл в fiddling, но не последую дальше, следуя примеру Process:

Инициатор:

function Worker() { return child_process.fork("myProcess.js); }
for(var i = 0; i < require('os').cpus().length; i++){
        var process = new Worker();
        process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}

myProcess.js

process.on('message', function(msg) {
    var valuesToReturn = [];
    // Do file reading here
    //How would I return valuesToReturn?
    process.exit(0);
}

Несколько побочных элементов:

  • Я знаю, что количество процессов должно зависеть от количества процессоров на сервере.
  • Я также знаю ограничения скорости в файловой системе. Рассмотрите это как доказательство концепции, прежде чем переместить это в базу данных или экземпляр Lucene: -)

Ответы

Ответ 1

Должно быть выполнимо. В качестве простого примера:

// parent.js
var child_process = require('child_process');

var numchild  = require('os').cpus().length;
var done      = 0;

for (var i = 0; i < numchild; i++){
  var child = child_process.fork('./child');
  child.send((i + 1) * 1000);
  child.on('message', function(message) {
    console.log('[parent] received message from child:', message);
    done++;
    if (done === numchild) {
      console.log('[parent] received all results');
      ...
    }
  });
}

// child.js
process.on('message', function(message) {
  console.log('[child] received message from server:', message);
  setTimeout(function() {
    process.send({
      child   : process.pid,
      result  : message + 1
    });
    process.disconnect();
  }, (0.5 + Math.random()) * 5000);
});

Таким образом, родительский процесс генерирует X число дочерних процессов и передает им сообщение. Он также устанавливает обработчик событий для прослушивания любых сообщений, отправленных обратно от ребенка (например, с результатом).

Дочерний процесс ожидает сообщений от родителя и начинает обработку (в этом случае он просто запускает таймер со случайным таймаутом, чтобы имитировать проделанную работу). После этого он возвращает результат в родительский процесс и использует process.disconnect(), чтобы отключиться от родителя (в основном останавливая дочерний процесс).

Родительский процесс отслеживает количество запущенных дочерних процессов и число из них, которые отправили результат. Когда эти числа равны, родитель получил все результаты от дочерних процессов, чтобы он мог объединить все результаты и вернуть результат JSON.

Ответ 2

Для распределенной задачи, подобной этой, я использовал zmq, и он работал очень хорошо. Я дам вам аналогичную проблему, с которой столкнулся, и попытался решить с помощью процессов (но не удалось.), А затем повернулся к zmq.

Использование bcrypt или дорогостоящего алгоритма хэширования является разумным, но он блокирует процесс node примерно на 0,5 секунды. Нам пришлось разгрузить это на другой сервер, и в качестве быстрого исправления я использовал, по сути, то, что вы сделали. Запустите дочерний процесс и отправьте ему сообщения и отвечать. Единственная проблема, с которой мы столкнулись, по какой-то причине наш дочерний процесс привязывает целое ядро, когда он абсолютно не работает. (Я до сих пор не понял, почему это произошло, мы провели трассировку, и оказалось, что epoll не работает на stdout/stdin. Это также произойдет только на наших Linux-боксах и отлично работает на OSX.)

изменить:

Пиннинг ядра был зафиксирован в https://github.com/joyent/libuv/commit/12210fe и был связан с https://github.com/joyent/node/issues/5504, поэтому, если вы столкнулись с проблемой, и вы используете centos + kernel v2.6.32: update node или обновите свое ядро!

Независимо от проблем, с которыми я столкнулся с child_process.fork(), здесь отличный шаблон, который я всегда использую

клиент:

var child_process = require('child_process');

function FileParser() {

    this.__callbackById = [];
    this.__callbackIdIncrement = 0;
    this.__process = child_process.fork('./child');
    this.__process.on('message', this.handleMessage.bind(this));

}

FileParser.prototype.handleMessage = function handleMessage(message) {

    var error = message.error;
    var result = message.result;
    var callbackId = message.callbackId;
    var callback = this.__callbackById[callbackId];

    if (! callback) {
        return;
    }
    callback(error, result);
    delete this.__callbackById[callbackId];

};

FileParser.prototype.parse = function parse(data, callback) {

    this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
    this.__callbackById[this.__callbackIdIncrement] = callback;
    this.__process.send({
        data: data, // optionally you could pass in the path of the file, and open it in the child process.
        callbackId: this.__callbackIdIncrement
    });

};

module.exports = FileParser;

дочерний процесс:

process.on('message', function(message) {

    var callbackId = message.callbackId;
    var data = message.data;
    function respond(error, response) {
        process.send({
            callbackId: callbackId,
            error: error,
            result: response
        });
    }
    // parse data..
    respond(undefined, "computed data");
});

Нам также нужен шаблон для синхронизации различных процессов, когда каждый процесс завершает свою задачу, он будет реагировать на нас, и мы увеличим счетчик для каждого завершаемого процесса, а затем вызываем обратный вызов Семафора, когда мы "Мы попали в счет, который хотим.

function Semaphore(wait, callback) {

    this.callback = callback;
    this.wait = wait;
    this.counted = 0;

}

Semaphore.prototype.signal = function signal() {
    this.counted++;
    if (this.counted >= this.wait) {
        this.callback();
    }
}

module.exports = Semaphore;

здесь используется случай, который связывает все вышеприведенные шаблоны вместе:

var FileParser = require('./FileParser');
var Semaphore  = require('./Semaphore');

var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
    var fileParser = new FileParser();
    arrFileParsers.push(fileParser);
}

function getFiles() {
    return ["file", "file"];
}

var arrResults = [];
function onAllFilesParsed() {
    console.log('all results completed', JSON.stringify(arrResults));
}

var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
    var arrFiles  = getFiles(); // you need to decide how to split the files into 1k chunks
    fileParser.parse(arrFiles, function (error, result) {
        arrResults.push(result);
        lock.signal();
    });
});

В конце концов я использовал http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern, где клиент использовал клиент nodejs zmq, а работники/брокер были написаны на C. Это позволило нам масштабировать это на нескольких машинах, а не только на локальной машине с подпроцессами.