Merge pull request #988 from neuroscr/patchopensnapps

snode communication and logging clean up #2 & AppDotNot minor refactor
pull/993/head
Vince 5 years ago committed by GitHub
commit 78ba6f03e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -14,6 +14,13 @@ const PUBLICCHAT_DELETION_POLL_EVERY = 5 * 1000; // 5s
const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s
const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s
const FILESERVER_HOSTS = [
'file-dev.lokinet.org',
'file.lokinet.org',
'file-dev.getsession.org',
'file.getsession.org',
];
const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver'; const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver';
const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar'; const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar';
const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings'; const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings';
@ -25,6 +32,288 @@ const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false, rejectUnauthorized: false,
}); });
const sendToProxy = async (
srvPubKey,
endpoint,
pFetchOptions,
options = {}
) => {
if (!srvPubKey) {
log.error(
'loki_app_dot_net: sendToProxy called without a server public key'
);
return {};
}
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
srvPubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = '0'
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
const randomPoolRemainingCount = lokiSnodeAPI.markRandomNodeUnreachable(
randSnode
);
log.warn(
`loki_app_dot_net: Marking random snode bad, internet address ${
randSnode.ip
}:${
randSnode.port
}. ${randomPoolRemainingCount} snodes remaining in randomPool`
);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return sendToProxy(srvPubKey, endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint,
'on',
url
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint,
'on',
url
);
}
} else {
log.warn(
'loki_app_dot_net: file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
};
const serverRequest = async (endpoint, options = {}) => {
const {
params = {},
method,
rawBody,
objBody,
token,
srvPubKey,
forceFreshToken = false,
} = options;
const url = new URL(endpoint);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (token) {
headers.Authorization = `Bearer ${token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.host.match(/\.loki$/i)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
}
let response;
let result;
let txtResponse;
let mode = 'nodeFetch';
try {
const host = url.host.toLowerCase();
// log.info('host', host, FILESERVER_HOSTS);
if (
window.lokiFeatureFlags.useSnodeProxy &&
FILESERVER_HOSTS.includes(host)
) {
mode = 'sendToProxy';
// strip trailing slash
const search = url.search ? `?${url.search}` : '';
const endpointWithQS = `${url.pathname}${search}`.replace(/^\//, '');
// log.info('endpointWithQS', endpointWithQS)
({ response, txtResponse, result } = await sendToProxy(
srvPubKey,
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i)
? '0'
: '1';
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
txtResponse = await result.text();
// cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// retry with forcing a fresh token
return this.serverRequest(endpoint, {
...options,
forceFreshToken: true,
});
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
};
// the core ADN class that handles all communication with a specific server // the core ADN class that handles all communication with a specific server
class LokiAppDotNetServerAPI { class LokiAppDotNetServerAPI {
constructor(ourKey, url) { constructor(ourKey, url) {
@ -394,276 +683,21 @@ class LokiAppDotNetServerAPI {
if (urlStr.match(/\.loki\//)) { if (urlStr.match(/\.loki\//)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} }
const result = await nodeFetch(urlObj, fetchOptions, options); const result = nodeFetch(urlObj, fetchOptions, options);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
return result; return result;
} }
async _sendToProxy(endpoint, pFetchOptions, options = {}) {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
log.info('detected body is a stream');
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
this.pubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = 0
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
log.warn(
`Marking random snode bad, internet address ${randSnode.ip}:${
randSnode.port
}`
);
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return this._sendToProxy(endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`_sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`_sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint
);
}
} else {
log.warn(
'file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
}
// make a request to the server // make a request to the server
async serverRequest(endpoint, options = {}) { async serverRequest(endpoint, options = {}) {
const { if (options.forceFreshToken) {
params = {}, await this.getOrRefreshServerToken(true);
method,
rawBody,
objBody,
forceFreshToken = false,
} = options;
const url = new URL(`${this.baseServerUrl}/${endpoint}`);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (forceFreshToken) {
await this.getOrRefreshServerToken(true);
}
if (this.token) {
headers.Authorization = `Bearer ${this.token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.toString().match(/\.loki\//)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
} }
return serverRequest(`${this.baseServerUrl}/${endpoint}`, {
let response; ...options,
let result; token: this.token,
let txtResponse; srvPubKey: this.pubKey,
let mode = 'nodeFetch'; });
try {
if (
window.lokiFeatureFlags.useSnodeProxy &&
(this.baseServerUrl === 'https://file-dev.lokinet.org' ||
this.baseServerUrl === 'https://file.lokinet.org' ||
this.baseServerUrl === 'https://file-dev.getsession.org' ||
this.baseServerUrl === 'https://file.getsession.org')
) {
mode = '_sendToProxy';
const endpointWithQS = url
.toString()
.replace(`${this.baseServerUrl}/`, '');
({ response, txtResponse, result } = await this._sendToProxy(
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
if (url.toString().match(/\.loki/)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
}
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
txtResponse = await result.text();
// hrm cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// copy options because lint complains if we modify this directly
const updatedOptions = options;
// force it this time
updatedOptions.forceFreshToken = true;
// retry with updated options
return this.serverRequest(endpoint, updatedOptions);
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
} }
async getUserAnnotations(pubKey) { async getUserAnnotations(pubKey) {
@ -946,6 +980,8 @@ class LokiPublicChannelAPI {
this.deleteLastId = 1; this.deleteLastId = 1;
this.timers = {}; this.timers = {};
this.myPrivateKey = false; this.myPrivateKey = false;
this.messagesPollLock = false;
// can escalated to SQL if it start uses too much memory // can escalated to SQL if it start uses too much memory
this.logMop = {}; this.logMop = {};
@ -1322,7 +1358,6 @@ class LokiPublicChannelAPI {
Conversation: Whisper.Conversation, Conversation: Whisper.Conversation,
} }
); );
await this.pollForChannelOnce();
} }
// get moderation actions // get moderation actions
@ -1523,6 +1558,20 @@ class LokiPublicChannelAPI {
} }
async pollOnceForMessages() { async pollOnceForMessages() {
if (this.messagesPollLock) {
// TODO: check if lock is stale
log.warn(
'pollOnceForModerators locked',
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
return;
}
// disable locking system for now as it's not quite perfect yet
// this.messagesPollLock = Date.now();
const params = { const params = {
include_annotations: 1, include_annotations: 1,
include_user_annotations: 1, // to get the home server include_user_annotations: 1, // to get the home server
@ -1551,6 +1600,7 @@ class LokiPublicChannelAPI {
if (res.err) { if (res.err) {
log.error('pollOnceForMessages receive error', res.err); log.error('pollOnceForMessages receive error', res.err);
} }
this.messagesPollLock = false;
return; return;
} }
@ -1706,6 +1756,7 @@ class LokiPublicChannelAPI {
// do we really need this? // do we really need this?
if (!pendingMessages.length) { if (!pendingMessages.length) {
this.conversation.setLastRetrievedMessage(this.lastGot); this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
return; return;
} }
@ -1755,7 +1806,14 @@ class LokiPublicChannelAPI {
); );
sendNow.forEach(message => { sendNow.forEach(message => {
// send them out now // send them out now
log.info('emitting primary message', message.serverId); log.info(
'emitting primary message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', { this.chatAPI.emit('publicMessage', {
message, message,
}); });
@ -1832,7 +1890,14 @@ class LokiPublicChannelAPI {
return; return;
} }
} }
log.info('emitting pending message', message.serverId); log.info(
'emitting pending message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', { this.chatAPI.emit('publicMessage', {
message, message,
}); });
@ -1854,6 +1919,7 @@ class LokiPublicChannelAPI {
// finally update our position // finally update our position
this.conversation.setLastRetrievedMessage(this.lastGot); this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
} }
static getPreviewFromAnnotation(annotation) { static getPreviewFromAnnotation(annotation) {

@ -94,6 +94,7 @@ class LokiMessageAPI {
await this.refreshSendingSwarm(pubKey, timestamp); await this.refreshSendingSwarm(pubKey, timestamp);
} }
// send parameters
const params = { const params = {
pubKey, pubKey,
ttl: ttl.toString(), ttl: ttl.toString(),
@ -104,7 +105,7 @@ class LokiMessageAPI {
const promises = []; const promises = [];
let completedConnections = 0; let completedConnections = 0;
for (let i = 0; i < numConnections; i += 1) { for (let i = 0; i < numConnections; i += 1) {
const connectionPromise = this.openSendConnection(params).finally(() => { const connectionPromise = this._openSendConnection(params).finally(() => {
completedConnections += 1; completedConnections += 1;
if (completedConnections >= numConnections) { if (completedConnections >= numConnections) {
delete this.sendingData[timestamp]; delete this.sendingData[timestamp];
@ -151,7 +152,9 @@ class LokiMessageAPI {
'Ran out of swarm nodes to query' 'Ran out of swarm nodes to query'
); );
} }
log.info(`Successful storage message to ${pubKey}`); log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey}`
);
} }
async refreshSendingSwarm(pubKey, timestamp) { async refreshSendingSwarm(pubKey, timestamp) {
@ -162,16 +165,11 @@ class LokiMessageAPI {
return true; return true;
} }
async openSendConnection(params) { async _openSendConnection(params) {
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) { while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift(); const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP // TODO: Revert back to using snode address instead of IP
const successfulSend = await this.sendToNode( const successfulSend = await this._sendToNode(snode, params);
snode.ip,
snode.port,
snode,
params
);
if (successfulSend) { if (successfulSend) {
return true; return true;
} }
@ -189,19 +187,19 @@ class LokiMessageAPI {
} }
await this.sendingData[params.timestamp].refreshPromise; await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again // Retry with a fresh list again
return this.openSendConnection(params); return this._openSendConnection(params);
} }
return false; return false;
} }
async sendToNode(address, port, targetNode, params) { async _sendToNode(targetNode, params) {
let successiveFailures = 0; let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) { while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
await sleepFor(successiveFailures * 500); await sleepFor(successiveFailures * 500);
try { try {
const result = await lokiRpc( const result = await lokiRpc(
`https://${address}`, `https://${targetNode.ip}`,
port, targetNode.port,
'store', 'store',
params, params,
{}, {},
@ -211,17 +209,21 @@ class LokiMessageAPI {
// Make sure we aren't doing too much PoW // Make sure we aren't doing too much PoW
const currentDifficulty = window.storage.get('PoWDifficulty', null); const currentDifficulty = window.storage.get('PoWDifficulty', null);
const newDifficulty = result.difficulty; if (
if (newDifficulty != null && newDifficulty !== currentDifficulty) { result &&
window.storage.put('PoWDifficulty', newDifficulty); result.difficulty &&
result.difficulty !== currentDifficulty
) {
window.storage.put('PoWDifficulty', result.difficulty);
// should we return false?
} }
return true; return true;
} catch (e) { } catch (e) {
log.warn( log.warn(
'Loki send message error:', 'loki_message:::_sendToNode - send error:',
e.code, e.code,
e.message, e.message,
`from ${address}` `destination ${targetNode.ip}:${targetNode.port}`
); );
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
@ -238,26 +240,35 @@ class LokiMessageAPI {
} else if (e instanceof textsecure.NotFoundError) { } else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error // TODO: Handle resolution error
} else if (e instanceof textsecure.TimestampError) { } else if (e instanceof textsecure.TimestampError) {
log.warn('Timestamp is invalid'); log.warn('loki_message:::_sendToNode - Timestamp is invalid');
throw e; throw e;
} else if (e instanceof textsecure.HTTPError) { } else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response // TODO: Handle working connection but error response
const body = await e.response.text(); const body = await e.response.text();
log.warn('HTTPError body:', body); log.warn('loki_message:::_sendToNode - HTTPError body:', body);
} }
successiveFailures += 1; successiveFailures += 1;
} }
} }
log.error(`Failed to send to node: ${address}`); const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
await lokiSnodeAPI.unreachableNode(params.pubKey, address); params.pubKey,
targetNode
);
log.error(
`loki_message:::_sendToNode - Too many successive failures trying to send to node ${
targetNode.ip
}:${targetNode.port}, ${remainingSwarmSnodes.lengt} remaining swarm nodes`
);
return false; return false;
} }
async openRetrieveConnection(stopPollingPromise, callback) { async _openRetrieveConnection(stopPollingPromise, callback) {
let stopPollingResult = false; let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes // When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then // http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created // want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then // eslint-disable-next-line more/no-then
stopPollingPromise.then(result => { stopPollingPromise.then(result => {
stopPollingResult = result; stopPollingResult = result;
@ -274,9 +285,13 @@ class LokiMessageAPI {
) { ) {
await sleepFor(successiveFailures * 1000); await sleepFor(successiveFailures * 1000);
// TODO: Revert back to using snode address instead of IP
try { try {
// TODO: Revert back to using snode address instead of IP // in general, I think we want exceptions to bubble up
let messages = await this.retrieveNextMessages(nodeData.ip, nodeData); // so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await this._retrieveNextMessages(nodeData);
// this only tracks retrieval failures // this only tracks retrieval failures
// won't include parsing failures... // won't include parsing failures...
successiveFailures = 0; successiveFailures = 0;
@ -296,7 +311,7 @@ class LokiMessageAPI {
callback(messages); callback(messages);
} catch (e) { } catch (e) {
log.warn( log.warn(
'Loki retrieve messages error:', 'loki_message:::_openRetrieveConnection - retrieve error:',
e.code, e.code,
e.message, e.message,
`on ${nodeData.ip}:${nodeData.port}` `on ${nodeData.ip}:${nodeData.port}`
@ -324,40 +339,49 @@ class LokiMessageAPI {
} }
} }
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
this.ourKey,
nodeData
);
log.warn( log.warn(
`removing ${nodeData.ip}:${ `loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData.port nodeData.ip
} from our swarm pool. We have ${ }:${nodeData.port} from our swarm pool. We have ${
Object.keys(this.ourSwarmNodes).length Object.keys(this.ourSwarmNodes).length
} usable swarm nodes left` } usable swarm nodes left (${
remainingSwarmSnodes.length
} in local db)`
); );
await lokiSnodeAPI.unreachableNode(this.ourKey, address);
} }
} }
// if not stopPollingResult // if not stopPollingResult
if (_.isEmpty(this.ourSwarmNodes)) { if (_.isEmpty(this.ourSwarmNodes)) {
log.error( log.error(
'We no longer have any swarm nodes available to try in pool, closing retrieve connection' 'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
); );
return false; return false;
} }
return true; return true;
} }
async retrieveNextMessages(nodeUrl, nodeData) { // we don't throw or catch here
// mark private (_ prefix) since no error handling is done here...
async _retrieveNextMessages(nodeData) {
const params = { const params = {
pubKey: this.ourKey, pubKey: this.ourKey,
lastHash: nodeData.lastHash || '', lastHash: nodeData.lastHash || '',
}; };
const options = { const options = {
timeout: 40000, timeout: 40000,
ourPubKey: this.ourKey,
headers: { headers: {
[LOKI_LONGPOLL_HEADER]: true, [LOKI_LONGPOLL_HEADER]: true,
}, },
}; };
// let exceptions bubble up
const result = await lokiRpc( const result = await lokiRpc(
`https://${nodeUrl}`, `https://${nodeData.ip}`,
nodeData.port, nodeData.port,
'retrieve', 'retrieve',
params, params,
@ -365,34 +389,39 @@ class LokiMessageAPI {
'/storage_rpc/v1', '/storage_rpc/v1',
nodeData nodeData
); );
return result.messages || []; return result.messages || [];
} }
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, callback) { async startLongPolling(numConnections, stopPolling, callback) {
log.info('startLongPolling for', numConnections, 'connections');
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey); let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey);
log.info('swarmNodes', nodes.length, 'for', this.ourKey);
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`${j} ${node.ip}:${node.port}`);
});
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
log.warn( log.warn(
'Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain' 'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
); );
// load from blockchain
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey); nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) { if (nodes.length < numConnections) {
log.error( log.error(
'Could not get enough SwarmNodes for our pubkey from blockchain' 'loki_message:::startLongPolling - Could not get enough SwarmNodes for our pubkey from blockchain'
); );
} }
} }
log.info( log.info(
`There are currently ${ 'loki_message:::startLongPolling - start polling for',
nodes.length numConnections,
} swarmNodes for pubKey in our local database` 'connections. We have swarmNodes',
nodes.length,
'for',
this.ourKey
); );
Object.keys(nodes).forEach(j => {
const node = nodes[j];
log.info(`loki_message: ${j} ${node.ip}:${node.port}`);
});
for (let i = 0; i < nodes.length; i += 1) { for (let i = 0; i < nodes.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode( const lastHash = await window.Signal.Data.getLastHashBySnode(
@ -406,15 +435,28 @@ class LokiMessageAPI {
const promises = []; const promises = [];
let unresolved = numConnections;
for (let i = 0; i < numConnections; i += 1) { for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openRetrieveConnection(stopPolling, callback)); promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(stopPolling, callback).then(() => {
unresolved -= 1;
log.info(
'loki_message:::startLongPolling - There are',
unresolved,
'open retrieve connections left'
);
})
);
} }
// blocks until numConnections snodes in our swarms have been removed from the list // blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13 // less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet) // or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises); await Promise.all(promises);
log.error('All our long poll swarm connections have been removed'); log.error(
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
);
// should we just call ourself again? // should we just call ourself again?
// no, our caller already handles this... // no, our caller already handles this...
} }

@ -1,4 +1,4 @@
/* global log, window, process */ /* global log, window, process, URL */
const EventEmitter = require('events'); const EventEmitter = require('events');
const nodeFetch = require('node-fetch'); const nodeFetch = require('node-fetch');
const LokiAppDotNetAPI = require('./loki_app_dot_net_api'); const LokiAppDotNetAPI = require('./loki_app_dot_net_api');
@ -27,16 +27,18 @@ class LokiPublicChatFactoryAPI extends EventEmitter {
static async validServer(serverUrl) { static async validServer(serverUrl) {
// test to make sure it's online (and maybe has a valid SSL cert) // test to make sure it's online (and maybe has a valid SSL cert)
try { try {
const url = new URL(serverUrl);
// allow .loki (may only need an agent but not sure // allow .loki (may only need an agent but not sure
// until we have a .loki to test with) // until we have a .loki to test with)
if (serverUrl.match(/\.loki$/)) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i)
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; ? '0'
} : '1';
// FIXME: use proxy when we have open groups that work with proxy
await nodeFetch(serverUrl); await nodeFetch(serverUrl);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
// const txt = await res.text(); // const txt = await res.text();
} catch (e) { } catch (e) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
log.warn(`failing to created ${serverUrl}`, e.code, e.message); log.warn(`failing to created ${serverUrl}`, e.code, e.message);
// bail out if not valid enough // bail out if not valid enough
return false; return false;

@ -29,7 +29,9 @@ const decryptResponse = async (response, address) => {
return {}; return {};
}; };
const sendToProxy = async (options = {}, targetNode) => { const timeoutDelay = ms => new Promise(resolve => setTimeout(resolve, ms));
const sendToProxy = async (options = {}, targetNode, retryNumber = 0) => {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
// Don't allow arbitrary URLs, only snodes and loki servers // Don't allow arbitrary URLs, only snodes and loki servers
@ -60,36 +62,101 @@ const sendToProxy = async (options = {}, targetNode) => {
'X-Sender-Public-Key': StringView.arrayBufferToHex(myKeys.pubKey), 'X-Sender-Public-Key': StringView.arrayBufferToHex(myKeys.pubKey),
'X-Target-Snode-Key': targetNode.pubkey_ed25519, 'X-Target-Snode-Key': targetNode.pubkey_ed25519,
}, },
agent: snodeHttpsAgent,
}; };
// we only proxy to snodes... // we only proxy to snodes...
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0;
const response = await nodeFetch(url, firstHopOptions); const response = await nodeFetch(url, firstHopOptions);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
if (response.status === 401) {
// decom or dereg
// remove
// but which the proxy or the target...
// we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
log.warn(
`lokiRpc sendToProxy`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`snode is decom or dereg: `,
ciphertext,
// `marking random snode bad ${randomPoolRemainingCount} remaining`
`Try #${retryNumber}`,
`removing randSnode leaving ${randomPoolRemainingCount} in the random pool`
);
// retry, just count it happening 5 times to be the target for now
return sendToProxy(options, targetNode, retryNumber + 1);
}
// detect SNode is not ready (not in swarm; not done syncing) // detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503) { if (response.status === 503 || response.status === 500) {
const ciphertext = await response.text(); const ciphertext = await response.text();
log.error( // we shouldn't do these,
`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, // it's seems to be not the random node that's always bad
ciphertext // but the target node
// we got a ton of randomPool nodes, let's just not worry about this one
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
const randomPoolRemainingCount = lokiSnodeAPI.getRandomPoolLength();
log.warn(
`lokiRpc sendToProxy`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`code ${response.status} error`,
ciphertext,
// `marking random snode bad ${randomPoolRemainingCount} remaining`
`Try #${retryNumber}`,
`removing randSnode leaving ${randomPoolRemainingCount} in the random pool`
); );
// mark as bad for this round (should give it some time and improve success rates) // mark as bad for this round (should give it some time and improve success rates)
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry for a new working snode // retry for a new working snode
return sendToProxy(options, targetNode); const pRetryNumber = retryNumber + 1;
if (pRetryNumber > 5) {
// it's likely a net problem or an actual problem on the target node
// lets mark the target node bad for now
// we'll just rotate it back in if it's a net problem
log.warn(`Failing ${targetNode.ip}:${targetNode.port} after 5 retries`);
if (options.ourPubKey) {
lokiSnodeAPI.unreachableNode(options.ourPubKey, targetNode);
}
return false;
}
// 500 burns through a node too fast,
// let's slow the retry to give it more time to recover
if (response.status === 500) {
await timeoutDelay(5000);
}
return sendToProxy(options, targetNode, pRetryNumber);
} }
/*
if (response.status === 500) {
// usually when the server returns nothing...
}
*/
// FIXME: handle nodeFetch errors/exceptions... // FIXME: handle nodeFetch errors/exceptions...
if (response.status !== 200) { if (response.status !== 200) {
// let us know we need to create handlers for new unhandled codes // let us know we need to create handlers for new unhandled codes
log.warn('lokiRpc sendToProxy fetch non-200 statusCode', response.status); log.warn(
'lokiRpc sendToProxy fetch non-200 statusCode',
response.status,
`from snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`
);
return false;
} }
const ciphertext = await response.text(); const ciphertext = await response.text();
if (!ciphertext) { if (!ciphertext) {
// avoid base64 decode failure // avoid base64 decode failure
log.warn('Server did not return any data for', options); // usually a 500 but not always
// could it be a timeout?
log.warn('Server did not return any data for', options, targetNode);
return false;
} }
let plaintext; let plaintext;
@ -112,7 +179,9 @@ const sendToProxy = async (options = {}, targetNode) => {
'lokiRpc sendToProxy decode error', 'lokiRpc sendToProxy decode error',
e.code, e.code,
e.message, e.message,
`from ${randSnode.ip}:${randSnode.port} ciphertext:`, `from ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
} ciphertext:`,
ciphertext ciphertext
); );
if (ciphertextBuffer) { if (ciphertextBuffer) {
@ -138,6 +207,15 @@ const sendToProxy = async (options = {}, targetNode) => {
} }
return false; return false;
}; };
if (retryNumber) {
log.info(
`lokiRpc sendToProxy request succeeded,`,
`snode ${randSnode.ip}:${randSnode.port} to ${targetNode.ip}:${
targetNode.port
}`,
`on retry #${retryNumber}`
);
}
return jsonRes; return jsonRes;
} catch (e) { } catch (e) {
log.error( log.error(
@ -182,22 +260,24 @@ const lokiFetch = async (url, options = {}, targetNode = null) => {
timeout, timeout,
method, method,
}; };
if (url.match(/https:\/\//)) {
fetchOptions.agent = snodeHttpsAgent;
}
try { try {
if (window.lokiFeatureFlags.useSnodeProxy && targetNode) { if (window.lokiFeatureFlags.useSnodeProxy && targetNode) {
const result = await sendToProxy(fetchOptions, targetNode); const result = await sendToProxy(fetchOptions, targetNode);
return result ? result.json() : false; // if not result, maybe we should throw??
return result ? result.json() : {};
} }
if (url.match(/https:\/\//)) { if (url.match(/https:\/\//)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0; // import that this does not get set in sendToProxy fetchOptions
fetchOptions.agent = snodeHttpsAgent;
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
} else {
log.info('lokiRpc http communication', url);
} }
const response = await nodeFetch(url, fetchOptions); const response = await nodeFetch(url, fetchOptions);
// restore TLS checking // restore TLS checking
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
let result; let result;
// Wrong swarm // Wrong swarm

@ -1,10 +1,12 @@
/* eslint-disable class-methods-use-this */ /* eslint-disable class-methods-use-this */
/* global window, ConversationController, _, log */ /* global window, ConversationController, _, log, clearTimeout */
const is = require('@sindresorhus/is'); const is = require('@sindresorhus/is');
const { lokiRpc } = require('./loki_rpc'); const { lokiRpc } = require('./loki_rpc');
const RANDOM_SNODES_TO_USE = 3; const RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM = 3;
const RANDOM_SNODES_POOL_SIZE = 1024;
const SEED_NODE_RETRIES = 3;
class LokiSnodeAPI { class LokiSnodeAPI {
constructor({ serverUrl, localUrl }) { constructor({ serverUrl, localUrl }) {
@ -15,13 +17,14 @@ class LokiSnodeAPI {
this.localUrl = localUrl; // localhost.loki this.localUrl = localUrl; // localhost.loki
this.randomSnodePool = []; this.randomSnodePool = [];
this.swarmsPendingReplenish = {}; this.swarmsPendingReplenish = {};
this.refreshRandomPoolPromise = false;
} }
async getRandomSnodeAddress() { async getRandomSnodeAddress() {
/* resolve random snode */ /* resolve random snode */
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
// allow exceptions to pass through upwards // allow exceptions to pass through upwards
await this.initialiseRandomPool(); await this.refreshRandomPool();
} }
if (this.randomSnodePool.length === 0) { if (this.randomSnodePool.length === 0) {
throw new window.textsecure.SeedNodeError('Invalid seed node response'); throw new window.textsecure.SeedNodeError('Invalid seed node response');
@ -31,74 +34,150 @@ class LokiSnodeAPI {
]; ];
} }
async initialiseRandomPool( async refreshRandomPool(seedNodes = [...window.seedNodeList]) {
seedNodes = [...window.seedNodeList], // if currently not in progress
consecutiveErrors = 0 if (this.refreshRandomPoolPromise === false) {
) { // set lock
const params = { this.refreshRandomPoolPromise = new Promise(async (resolve, reject) => {
limit: 20, let timeoutTimer = null;
active_only: true, // private retry container
fields: { const trySeedNode = async (consecutiveErrors = 0) => {
public_ip: true, const params = {
storage_port: true, limit: RANDOM_SNODES_POOL_SIZE,
pubkey_x25519: true, active_only: true,
pubkey_ed25519: true, fields: {
}, public_ip: true,
}; storage_port: true,
const seedNode = seedNodes.splice( pubkey_x25519: true,
Math.floor(Math.random() * seedNodes.length), pubkey_ed25519: true,
1 },
)[0]; };
let snodes = []; const seedNode = seedNodes.splice(
Math.floor(Math.random() * seedNodes.length),
1
)[0];
let snodes = [];
try {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshing random snode pool'
);
const response = await lokiRpc(
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
log.info(
'loki_snodes:::refreshRandomPoolPromise - Refreshed random snode pool with',
this.randomSnodePool.length,
'snodes'
);
// clear lock
this.refreshRandomPoolPromise = null;
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
resolve();
} catch (e) {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - error',
e.code,
e.message
);
if (consecutiveErrors < SEED_NODE_RETRIES) {
// retry after a possible delay
setTimeout(() => {
log.info(
'loki_snodes:::refreshRandomPoolPromise - Retrying initialising random snode pool, try #',
consecutiveErrors
);
trySeedNode(consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error(
'loki_snodes:::refreshRandomPoolPromise - Giving up trying to contact seed node'
);
if (snodes.length === 0) {
this.refreshRandomPoolPromise = null; // clear lock
if (timeoutTimer !== null) {
clearTimeout(timeoutTimer);
timeoutTimer = null;
}
reject();
}
}
}
};
const delay = (SEED_NODE_RETRIES + 1) * (SEED_NODE_RETRIES + 1) * 5000;
timeoutTimer = setTimeout(() => {
log.warn(
'loki_snodes:::refreshRandomPoolPromise - TIMEDOUT after',
delay,
's'
);
reject();
}, delay);
trySeedNode();
});
}
try { try {
const response = await lokiRpc( await this.refreshRandomPoolPromise;
`http://${seedNode.ip}`,
seedNode.port,
'get_n_service_nodes',
params,
{}, // Options
'/json_rpc' // Seed request endpoint
);
// Filter 0.0.0.0 nodes which haven't submitted uptime proofs
snodes = response.result.service_node_states.filter(
snode => snode.public_ip !== '0.0.0.0'
);
this.randomSnodePool = snodes.map(snode => ({
ip: snode.public_ip,
port: snode.storage_port,
pubkey_x25519: snode.pubkey_x25519,
pubkey_ed25519: snode.pubkey_ed25519,
}));
} catch (e) { } catch (e) {
log.warn('initialiseRandomPool error', e.code, e.message); // we will throw for each time initialiseRandomPool has been called in parallel
if (consecutiveErrors < 3) { log.error(
// retry after a possible delay 'loki_snodes:::refreshRandomPoolPromise - error',
setTimeout(() => { e.code,
log.info( e.message
'Retrying initialising random snode pool, try #', );
consecutiveErrors throw new window.textsecure.SeedNodeError('Failed to contact seed node');
);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
log.error('Giving up trying to contact seed node');
if (snodes.length === 0) {
throw new window.textsecure.SeedNodeError(
'Failed to contact seed node'
);
}
}
} }
log.info('loki_snodes:::refreshRandomPoolPromise - RESOLVED');
} }
// nodeUrl is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode // unreachableNode.url is like 9hrje1bymy7hu6nmtjme9idyu3rm8gr3mkstakjyuw1997t7w4ny.snode
async unreachableNode(pubKey, nodeUrl) { async unreachableNode(pubKey, unreachableNode) {
const conversation = ConversationController.get(pubKey); const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')]; const swarmNodes = [...conversation.get('swarmNodes')];
const filteredNodes = swarmNodes.filter( if (typeof unreachableNode === 'string') {
node => node.address !== nodeUrl && node.ip !== nodeUrl log.warn(
); 'loki_snodes:::unreachableNode - String passed as unreachableNode to unreachableNode'
);
return swarmNodes;
}
let found = false;
const filteredNodes = swarmNodes.filter(node => {
// keep all but thisNode
const thisNode =
node.address === unreachableNode.address &&
node.ip === unreachableNode.ip &&
node.port === unreachableNode.port;
if (thisNode) {
found = true;
}
return !thisNode;
});
if (!found) {
log.warn(
`loki_snodes:::unreachableNode - snode ${unreachableNode.ip}:${
unreachableNode.port
} has already been marked as bad`
);
}
await conversation.updateSwarmNodes(filteredNodes); await conversation.updateSwarmNodes(filteredNodes);
return filteredNodes;
} }
markRandomNodeUnreachable(snode) { markRandomNodeUnreachable(snode) {
@ -108,6 +187,10 @@ class LokiSnodeAPI {
); );
} }
getRandomPoolLength() {
return this.randomSnodePool.length;
}
async updateLastHash(snode, hash, expiresAt) { async updateLastHash(snode, hash, expiresAt) {
await window.Signal.Data.updateLastHash({ snode, hash, expiresAt }); await window.Signal.Data.updateLastHash({ snode, hash, expiresAt });
} }
@ -150,7 +233,11 @@ class LokiSnodeAPI {
try { try {
newSwarmNodes = await this.getSwarmNodes(pubKey); newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) { } catch (e) {
log.error('getFreshSwarmNodes error', e.code, e.message); log.error(
'loki_snodes:::getFreshSwarmNodes - error',
e.code,
e.message
);
// TODO: Handle these errors sensibly // TODO: Handle these errors sensibly
newSwarmNodes = []; newSwarmNodes = [];
} }
@ -177,30 +264,43 @@ class LokiSnodeAPI {
); );
if (!result) { if (!result) {
log.warn( log.warn(
`getSnodesForPubkey lokiRpc on ${snode.ip}:${ `loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port snode.port
} returned falsish value`, } returned falsish value`,
result result
); );
return []; return [];
} }
if (!result.snodes) {
// we hit this when snode gives 500s
log.warn(
`loki_snode:::getSnodesForPubkey - lokiRpc on ${snode.ip}:${
snode.port
} returned falsish value for snodes`,
result
);
return [];
}
const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0'); const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0');
return snodes; return snodes;
} catch (e) { } catch (e) {
this.markRandomNodeUnreachable(snode);
const randomPoolRemainingCount = this.getRandomPoolLength();
log.error( log.error(
'getSnodesForPubkey error', 'loki_snodes:::getSnodesForPubkey - error',
e.code, e.code,
e.message, e.message,
`for ${snode.ip}:${snode.port}` `for ${snode.ip}:${
snode.port
}. ${randomPoolRemainingCount} snodes remaining in randomPool`
); );
this.markRandomNodeUnreachable(snode);
return []; return [];
} }
} }
async getSwarmNodes(pubKey) { async getSwarmNodes(pubKey) {
const snodes = []; const snodes = [];
const questions = [...Array(RANDOM_SNODES_TO_USE).keys()]; const questions = [...Array(RANDOM_SNODES_TO_USE_FOR_PUBKEY_SWARM).keys()];
await Promise.all( await Promise.all(
questions.map(async () => { questions.map(async () => {
// allow exceptions to pass through upwards // allow exceptions to pass through upwards

@ -223,7 +223,7 @@
if (item) { if (item) {
return item.value; return item.value;
} }
window.log.error('Could not load identityKey from SignalData');
return undefined; return undefined;
}, },
async getLocalRegistrationId() { async getLocalRegistrationId() {

@ -84,6 +84,8 @@
}; };
this.pollServer = async () => { this.pollServer = async () => {
// bg.connect calls mr connect after storage system is ready
window.log.info('http-resource pollServer start');
// This blocking call will return only when all attempts // This blocking call will return only when all attempts
// at reaching snodes are exhausted or a DNS error occured // at reaching snodes are exhausted or a DNS error occured
try { try {
@ -93,25 +95,30 @@
messages => { messages => {
connected = true; connected = true;
messages.forEach(message => { messages.forEach(message => {
const { data } = message; this.handleMessage(message.data);
this.handleMessage(data);
}); });
} }
); );
} catch (e) { } catch (e) {
// we'll try again anyway // we'll try again anyway
window.log.error('http-resource pollServer error', e.code, e.message); window.log.error(
'http-resource pollServer error',
e.code,
e.message,
e.stack
);
} }
connected = false;
if (this.calledStop) { if (this.calledStop) {
// don't restart
return; return;
} }
connected = false;
// Exhausted all our snodes urls, trying again later from scratch // Exhausted all our snodes urls, trying again later from scratch
setTimeout(() => { setTimeout(() => {
window.log.info( window.log.info(
`Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY / `http-resource: Exhausted all our snodes urls, trying again in ${EXHAUSTED_SNODES_RETRY_DELAY /
1000}s from scratch` 1000}s from scratch`
); );
this.pollServer(); this.pollServer();

Loading…
Cancel
Save