Ответ 1
Предварительное наблюдение: вы пытались получить желаемые результаты, используя несколько подходов. При сравнении подходов, которые вы использовали, сложность заключается в том, что они не все выполняют одинаковую работу. Если вы запускаете тесты для файлового дерева, которое содержит только обычные файлы, это дерево не содержит точек монтирования, вы, вероятно, сможете сравнить подходы справедливо, но когда вы начнете добавлять точки монтирования, символические ссылки и т.д., Вы можете получить различную статистику памяти и времени просто из-за того, что один подход исключает файлы, которые включает другой подход.
Первоначально я пытался readdirp
с использованием readdirp
, но, к сожалению, эта библиотека кажется мне readdirp
. Запустив его в моей системе, я получил противоречивые результаты. Один прогон выдаст 10 Мб данных, другой прогон с теми же входными параметрами выведет 22 Мб, затем я получу другое число и т.д. Я посмотрел на код и обнаружил, что он не учитывает возвращаемое значение push
:
_push(entry) {
if (this.readable) {
this.push(entry);
}
}
Согласно документации, метод push
может возвращать false
значение, и в этом случае поток Readable
должен прекратить производить данные и ждать _read
вызова _read
. readdirp
полностью игнорирует эту часть спецификации. Крайне важно обратить внимание на возвращаемое значение push
чтобы получить правильную обработку противодавления. Есть и другие вещи, которые казались сомнительными в этом коде.
Поэтому я отказался от этого и работал над проверкой концепции, показывающей, как это можно сделать. Важнейшие части:
-
Когда метод
push
возвращаетfalse
необходимо прекратить добавление данных в поток. Вместо этого мы записываем, где мы были, и останавливаемся. -
Мы начинаем снова только тогда, когда
_read
вызывается.
Если вы раскомментируете операторы console.log
которые выводят START
и STOP
. Вы увидите их распечатанные последовательно на консоли. Мы начинаем, производим данные до тех пор, пока Node не скажет нам остановиться, а затем мы остановимся, пока Node не скажет нам начать снова, и так далее.
const stream = require("stream");
const fs = require("fs");
const { readdir, lstat } = fs.promises;
const path = require("path");
class Walk extends stream.Readable {
constructor(root, maxDepth = Infinity) {
super();
this._maxDepth = maxDepth;
// These fields allow us to remember where we were when we have to pause our
// work.
// The path of the directory to process when we resume processing, and the
// depth of this directory.
this._curdir = [root, 1];
// The directories still to process.
this._dirs = [this._curdir];
// The list of files to process when we resume processing.
this._files = [];
// The location in 'this._files' were to continue processing when we resume.
this._ix = 0;
// A flag recording whether or not the fetching of files is currently going
// on.
this._started = false;
}
async _fetch() {
// Recall where we were by loading the state in local variables.
let files = this._files;
let dirs = this._dirs;
let [dir, depth] = this._curdir;
let ix = this._ix;
while (true) {
// If we've gone past the end of the files we were processing, then
// just forget about them. This simplifies the code that follows a bit.
if (ix >= files.length) {
ix = 0;
files = [];
}
// Read directories until we have files to process.
while (!files.length) {
// We've read everything, end the stream.
if (dirs.length === 0) {
// This is how the stream API requires us to indicate the stream has
// ended.
this.push(null);
// We're no longer running.
this._started = false;
return;
}
// Here, we get the next directory to process and get the list of
// files in it.
[dir, depth] = dirs.pop();
try {
files = await readdir(dir, { withFileTypes: true });
}
catch (ex) {
// This is a proof-of-concept. In a real application, you should
// determine what exceptions you want to ignore (e.g. EPERM).
}
}
// Process each file.
for (; ix < files.length; ++ix) {
const dirent = files[ix];
// Don't include in the results those files that are not directories,
// files or symbolic links.
if (!(dirent.isFile() || dirent.isDirectory() || dirent.isSymbolicLink())) {
continue;
}
const fullPath = path.join(dir, dirent.name);
if (dirent.isDirectory() & depth < this._maxDepth) {
// Keep track that we need to walk this directory.
dirs.push([fullPath, depth + 1]);
}
// Finally, we can put the data into the stream!
if (!this.push('${fullPath}\n')) {
// If the push returned false, we have to stop pushing results to the
// stream until _read is called again, so we have to stop.
// Uncomment this if you want to see when the stream stops.
// console.log("STOP");
// Record where we were in our processing.
this._files = files;
// The element at ix *has* been processed, so ix + 1.
this._ix = ix + 1;
this._curdir = [dir, depth];
// We're stopping, so indicate that!
this._started = false;
return;
}
}
}
}
async _read() {
// Do not start the process that puts data on the stream over and over
// again.
if (this._started) {
return;
}
this._started = true; // Yep, we've started.
// Uncomment this if you want to see when the stream starts.
// console.log("START");
await this._fetch();
}
}
// Change the paths to something that makes sense for you.
stream.pipeline(new Walk("/home/", 5),
fs.createWriteStream("/tmp/paths3.txt"),
(err) => console.log("ended with", err));
Когда я запускаю первую попытку, которую вы сделали здесь с помощью walkdir
, я получаю следующую статистику:
- Истекшее время (настенные часы): 59 сек
- Максимальный размер резидентного набора: 2,90 ГБ
Когда я использую код, который я показал выше:
- Истекшее время (настенные часы): 35 сек
- Максимальный размер резидентного набора: 0,1 ГБ
Дерево файлов, которое я использую для тестов, выдает список файлов размером 792 МБ