Идиоматический способ ожидания нескольких обратных вызовов в Node.js
Предположим, вам нужно выполнить некоторые операции, зависящие от некоторого временного файла. поскольку
мы говорим о Node здесь, эти операции, очевидно, асинхронны.
Какой идиоматический способ дождаться завершения всех операций, чтобы
знаете, когда временный файл можно удалить?
Вот какой код показывает, что я хочу сделать:
do_something(tmp_file_name, function(err) {});
do_something_other(tmp_file_name, function(err) {});
fs.unlink(tmp_file_name);
Но если я напишу его таким образом, третий вызов может быть выполнен до первых двух
получите возможность использовать файл. Мне нужно каким-то образом гарантировать, что первые два
звонки уже завершены (вызывается их обратные вызовы), прежде чем перемещаться без вложенности
(и делает их синхронными на практике).
Я думал об использовании излучателей событий в обратных вызовах и регистрации счетчика
как приемник. Счетчик получит готовые события и подсчитает, сколько
операции продолжались. Когда последний закончен, он удалит
файл. Но есть риск состояния гонки, и я не уверен, что это
обычно, как это делается.
Как люди Node решают эту проблему?
Ответы
Ответ 1
Update:
Теперь я бы посоветовал посмотреть:
-
Promises
Объект Promise используется для отложенных и асинхронных вычислений. Обещание представляет собой операцию, которая еще не завершена, но ожидаемый в будущем.
Популярная библиотека promises bluebird. A советовал бы взглянуть на почему promises.
Вы должны использовать promises, чтобы включить это:
fs.readFile("file.json", function (err, val) {
if (err) {
console.error("unable to read file");
}
else {
try {
val = JSON.parse(val);
console.log(val.success);
}
catch (e) {
console.error("invalid json in file");
}
}
});
В это:
fs.readFileAsync("file.json").then(JSON.parse).then(function (val) {
console.log(val.success);
})
.catch(SyntaxError, function (e) {
console.error("invalid json in file");
})
.catch(function (e) {
console.error("unable to read file");
});
-
генераторы: Например, через co.
Эффективность потока управления на основе генератора для узлов и браузера, используя promises, позволяя вам писать неблокирующий код с хорошим способом.
var co = require('co');
co(function *(){
// yield any promise
var result = yield Promise.resolve(true);
}).catch(onerror);
co(function *(){
// resolve multiple promises in parallel
var a = Promise.resolve(1);
var b = Promise.resolve(2);
var c = Promise.resolve(3);
var res = yield [a, b, c];
console.log(res);
// => [1, 2, 3]
}).catch(onerror);
// errors can be try/catched
co(function *(){
try {
yield Promise.reject(new Error('boom'));
} catch (err) {
console.error(err.message); // "boom"
}
}).catch(onerror);
function onerror(err) {
// log any uncaught errors
// co will not throw any errors you do not handle!!!
// HANDLE ALL YOUR ERRORS!!!
console.error(err.stack);
}
Если я правильно понимаю, я думаю, вам стоит взглянуть на очень хорошую библиотеку async. Вы должны особенно взглянуть на series. Просто копия из фрагментов страницы github:
async.series([
function(callback){
// do some stuff ...
callback(null, 'one');
},
function(callback){
// do some more stuff ...
callback(null, 'two');
},
],
// optional callback
function(err, results){
// results is now equal to ['one', 'two']
});
// an example using an object instead of an array
async.series({
one: function(callback){
setTimeout(function(){
callback(null, 1);
}, 200);
},
two: function(callback){
setTimeout(function(){
callback(null, 2);
}, 100);
},
},
function(err, results) {
// results is now equals to: {one: 1, two: 2}
});
Как плюс, эта библиотека также может запускаться в браузере.
Ответ 2
Самый простой способ увеличить счетчик целых чисел при запуске операции async, а затем в обратном вызове уменьшить счетчик. В зависимости от сложности обратный вызов может проверять счетчик на нуль и затем удалять файл.
Немного сложнее было бы поддерживать список объектов, и каждый объект имел бы какие-либо атрибуты, необходимые для идентификации операции (это может быть даже вызов функции), а также код состояния. Обратные вызовы задали бы код состояния для завершения.
Тогда у вас будет цикл, который ждет (используя process.nextTick
) и проверяет, завершены ли все задачи. Преимущество этого метода над счетчиком заключается в том, что если все выдающиеся задачи могут быть выполнены, прежде чем все задания будут выпущены, техника счетчика заставит вас преждевременно удалить файл.
Ответ 3
// simple countdown latch
function CDL(countdown, completion) {
this.signal = function() {
if(--countdown < 1) completion();
};
}
// usage
var latch = new CDL(10, function() {
console.log("latch.signal() was called 10 times.");
});
Ответ 4
Нет "родного" решения, но есть миллионные библиотеки управления потоками для node. Вам может понадобиться Step:
Step(
function(){
do_something(tmp_file_name, this.parallel());
do_something_else(tmp_file_name, this.parallel());
},
function(err) {
if (err) throw err;
fs.unlink(tmp_file_name);
}
)
Или, как предположил Майкл, счетчики могут быть более простым решением. Взгляните на этот макет семафора. Вы бы использовали его следующим образом:
do_something1(file, queue('myqueue'));
do_something2(file, queue('myqueue'));
queue.done('myqueue', function(){
fs.unlink(file);
});
Ответ 5
Я хотел бы предложить другое решение, которое использует скорость и эффективность парадигмы программирования в самом ядре событий Node:.
Все, что вы можете сделать с помощью Promises или модулей, предназначенных для управления потоком-контролем, например async
, может быть выполнено с помощью событий и простой государственной машины, что, по моему мнению, предлагает методологию, которая, возможно, чем другие варианты.
Например, предположим, что вы хотите суммировать длину нескольких файлов параллельно:
const EventEmitter = require('events').EventEmitter;
// simple event-driven state machine
const sm = new EventEmitter();
// running state
let context={
tasks: 0, // number of total tasks
active: 0, // number of active tasks
results: [] // task results
};
const next = (result) => { // must be called when each task chain completes
if(result) { // preserve result of task chain
context.results.push(result);
}
// decrement the number of running tasks
context.active -= 1;
// when all tasks complete, trigger done state
if(!context.active) {
sm.emit('done');
}
};
// operational states
// start state - initializes context
sm.on('start', (paths) => {
const len=paths.length;
console.log(`start: beginning processing of ${len} paths`);
context.tasks = len; // total number of tasks
context.active = len; // number of active tasks
sm.emit('forEachPath', paths); // go to next state
});
// start processing of each path
sm.on('forEachPath', (paths)=>{
console.log(`forEachPath: starting ${paths.length} process chains`);
paths.forEach((path) => sm.emit('readPath', path));
});
// read contents from path
sm.on('readPath', (path) => {
console.log(` readPath: ${path}`);
fs.readFile(path,(err,buf) => {
if(err) {
sm.emit('error',err);
return;
}
sm.emit('processContent', buf.toString(), path);
});
});
// compute length of path contents
sm.on('processContent', (str, path) => {
console.log(` processContent: ${path}`);
next(str.length);
});
// when processing is complete
sm.on('done', () => {
const total = context.results.reduce((sum,n) => sum + n);
console.log(`The total of ${context.tasks} files is ${total}`);
});
// error state
sm.on('error', (err) => { throw err; });
// ======================================================
// start processing - ok, let go
// ======================================================
sm.emit('start', ['file1','file2','file3','file4']);
Будет выводиться:
start: beginning processing of 4 paths
forEachPath: starting 4 process chains
readPath: file1
readPath: file2
processContent: file1
readPath: file3
processContent: file2
processContent: file3
readPath: file4
processContent: file4
The total of 4 files is 4021
Обратите внимание, что упорядочение задач цепочки процессов зависит от загрузки системы.
Вы можете представить программный поток как:
start -> forEachPath -+-> readPath1 -> processContent1 -+-> done
+-> readFile2 -> processContent2 -+
+-> readFile3 -> processContent3 -+
+-> readFile4 -> processContent4 -+
Для повторного использования было бы тривиально создать модуль для поддержки различных шаблонов управления потоком, то есть рядов, параллельных, пакетных, пока, и так далее.
Ответ 6
Простейшим решением является запуск do_something * и отсоединение последовательности следующим образом:
do_something(tmp_file_name, function(err) {
do_something_other(tmp_file_name, function(err) {
fs.unlink(tmp_file_name);
});
});
Если по соображениям производительности вы хотите выполнять do_something() и do_something_other() параллельно, я предлагаю сохранить его простым и идти этим путем.
Ответ 7
Wait.for https://github.com/luciotato/waitfor
используя Wait.for:
var wait=require('wait.for');
...in a fiber...
wait.for(do_something,tmp_file_name);
wait.for(do_something_other,tmp_file_name);
fs.unlink(tmp_file_name);
Ответ 8
С чистыми Обещаниями это может быть немного более грязно, но если вы используете Отложенные Обещания, тогда это не так уж и плохо:
Установка:
npm install --save @bitbar/deferred-promise
Измените свой код:
const DeferredPromise = require('@bitbar/deferred-promise');
const promises = [
new DeferredPromise(),
new DeferredPromise()
];
do_something(tmp_file_name, (err) => {
if (err) {
promises[0].reject(err);
} else {
promises[0].resolve();
}
});
do_something_other(tmp_file_name, (err) => {
if (err) {
promises[1].reject(err);
} else {
promises[1].resolve();
}
});
Promise.all(promises).then( () => {
fs.unlink(tmp_file_name);
});