I'm trying to wrap my head around async/await
, and I have the following code:
class AsyncQueue<T> {
queue = Array<T>()
maxSize = 1
async enqueue(x: T) {
if (this.queue.length > this.maxSize) {
// Block until available
}
this.queue.unshift(x)
}
async dequeue() {
if (this.queue.length == 0) {
// Block until available
}
return this.queue.pop()!
}
}
async function produce<T>(q: AsyncQueue, x: T) {
await q.enqueue(x)
}
async function consume<T>(q: AsyncQueue): T {
return await q.dequeue()
}
// Expecting 3 4 in the console
(async () => {
const q = new AsyncQueue<number>()
consume(q).then(console.log)
consume(q).then(console.log)
produce(q, 3)
produce(q, 4)
consume(q).then(console.log)
consume(q).then(console.log)
})()
My problem, of course, is in the "Block until available" parts of the code. I was expecting to be able to "halt" the execution until something happens (for example, dequeue halts until an enqueue exists, and vice-versa given the available space). I have the feeling I might need to use coroutines for this, but I really wanted to make sure I am just not missing any async/await
magic here.
I'm trying to wrap my head around async/await
, and I have the following code:
class AsyncQueue<T> {
queue = Array<T>()
maxSize = 1
async enqueue(x: T) {
if (this.queue.length > this.maxSize) {
// Block until available
}
this.queue.unshift(x)
}
async dequeue() {
if (this.queue.length == 0) {
// Block until available
}
return this.queue.pop()!
}
}
async function produce<T>(q: AsyncQueue, x: T) {
await q.enqueue(x)
}
async function consume<T>(q: AsyncQueue): T {
return await q.dequeue()
}
// Expecting 3 4 in the console
(async () => {
const q = new AsyncQueue<number>()
consume(q).then(console.log)
consume(q).then(console.log)
produce(q, 3)
produce(q, 4)
consume(q).then(console.log)
consume(q).then(console.log)
})()
My problem, of course, is in the "Block until available" parts of the code. I was expecting to be able to "halt" the execution until something happens (for example, dequeue halts until an enqueue exists, and vice-versa given the available space). I have the feeling I might need to use coroutines for this, but I really wanted to make sure I am just not missing any async/await
magic here.
-
1
You don't want to
block
, that would freeze the script - you should haveenqueue
anddequeue
await
promises that resolve once whatever they're waiting on is available. Also, you should call constructors with()
– CertainPerformance Commented May 17, 2018 at 2:39 -
1
It seems I am pushing the
async/await
game further and further, and it isn't clear to me how it would all solve down. – Hugo Sereno Ferreira Commented May 17, 2018 at 2:42 - 1 Have a look at the possible duplicate How to implement a pseudo blocking async queue in JS/TS? – Bergi Commented May 17, 2018 at 5:56
-
1
No, you don't need coroutines, you just need the
new Promise
constructor to wait for things that happen externally. You are however implementing coroutines, CSP-style. – Bergi Commented May 17, 2018 at 6:03
1 Answer
Reset to default 917/04/2019 Update: Long story short, there's a bug in the AsyncSemaphore implementation below, that was caught using property-based testing. You can read all about this "tale" here. Here's the fixed version:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()!()
}
async wait() {
this.permits -= 1
if (this.permits < 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
}
}
Finally, after considerable effort, and inspired by @Titian answer, I think I solved this. The code is filled with debug messages, but it might serve pedagogical purposes regarding the flow of control:
class AsyncQueue<T> {
waitingEnqueue = new Array<() => void>()
waitingDequeue = new Array<() => void>()
enqueuePointer = 0
dequeuePointer = 0
queue = Array<T>()
maxSize = 1
trace = 0
async enqueue(x: T) {
this.trace += 1
const localTrace = this.trace
if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
console.debug(`[${localTrace}] Producer Waiting`)
this.dequeuePointer += 1
await new Promise(r => this.waitingDequeue.unshift(r))
this.waitingDequeue.pop()
console.debug(`[${localTrace}] Producer Ready`)
}
this.queue.unshift(x)
console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)
if (this.enqueuePointer > 0) {
console.debug(`[${localTrace}] Notify Consumer`)
this.waitingEnqueue[this.enqueuePointer-1]()
this.enqueuePointer -= 1
}
}
async dequeue() {
this.trace += 1
const localTrace = this.trace
console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)
if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
console.debug(`[${localTrace}] Consumer Waiting`)
this.enqueuePointer += 1
await new Promise(r => this.waitingEnqueue.unshift(r))
this.waitingEnqueue.pop()
console.debug(`[${localTrace}] Consumer Ready`)
}
const x = this.queue.pop()!
console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)
if (this.dequeuePointer > 0) {
console.debug(`[${localTrace}] Notify Producer`)
this.waitingDequeue[this.dequeuePointer - 1]()
this.dequeuePointer -= 1
}
return x
}
}
Update: Here's a clean version using an AsyncSemaphore
, that really encapsulates the way things are usually done using concurrency primitives, but adapted to the asynchronous-CPS-single-threaded-event-loop™ style of JavaScript with async/await
. You can see that the logic of AsyncQueue
bees much more intuitive, and the double synchronisation through Promises is delegated to the two semaphores:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()()
}
async wait() {
if (this.permits == 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
this.permits -= 1
}
}
class AsyncQueue<T> {
private queue = Array<T>()
private waitingEnqueue: AsyncSemaphore
private waitingDequeue: AsyncSemaphore
constructor(readonly maxSize: number) {
this.waitingEnqueue = new AsyncSemaphore(0)
this.waitingDequeue = new AsyncSemaphore(maxSize)
}
async enqueue(x: T) {
await this.waitingDequeue.wait()
this.queue.unshift(x)
this.waitingEnqueue.signal()
}
async dequeue() {
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
return this.queue.pop()!
}
}
Update 2: There seemed to be a subtle bug hidden in the above code, that became evident when trying to use an AsyncQueue
of size 0. The semantics do make sense: it is a queue without any buffer, where the publisher always awaits for an consumer to exist. The lines that were preventing it to work were:
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
If you look closely, you'll see that dequeue()
isn't perfectly symmetric to enqueue()
. In fact, if one swaps the order of these two instructions:
this.waitingDequeue.signal()
await this.waitingEnqueue.wait()
Then all works again; it seems intuitive to me that we signal that there's something interested in dequeuing()
before actually waiting for an enqueuing
to take place.
I'm still not sure this doesn't reintroduce subtle bugs, without extensive testing. I'll leave this as a challenge ;)
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744965438a4603650.html
评论列表(0条)