Ответ 1
17/04/2019 Обновление: Короче говоря, в реализации AsyncSemaphore есть ошибка, обнаруженная при тестировании на основе свойств. Вы можете прочитать все об этой "сказке" здесь. Вот исправленная версия:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()!()
}
async wait() {
this.permits -= 1
if (this.permits < 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
}
}
Наконец, после значительных усилий, вдохновленных ответом @Titian, я думаю, что решил это. Код заполнен отладочными сообщениями, но он может служить педагогическим целям относительно потока управления:
class AsyncQueue<T> {
waitingEnqueue = new Array<() => void>()
waitingDequeue = new Array<() => void>()
enqueuePointer = 0
dequeuePointer = 0
queue = Array<T>()
maxSize = 1
trace = 0
async enqueue(x: T) {
this.trace += 1
const localTrace = this.trace
if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
console.debug('[${localTrace}] Producer Waiting')
this.dequeuePointer += 1
await new Promise(r => this.waitingDequeue.unshift(r))
this.waitingDequeue.pop()
console.debug('[${localTrace}] Producer Ready')
}
this.queue.unshift(x)
console.debug('[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]')
if (this.enqueuePointer > 0) {
console.debug('[${localTrace}] Notify Consumer')
this.waitingEnqueue[this.enqueuePointer-1]()
this.enqueuePointer -= 1
}
}
async dequeue() {
this.trace += 1
const localTrace = this.trace
console.debug('[${localTrace}] Queue length before pop: ${this.queue.length}')
if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
console.debug('[${localTrace}] Consumer Waiting')
this.enqueuePointer += 1
await new Promise(r => this.waitingEnqueue.unshift(r))
this.waitingEnqueue.pop()
console.debug('[${localTrace}] Consumer Ready')
}
const x = this.queue.pop()!
console.debug('[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}')
if (this.dequeuePointer > 0) {
console.debug('[${localTrace}] Notify Producer')
this.waitingDequeue[this.dequeuePointer - 1]()
this.dequeuePointer -= 1
}
return x
}
}
Обновление: здесь чистая версия с использованием AsyncSemaphore
, которая действительно инкапсулирует способ, которым все обычно делается с использованием примитивов параллелизма, но адаптирована к асинхронному стилю JavaScript CPS-single-threadaded-event-loop ™ с помощью async/await
. Вы можете видеть, что логика AsyncQueue
становится намного более интуитивной, и двойная синхронизация через Promises делегируется двум семафорам:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()()
}
async wait() {
if (this.permits == 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
this.permits -= 1
}
}
class AsyncQueue<T> {
private queue = Array<T>()
private waitingEnqueue: AsyncSemaphore
private waitingDequeue: AsyncSemaphore
constructor(readonly maxSize: number) {
this.waitingEnqueue = new AsyncSemaphore(0)
this.waitingDequeue = new AsyncSemaphore(maxSize)
}
async enqueue(x: T) {
await this.waitingDequeue.wait()
this.queue.unshift(x)
this.waitingEnqueue.signal()
}
async dequeue() {
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
return this.queue.pop()!
}
}
Обновление 2: в приведенном выше коде, по- AsyncQueue
ошибка, которая стала очевидной при попытке использовать AsyncQueue
размера 0. Семантика имеет смысл: это очередь без буфера, где издатель всегда ожидает потребитель существует. Линии, которые мешали ему работать, были:
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
Если вы посмотрите внимательно, вы увидите, что dequeue()
не совсем симметрична enqueue()
. На самом деле, если поменять местами порядок этих двух инструкций:
this.waitingDequeue.signal()
await this.waitingEnqueue.wait()
Тогда все снова работает; мне кажется интуитивно понятным, что мы сигнализируем, что есть что-то заинтересованное в dequeuing()
прежде чем фактически ожидать, что произойдет постановка в enqueuing
.
Я до сих пор не уверен, что это не приведет к появлению мелких ошибок без тщательного тестирования. Я оставлю это как вызов;)