You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/js/modules/loki_message_api.js

296 lines
8.9 KiB
JavaScript

/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, lokiP2pAPI, lokiSnodeAPI, textsecure */
const _ = require('lodash');
const { rpc } = require('./loki_rpc');
const DEFAULT_CONNECTIONS = 2;
const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
function sleepFor(time) {
return new Promise(resolve => {
setTimeout(() => resolve(), time);
});
}
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
const difficulty = window.storage.get('PoWDifficulty', null);
// Nonce is returned as a base64 string to include in header
window.Whisper.events.trigger('calculatingPoW', messageEventData);
return callWorker('calcPoW', timestamp, ttl, pubKey, data64, difficulty);
};
const trySendP2p = async (pubKey, data64, isPing, messageEventData) => {
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (!p2pDetails || (!isPing && !p2pDetails.isOnline)) {
return false;
}
try {
await rpc(p2pDetails.address, p2pDetails.port, 'store', {
data: data64,
});
lokiP2pAPI.setContactOnline(pubKey);
window.Whisper.events.trigger('p2pMessageSent', messageEventData);
if (isPing) {
log.info(`Successfully pinged ${pubKey}`);
} else {
log.info(`Successful p2p message to ${pubKey}`);
}
return true;
} catch (e) {
lokiP2pAPI.setContactOffline(pubKey);
if (isPing) {
// If this was just a ping, we don't bother sending to storage server
log.warn('Ping failed, contact marked offline', e);
return true;
}
log.warn('Failed to send P2P message, falling back to storage', e);
return false;
}
};
const retrieveNextMessages = async (nodeUrl, nodeData, ourKey) => {
const params = {
pubKey: ourKey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
headers: {
[LOKI_LONGPOLL_HEADER]: true,
},
};
const result = await rpc(
`https://${nodeUrl}`,
nodeData.port,
'retrieve',
params,
options
);
return result.messages || [];
};
class LokiMessageAPI {
constructor() {
this.jobQueue = new window.JobQueue();
this.sendingSwarmNodes = {};
}
async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
const { isPing = false, numConnections = DEFAULT_CONNECTIONS } = options;
// Data required to identify a message in a conversation
const messageEventData = {
pubKey,
timestamp: messageTimeStamp,
};
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const p2pSuccess = await trySendP2p(
pubKey,
data64,
isPing,
messageEventData
);
if (p2pSuccess) {
return;
}
const timestamp = Date.now();
const nonce = await calcNonce(
messageEventData,
pubKey,
data64,
timestamp,
ttl
);
// Using timestamp as a unique identifier
this.sendingSwarmNodes[timestamp] = lokiSnodeAPI.getSwarmNodesForPubKey(
pubKey
);
if (this.sendingSwarmNodes[timestamp].length < numConnections) {
const freshNodes = await lokiSnodeAPI.getFreshSwarmNodes(pubKey);
await lokiSnodeAPI.updateSwarmNodes(pubKey, freshNodes);
this.sendingSwarmNodes[timestamp] = freshNodes;
}
const params = {
pubKey,
ttl: ttl.toString(),
nonce,
timestamp: timestamp.toString(),
data: data64,
};
const promises = [];
for (let i = 0; i < numConnections; i += 1) {
promises.push(this.openSendConnection(params));
}
let results;
try {
results = await Promise.all(promises);
} catch (e) {
if (e instanceof textsecure.WrongDifficultyError) {
// Force nonce recalculation
this.sendMessage(pubKey, data, messageTimeStamp, ttl, options);
return;
}
throw e;
}
delete this.sendingSwarmNodes[timestamp];
if (results.every(value => value === false)) {
throw new window.textsecure.EmptySwarmError(
pubKey,
'Ran out of swarm nodes to query'
);
}
if (results.every(value => value === true)) {
log.info(`Successful storage message to ${pubKey}`);
} else {
log.warn(`Partially successful storage message to ${pubKey}`);
}
}
async openSendConnection(params) {
while (!_.isEmpty(this.sendingSwarmNodes[params.timestamp])) {
const snode = this.sendingSwarmNodes[params.timestamp].shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this.sendToNode(
snode.ip,
snode.port,
params
);
if (successfulSend) {
return true;
}
}
return false;
}
async sendToNode(address, port, params) {
let successiveFailures = 0;
while (successiveFailures < 3) {
await sleepFor(successiveFailures * 500);
try {
const result = await rpc(`https://${address}`, port, 'store', params);
// Make sure we aren't doing too much PoW
const currentDifficulty = window.storage.get('PoWDifficulty', null);
const newDifficulty = result.difficulty;
if (newDifficulty != null && newDifficulty !== currentDifficulty) {
window.storage.put('PoWDifficulty', newDifficulty);
}
return true;
} catch (e) {
log.warn('Loki send message:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
this.sendingSwarmNodes[params.timestamp] = newSwarm;
return false;
} else if (e instanceof textsecure.WrongDifficultyError) {
const { newDifficulty } = e;
if (!Number.isNaN(newDifficulty)) {
window.storage.put('PoWDifficulty', newDifficulty);
}
throw e;
} else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error
successiveFailures += 1;
} else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response
successiveFailures += 1;
} else {
successiveFailures += 1;
}
}
}
log.error(`Failed to send to node: ${address}`);
await lokiSnodeAPI.unreachableNode(params.pubKey, address);
return false;
}
async openConnection(callback) {
const ourKey = window.textsecure.storage.user.getNumber();
while (!_.isEmpty(this.ourSwarmNodes)) {
const address = Object.keys(this.ourSwarmNodes)[0];
const nodeData = this.ourSwarmNodes[address];
delete this.ourSwarmNodes[address];
let successiveFailures = 0;
while (successiveFailures < 3) {
await sleepFor(successiveFailures * 1000);
try {
// TODO: Revert back to using snode address instead of IP
let messages = await retrieveNextMessages(
nodeData.ip,
nodeData,
ourKey
);
successiveFailures = 0;
if (messages.length) {
const lastMessage = _.last(messages);
nodeData.lashHash = lastMessage.hash;
lokiSnodeAPI.updateLastHash(
address,
lastMessage.hash,
lastMessage.expiration
);
messages = await this.jobQueue.add(() =>
filterIncomingMessages(messages)
);
}
// Execute callback even with empty array to signal online status
callback(messages);
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
// Try another snode
break;
} else if (e instanceof textsecure.NotFoundError) {
// DNS/Lokinet error, needs to bubble up
throw new window.textsecure.DNSResolutionError(
'Retrieving messages'
);
}
successiveFailures += 1;
}
}
}
}
async startLongPolling(numConnections, callback) {
this.ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const promises = [];
for (let i = 0; i < numConnections; i += 1)
promises.push(this.openConnection(callback));
// blocks until all snodes in our swarms have been removed from the list
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
}
}
module.exports = LokiMessageAPI;