You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
319 lines
8.9 KiB
JavaScript
319 lines
8.9 KiB
JavaScript
/* eslint-disable no-await-in-loop */
|
|
/* eslint-disable no-loop-func */
|
|
/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI */
|
|
|
|
const nodeFetch = require('node-fetch');
|
|
const _ = require('lodash');
|
|
|
|
class HTTPError extends Error {
|
|
constructor(response) {
|
|
super(response.statusText);
|
|
this.name = 'HTTPError';
|
|
this.response = response;
|
|
}
|
|
}
|
|
|
|
class NotFoundError extends Error {
|
|
constructor() {
|
|
super('ENOTFOUND');
|
|
this.name = 'NotFoundError';
|
|
}
|
|
}
|
|
|
|
// A small wrapper around node-fetch which deserializes response
|
|
const fetch = async (url, options = {}) => {
|
|
const timeout = options.timeout || 10000;
|
|
const method = options.method || 'GET';
|
|
|
|
try {
|
|
const response = await nodeFetch(url, {
|
|
...options,
|
|
timeout,
|
|
method,
|
|
});
|
|
|
|
if (!response.ok) {
|
|
throw new HTTPError(response);
|
|
}
|
|
|
|
let result;
|
|
if (response.headers.get('Content-Type') === 'application/json') {
|
|
result = await response.json();
|
|
} else if (options.responseType === 'arraybuffer') {
|
|
result = await response.buffer();
|
|
} else {
|
|
result = await response.text();
|
|
}
|
|
|
|
return result;
|
|
} catch (e) {
|
|
if (e.code === 'ENOTFOUND') {
|
|
throw new NotFoundError();
|
|
}
|
|
|
|
throw e;
|
|
}
|
|
};
|
|
|
|
// Will be raised (to 3?) when we get more nodes
|
|
const MINIMUM_SUCCESSFUL_REQUESTS = 2;
|
|
|
|
class LokiMessageAPI {
|
|
constructor({ messageServerPort }) {
|
|
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
|
|
}
|
|
|
|
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
|
|
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
|
|
const timestamp = Math.floor(Date.now() / 1000);
|
|
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
|
|
if (p2pDetails && (isPing || p2pDetails.isOnline)) {
|
|
try {
|
|
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
|
|
const url = `${p2pDetails.address}${port}/store`;
|
|
const fetchOptions = {
|
|
method: 'POST',
|
|
body: data64,
|
|
};
|
|
|
|
await fetch(url, fetchOptions);
|
|
lokiP2pAPI.setContactOnline(pubKey);
|
|
return;
|
|
} catch (e) {
|
|
log.warn('Failed to send P2P message, falling back to storage', e);
|
|
lokiP2pAPI.setContactOffline(pubKey);
|
|
if (isPing) {
|
|
// If this was just a ping, we don't bother sending to storage server
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Nonce is returned as a base64 string to include in header
|
|
let nonce;
|
|
try {
|
|
window.Whisper.events.trigger('calculatingPoW', {
|
|
pubKey,
|
|
timestamp: messageTimeStamp,
|
|
});
|
|
const development = window.getEnvironment() !== 'production';
|
|
nonce = await callWorker(
|
|
'calcPoW',
|
|
timestamp,
|
|
ttl,
|
|
pubKey,
|
|
data64,
|
|
development
|
|
);
|
|
} catch (err) {
|
|
// Something went horribly wrong
|
|
throw err;
|
|
}
|
|
const completedNodes = [];
|
|
const failedNodes = [];
|
|
let successfulRequests = 0;
|
|
let canResolve = true;
|
|
|
|
let swarmNodes = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
|
|
|
|
const nodeComplete = nodeUrl => {
|
|
completedNodes.push(nodeUrl);
|
|
swarmNodes = swarmNodes.filter(node => node !== nodeUrl);
|
|
};
|
|
|
|
const doRequest = async nodeUrl => {
|
|
const url = `${nodeUrl}${this.messageServerPort}/v1/storage_rpc`;
|
|
const body = {
|
|
method: 'store',
|
|
params: {
|
|
pubKey,
|
|
ttl: ttl.toString(),
|
|
nonce,
|
|
timestamp: timestamp.toString(),
|
|
data: data64,
|
|
},
|
|
};
|
|
const fetchOptions = {
|
|
method: 'POST',
|
|
body: JSON.stringify(body),
|
|
headers: {
|
|
'X-Loki-EphemKey': 'not implemented yet',
|
|
},
|
|
};
|
|
|
|
try {
|
|
await fetch(url, fetchOptions);
|
|
|
|
nodeComplete(nodeUrl);
|
|
successfulRequests += 1;
|
|
} catch (e) {
|
|
log.warn('Send message error:', e);
|
|
if (e instanceof NotFoundError) {
|
|
canResolve = false;
|
|
} else if (e instanceof HTTPError) {
|
|
log.error(
|
|
`POST ${e.response.url} (store)`,
|
|
e.response.status,
|
|
'Error sending message'
|
|
);
|
|
|
|
// We mark the node as complete as we could still reach it
|
|
nodeComplete(nodeUrl);
|
|
} else {
|
|
const removeNode = await lokiSnodeAPI.unreachableNode(
|
|
pubKey,
|
|
nodeUrl
|
|
);
|
|
if (removeNode) {
|
|
log.error('Loki SendMessages:', e);
|
|
nodeComplete(nodeUrl);
|
|
failedNodes.push(nodeUrl);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
|
|
if (!canResolve) {
|
|
throw new window.textsecure.DNSResolutionError('Sending messages');
|
|
}
|
|
if (swarmNodes.length === 0) {
|
|
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
|
|
const goodNodes = _.difference(freshNodes, failedNodes);
|
|
await lokiSnodeAPI.updateSwarmNodes(pubKey, goodNodes);
|
|
swarmNodes = _.difference(freshNodes, completedNodes);
|
|
if (swarmNodes.length === 0) {
|
|
if (successfulRequests !== 0) {
|
|
// TODO: Decide how to handle some completed requests but not enough
|
|
return;
|
|
}
|
|
throw new window.textsecure.EmptySwarmError(
|
|
pubKey,
|
|
new Error('Ran out of swarm nodes to query')
|
|
);
|
|
}
|
|
}
|
|
|
|
const remainingRequests =
|
|
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
|
|
|
|
await Promise.all(
|
|
swarmNodes
|
|
.splice(0, remainingRequests)
|
|
.map(nodeUrl => doRequest(nodeUrl))
|
|
);
|
|
}
|
|
}
|
|
|
|
async retrieveMessages(callback) {
|
|
const ourKey = window.textsecure.storage.user.getNumber();
|
|
const completedNodes = [];
|
|
let canResolve = true;
|
|
let successfulRequests = 0;
|
|
|
|
let ourSwarmNodes;
|
|
try {
|
|
ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
|
|
} catch (e) {
|
|
throw new window.textsecure.EmptySwarmError(ourKey, e);
|
|
}
|
|
|
|
const nodeComplete = nodeUrl => {
|
|
completedNodes.push(nodeUrl);
|
|
delete ourSwarmNodes[nodeUrl];
|
|
};
|
|
|
|
const doRequest = async (nodeUrl, nodeData) => {
|
|
const url = `${nodeUrl}${this.messageServerPort}/v1/storage_rpc`;
|
|
const body = {
|
|
method: 'retrieve',
|
|
params: {
|
|
pubKey: ourKey,
|
|
lastHash: nodeData.lastHash,
|
|
},
|
|
};
|
|
const headers = {
|
|
'X-Loki-EphemKey': 'not implemented yet',
|
|
};
|
|
const fetchOptions = {
|
|
method: 'POST',
|
|
body: JSON.stringify(body),
|
|
headers,
|
|
};
|
|
try {
|
|
const result = await fetch(url, fetchOptions);
|
|
|
|
nodeComplete(nodeUrl);
|
|
|
|
if (result.lastHash) {
|
|
lokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
|
|
callback(result.messages);
|
|
}
|
|
successfulRequests += 1;
|
|
} catch (e) {
|
|
log.warn('Retrieve message error:', e);
|
|
if (e instanceof NotFoundError) {
|
|
canResolve = false;
|
|
} else if (e instanceof HTTPError) {
|
|
log.error(
|
|
`POST ${e.response.url} (retrieve)`,
|
|
e.response.status,
|
|
`Error retrieving messages from ${nodeUrl}`
|
|
);
|
|
|
|
// We mark the node as complete as we could still reach it
|
|
nodeComplete(nodeUrl);
|
|
} else {
|
|
const removeNode = await lokiSnodeAPI.unreachableNode(
|
|
ourKey,
|
|
nodeUrl
|
|
);
|
|
if (removeNode) {
|
|
log.error('Loki RetrieveMessages:', e);
|
|
nodeComplete(nodeUrl);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
while (successfulRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
|
|
if (!canResolve) {
|
|
throw new window.textsecure.DNSResolutionError('Retrieving messages');
|
|
}
|
|
if (Object.keys(ourSwarmNodes).length === 0) {
|
|
try {
|
|
ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
|
|
// Filter out the nodes we have already got responses from
|
|
completedNodes.forEach(nodeUrl => delete ourSwarmNodes[nodeUrl]);
|
|
} catch (e) {
|
|
throw new window.textsecure.EmptySwarmError(
|
|
window.textsecure.storage.user.getNumber(),
|
|
e
|
|
);
|
|
}
|
|
if (Object.keys(ourSwarmNodes).length === 0) {
|
|
if (successfulRequests !== 0) {
|
|
// TODO: Decide how to handle some completed requests but not enough
|
|
return;
|
|
}
|
|
throw new window.textsecure.EmptySwarmError(
|
|
window.textsecure.storage.user.getNumber(),
|
|
new Error('Ran out of swarm nodes to query')
|
|
);
|
|
}
|
|
}
|
|
|
|
const remainingRequests =
|
|
MINIMUM_SUCCESSFUL_REQUESTS - successfulRequests;
|
|
|
|
await Promise.all(
|
|
Object.entries(ourSwarmNodes)
|
|
.splice(0, remainingRequests)
|
|
.map(([nodeUrl, nodeData]) => doRequest(nodeUrl, nodeData))
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = LokiMessageAPI;
|