Make sendMessage take options, remove redundant retrieve function and get constant

pull/295/head
Beaudan 6 years ago
parent b43978ece1
commit 709db4bf54

@ -5,8 +5,7 @@
const _ = require('lodash');
const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
const DEFAULT_CONNECTIONS = 2;
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
function sleepFor(time) {
@ -88,7 +87,8 @@ class LokiMessageAPI {
this.sendingSwarmNodes = {};
}
async sendMessage(numConnections, pubKey, data, messageTimeStamp, ttl, isPing = false) {
async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options;
// Data required to identify a message in a conversation
const messageEventData = {
pubKey,
@ -267,114 +267,6 @@ class LokiMessageAPI {
await Promise.all(promises);
}
// stale function, kept around to reduce diff noise
// TODO: remove
async retrieveMessages(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
const completedNodes = [];
let canResolve = true;
let successfulRequests = 0;
let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
};
const doRequest = async (nodeUrl, nodeData) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
try {
const result = await rpc(
`http://${nodeUrl}`,
this.snodeServerPort,
'retrieve',
params,
options
);
nodeComplete(nodeUrl);
successfulRequests += 1;
if (Array.isArray(result.messages) && result.messages.length) {
const lastMessage = _.last(result.messages);
lokiSnodeAPI.updateLastHash(
nodeUrl,
lastMessage.hash,
lastMessage.expiration
);
const filteredMessages = await this.jobQueue.add(() =>
filterIncomingMessages(result.messages)
);
if (filteredMessages.length) {
callback(filteredMessages);
}
}
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
const removeNode = await lokiSnodeAPI.unreachableNode(
ourKey,
nodeUrl
);
if (removeNode) {
log.error('Loki retrieve messages:', e);
nodeComplete(nodeUrl);
}
}
}
};
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Retrieving messages');
}
if (Object.keys(ourSwarmNodes).length === 0) {
ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
// Filter out the nodes we have already got responses from
completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]);
if (Object.keys(ourSwarmNodes).length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
ourKey,
'Ran out of swarm nodes to query'
);
}
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)
.map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData))
);
}
}
}
module.exports = LokiMessageAPI;

@ -12,6 +12,7 @@
/* eslint-disable more/no-then */
/* eslint-disable no-unreachable */
const NUM_SEND_CONNECTIONS = 2;
function OutgoingMessage(
server,
@ -188,13 +189,16 @@ OutgoingMessage.prototype = {
const pubKey = number;
try {
// TODO: Make NUM_CONCURRENT_CONNECTIONS a global constant
const options = {
numConnections: NUM_SEND_CONNECTIONS,
isPing: this.isPing,
}
await lokiMessageAPI.sendMessage(
2,
pubKey,
data,
timestamp,
ttl,
this.isPing
options
);
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {

Loading…
Cancel
Save