|
|
|
/* eslint-disable no-await-in-loop */
|
|
|
|
/* eslint-disable no-loop-func */
|
|
|
|
/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, textsecure */
|
|
|
|
|
|
|
|
const _ = require('lodash');
|
|
|
|
const { rpc } = require('./loki_rpc');
|
|
|
|
|
|
|
|
const DEFAULT_CONNECTIONS = 2;
|
|
|
|
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
|
|
|
|
|
|
|
|
function sleepFor(time) {
|
|
|
|
return new Promise(resolve => {
|
|
|
|
setTimeout(() => resolve(), time);
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
const filterIncomingMessages = async messages => {
|
|
|
|
const incomingHashes = messages.map(m => m.hash);
|
|
|
|
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
|
|
|
|
incomingHashes
|
|
|
|
);
|
|
|
|
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
|
|
|
|
if (newMessages.length) {
|
|
|
|
const newHashes = newMessages.map(m => ({
|
|
|
|
expiresAt: m.expiration,
|
|
|
|
hash: m.hash,
|
|
|
|
}));
|
|
|
|
await window.Signal.Data.saveSeenMessageHashes(newHashes);
|
|
|
|
}
|
|
|
|
return newMessages;
|
|
|
|
};
|
|
|
|
|
|
|
|
const calcNonce = async (messageEventData, pubKey, data64, timestamp, ttl) => {
|
|
|
|
// Nonce is returned as a base64 string to include in header
|
|
|
|
window.Whisper.events.trigger('calculatingPoW', messageEventData);
|
|
|
|
const development = window.getEnvironment() !== 'production';
|
|
|
|
return callWorker('calcPoW', timestamp, ttl, pubKey, data64, development);
|
|
|
|
};
|
|
|
|
|
|
|
|
const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
|
|
|
|
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
|
|
|
|
if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
try {
|
|
|
|
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
|
|
|
|
|
|
|
|
await rpc(p2pDetails.address, port, 'store', {
|
|
|
|
data: data64,
|
|
|
|
});
|
|
|
|
lokiP2pAPI.setContactOnline(pubKey);
|
|
|
|
window.Whisper.events.trigger('p2pMessageSent', messageEventData);
|
|
|
|
if (isPing) {
|
|
|
|
log.info(`Successfully pinged ${pubKey}`);
|
|
|
|
} else {
|
|
|
|
log.info(`Successful p2p message to ${pubKey}`);
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
} catch (e) {
|
|
|
|
lokiP2pAPI.setContactOffline(pubKey);
|
|
|
|
if (isPing) {
|
|
|
|
// If this was just a ping, we don't bother sending to storage server
|
|
|
|
log.warn('Ping failed, contact marked offline', e);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
log.warn('Failed to send P2P message, falling back to storage', e);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
class LokiMessageAPI {
|
|
|
|
constructor({ snodeServerPort }) {
|
|
|
|
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
|
|
|
|
this.jobQueue = new window.JobQueue();
|
|
|
|
this.sendingSwarmNodes = {};
|
|
|
|
}
|
|
|
|
|
|
|
|
async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
|
|
|
|
const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options;
|
|
|
|
// Data required to identify a message in a conversation
|
|
|
|
const messageEventData = {
|
|
|
|
pubKey,
|
|
|
|
timestamp: messageTimeStamp,
|
|
|
|
};
|
|
|
|
|
|
|
|
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
|
|
|
|
const p2pSuccess = await trySendP2p(
|
|
|
|
pubKey,
|
|
|
|
data64,
|
|
|
|
isPing,
|
|
|
|
messageEventData
|
|
|
|
);
|
|
|
|
if (p2pSuccess) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
const timestamp = Date.now();
|
|
|
|
const nonce = await calcNonce(
|
|
|
|
messageEventData,
|
|
|
|
pubKey,
|
|
|
|
data64,
|
|
|
|
timestamp,
|
|
|
|
ttl
|
|
|
|
);
|
|
|
|
// Using timestamp as a unique identifier
|
|
|
|
this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(
|
|
|
|
pubKey
|
|
|
|
);
|
|
|
|
if (this.sendingSwarmNodes[timestamp].length < numConnections) {
|
|
|
|
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
|
|
|
|
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
|
|
|
|
this.sendingSwarmNodes[timestamp] = freshNodes;
|
|
|
|
}
|
|
|
|
|
|
|
|
const params = {
|
|
|
|
pubKey,
|
|
|
|
ttl: ttl.toString(),
|
|
|
|
nonce,
|
|
|
|
timestamp: timestamp.toString(),
|
|
|
|
data: data64,
|
|
|
|
};
|
|
|
|
const promises = [];
|
|
|
|
for (let i = 0; i < numConnections; i += 1) {
|
|
|
|
promises.push(this.openSendConnection(params));
|
|
|
|
}
|
|
|
|
|
|
|
|
const results = await Promise.all(promises);
|
|
|
|
delete this.sendingSwarmNodes[timestamp];
|
|
|
|
if (results.every(value => value === false)) {
|
|
|
|
throw new window.textsecure.EmptySwarmError(
|
|
|
|
pubKey,
|
|
|
|
'Ran out of swarm nodes to query'
|
|
|
|
);
|
|
|
|
}
|
|
|
|
if (results.every(value => value === true)) {
|
|
|
|
log.info(`Successful storage message to ${pubKey}`);
|
|
|
|
} else {
|
|
|
|
log.warn(`Partially successful storage message to ${pubKey}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async openSendConnection(params) {
|
|
|
|
while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) {
|
|
|
|
const url = this.sendingSwarmNodes[params.timestamp].shift();
|
|
|
|
const successfulSend = await this.sendToNode(url, params);
|
|
|
|
if (successfulSend) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
async sendToNode(url, params) {
|
|
|
|
let successiveFailures = 0;
|
|
|
|
while (successiveFailures < 3) {
|
|
|
|
await sleepFor(successiveFailures * 500);
|
|
|
|
try {
|
|
|
|
await rpc(`http://${url}`, this.snodeServerPort, 'store', params);
|
|
|
|
return true;
|
|
|
|
} catch (e) {
|
|
|
|
log.warn('Loki send message:', e);
|
|
|
|
if (e instanceof textsecure.WrongSwarmError) {
|
|
|
|
const { newSwarm } = e;
|
|
|
|
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
|
|
|
|
this.sendingSwarmNodes[params.timestamp] = newSwarm;
|
|
|
|
return false;
|
|
|
|
} else if (e instanceof textsecure.NotFoundError) {
|
|
|
|
// TODO: Handle resolution error
|
|
|
|
successiveFailures += 1;
|
|
|
|
} else if (e instanceof textsecure.HTTPError) {
|
|
|
|
// TODO: Handle working connection but error response
|
|
|
|
successiveFailures += 1;
|
|
|
|
} else {
|
|
|
|
successiveFailures += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.error(`Failed to send to node: ${url}`);
|
|
|
|
await lokiSnodeAPI.unreachableNode(params.pubKey, url);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
async retrieveNextMessages(nodeUrl, nodeData, ourKey) {
|
|
|
|
const params = {
|
|
|
|
pubKey: ourKey,
|
|
|
|
lastHash: nodeData.lastHash || '',
|
|
|
|
};
|
|
|
|
const options = {
|
|
|
|
timeout: 40000,
|
|
|
|
headers: {
|
|
|
|
[LOKI_LONGPOLL_HEADER]: true,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
const result = await rpc(
|
|
|
|
`http://${nodeUrl}`,
|
|
|
|
this.snodeServerPort,
|
|
|
|
'retrieve',
|
|
|
|
params,
|
|
|
|
options
|
|
|
|
);
|
|
|
|
return result.messages || [];
|
|
|
|
}
|
|
|
|
|
|
|
|
async openConnection(callback) {
|
|
|
|
const ourKey = window.textsecure.storage.user.getNumber();
|
|
|
|
while (!_.isEmpty(this.ourSwarmNodes)) {
|
|
|
|
const url = Object.keys(this.ourSwarmNodes)[0];
|
|
|
|
const nodeData = this.ourSwarmNodes[url];
|
|
|
|
delete this.ourSwarmNodes[url];
|
|
|
|
let successiveFailures = 0;
|
|
|
|
while (successiveFailures < 3) {
|
|
|
|
await sleepFor(successiveFailures * 1000);
|
|
|
|
|
|
|
|
try {
|
|
|
|
let messages = await this.retrieveNextMessages(url, nodeData, ourKey);
|
|
|
|
successiveFailures = 0;
|
|
|
|
if (messages.length) {
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
nodeData.lashHash = lastMessage.hash;
|
|
|
|
lokiSnodeAPI.updateLastHash(
|
|
|
|
url,
|
|
|
|
lastMessage.hash,
|
|
|
|
lastMessage.expiration
|
|
|
|
);
|
|
|
|
messages = await this.jobQueue.add(() =>
|
|
|
|
filterIncomingMessages(messages)
|
|
|
|
);
|
|
|
|
}
|
|
|
|
// Execute callback even with empty array to signal online status
|
|
|
|
callback(messages);
|
|
|
|
} catch (e) {
|
|
|
|
log.warn('Loki retrieve messages:', e);
|
|
|
|
if (e instanceof textsecure.WrongSwarmError) {
|
|
|
|
const { newSwarm } = e;
|
|
|
|
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
|
|
|
|
// Try another snode
|
|
|
|
break;
|
|
|
|
} else if (e instanceof textsecure.NotFoundError) {
|
|
|
|
// DNS/Lokinet error, needs to bubble up
|
|
|
|
throw new window.textsecure.DNSResolutionError(
|
|
|
|
'Retrieving messages'
|
|
|
|
);
|
|
|
|
}
|
|
|
|
successiveFailures += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async startLongPolling(numConnections, callback) {
|
|
|
|
this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
|
|
|
|
|
|
|
|
const promises = [];
|
|
|
|
|
|
|
|
for (let i = 0; i < numConnections; i += 1)
|
|
|
|
promises.push(this.openConnection(callback));
|
|
|
|
|
|
|
|
// blocks until all snodes in our swarms have been removed from the list
|
|
|
|
// or if there is network issues (ENOUTFOUND due to lokinet)
|
|
|
|
await Promise.all(promises);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = LokiMessageAPI;
|