Refresh swarm list 1 time for outgoing messages

pull/327/head
Beaudan 6 years ago
parent 0435050916
commit 804fc076cc

@ -70,7 +70,7 @@ const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
class LokiMessageAPI {
constructor(ourKey) {
this.jobQueue = new window.JobQueue();
this.sendingSwarmNodes = {};
this.sendingData = {};
this.ourKey = ourKey;
}
@ -102,13 +102,13 @@ class LokiMessageAPI {
ttl
);
// Using timestamp as a unique identifier
this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(
pubKey
);
if (this.sendingSwarmNodes[timestamp].length < numConnections) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
this.sendingSwarmNodes[timestamp] = freshNodes;
const swarm = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp] = {
swarm,
freshList: false,
};
if (this.sendingData[timestamp].swarm.length < numConnections) {
await this.refreshSendingSwarm(pubKey, timestamp);
}
const params = {
@ -124,7 +124,7 @@ class LokiMessageAPI {
const connectionPromise = this.openSendConnection(params).finally(() => {
completedConnections += 1;
if (completedConnections >= numConnections) {
delete this.sendingSwarmNodes[timestamp];
delete this.sendingData[timestamp];
}
});
promises.push(connectionPromise);
@ -167,9 +167,17 @@ class LokiMessageAPI {
log.info(`Successful storage message to ${pubKey}`);
}
async refreshSendingSwarm(pubKey, timestamp) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
this.sendingData[timestamp].swarm = freshNodes;
this.sendingData[timestamp].freshList = true;
return true;
}
async openSendConnection(params) {
while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) {
const snode = this.sendingSwarmNodes[params.timestamp].shift();
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this.sendToNode(
snode.ip,
@ -180,6 +188,21 @@ class LokiMessageAPI {
return true;
}
}
if (!this.sendingData[params.timestamp].freshList) {
// Ensure that there is only a single refresh per outgoing message
if (!this.sendingData[params.timestamp].refreshPromise) {
this.sendingData[
params.timestamp
].refreshPromise = this.refreshSendingSwarm(
params.pubKey,
params.timestamp
);
}
await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again
return this.openSendConnection(params);
}
return false;
}
@ -202,7 +225,8 @@ class LokiMessageAPI {
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
this.sendingSwarmNodes[params.timestamp] = newSwarm;
this.sendingData[params.timestamp].swarm = newSwarm;
this.sendingData[params.timestamp].freshList = true;
return false;
} else if (e instanceof textsecure.WrongDifficultyError) {
const { newDifficulty } = e;

@ -12,7 +12,7 @@
/* eslint-disable more/no-then */
/* eslint-disable no-unreachable */
const NUM_SEND_CONNECTIONS = 2;
const NUM_SEND_CONNECTIONS = 3;
function OutgoingMessage(
server,

Loading…
Cancel
Save