Merge pull request #147 from BeaudanBrown/swarm-error-handling

Swarm error handling
pull/159/head
sachaaaaa 6 years ago committed by GitHub
commit 79a28fe993
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -181,8 +181,9 @@
return conversation;
}
window.LokiSnodeAPI.replenishSwarm(id);
try {
const swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(id);
conversation.set({ swarmNodes});
await window.Signal.Data.saveConversation(conversation.attributes, {
Conversation: Whisper.Conversation,
});

@ -847,6 +847,8 @@
e.name === 'SendMessageNetworkError' ||
e.name === 'SignedPreKeyRotationError' ||
e.name === 'OutgoingIdentityKeyError' ||
e.name === 'DNSResolutionError' ||
e.name === 'EmptySwarmError' ||
e.name === 'PoWError'
);
},

@ -662,19 +662,15 @@ async function removeAllSessions(id) {
function setifyProperty(data, propertyName) {
if (!data) return data;
const returnData = data;
if (returnData[propertyName]) {
const returnData = { ...data };
if (Array.isArray(returnData[propertyName])) {
returnData[propertyName] = new Set(returnData[propertyName]);
}
return returnData;
}
async function getSwarmNodesByPubkey(pubkey) {
let swarmNodes = await channels.getSwarmNodesByPubkey(pubkey);
if (Array.isArray(swarmNodes)) {
swarmNodes = new Set(swarmNodes);
}
return swarmNodes;
return channels.getSwarmNodesByPubkey(pubkey);
}
async function getConversationCount() {
@ -682,11 +678,7 @@ async function getConversationCount() {
}
async function saveConversation(data) {
const storeData = data;
if (storeData.swarmNodes) {
storeData.swarmNodes = Array.from(storeData.swarmNodes);
}
await channels.saveConversation(storeData);
await channels.saveConversation(data);
}
async function saveConversations(data) {
@ -694,8 +686,7 @@ async function saveConversations(data) {
}
async function getConversationById(id, { Conversation }) {
const rawData = await channels.getConversationById(id);
const data = setifyProperty(rawData, 'swarmNodes');
const data = await channels.getConversationById(id);
return new Conversation(data);
}
@ -704,8 +695,10 @@ async function updateConversation(id, data, { Conversation }) {
if (!existing) {
throw new Error(`Conversation ${id} does not exist!`);
}
const setData = setifyProperty(data, 'swarmNodes');
const setExisting = setifyProperty(existing.attributes, 'swarmNodes');
const merged = merge({}, existing.attributes, data);
const merged = merge({}, setExisting, setData);
if (merged.swarmNodes instanceof Set) {
merged.swarmNodes = Array.from(merged.swarmNodes);
}
@ -729,9 +722,7 @@ async function _removeConversations(ids) {
}
async function getAllConversations({ ConversationCollection }) {
const conversations = (await channels.getAllConversations()).map(c =>
setifyProperty(c, 'swarmNodes')
);
const conversations = await channels.getAllConversations();
const collection = new ConversationCollection();
collection.add(conversations);
@ -744,9 +735,7 @@ async function getAllConversationIds() {
}
async function getAllPrivateConversations({ ConversationCollection }) {
const conversations = (await channels.getAllPrivateConversations()).map(c =>
setifyProperty(c, 'swarmNodes')
);
const conversations = await channels.getAllPrivateConversations();
const collection = new ConversationCollection();
collection.add(conversations);
@ -754,9 +743,7 @@ async function getAllPrivateConversations({ ConversationCollection }) {
}
async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
const conversations = (await channels.getAllGroupsInvolvingId(id)).map(c =>
setifyProperty(c, 'swarmNodes')
);
const conversations = await channels.getAllGroupsInvolvingId(id);
const collection = new ConversationCollection();
collection.add(conversations);
@ -764,9 +751,7 @@ async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
}
async function searchConversations(query, { ConversationCollection }) {
const conversations = (await channels.searchConversations(query)).map(c =>
setifyProperty(c, 'swarmNodes')
);
const conversations = await channels.searchConversations(query);
const collection = new ConversationCollection();
collection.add(conversations);

@ -1,25 +1,18 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker */
const fetch = require('node-fetch');
// eslint-disable-next-line
const invert = p => new Promise((res, rej) => p.then(rej, res));
const firstOf = ps => invert(Promise.all(ps.map(invert)));
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
class LokiMessageAPI {
constructor({ messageServerPort }) {
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
}
async sendMessage(pubKey, data, messageTimeStamp, ttl) {
const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
if (!swarmNodes || swarmNodes.size === 0) {
throw Error('No swarm nodes to query!');
}
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000);
// Nonce is returned as a base64 string to include in header
@ -42,14 +35,17 @@ class LokiMessageAPI {
// Something went horribly wrong
throw err;
}
const completedNodes = [];
let successfulRequests = 0;
let canResolve = true;
const requests = Array.from(swarmNodes).map(async node => {
const doRequest = async nodeUrl => {
// TODO: Confirm sensible timeout
const options = {
url: `${node}${this.messageServerPort}/store`,
url: `${nodeUrl}${this.messageServerPort}/store`,
type: 'POST',
responseType: undefined,
timeout: 5000,
timeout: 10000,
};
const fetchOptions = {
@ -68,9 +64,17 @@ class LokiMessageAPI {
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
if (e.code === 'ENOTFOUND') {
// TODO: Handle the case where lokinet is not working
canResolve = false;
return;
}
log.error(options.type, options.url, 0, 'Error sending message');
window.LokiSnodeAPI.unreachableNode(pubKey, node);
throw HTTPError('fetch error', 0, e.toString());
if (window.LokiSnodeAPI.unreachableNode(pubKey, nodeUrl)) {
completedNodes.push(nodeUrl);
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
}
return;
}
let result;
@ -86,7 +90,10 @@ class LokiMessageAPI {
}
if (response.status >= 0 && response.status < 400) {
return result;
completedNodes.push(nodeUrl);
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
successfulRequests += 1;
return;
}
log.error(
options.type,
@ -95,19 +102,48 @@ class LokiMessageAPI {
'Error sending message'
);
throw HTTPError('sendMessage: error response', response.status, result);
});
};
let swarmNodes;
try {
// TODO: Possibly change this to require more than a single response?
const result = await firstOf(requests);
return result;
} catch (err) {
throw err;
swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey);
} catch (e) {
throw new window.textsecure.EmptySwarmError(pubKey, e);
}
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Sending messages');
}
if (!swarmNodes || swarmNodes.length === 0) {
swarmNodes = await window.LokiSnodeAPI.getFreshSwarmNodes(pubKey);
swarmNodes = swarmNodes.filter(node => !(node in completedNodes));
if (!swarmNodes || swarmNodes.length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
pubKey,
new Error('Ran out of swarm nodes to query')
);
}
await window.LokiSnodeAPI.saveSwarmNodes(pubKey, swarmNodes);
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - completedNodes.length;
await Promise.all(
swarmNodes
.splice(0, remainingRequests)
.map(nodeUrl => doRequest(nodeUrl))
);
}
}
async retrieveMessages(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
let completedRequests = 0;
const completedNodes = [];
let canResolve = true;
let successfulRequests = 0;
const doRequest = async (nodeUrl, nodeData) => {
// TODO: Confirm sensible timeout
@ -115,7 +151,7 @@ class LokiMessageAPI {
url: `${nodeUrl}${this.messageServerPort}/retrieve`,
type: 'GET',
responseType: 'json',
timeout: 5000,
timeout: 10000,
};
const headers = {
@ -135,15 +171,21 @@ class LokiMessageAPI {
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
// TODO: Maybe we shouldn't immediately delete?
// And differentiate between different connectivity issues
if (e.code === 'ENOTFOUND') {
// TODO: Handle the case where lokinet is not working
canResolve = false;
return;
}
log.error(
options.type,
options.url,
0,
`Error retrieving messages from ${nodeUrl}`
);
window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl);
if (window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl)) {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
}
return;
}
@ -158,29 +200,59 @@ class LokiMessageAPI {
} else {
result = await response.text();
}
completedRequests += 1;
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
if (response.status === 200) {
if (result.lastHash) {
window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
callback(result.messages);
}
successfulRequests += 1;
return;
}
// Handle error from snode
log.error(options.type, options.url, response.status, 'Error');
};
while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
if (Object.keys(ourSwarmNodes).length < remainingRequests) {
// This means we don't have enough swarm nodes to meet the minimum threshold
if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
let ourSwarmNodes;
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
} catch (e) {
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (!canResolve) {
throw new window.textsecure.DNSResolutionError('Retrieving messages');
}
if (Object.keys(ourSwarmNodes).length === 0) {
try {
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
// Filter out the nodes we have already got responses from
completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]);
} catch (e) {
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
e
);
}
if (Object.keys(ourSwarmNodes).length === 0) {
if (successfulRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
return;
}
throw new window.textsecure.EmptySwarmError(
window.textsecure.storage.user.getNumber(),
new Error('Ran out of swarm nodes to query')
);
}
}
const remainingRequests =
MINIMUM_SUCCESSFUL_REQUESTS - completedNodes.length;
await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)

@ -1,3 +1,4 @@
/* eslint-disable class-methods-use-this */
/* global log, window, Whisper */
const fetch = require('node-fetch');
@ -6,6 +7,7 @@ const dns = require('dns');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
const FAILURE_THRESHOLD = 3;
class LokiSnodeAPI {
constructor({ url, swarmServerPort }) {
@ -16,6 +18,7 @@ class LokiSnodeAPI {
this.swarmServerPort = swarmServerPort ? `:${swarmServerPort}` : '';
this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
this.contactSwarmNodes = {};
}
getRandomSnodeAddress() {
@ -33,8 +36,27 @@ class LokiSnodeAPI {
async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl];
return;
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount >= FAILURE_THRESHOLD) {
delete this.ourSwarmNodes[nodeUrl];
}
return false;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
failureCount: 1,
};
} else {
this.contactSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.contactSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
}
const conversation = window.ConversationController.get(pubKey);
const swarmNodes = conversation.get('swarmNodes');
@ -47,7 +69,9 @@ class LokiSnodeAPI {
Conversation: Whisper.Conversation,
}
);
delete this.contactSwarmNodes[nodeUrl];
}
return true;
}
updateLastHash(nodeUrl, hash) {
@ -74,7 +98,9 @@ class LokiSnodeAPI {
}
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {};
this.ourSwarmNodes[url] = {
failureCount: 0,
};
});
}
return this.ourSwarmNodes;
@ -82,32 +108,34 @@ class LokiSnodeAPI {
async getSwarmNodesByPubkey(pubKey) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey);
// TODO: Check if swarm list is below a threshold rather than empty
if (swarmNodes && swarmNodes.size !== 0) {
if (swarmNodes) {
return swarmNodes;
}
return this.replenishSwarm(pubKey);
return [];
}
async replenishSwarm(pubKey) {
async saveSwarmNodes(pubKey, swarmNodes) {
const conversation = window.ConversationController.get(pubKey);
conversation.set({ swarmNodes });
await window.Signal.Data.updateConversation(
conversation.id,
conversation.attributes,
{
Conversation: Whisper.Conversation,
}
);
}
async getFreshSwarmNodes(pubKey) {
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async resolve => {
let newSwarmNodes;
try {
newSwarmNodes = new Set(await this.getSwarmNodes(pubKey));
newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) {
// TODO: Handle these errors sensibly
newSwarmNodes = new Set([]);
newSwarmNodes = [];
}
conversation.set({ swarmNodes: newSwarmNodes });
await window.Signal.Data.updateConversation(
conversation.id,
conversation.attributes,
{
Conversation: Whisper.Conversation,
}
);
resolve(newSwarmNodes);
});
}
@ -124,7 +152,7 @@ class LokiSnodeAPI {
url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: 5000,
timeout: 10000,
};
const body = {
@ -155,7 +183,7 @@ class LokiSnodeAPI {
0,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('fetch error', 0, e.toString());
throw HTTPError('getSwarmNodes fetch error', 0, e.toString());
}
let result;
@ -179,7 +207,7 @@ class LokiSnodeAPI {
response.status,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('sendMessage: error response', response.status, result);
throw HTTPError('getSwarmNodes: error response', response.status, result);
}
}

@ -127,6 +127,21 @@
}
inherit(Error, UnregisteredUserError);
function EmptySwarmError(number, error) {
// eslint-disable-next-line prefer-destructuring
this.number = number.split('.')[0];
ReplayableError.call(this, {
name: 'EmptySwarmError',
message: 'Could not get any swarm nodes to query',
});
if (error) {
appendStack(this, error);
}
}
inherit(ReplayableError, EmptySwarmError);
function PoWError(number, error) {
// eslint-disable-next-line prefer-destructuring
this.number = number.split('.')[0];
@ -142,6 +157,16 @@
}
inherit(ReplayableError, PoWError);
function DNSResolutionError(message) {
// eslint-disable-next-line prefer-destructuring
ReplayableError.call(this, {
name: 'DNSResolutionError',
message: `Error resolving url: ${message}`,
});
}
inherit(ReplayableError, DNSResolutionError);
window.textsecure.UnregisteredUserError = UnregisteredUserError;
window.textsecure.SendMessageNetworkError = SendMessageNetworkError;
window.textsecure.IncomingIdentityKeyError = IncomingIdentityKeyError;
@ -151,4 +176,6 @@
window.textsecure.MessageError = MessageError;
window.textsecure.SignedPreKeyRotationError = SignedPreKeyRotationError;
window.textsecure.PoWError = PoWError;
window.textsecure.EmptySwarmError = EmptySwarmError;
window.textsecure.DNSResolutionError = DNSResolutionError;
})();

@ -186,13 +186,7 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number;
try {
const result = await this.lokiMessageAPI.sendMessage(
pubKey,
data,
timestamp,
ttl
);
return result;
await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl);
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
// 409 and 410 should bubble and be handled by doSendMessage

Loading…
Cancel
Save