Как я могу перемежать/объединять асинхронные итерации?
Предположим, у меня есть некоторые asnyc итерируемые объекты, подобные этому:
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
setTimeout(() => resolve(ms), ms);
});
const a = {
[Symbol.asyncIterator]: async function * () {
yield 'a';
await sleep(1000);
yield 'b';
await sleep(2000);
yield 'c';
},
};
const b = {
[Symbol.asyncIterator]: async function * () {
await sleep(6000);
yield 'i';
yield 'j';
await sleep(2000);
yield 'k';
},
};
const c = {
[Symbol.asyncIterator]: async function * () {
yield 'x';
await sleep(2000);
yield 'y';
await sleep(8000);
yield 'z';
await sleep(10000);
throw new Error('You have gone too far! ');
},
};
Теперь предположим, что я могу конкатрировать их так:
const abcs = async function * () {
yield * a;
yield * b;
yield * c;
};
Полученные (первые 9) предметы будут:
(async () => {
const limit = 9;
let i = 0;
const xs = [];
for await (const x of abcs()) {
xs.push(x);
i++;
if (i === limit) {
break;
}
}
console.log(xs);
})().catch(error => console.error(error));
// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]
Но представьте, что меня не волнует порядок, что a
, b
и c
работают c
разной скоростью и что я хочу уступить как можно быстрее.
Как я могу переписать этот цикл так, чтобы x
получен как можно скорее, игнорируя порядок?
Также возможно, что a
, b
или c
являются бесконечными последовательностями, поэтому решение не должно требовать буферизации всех элементов в массив.
Ответы
Ответ 1
Невозможно написать это с помощью оператора цикла. async
/await
код всегда выполняется последовательно, чтобы делать что-то одновременно, вам нужно использовать комбинаторы обещаний напрямую. Для простых обещаний, там Promise.all
, для асинхронных итераторов нет ничего (пока), поэтому нам нужно написать это самостоятельно:
async function* combine(iterable) {
const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
const results = [];
let count = asyncIterators.length;
const never = new Promise(() => {});
function getNext(asyncIterator, index) {
return asyncIterator.next().then(result => ({
index,
result,
}));
}
const nextPromises = asyncIterators.map(getNext);
while (count) {
const {index, result} = await Promise.race(nextPromises);
if (result.done) {
nextPromises[index] = never;
results[index] = result.value;
count--;
} else {
nextPromises[index] = getNext(asyncIterators[index], index);
yield result.value;
}
}
return results;
}
Обратите внимание: combine
не поддерживает передачу значений в next
или отмену через .throw
или .return
.
Вы можете назвать это как
(async () => {
for await (const x of combine([a, b, c])) {
console.log(x);
}
})().catch(console.error);
Ответ 2
Если я изменю abcs
чтобы принять генераторы для обработки, я придумал это, см. Встроенные комментарии:
const abcs = async function * (...gens) {
// Worker function to queue up the next result
const queueNext = async (e) => {
e.result = null; // Release previous one as soon as possible
e.result = await e.it.next();
return e;
};
// Map the generators to source objects in a map, get and start their
// first iteration
const sources = new Map(gens.map(gen => [
gen,
queueNext({
key: gen,
it: gen[Symbol.asyncIterator]()
})
]));
// While we still have any sources, race the current promise of
// the sources we have left
while (sources.size) {
const winner = await Promise.race(sources.values());
// Completed the sequence?
if (winner.result.done) {
// Yes, drop it from sources
sources.delete(winner.key);
} else {
// No, grab the value to yield and queue up the next
// Then yield the value
const {value} = winner.result;
sources.set(winner.key, queueNext(winner));
yield value;
}
}
};
Живой пример:
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
setTimeout(() => resolve(ms), ms);
});
const a = {
[Symbol.asyncIterator]: async function * () {
yield 'a';
await sleep(1000);
yield 'b';
await sleep(2000);
yield 'c';
},
};
const b = {
[Symbol.asyncIterator]: async function * () {
await sleep(6000);
yield 'i';
yield 'j';
await sleep(2000);
yield 'k';
},
};
const c = {
[Symbol.asyncIterator]: async function * () {
yield 'x';
await sleep(2000);
yield 'y';
await sleep(8000);
yield 'z';
},
};
const abcs = async function * (...gens) {
// Worker function to queue up the next result
const queueNext = async (e) => {
e.result = null; // Release previous one as soon as possible
e.result = await e.it.next();
return e;
};
// Map the generators to source objects in a map, get and start their
// first iteration
const sources = new Map(gens.map(gen => [
gen,
queueNext({
key: gen,
it: gen[Symbol.asyncIterator]()
})
]));
// While we still have any sources, race the current promise of
// the sources we have left
while (sources.size) {
const winner = await Promise.race(sources.values());
// Completed the sequence?
if (winner.result.done) {
// Yes, drop it from sources
sources.delete(winner.key);
} else {
// No, grab the value to yield and queue up the next
// Then yield the value
const {value} = winner.result;
sources.set(winner.key, queueNext(winner));
yield value;
}
}
};
(async () => {
console.log("start");
for await (const x of abcs(a, b, c)) {
console.log(x);
}
console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
max-height: 100% !important;
}
Ответ 3
Я решил это с помощью генераторов асинхронных сигналов. (Хотелось бы, чтобы я нашел этот вопрос несколько дней назад, я бы сэкономил некоторое время) Будем рады услышать мнение и критику.
async function* mergen(...gens) {
const promises = gens.map((gen, index) =>
gen.next().then(p => ({...p, gen}))
);
while (promises.length > 0) {
yield race(promises).then(({index, value: {value, done, gen}}) => {
promises.splice(index, 1);
if (!done)
promises.push(
gen.next().then(({value: newVal, done: newDone}) => ({
value: newVal,
done: newDone,
gen
}))
);
return value;
});
}
};
// Needed to implement race to provide index of resolved promise
function race(promises) {
return new Promise(resolve =>
promises.forEach((p, index) => {
p.then(value => {
resolve({index, value});
});
})
);
}
Мне потребовалось много времени, чтобы найти, и я так взволновался, что поместил его в пакет npm :) https://www.npmjs.com/package/mergen
Ответ 4
Надеюсь, я правильно понял ваш вопрос, вот как я подхожу к нему:
let results = [];
Promise.all([ a, b, c ].map(async function(source) {
for await (let item of source) {
results.push(item);
}
}))
.then(() => console.log(results));
Я попробовал это с тремя нормальными массивами:
var a = [ 1, 2, 3 ];
var b = [ 4, 5, 6 ];
var c = [ 7, 8, 9 ];
И это привело к [1, 4, 7, 2, 5, 8, 3, 6, 9]
.