Лучший способ выполнить параллельную обработку в Node.js
Я пытаюсь написать небольшое приложение node, которое будет искать и анализировать большое количество файлов в файловой системе.
Чтобы ускорить поиск, мы пытаемся использовать какое-то сокращение карты. План был бы следующим упрощенным сценарием:
- Веб-запрос приходит с поисковым запросом
- запускается 3 процесса, каждый из которых получает 1000 (разных) файлов
- Как только процесс завершится, он вернется к основному потоку
- Как только все процессы завершатся, основной поток будет продолжаться, возвращая комбинированный результат как результат JSON
Вопросы, которые у меня есть, следующие:
Это можно сделать в Node?
Каков рекомендуемый способ сделать это?
Я играл в fiddling, но не последую дальше, следуя примеру Process:
Инициатор:
function Worker() { return child_process.fork("myProcess.js); }
for(var i = 0; i < require('os').cpus().length; i++){
var process = new Worker();
process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}
myProcess.js
process.on('message', function(msg) {
var valuesToReturn = [];
// Do file reading here
//How would I return valuesToReturn?
process.exit(0);
}
Несколько побочных элементов:
- Я знаю, что количество процессов должно зависеть от количества процессоров на сервере.
- Я также знаю ограничения скорости в файловой системе. Рассмотрите это как доказательство концепции, прежде чем переместить это в базу данных или экземпляр Lucene: -)
Ответы
Ответ 1
Должно быть выполнимо. В качестве простого примера:
// parent.js
var child_process = require('child_process');
var numchild = require('os').cpus().length;
var done = 0;
for (var i = 0; i < numchild; i++){
var child = child_process.fork('./child');
child.send((i + 1) * 1000);
child.on('message', function(message) {
console.log('[parent] received message from child:', message);
done++;
if (done === numchild) {
console.log('[parent] received all results');
...
}
});
}
// child.js
process.on('message', function(message) {
console.log('[child] received message from server:', message);
setTimeout(function() {
process.send({
child : process.pid,
result : message + 1
});
process.disconnect();
}, (0.5 + Math.random()) * 5000);
});
Таким образом, родительский процесс генерирует X число дочерних процессов и передает им сообщение. Он также устанавливает обработчик событий для прослушивания любых сообщений, отправленных обратно от ребенка (например, с результатом).
Дочерний процесс ожидает сообщений от родителя и начинает обработку (в этом случае он просто запускает таймер со случайным таймаутом, чтобы имитировать проделанную работу). После этого он возвращает результат в родительский процесс и использует process.disconnect()
, чтобы отключиться от родителя (в основном останавливая дочерний процесс).
Родительский процесс отслеживает количество запущенных дочерних процессов и число из них, которые отправили результат. Когда эти числа равны, родитель получил все результаты от дочерних процессов, чтобы он мог объединить все результаты и вернуть результат JSON.
Ответ 2
Для распределенной задачи, подобной этой, я использовал zmq, и он работал очень хорошо. Я дам вам аналогичную проблему, с которой столкнулся, и попытался решить с помощью процессов (но не удалось.), А затем повернулся к zmq.
Использование bcrypt или дорогостоящего алгоритма хэширования является разумным, но он блокирует процесс node примерно на 0,5 секунды. Нам пришлось разгрузить это на другой сервер, и в качестве быстрого исправления я использовал, по сути, то, что вы сделали. Запустите дочерний процесс и отправьте ему сообщения и
отвечать. Единственная проблема, с которой мы столкнулись, по какой-то причине наш дочерний процесс привязывает целое ядро, когда он абсолютно не работает. (Я до сих пор не понял, почему это произошло, мы провели трассировку, и оказалось, что epoll не работает на stdout/stdin. Это также произойдет только на наших Linux-боксах и отлично работает на OSX.)
изменить:
Пиннинг ядра был зафиксирован в https://github.com/joyent/libuv/commit/12210fe и был связан с https://github.com/joyent/node/issues/5504, поэтому, если вы столкнулись с проблемой, и вы используете centos + kernel v2.6.32: update node или обновите свое ядро!
Независимо от проблем, с которыми я столкнулся с child_process.fork(), здесь отличный шаблон, который я всегда использую
клиент:
var child_process = require('child_process');
function FileParser() {
this.__callbackById = [];
this.__callbackIdIncrement = 0;
this.__process = child_process.fork('./child');
this.__process.on('message', this.handleMessage.bind(this));
}
FileParser.prototype.handleMessage = function handleMessage(message) {
var error = message.error;
var result = message.result;
var callbackId = message.callbackId;
var callback = this.__callbackById[callbackId];
if (! callback) {
return;
}
callback(error, result);
delete this.__callbackById[callbackId];
};
FileParser.prototype.parse = function parse(data, callback) {
this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
this.__callbackById[this.__callbackIdIncrement] = callback;
this.__process.send({
data: data, // optionally you could pass in the path of the file, and open it in the child process.
callbackId: this.__callbackIdIncrement
});
};
module.exports = FileParser;
дочерний процесс:
process.on('message', function(message) {
var callbackId = message.callbackId;
var data = message.data;
function respond(error, response) {
process.send({
callbackId: callbackId,
error: error,
result: response
});
}
// parse data..
respond(undefined, "computed data");
});
Нам также нужен шаблон для синхронизации различных процессов, когда каждый процесс завершает свою задачу, он будет реагировать на нас, и мы увеличим счетчик для каждого завершаемого процесса, а затем вызываем обратный вызов Семафора, когда мы "Мы попали в счет, который хотим.
function Semaphore(wait, callback) {
this.callback = callback;
this.wait = wait;
this.counted = 0;
}
Semaphore.prototype.signal = function signal() {
this.counted++;
if (this.counted >= this.wait) {
this.callback();
}
}
module.exports = Semaphore;
здесь используется случай, который связывает все вышеприведенные шаблоны вместе:
var FileParser = require('./FileParser');
var Semaphore = require('./Semaphore');
var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
var fileParser = new FileParser();
arrFileParsers.push(fileParser);
}
function getFiles() {
return ["file", "file"];
}
var arrResults = [];
function onAllFilesParsed() {
console.log('all results completed', JSON.stringify(arrResults));
}
var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
var arrFiles = getFiles(); // you need to decide how to split the files into 1k chunks
fileParser.parse(arrFiles, function (error, result) {
arrResults.push(result);
lock.signal();
});
});
В конце концов я использовал http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern, где клиент использовал клиент nodejs zmq, а работники/брокер были написаны на C. Это позволило нам масштабировать это на нескольких машинах, а не только на локальной машине с подпроцессами.