|
|
|
@ -224,7 +224,11 @@ MessageReceiver.prototype.extend({
|
|
|
|
|
window.log.error('websocket error');
|
|
|
|
|
},
|
|
|
|
|
dispatchAndWait(event) {
|
|
|
|
|
return Promise.all(this.dispatchEvent(event));
|
|
|
|
|
const promise = this.appPromise || Promise.resolve();
|
|
|
|
|
const appJobPromise = Promise.all(this.dispatchEvent(event));
|
|
|
|
|
const job = () => appJobPromise;
|
|
|
|
|
|
|
|
|
|
this.appPromise = promise.then(job, job);
|
|
|
|
|
},
|
|
|
|
|
onclose(ev) {
|
|
|
|
|
window.log.info(
|
|
|
|
@ -354,23 +358,34 @@ MessageReceiver.prototype.extend({
|
|
|
|
|
const { incoming } = this;
|
|
|
|
|
this.incoming = [];
|
|
|
|
|
|
|
|
|
|
const dispatchEmpty = () => {
|
|
|
|
|
const emitEmpty = () => {
|
|
|
|
|
window.log.info("MessageReceiver: emitting 'empty' event");
|
|
|
|
|
const ev = new Event('empty');
|
|
|
|
|
return this.dispatchAndWait(ev);
|
|
|
|
|
this.dispatchAndWait(ev);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const waitForApplication = async () => {
|
|
|
|
|
window.log.info(
|
|
|
|
|
"MessageReceiver: finished processing messages after 'empty', now waiting for application"
|
|
|
|
|
);
|
|
|
|
|
const promise = this.appPromise || Promise.resolve();
|
|
|
|
|
this.appPromise = Promise.resolve();
|
|
|
|
|
|
|
|
|
|
// We don't await here because we don't this to gate future message processing
|
|
|
|
|
promise.then(emitEmpty, emitEmpty);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const queueDispatch = () => {
|
|
|
|
|
const waitForEmptyQueue = () => {
|
|
|
|
|
// resetting count to zero so everything queued after this starts over again
|
|
|
|
|
this.count = 0;
|
|
|
|
|
|
|
|
|
|
this.addToQueue(dispatchEmpty);
|
|
|
|
|
this.addToQueue(waitForApplication);
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// We first wait for all recently-received messages (this.incoming) to be queued,
|
|
|
|
|
// then we add a task to emit the 'empty' event to the queue, so all message
|
|
|
|
|
// processing is complete by the time it runs.
|
|
|
|
|
Promise.all(incoming).then(queueDispatch, queueDispatch);
|
|
|
|
|
// then we queue a task to wait for the application to finish its processing, then
|
|
|
|
|
// finally we emit the 'empty' event to the queue.
|
|
|
|
|
Promise.all(incoming).then(waitForEmptyQueue, waitForEmptyQueue);
|
|
|
|
|
},
|
|
|
|
|
drain() {
|
|
|
|
|
const { incoming } = this;
|
|
|
|
|