Start attempt at closing long polling connections when offline event is triggered

pull/344/head
Beaudan 6 years ago
parent d3aba65e41
commit 6ef94fa43d

@ -253,13 +253,22 @@ class LokiMessageAPI {
return false;
}
async openRetrieveConnection(callback) {
while (!_.isEmpty(this.ourSwarmNodes)) {
async openRetrieveConnection(stopPollingPromise, callback) {
let stopPollingResult = false;
// eslint-disable-next-line more/no-then
stopPollingPromise.then(result => {
stopPollingResult = result;
});
while (!stopPollingResult && !_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address];
delete this.ourSwarmNodes[address];
let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
while (
!stopPollingResult &&
successiveFailures < MAX_ACCEPTABLE_FAILURES
) {
await sleepFor(successiveFailures * 1000);
try {
@ -332,7 +341,7 @@ class LokiMessageAPI {
return result.messages || [];
}
async startLongPolling(numConnections, callback) {
async startLongPolling(numConnections, stopPolling, callback) {
this.ourSwarmNodes = {};
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
@ -353,7 +362,7 @@ class LokiMessageAPI {
const promises = [];
for (let i = 0; i < numConnections; i += 1)
promises.push(this.openRetrieveConnection(callback));
promises.push(this.openRetrieveConnection(stopPolling, callback));
// blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet)

@ -49,6 +49,11 @@
handleRequest = request => request.respond(404, 'Not found');
}
let connected = true;
this.calledStop = false;
let resolveStopPolling;
const stopPolling = new Promise(res => {
resolveStopPolling = res;
});
this.handleMessage = (message, options = {}) => {
try {
@ -83,18 +88,26 @@
// This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured
try {
await server.startLongPolling(NUM_CONCURRENT_CONNECTIONS, messages => {
connected = true;
callback(connected);
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
});
await server.startLongPolling(
NUM_CONCURRENT_CONNECTIONS,
stopPolling,
messages => {
connected = true;
callback(connected);
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
}
);
} catch (e) {
// we'll try again anyway
}
if (this.calledStop) {
return;
}
connected = false;
// Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => {
@ -105,5 +118,10 @@
this.isConnected = function isConnected() {
return connected;
};
this.close = () => {
this.calledStop = true;
resolveStopPolling(true);
};
};
})();

@ -190,6 +190,10 @@ MessageReceiver.prototype.extend({
localLokiServer.close();
}
if (this.httpPollingResource) {
this.httpPollingResource.close();
}
return this.drain();
},
onopen() {

Loading…
Cancel
Save