Как записать живой набор данных на диск с асинхронным вводом-выводом?
Я новичок в разработке в node.js(хотя и относительно опытный на клиентском javascript), и у меня много вопросов о хороших практиках при работе с асинхронными операциями в node.js.
Моя конкретная проблема (хотя я думаю, что это довольно общая тема) заключается в том, что у меня есть приложение node.js(работает на малине Pi), которое записывает показания от нескольких датчиков температуры каждые 10 секунд до в структуре данных памяти. Это прекрасно работает. Данные накапливаются с течением времени в памяти и, поскольку они накапливаются и достигают определенного порога размера, данные регулярно стареют (сохраняя только последние N дней данных), чтобы он не превышал определенный размер. Эти данные температуры используются для управления некоторыми другими устройствами.
Затем у меня есть отдельный таймер с интервалом, который так часто записывает эти данные на диск (чтобы сохранить его, если процесс выходит из строя). Я использую async node.js(fs.open()
, fs.write()
и fs.close()
) disk IO для записи данных на диск.
И, из-за асинхронного характера дискового ввода-вывода, мне приходит в голову, что сама структура данных, которую я пытаюсь записать на диск, может быть модифицирована прямо посреди меня, записывая ее на диск. Возможно, это будет плохо. Если данные добавляются только к структуре данных при записи на диск, это фактически не вызовет проблемы с тем, как я пишу данные, но есть некоторые обстоятельства, когда более ранние данные могут быть изменены при записи новых данных и это действительно будет бесполезно с целостностью того, что я нахожусь в середине записи на диск.
Я могу придумать всевозможные несколько уродливых гарантий, которые я мог бы добавить в свой код, например:
- Переключитесь на синхронный ввод-вывод, чтобы записать данные на диск (на самом деле не хотите делать это для ответа на сервер).
- Установите флаг, когда я начал записывать данные и не записывал никаких новых данных, пока этот флаг установлен (заставляет меня потерять запись данных во время записи).
- Более сложные версии опции 2, где я устанавливаю флаг, и когда установлен флаг, новые данные поступают в отдельную временную структуру данных, которая, когда файл IO выполняется, затем объединяется с реальными данными (выполнимо, но кажется некрасиво).
- Сделайте копию моментальных копий исходных данных и не спешите писать эту копию на диск, зная, что никто не будет изменять копию. Я не хочу этого делать, потому что набор данных относительно велик, и я в ограниченной среде памяти (малиновый PI).
Итак, мой вопрос заключается в том, какие шаблоны проектирования для записи большого набора данных с асинхронным IO, когда другие операции могут захотеть изменить эти данные во время асинхронного ввода-вывода? Существуют ли более общие способы решения моей проблемы, чем конкретные описанные выше работы?
Ответы
Ответ 1
Ваша проблема синхронизация данных. Традиционно это решается с помощью locks/mutexes, но javascript/ node на самом деле не имеет ничего подобного встроенному.
Итак, как мы решаем это в node? Мы используем очереди. Лично я использую функцию queue из асинхронный модуль.
Очереди работают, сохраняя список задач, которые необходимо выполнить, и выполняйте эти задачи только в том порядке, в котором они добавлены в очередь, после завершения предыдущей задачи (аналогично вашему варианту 3).
![queue animation]()
Примечание. Метод очереди асинхронного модуля может фактически запускать несколько задач одновременно (например, анимация выше показывает), но, поскольку мы говорим о синхронизации данных здесь, мы этого не хотим. К счастью, мы можем сказать, чтобы он просто запускал по одному.
В вашей конкретной ситуации то, что вы хотите сделать, это настроить очередь, которая может выполнять два типа задач:
- Изменить структуру данных
- Напишите вашу структуру данных на диск
Всякий раз, когда вы получаете новые данные от ваших температурных зондов, добавьте задачу в свою очередь, чтобы изменить структуру данных с помощью этих новых данных. Затем, когда срабатывает ваш интервальный таймер, добавьте задачу в свою очередь, которая записывает вашу структуру данных на диск.
Поскольку очередь будет запускать только одну задачу за раз, в том порядке, в котором они добавлены в очередь, она гарантирует, что вы никогда не будете изменять свою структуру данных в памяти во время записи данных на диск.
Очень простая реализация может выглядеть так:
var dataQueue = async.queue(function(task, callback) {
if (task.type === "newData") {
memoryStore.add(task.data); // modify your data structure however you do it now
callback(); // let the queue know the task is done; you can pass an error here as usual if needed
} else if (task.type === "writeData") {
fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
// error handling
callback(err); // let the queue know the task is done
})
} else {
callback(new Error("Unknown Task")); // just in case we get a task we don't know about
}
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time
// call when you get new probe data
funcion addNewData(data) {
dataQueue.push({task: "newData", data: data}, function(err) {
// called when the task is complete; optional
});
}
// write to disk every 5 minutes
setInterval(function() {
dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
// called when the task is complete; optional
});
}, 18000);
Также обратите внимание, что теперь вы можете добавлять данные в свою структуру данных асинхронно. Скажем, вы добавляете новый зонд, который запускает событие, когда изменяется его значение. Вы можете просто addNewData(data)
, как и в случае с существующими пробниками, и не беспокоиться о том, что это противоречит изменениям в процессе или записи на диск (это действительно происходит, если вы начинаете писать в базу данных, а не в хранилище данных в памяти).
Обновление: Более элегантная реализация с использованием bind()
Идея состоит в том, что вы используете bind()
для привязки аргументов к функции, а затем нажмите новую связанную функцию, которая возвращает bind()
в очередь. Таким образом, вам не нужно вставлять какой-либо пользовательский объект в очередь, которую он должен интерпретировать; вы можете просто дать ему функцию для вызова, все настройки с правильными аргументами уже есть. Единственное предостережение в том, что функция должна принимать обратный вызов в качестве последнего аргумента.
Это должно позволить вам использовать все существующие функции (возможно, с небольшими изменениями) и просто вставлять их в очередь, когда вам нужно убедиться, что они не запускаются одновременно.
Я собрал это вместе, чтобы проверить концепцию:
var async = require('async');
var dataQueue = async.queue(function(task, callback) {
// task is just a function that takes a callback; call it
task(callback);
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time
function storeData(data, callback) {
setTimeout(function() { // simulate async op
console.log('store', data);
callback(); // let the queue know the task is done
}, 50);
}
function writeToDisk(filename, callback) {
setTimeout(function() { // simulate async op
console.log('write', filename);
callback(); // let the queue know the task is done
}, 250);
}
// store data every second
setInterval(function() {
var data = {date: Date.now()}
var boundStoreData = storeData.bind(null, data);
dataQueue.push(boundStoreData, function(err) {
console.log('store complete', data.date);
})
}, 1000)
// write to disk every 2 seconds
setInterval(function() {
var filename = Date.now() + ".dat"
var boundWriteToDisk = writeToDisk.bind(null, filename);
dataQueue.push(boundWriteToDisk, function(err) {
console.log('write complete', filename);
});
}, 2000);
Ответ 2
Сначала - покажите практическое решение, а затем дайте понять, как и почему он работает:
var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));
chain = chain.then(function(){
return fs.writeAsync(...); // A
});
// some time in the future
chain = chain.then(function(){
return fs.writeAsync(...); // This will always execute after A is done
})
Поскольку вы отметили свой вопрос с помощью promises - стоит упомянуть, что promises решить эту (довольно сложную) проблему очень хорошо самостоятельно и сделать это довольно легко.
Проблема синхронизации данных называется проблемой производителя. Есть много способов решить синхронизацию в JavaScript - этот недавний фрагмент Q KrisKowal является хорошим показанием по этому вопросу.
Введите: Promises
Самый простой способ решить это с помощью promises - это объединить все с помощью единого обещания. Я знаю, что вы опытны с promises самостоятельно, но для более свежих читателей давайте вспомнить:
Promises являются абстракцией над понятием самого секвенирования. Обещание - это единый (читаемый дискретный) блок действий. Цепочка promises, очень похожая на ;
на некоторых языках, отмечает конец одной операции и начало следующего. promises в JavaScript абстрактно две основные вещи - понятие действия, требующее времени и исключительных условий.
Здесь есть "более высокая" абстракция, называемая монадой, а A + promises не строго соблюдают законы монады (для удобства) существуют реализации promises. promises аннотация определенного вида обработки, где монады абстрагируют понятие самой обработки, вы можете сказать, что обещание - это монада или, по крайней мере, что они являются монадическими.
Promises начинаются как отложенные, означая, что они представляют действие, которое уже было запущено, но еще не завершено. В какой-то момент они могут пройти разрешение, в течение которых они устанавливают в одном из двух состояний:
- Выполнено - указывает, что действие выполнено успешно.
- Отклонено - указывает, что действие не завершено успешно.
Как только обещание будет разрешено, оно больше не может изменить свое состояние. Так же, как вы можете продолжить ;
на следующей строке - вы можете продолжить обещание с помощью ключевого слова .then
, которое привязывает предыдущее действие к следующему.
Решение производителя - потребителя.
Традиционное решение проблемы производителя/потребителя может быть выполнено с помощью традиционных конструкций concurrency, таких как семафоры Дийкстры. Действительно, такое решение существует через promises или простые обратные вызовы, но я считаю, что мы можем сделать что-то подобное.
Вместо этого мы будем поддерживать запущенную программу и каждый раз добавлять к ней новые действия.
var fsQueue = Promise.resolve(); // start a new chain
// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
return fs.writeAsync(...);
});
// some other place
fsQueue = fsQueue.then(function(){
return fs.writeAsync(...);
});
Добавление действий в очередь гарантирует, что мы заказали синхронизацию, и действия будут выполняться только после того, как предыдущие завершены. Это самое простое решение для синхронизации этой проблемы и требует обертывания fs.asyncFunction
вызовов .then
их в вашу очередь.
Альтернативное решение будет использовать что-то похожее на "монитор" - мы можем гарантировать, что доступ согласован изнутри путем обертывания fs:
var fs = B.promisifyAll(require("fs")); // bluebird promisified fs
var syncFs = { // sync stands for synchronized, not synchronous
queue: B.resolve();
writeAsync = function(){
var args = arguments
return (queue = queue.then( // only execute later
return fs.writeAsync.apply(fs,arguments);
});
} // promisify other used functions similarly
};
Что создаст синхронизированные версии действий fs. Также возможно автоматизировать это (не проверено) с помощью чего-то подобного:
// assumes module is promisified and ignores nested functions
function synchronize(module){
var ret = {}, queue = B.resolve();
for(var fn in module){
ret[fn] = function(){
var args = arguments;
queue = queue.then(function(){
return module[fn].apply(module, args);
})
};
}
ret.queue = queue; // expose the queue for handling errors
return ret;
}
Какая должна быть версия модуля, которая синхронизирует все его действия. Обратите внимание, что мы получаем дополнительное преимущество: ошибки не подавляются, и файловая система не будет находиться в противоречивом состоянии, потому что действия не будут выполняться до тех пор, пока не будет обработана ошибка, из-за которой не будет выполнено действие.
Разве это не похоже на очередь?
Да! Очереди делают что-то очень похожее (что вы можете увидеть в другом ответе), предоставляя первую в первой структуре структуру действий. Очень похоже на программный код, который выполняется в этом порядке. promises - это просто более сильная сторона той же монеты, на мой взгляд.
Другой ответ также обеспечивает жизнеспособную возможность через очереди.
О предлагаемых подходах
Переключитесь на синхронный ввод-вывод, чтобы записать данные на диск (на самом деле не хотите делать это для ответа на сервер).
В то время как я согласен, что это самый простой - подход "монитора" для цепочки всех действий, которые необходимо синхронизировать в одной очереди, очень похож.
Установите флаг, когда я начал писать данные и не записывал никаких новых данных, пока этот флаг установлен (заставляет меня потерять запись данных во время записи).
Этот флаг эффективно является мьютеком. Если вы блокируете (или уступаете и помещаете действие в очередь), когда кто-то пытается сделать это, у вас есть реальный мьютекс, который содержит "гарантии мьютекса".
Повторная попытка с этим флагом и сохранение списка следующих действий для хранения флага на самом деле очень распространены в реализациях семафора - один пример находится в ядре linux.
Более сложные версии опции 2, где я устанавливаю флаг и когда установлен флаг, новые данные поступают в отдельную временную структуру данных, которая, когда файл IO выполняется, затем объединяется с реальными данными (выполнимо, но кажется некрасиво). Сделайте копию моментальных копий исходных данных и не спешите писать эту копию на диск, зная, что никто не будет изменять копию. Я не хочу этого делать, потому что набор данных относительно велик, и я в ограниченной среде памяти (малина PI).
Эти подходы обычно называются транзакционными обновлениями RCU, в некоторых случаях они очень современные и очень быстрые - например, для "проблемы писателей-писателей" (что очень похоже на то, что у вас есть). Родная поддержка для них ударила в ядре linux совсем недавно. Выполнение этого в определенных случаях на самом деле является жизнеспособным и эффективным, хотя в вашем случае это слишком сложно, как вы полагаете.
Итак, подведем итоги
- Это непростая задача, но интересная.
- К счастью, promises решить это довольно хорошо, они были построены именно для решения этой проблемы путем абстрагирования понятия последовательности.
Счастливое кодирование, проект Pi NodeJS звучит потрясающе. Дайте мне знать, если я смогу прояснить это дальше.