result guard, mark internal-only intended functions with _ prefix and simplify parameters, logging improvements

pull/988/head
Ryan Tharp 5 years ago
parent 241e64b94b
commit 455bfa4ab7

@ -94,6 +94,7 @@ class LokiMessageAPI {
await this.refreshSendingSwarm(pubKey, timestamp);
}
// send parameters
const params = {
pubKey,
ttl: ttl.toString(),
@ -104,7 +105,7 @@ class LokiMessageAPI {
const promises = [];
let completedConnections = 0;
for (let i = 0; i < numConnections; i += 1) {
const connectionPromise = this.openSendConnection(params).finally(() => {
const connectionPromise = this._openSendConnection(params).finally(() => {
completedConnections += 1;
if (completedConnections >= numConnections) {
delete this.sendingData[timestamp];
@ -151,7 +152,7 @@ class LokiMessageAPI {
'Ran out of swarm nodes to query'
);
}
log.info(`Successful storage message to ${pubKey}`);
log.info(`loki_message: Successfully stored message to ${pubKey}`);
}
async refreshSendingSwarm(pubKey, timestamp) {
@ -162,16 +163,11 @@ class LokiMessageAPI {
return true;
}
async openSendConnection(params) {
async _openSendConnection(params) {
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this.sendToNode(
snode.ip,
snode.port,
snode,
params
);
const successfulSend = await this._sendToNode(snode, params);
if (successfulSend) {
return true;
}
@ -189,19 +185,19 @@ class LokiMessageAPI {
}
await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again
return this.openSendConnection(params);
return this._openSendConnection(params);
}
return false;
}
async sendToNode(address, port, targetNode, params) {
async _sendToNode(targetNode, params) {
let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
await sleepFor(successiveFailures * 500);
try {
const result = await lokiRpc(
`https://${address}`,
port,
`https://${targetNode.ip}`,
targetNode.port,
'store',
params,
{},
@ -211,17 +207,19 @@ class LokiMessageAPI {
// Make sure we aren't doing too much PoW
const currentDifficulty = window.storage.get('PoWDifficulty', null);
const newDifficulty = result.difficulty;
if (newDifficulty != null && newDifficulty !== currentDifficulty) {
window.storage.put('PoWDifficulty', newDifficulty);
}
if (result && result.difficulty) {
const newDifficulty = result.difficulty;
if (newDifficulty != null && newDifficulty !== currentDifficulty) {
window.storage.put('PoWDifficulty', newDifficulty);
}
} // else should we return false?
return true;
} catch (e) {
log.warn(
'Loki send message error:',
'loki_message: send error:',
e.code,
e.message,
`from ${address}`
`destination ${targetNode.ip}:${targetNode.port}`
);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
@ -238,26 +236,35 @@ class LokiMessageAPI {
} else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error
} else if (e instanceof textsecure.TimestampError) {
log.warn('Timestamp is invalid');
log.warn('loki_message: Timestamp is invalid');
throw e;
} else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response
const body = await e.response.text();
log.warn('HTTPError body:', body);
log.warn('loki_message: HTTPError body:', body);
}
successiveFailures += 1;
}
}
log.error(`Failed to send to node: ${address}`);
await lokiSnodeAPI.unreachableNode(params.pubKey, address);
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
params.pubKey,
targetNode
);
log.error(
`loki_message: Too many successive failures trying to send to node ${
targetNode.ip
}:${targetNode.port}, ${remainingSwarmSnodes.lengt} remaining swarm nodes`
);
return false;
}
async openRetrieveConnection(stopPollingPromise, callback) {
async _openRetrieveConnection(stopPollingPromise, callback) {
let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then
stopPollingPromise.then(result => {
stopPollingResult = result;
@ -274,9 +281,13 @@ class LokiMessageAPI {
) {
await sleepFor(successiveFailures * 1000);
// TODO: Revert back to using snode address instead of IP
try {
// TODO: Revert back to using snode address instead of IP
let messages = await this.retrieveNextMessages(nodeData.ip, nodeData);
// in general, I think we want exceptions to bubble up
// so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await this._retrieveNextMessages(nodeData);
// this only tracks retrieval failures
// won't include parsing failures...
successiveFailures = 0;
@ -296,7 +307,7 @@ class LokiMessageAPI {
callback(messages);
} catch (e) {
log.warn(
'Loki retrieve messages error:',
'loki_message: retrieve error:',
e.code,
e.message,
`on ${nodeData.ip}:${nodeData.port}`
@ -324,40 +335,49 @@ class LokiMessageAPI {
}
}
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
this.ourKey,
nodeData
);
log.warn(
`removing ${nodeData.ip}:${
`loki_message: removing ${nodeData.ip}:${
nodeData.port
} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left`
} usable swarm nodes left (${
remainingSwarmSnodes.length
} in local db)`
);
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
}
}
// if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) {
log.error(
'We no longer have any swarm nodes available to try in pool, closing retrieve connection'
'loki_message: We no longer have any swarm nodes available to try in pool, closing retrieve connection'
);
return false;
}
return true;
}
async retrieveNextMessages(nodeUrl, nodeData) {
// we don't throw or catch here
// mark private (_ prefix) since no error handling is done here...
async _retrieveNextMessages(nodeData) {
const params = {
pubKey: this.ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
ourPubKey: this.ourKey,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
// let exceptions bubble up
const result = await lokiRpc(
`https://${nodeUrl}`,
`https://${nodeData.ip}`,
nodeData.port,
'retrieve',
params,
@ -365,34 +385,39 @@ class LokiMessageAPI {
'/storage_rpc/v1',
nodeData
);
return result.messages || [];
}
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, callback) {
log.info('startLongPolling for', numConnections, 'connections');
this.ourSwarmNodes = {};
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
log.info('swarmNodes', nodes.length, 'for', this.ourKey);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`${j} ${node.ip}:${node.port}`);
});
if (nodes.length < numConnections) {
log.warn(
'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
'loki_message: Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
);
// load from blockchain
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
log.error(
'Could not get enough SwarmNodes for our pubkey from blockchain'
'loki_message: Could not get enough SwarmNodes for our pubkey from blockchain'
);
}
}
log.info(
`There are currently ${
nodes.length
} swarmNodes for pubKey in our local database`
'loki_message: startLongPolling for',
numConnections,
'connections. We have swarmNodes',
nodes.length,
'for',
this.ourKey
);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`loki_message: ${j} ${node.ip}:${node.port}`);
});
for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
@ -406,15 +431,28 @@ class LokiMessageAPI {
const promises = [];
let unresolved = numConnections;
for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openRetrieveConnection(stopPolling, callback));
promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(stopPolling, callback).then(() => {
unresolved -= 1;
log.info(
'loki_message: There are',
unresolved,
'open retrieve connections left'
);
})
);
}
// blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
log.error('All our long poll swarm connections have been removed');
log.error(
'loki_message: All our long poll swarm connections have been removed'
);
// should we just call ourself again?
// no, our caller already handles this...
}

Loading…
Cancel
Save