Big ol' hunk o' chunk. Now using random.snode to populate swarm lists, now making multiple requests from said lists and they are processed as they complete rather than waiting for all to resolve

pull/132/head
Beaudan 6 years ago
parent 714a5ab8b1
commit 7b1799c418

@ -1,8 +1,8 @@
{ {
"serverUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", "serverUrl": "random.snode",
"cdnUrl": "http://qp994mrc8z7fqmsynzdumd35b5918q599gno46br86e537f7qzzy.snode", "cdnUrl": "random.snode",
"messageServerPort": ":8080", "messageServerPort": "8080",
"swarmServerPort": ":8079", "swarmServerPort": "8079",
"disableAutoUpdate": false, "disableAutoUpdate": false,
"openDevTools": false, "openDevTools": false,
"buildExpiration": 0, "buildExpiration": 0,

@ -174,7 +174,7 @@
return conversation; return conversation;
} }
window.libloki.replenishSwarm(id); window.LokiSnodeAPI.replenishSwarm(id);
try { try {
await window.Signal.Data.saveConversation(conversation.attributes, { await window.Signal.Data.saveConversation(conversation.attributes, {
Conversation: Whisper.Conversation, Conversation: Whisper.Conversation,

@ -1200,9 +1200,6 @@
// Add the message sending on another queue so that our UI doesn't get blocked // Add the message sending on another queue so that our UI doesn't get blocked
this.queueMessageSend(async () => { this.queueMessageSend(async () => {
if (this.get('swarmNodes').length === 0) {
await window.libloki.replenishSwarm(destination);
}
message.send( message.send(
this.wrapSend( this.wrapSend(
sendFunction( sendFunction(

@ -1,90 +1,25 @@
/* eslint-disable no-await-in-loop */
/* global log, dcodeIO, window, callWorker */ /* global log, dcodeIO, window, callWorker */
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
class LokiServer { // eslint-disable-next-line
const invert = p => new Promise((res, rej) => p.then(rej, res));
const firstOf = ps => invert(Promise.all(ps.map(invert)));
constructor({ urls, messageServerPort, swarmServerPort }) { // Will be raised (to 3?) when we get more nodes
this.nodes = []; const MINIMUM_SUCCESSFUL_REQUESTS = 2;
this.messageServerPort = messageServerPort; class LokiMessageAPI {
this.swarmServerPort = swarmServerPort;
urls.forEach(url => {
if (!is.string(url)) {
throw new Error('WebAPI.initialize: Invalid server url');
}
this.nodes.push({ url });
});
}
async loadOurSwarm() {
const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await this.getSwarmNodes(ourKey);
this.ourSwarmNodes = [];
nodeAddresses.forEach(url => {
this.ourSwarmNodes.push({ url });
})
}
async getSwarmNodes(pubKey) {
const currentNode = this.nodes[0];
const options = {
url: `${currentNode.url}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: undefined,
};
const body = {
jsonrpc: '2.0',
id: '0',
method: 'get_swarm_list_for_messenger_pubkey',
params: {
pubkey: pubKey,
},
}
const fetchOptions = { constructor({ messageServerPort }) {
method: options.type, this.messageServerPort = messageServerPort
body: JSON.stringify(body), ? `:${messageServerPort}`
headers: { : '';
'Content-Type': 'application/json',
},
timeout: options.timeout,
};
let response;
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(options.type, options.url, 0, 'Error');
throw HTTPError('fetch error', 0, e.toString());
}
let result;
if (
options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json'
) {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
}
if (response.status >= 0 && response.status < 400) {
return result.nodes;
}
log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('sendMessage: error response', response.status, result);
} }
async sendMessage(pubKey, data, messageTimeStamp, ttl) { async sendMessage(pubKey, data, messageTimeStamp, ttl) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey); const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey)
if (!swarmNodes || swarmNodes.length === 0) { if (!swarmNodes || swarmNodes.length === 0) {
// TODO: Refresh the swarm nodes list
throw Error('No swarm nodes to query!'); throw Error('No swarm nodes to query!');
} }
@ -101,12 +36,12 @@ class LokiServer {
nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development); nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development);
} catch (err) { } catch (err) {
// Something went horribly wrong // Something went horribly wrong
// TODO: Handle gracefully
throw err; throw err;
} }
const requests = swarmNodes.map(async node => {
const options = { const options = {
url: `${swarmNodes[0]}${this.messageServerPort}/store`, url: `${node}${this.messageServerPort}/store`,
type: 'POST', type: 'POST',
responseType: undefined, responseType: undefined,
timeout: undefined, timeout: undefined,
@ -128,7 +63,8 @@ class LokiServer {
try { try {
response = await fetch(options.url, fetchOptions); response = await fetch(options.url, fetchOptions);
} catch (e) { } catch (e) {
log.error(options.type, options.url, 0, 'Error'); log.error(options.type, options.url, 0, 'Error sending message');
window.LokiSnodeAPI.unreachableNode(pubKey, node);
throw HTTPError('fetch error', 0, e.toString()); throw HTTPError('fetch error', 0, e.toString());
} }
@ -147,28 +83,37 @@ class LokiServer {
if (response.status >= 0 && response.status < 400) { if (response.status >= 0 && response.status < 400) {
return result; return result;
} }
log.error(options.type, options.url, response.status, 'Error'); log.error(options.type, options.url, response.status, 'Error sending message');
throw HTTPError('sendMessage: error response', response.status, result); throw HTTPError('sendMessage: error response', response.status, result);
});
try {
// TODO: Possibly change this to require more than a single response?
const result = await firstOf(requests);
return result;
} catch(err) {
throw err;
} }
async retrieveMessages(pubKey) {
if (!this.ourSwarmNodes || this.ourSwarmNodes.length === 0) {
await this.loadOurSwarm();
} }
const currentNode = this.ourSwarmNodes[0];
async retrieveMessages(callback) {
let ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
const ourKey = window.textsecure.storage.user.getNumber();
let completedRequests = 0;
const doRequest = async (nodeUrl, nodeData) => {
const options = { const options = {
url: `${currentNode.url}${this.messageServerPort}/retrieve`, url: `${nodeUrl}${this.messageServerPort}/retrieve`,
type: 'GET', type: 'GET',
responseType: 'json', responseType: 'json',
timeout: undefined, timeout: undefined,
}; };
const headers = { const headers = {
'X-Loki-recipient': pubKey, 'X-Loki-recipient': ourKey,
}; };
if (currentNode.lastHash) { if (nodeData.lastHash) {
headers['X-Loki-last-hash'] = currentNode.lastHash; headers['X-Loki-last-hash'] = nodeData.lastHash;
} }
const fetchOptions = { const fetchOptions = {
@ -176,12 +121,13 @@ class LokiServer {
headers, headers,
timeout: options.timeout, timeout: options.timeout,
}; };
let response; let response;
try { try {
response = await fetch(options.url, fetchOptions); response = await fetch(options.url, fetchOptions);
} catch (e) { } catch (e) {
log.error(options.type, options.url, 0, 'Error'); // TODO: Maybe we shouldn't immediately delete?
log.error(options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}`);
window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl);
throw HTTPError('fetch error', 0, e.toString()); throw HTTPError('fetch error', 0, e.toString());
} }
@ -196,16 +142,38 @@ class LokiServer {
} else { } else {
result = await response.text(); result = await response.text();
} }
completedRequests += 1;
if (response.status >= 0 && response.status < 400) { if (response.status >= 0 && response.status < 400) {
if (result.lastHash) { if (result.lastHash) {
currentNode.lastHash = result.lastHash; window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
} }
return result; return result;
} }
log.error(options.type, options.url, response.status, 'Error'); log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('retrieveMessages: error response', response.status, result); throw HTTPError('retrieveMessages: error response', response.status, result);
} }
while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
if (Object.keys(ourSwarmNodes).length < remainingRequests) {
if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
}
return;
}
const requests = await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)
.map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash).catch(() => null))
);
// Requests is now an array of null for failed requests and the json for success
requests.filter(v => v !== null && 'messages' in v)
.forEach(v => callback(v.messages));
}
}
} }
function HTTPError(message, providedCode, response, stack) { function HTTPError(message, providedCode, response, stack) {
@ -223,5 +191,5 @@ function HTTPError(message, providedCode, response, stack) {
} }
module.exports = { module.exports = {
LokiServer, LokiMessageAPI,
}; };

@ -0,0 +1,170 @@
/* global log, window, Whisper */
const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
const dns = require('dns');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
class LokiSnodeAPI {
constructor({ url, swarmServerPort }) {
if (!is.string(url)) {
throw new Error('WebAPI.initialize: Invalid server url');
}
this.url = url;
this.swarmServerPort = swarmServerPort
? `:${swarmServerPort}`
: '';
this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
}
getRandomSnodeAddress() {
/* resolve random snode */
return new Promise((resolve, reject) => {
dns.resolveCname(this.url, (err, address) => {
if(err) {
reject(err);
} else {
resolve(address[0]);
}
});
});
}
unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl];
}
}
updateLastHash(nodeUrl, hash) {
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
lastHash: hash,
}
} else {
this.ourSwarmNodes[nodeUrl].lastHash = hash;
}
}
async getOurSwarmNodes() {
if (
!this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) {
// Try refresh our swarm list once
const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey);
this.ourSwarmNodes = {};
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {};
})
if (!this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length === 0) {
throw Error('Could not load our swarm')
}
}
return this.ourSwarmNodes;
}
async getSwarmNodesByPubkey(pubKey) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey);
// TODO: Check if swarm list is below a threshold rather than empty
if (swarmNodes && swarmNodes.length !== 0) {
return swarmNodes;
}
return this.replenishSwarm(pubKey);
}
async replenishSwarm(pubKey) {
const conversation = window.ConversationController.get(pubKey);
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => {
const newSwarmNodes = await this.getSwarmNodes(pubKey);
conversation.set({ swarmNodes: newSwarmNodes });
await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, {
Conversation: Whisper.Conversation,
});
resolve(newSwarmNodes);
});
}
const newSwarmNodes = await this.swarmsPendingReplenish[pubKey];
delete this.swarmsPendingReplenish[pubKey];
return newSwarmNodes;
}
async getSwarmNodes(pubKey) {
const node = await this.getRandomSnodeAddress();
const options = {
url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: undefined,
};
const body = {
jsonrpc: '2.0',
id: '0',
method: 'get_swarm_list_for_messenger_pubkey',
params: {
pubkey: pubKey,
},
}
const fetchOptions = {
method: options.type,
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
},
timeout: options.timeout,
};
let response;
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(options.type, options.url, 0, `Error getting swarm nodes for ${pubKey}`);
throw HTTPError('fetch error', 0, e.toString());
}
let result;
if (
options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json'
) {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
}
if (response.status >= 0 && response.status < 400) {
return result.nodes;
}
log.error(options.type, options.url, response.status, `Error getting swarm nodes for ${pubKey}`);
throw HTTPError('sendMessage: error response', response.status, result);
}
}
function HTTPError(message, providedCode, response, stack) {
const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode;
const e = new Error(`${message}; code: ${code}`);
e.name = 'HTTPError';
e.code = code;
if (stack) {
e.stack += `\nOriginal stack:\n${stack}`;
}
if (response) {
e.response = response;
}
return e;
}
module.exports = {
LokiSnodeAPI,
};

@ -1,4 +1,4 @@
/* global window, dcodeIO, textsecure, StringView */ /* global window, dcodeIO, textsecure */
// eslint-disable-next-line func-names // eslint-disable-next-line func-names
(function () { (function () {
@ -62,26 +62,8 @@
}; };
let connected = false; let connected = false;
this.startPolling = async function pollServer(callBack) { const processMessages = async messages => {
const myKeys = await textsecure.storage.protocol.getIdentityKeyPair(); const newMessages = await filterIncomingMessages(messages);
const pubKey = StringView.arrayBufferToHex(myKeys.pubKey)
let result;
try {
result = await server.retrieveMessages(pubKey);
connected = true;
} catch (err) {
connected = false;
setTimeout(() => { pollServer(callBack); }, pollTime);
return;
}
if (typeof callBack === 'function') {
callBack(connected);
}
if (!result.messages) {
setTimeout(() => { pollServer(callBack); }, pollTime);
return;
}
const newMessages = await filterIncomingMessages(result.messages);
newMessages.forEach(async message => { newMessages.forEach(async message => {
const { data } = message; const { data } = message;
const dataPlaintext = stringToArrayBufferBase64(data); const dataPlaintext = stringToArrayBufferBase64(data);
@ -97,7 +79,17 @@
); );
} }
}); });
setTimeout(() => { pollServer(callBack); }, pollTime); }
this.startPolling = async function pollServer(callback) {
try {
await server.retrieveMessages(processMessages);
connected = true;
} catch (err) {
connected = false;
}
callback(connected);
setTimeout(() => { pollServer(callback); }, pollTime);
}; };
this.isConnected = function isConnected() { this.isConnected = function isConnected() {

@ -22,7 +22,7 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
this.signalingKey = signalingKey; this.signalingKey = signalingKey;
this.username = username; this.username = username;
this.password = password; this.password = password;
this.lokiserver = window.LokiAPI; this.lokiMessageAPI = window.LokiMessageAPI;
if (!options.serverTrustRoot) { if (!options.serverTrustRoot) {
throw new Error('Server trust root is required!'); throw new Error('Server trust root is required!');
@ -67,7 +67,7 @@ MessageReceiver.prototype.extend({
} }
this.hasConnected = true; this.hasConnected = true;
this.httpPollingResource = new HttpResource(this.lokiserver, { this.httpPollingResource = new HttpResource(this.lokiMessageAPI, {
handleRequest: this.handleRequest.bind(this), handleRequest: this.handleRequest.bind(this),
}); });
this.httpPollingResource.startPolling((connected) => { this.httpPollingResource.startPolling((connected) => {

@ -34,7 +34,7 @@ function OutgoingMessage(
this.callback = callback; this.callback = callback;
this.silent = silent; this.silent = silent;
this.lokiserver = window.LokiAPI; this.lokiMessageAPI = window.LokiMessageAPI;
this.numbersCompleted = 0; this.numbersCompleted = 0;
this.errors = []; this.errors = [];
@ -184,7 +184,7 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number; const pubKey = number;
try { try {
const result = await this.lokiserver.sendMessage(pubKey, data, timestamp, ttl); const result = await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl);
return result; return result;
} catch (e) { } catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {

@ -265,14 +265,20 @@ window.WebAPI = initializeWebAPI({
proxyUrl: config.proxyUrl, proxyUrl: config.proxyUrl,
}); });
const { LokiServer } = require('./js/modules/loki_message_api'); const { LokiSnodeAPI } = require('./js/modules/loki_snode_api');
window.LokiAPI = new LokiServer({ window.LokiSnodeAPI = new LokiSnodeAPI({
urls: [config.serverUrl], url: config.serverUrl,
messageServerPort: config.messageServerPort,
swarmServerPort: config.swarmServerPort, swarmServerPort: config.swarmServerPort,
}); });
const { LokiMessageAPI } = require('./js/modules/loki_message_api');
window.LokiMessageAPI = new LokiMessageAPI({
url: config.serverUrl,
messageServerPort: config.messageServerPort,
});
window.mnemonic = require('./libloki/mnemonic'); window.mnemonic = require('./libloki/mnemonic');
const { WorkerInterface } = require('./js/modules/util_worker_interface'); const { WorkerInterface } = require('./js/modules/util_worker_interface');

Loading…
Cancel
Save