Merge pull request #258 from BeaudanBrown/storage-swarm

Storage swarm
pull/261/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit ff452cd7f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,8 +3,7 @@
"localUrl": "localhost.loki",
"cdnUrl": "random.snode",
"localServerPort": "8081",
"messageServerPort": "8080",
"swarmServerPort": "8079",
"snodeServerPort": "8080",
"disableAutoUpdate": false,
"openDevTools": false,
"buildExpiration": 0,

@ -1,121 +1,16 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, libloki */
/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, textsecure */
const nodeFetch = require('node-fetch');
const _ = require('lodash');
const { parse } = require('url');
const endpointBase = '/v1/storage_rpc';
const LOKI_EPHEMKEY_HEADER = 'X-Loki-EphemKey';
class HTTPError extends Error {
constructor(response) {
super(response.statusText);
this.name = 'HTTPError';
this.response = response;
}
}
class NotFoundError extends Error {
constructor() {
super('ENOTFOUND');
this.name = 'NotFoundError';
}
}
// A small wrapper around node-fetch which deserializes response
const fetch = async (url, options = {}) => {
const timeout = options.timeout || 10000;
const method = options.method || 'GET';
const address = parse(url).hostname;
const doEncryptChannel = address.endsWith('.snode');
if (doEncryptChannel) {
try {
// eslint-disable-next-line no-param-reassign
options.body = await libloki.crypto.snodeCipher.encrypt(
address,
options.body
);
// eslint-disable-next-line no-param-reassign
options.headers = {
...options.headers,
'Content-Type': 'text/plain',
[LOKI_EPHEMKEY_HEADER]: libloki.crypto.snodeCipher.getChannelPublicKeyHex(),
};
} catch (e) {
log.warn(`Could not encrypt channel for ${address}: `, e);
}
}
try {
const response = await nodeFetch(url, {
...options,
timeout,
method,
});
if (!response.ok) {
throw new HTTPError(response);
}
let result;
if (response.headers.get('Content-Type') === 'application/json') {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
if (doEncryptChannel) {
try {
result = await libloki.crypto.snodeCipher.decrypt(address, result);
} catch (e) {
log.warn(`Could not decrypt response from ${address}`, e);
}
try {
result = JSON.parse(result);
} catch (e) {
log.warn(`Could not parse string to json ${result}`, e);
}
}
}
return result;
} catch (e) {
if (e.code === 'ENOTFOUND') {
throw new NotFoundError();
}
throw e;
}
};
// Wrapper for a JSON RPC request
const rpc = (address, port, method, params, options = {}) => {
const headers = options.headers || {};
const url = `${address}${port}${endpointBase}`;
const body = {
method,
params,
};
const fetchOptions = {
method: 'POST',
...options,
body: JSON.stringify(body),
headers,
};
return fetch(url, fetchOptions);
};
const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
class LokiMessageAPI {
constructor({ messageServerPort }) {
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
}
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
@ -195,15 +90,19 @@ class LokiMessageAPI {
};
try {
await rpc(nodeUrl, this.messageServerPort, 'store', params);
await rpc(`http://${nodeUrl}`, this.snodeServerPort, 'store', params);
nodeComplete(nodeUrl);
successfulRequests += 1;
} catch (e) {
log.warn('Loki send message:', e);
if (e instanceof NotFoundError) {
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(pubKey, newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
@ -270,29 +169,34 @@ class LokiMessageAPI {
const doRequest = async (nodeUrl, nodeData) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash,
lastHash: nodeData.lastHash || '',
};
try {
const result = await rpc(
nodeUrl,
this.messageServerPort,
`http://${nodeUrl}`,
this.snodeServerPort,
'retrieve',
params
);
nodeComplete(nodeUrl);
if (result.lastHash) {
lokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
if (Array.isArray(result.messages) && result.messages.length) {
const lastHash = _.last(result.messages).hash;
lokiSnodeAPI.updateLastHash(nodeUrl, lastHash);
callback(result.messages);
}
successfulRequests += 1;
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof NotFoundError) {
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
} else if (e instanceof textsecure.HTTPError) {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {

@ -0,0 +1,116 @@
/* global log, libloki, textsecure */
const nodeFetch = require('node-fetch');
const { parse } = require('url');
const LOKI_EPHEMKEY_HEADER = 'X-Loki-EphemKey';
const endpointBase = '/v1/storage_rpc';
// A small wrapper around node-fetch which deserializes response
const fetch = async (url, options = {}) => {
const timeout = options.timeout || 10000;
const method = options.method || 'GET';
const address = parse(url).hostname;
const doEncryptChannel = address.endsWith('.snode');
if (doEncryptChannel) {
try {
// eslint-disable-next-line no-param-reassign
options.body = await libloki.crypto.snodeCipher.encrypt(
address,
options.body
);
// eslint-disable-next-line no-param-reassign
options.headers = {
...options.headers,
'Content-Type': 'text/plain',
[LOKI_EPHEMKEY_HEADER]: libloki.crypto.snodeCipher.getChannelPublicKeyHex(),
};
} catch (e) {
log.warn(`Could not encrypt channel for ${address}: `, e);
}
}
try {
const response = await nodeFetch(url, {
...options,
timeout,
method,
});
if (response.status === 421) {
let newSwarm = await response.text();
if (doEncryptChannel) {
try {
newSwarm = await libloki.crypto.snodeCipher.decrypt(
address,
newSwarm
);
} catch (e) {
log.warn(`Could not decrypt response from ${address}`, e);
}
try {
newSwarm = newSwarm === '' ? {} : JSON.parse(newSwarm);
} catch (e) {
log.warn(`Could not parse string to json ${newSwarm}`, e);
}
}
throw new textsecure.WrongSwarmError(newSwarm);
}
if (!response.ok) {
throw new textsecure.HTTPError('Loki_rpc error', response);
}
let result;
if (response.headers.get('Content-Type') === 'application/json') {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
if (doEncryptChannel) {
try {
result = await libloki.crypto.snodeCipher.decrypt(address, result);
} catch (e) {
log.warn(`Could not decrypt response from ${address}`, e);
}
try {
result = result === '' ? {} : JSON.parse(result);
} catch (e) {
log.warn(`Could not parse string to json ${result}`, e);
}
}
}
return result;
} catch (e) {
if (e.code === 'ENOTFOUND') {
throw new textsecure.NotFoundError('Failed to resolve address', e);
}
throw e;
}
};
// Wrapper for a JSON RPC request
const rpc = (address, port, method, params, options = {}) => {
const headers = options.headers || {};
const url = `${address}${port}${endpointBase}`;
const body = {
method,
params,
};
const fetchOptions = {
method: 'POST',
...options,
body: JSON.stringify(body),
headers,
};
return fetch(url, fetchOptions);
};
module.exports = {
rpc,
};

@ -1,10 +1,10 @@
/* eslint-disable class-methods-use-this */
/* global window, ConversationController */
const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
const dns = require('dns');
const process = require('process');
const { rpc } = require('./loki_rpc');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
@ -33,13 +33,13 @@ const resolveCname = url =>
});
class LokiSnodeAPI {
constructor({ serverUrl, localUrl, swarmServerPort }) {
constructor({ serverUrl, localUrl, snodeServerPort }) {
if (!is.string(serverUrl)) {
throw new Error('WebAPI.initialize: Invalid server url');
}
this.serverUrl = serverUrl;
this.localUrl = localUrl;
this.swarmServerPort = swarmServerPort ? `:${swarmServerPort}` : '';
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
this.contactSwarmNodes = {};
@ -139,6 +139,15 @@ class LokiSnodeAPI {
}
}
updateOurSwarmNodes(newNodes) {
this.ourSwarmNodes = {};
newNodes.forEach(url => {
this.ourSwarmNodes[url] = {
failureCount: 0,
};
});
}
async getOurSwarmNodes() {
if (
!this.ourSwarmNodes ||
@ -183,65 +192,17 @@ class LokiSnodeAPI {
async getSwarmNodes(pubKey) {
// TODO: Hit multiple random nodes and merge lists?
const node = await this.getRandomSnodeAddress();
// TODO: Confirm final API URL and sensible timeout
const options = {
url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: 10000,
};
const body = {
jsonrpc: '2.0',
id: '0',
method: 'get_swarm_list_for_messenger_pubkey',
params: {
pubkey: pubKey,
},
};
const fetchOptions = {
method: options.type,
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
},
timeout: options.timeout,
};
let response;
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
throw new window.textsecure.EmptySwarmError(
pubKey,
'Could not retrieve swarm nodes'
);
}
const nodeUrl = await this.getRandomSnodeAddress();
let result;
if (
options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json'
) {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
}
// TODO: Handle wrong swarm error from snode
if (!response.ok || !result.nodes || result.nodes === []) {
throw new window.textsecure.EmptySwarmError(
const result = await rpc(
`http://${nodeUrl}`,
this.snodeServerPort,
'get_snodes_for_pubkey',
{
pubKey,
'Could not retrieve swarm nodes'
);
}
return result.nodes;
}
);
return result.snodes;
}
}

@ -179,6 +179,49 @@
appendStack(this, resolutionError);
}
function NotFoundError(message, error) {
this.name = 'NotFoundError';
this.message = message;
this.error = error;
Error.call(this, message);
// Maintains proper stack trace, where our error was thrown (only available on V8)
// via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
if (Error.captureStackTrace) {
Error.captureStackTrace(this);
}
appendStack(this, error);
}
function HTTPError(message, response) {
this.name = 'HTTPError';
this.message = `${response.status} Error: ${message}`;
this.response = response;
Error.call(this, message);
// Maintains proper stack trace, where our error was thrown (only available on V8)
// via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
if (Error.captureStackTrace) {
Error.captureStackTrace(this);
}
}
function WrongSwarmError(newSwarm) {
this.name = 'WrongSwarmError';
this.newSwarm = newSwarm;
Error.call(this, this.name);
// Maintains proper stack trace, where our error was thrown (only available on V8)
// via https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
if (Error.captureStackTrace) {
Error.captureStackTrace(this);
}
}
window.textsecure.UnregisteredUserError = UnregisteredUserError;
window.textsecure.SendMessageNetworkError = SendMessageNetworkError;
window.textsecure.IncomingIdentityKeyError = IncomingIdentityKeyError;
@ -191,4 +234,7 @@
window.textsecure.EmptySwarmError = EmptySwarmError;
window.textsecure.DNSResolutionError = DNSResolutionError;
window.textsecure.LokiIpError = LokiIpError;
window.textsecure.HTTPError = HTTPError;
window.textsecure.NotFoundError = NotFoundError;
window.textsecure.WrongSwarmError = WrongSwarmError;
})();

@ -147,8 +147,7 @@ function prepareURL(pathSegments, moreKeys) {
serverUrl: config.get('serverUrl'),
localUrl: config.get('localUrl'),
cdnUrl: config.get('cdnUrl'),
messageServerPort: config.get('messageServerPort'),
swarmServerPort: config.get('swarmServerPort'),
snodeServerPort: config.get('snodeServerPort'),
localServerPort: config.get('localServerPort'),
certificateAuthority: config.get('certificateAuthority'),
environment: config.environment,

@ -292,7 +292,7 @@ const LokiSnodeAPI = require('./js/modules/loki_snode_api');
window.lokiSnodeAPI = new LokiSnodeAPI({
serverUrl: config.serverUrl,
localUrl: config.localUrl,
swarmServerPort: config.swarmServerPort,
snodeServerPort: config.snodeServerPort,
});
window.LokiP2pAPI = require('./js/modules/loki_p2p_api');
@ -301,7 +301,7 @@ const LokiMessageAPI = require('./js/modules/loki_message_api');
window.lokiMessageAPI = new LokiMessageAPI({
url: config.serverUrl,
messageServerPort: config.messageServerPort,
snodeServerPort: config.snodeServerPort,
});
const LocalLokiServer = require('./libloki/modules/local_loki_server');

Loading…
Cancel
Save