|
|
|
@ -217,7 +217,7 @@ class LokiMessageAPI {
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
} catch (e) {
|
|
|
|
|
log.warn('Loki send message:', e);
|
|
|
|
|
log.warn('Loki send message error:', e.code, e.message, `from ${address}`);
|
|
|
|
|
if (e instanceof textsecure.WrongSwarmError) {
|
|
|
|
|
const { newSwarm } = e;
|
|
|
|
|
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
|
|
|
|
@ -272,6 +272,8 @@ class LokiMessageAPI {
|
|
|
|
|
try {
|
|
|
|
|
// TODO: Revert back to using snode address instead of IP
|
|
|
|
|
let messages = await this.retrieveNextMessages(nodeData.ip, nodeData);
|
|
|
|
|
// this only tracks retrieval failures
|
|
|
|
|
// won't include parsing failures...
|
|
|
|
|
successiveFailures = 0;
|
|
|
|
|
if (messages.length) {
|
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
@ -288,7 +290,12 @@ class LokiMessageAPI {
|
|
|
|
|
// Execute callback even with empty array to signal online status
|
|
|
|
|
callback(messages);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
log.warn('Loki retrieve messages:', e.code, e.message);
|
|
|
|
|
log.warn(
|
|
|
|
|
'Loki retrieve messages error:',
|
|
|
|
|
e.code,
|
|
|
|
|
e.message,
|
|
|
|
|
`on ${nodeData.ip}:${nodeData.port}`
|
|
|
|
|
);
|
|
|
|
|
if (e instanceof textsecure.WrongSwarmError) {
|
|
|
|
|
const { newSwarm } = e;
|
|
|
|
|
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
|
|
|
|
@ -312,9 +319,24 @@ class LokiMessageAPI {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
|
|
|
|
|
log.warn(
|
|
|
|
|
`removing ${nodeData.ip}:${
|
|
|
|
|
nodeData.port
|
|
|
|
|
} from our swarm pool. We have ${
|
|
|
|
|
Object.keys(this.ourSwarmNodes).length
|
|
|
|
|
} usable swarm nodes left`
|
|
|
|
|
);
|
|
|
|
|
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// if not stopPollingResult
|
|
|
|
|
if (_.isEmpty(this.ourSwarmNodes)) {
|
|
|
|
|
log.error(
|
|
|
|
|
'We no longer have any swarm nodes available to try in pool, closing retrieve connection'
|
|
|
|
|
);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async retrieveNextMessages(nodeUrl, nodeData) {
|
|
|
|
@ -342,12 +364,31 @@ class LokiMessageAPI {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async startLongPolling(numConnections, stopPolling, callback) {
|
|
|
|
|
log.info('startLongPolling for', numConnections, 'connections');
|
|
|
|
|
this.ourSwarmNodes = {};
|
|
|
|
|
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
|
|
|
|
|
log.info('swarmNodes', nodes.length, 'for', this.ourKey);
|
|
|
|
|
Object.keys(nodes).forEach(j => {
|
|
|
|
|
const node = nodes[j];
|
|
|
|
|
log.info(`${j} ${node.ip}:${node.port}`);
|
|
|
|
|
});
|
|
|
|
|
if (nodes.length < numConnections) {
|
|
|
|
|
await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
|
|
|
|
|
nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
|
|
|
|
|
log.warn(
|
|
|
|
|
'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
|
|
|
|
|
);
|
|
|
|
|
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
|
|
|
|
|
if (nodes.length < numConnections) {
|
|
|
|
|
log.error(
|
|
|
|
|
'Could not get enough SwarmNodes for our pubkey from blockchain'
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.info(
|
|
|
|
|
`There are currently ${
|
|
|
|
|
nodes.length
|
|
|
|
|
} swarmNodes for pubKey in our local database`
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
for (let i = 0; i < nodes.length; i += 1) {
|
|
|
|
|
const lastHash = await window.Signal.Data.getLastHashBySnode(
|
|
|
|
|
nodes[i].address
|
|
|
|
@ -364,9 +405,13 @@ class LokiMessageAPI {
|
|
|
|
|
promises.push(this.openRetrieveConnection(stopPolling, callback));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// blocks until all snodes in our swarms have been removed from the list
|
|
|
|
|
// blocks until numConnections snodes in our swarms have been removed from the list
|
|
|
|
|
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
|
|
|
|
|
// or if there is network issues (ENOUTFOUND due to lokinet)
|
|
|
|
|
await Promise.all(promises);
|
|
|
|
|
log.error('All our long poll swarm connections have been removed');
|
|
|
|
|
// should we just call ourself again?
|
|
|
|
|
// no, our caller already handles this...
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|