You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
session-desktop/js/modules/loki_message_api.js

598 lines
19 KiB
JavaScript

/* eslint-disable no-await-in-loop */
/* eslint-disable no-loop-func */
/* global log, dcodeIO, window, callWorker, lokiSnodeAPI, textsecure */
const _ = require('lodash');
const { lokiRpc } = require('./loki_rpc');
const primitives = require('./loki_primitives');
const DEFAULT_CONNECTIONS = 3;
const MAX_ACCEPTABLE_FAILURES = 10;
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
const difficulty = window.storage.get('PoWDifficulty', null);
// Nonce is returned as a base64 string to include in header
window.Whisper.events.trigger('calculatingPoW', messageEventData);
return callWorker('calcPoW', timestamp, ttl, pubKey, data64, difficulty);
};
// we don't throw or catch here
// mark private (_ prefix) since no error handling is done here...
async function _retrieveNextMessages(nodeData, pubkey) {
const params = {
pubKey: pubkey,
lastHash: nodeData.lastHash || '',
};
const options = {
timeout: 40000,
ourPubKey: pubkey,
};
// let exceptions bubble up
const result = await lokiRpc(
`https://${nodeData.ip}`,
nodeData.port,
'retrieve',
params,
options,
'/storage_rpc/v1',
nodeData
);
if (result === false) {
// make a note of it because of caller doesn't care...
log.warn(
`loki_message:::_retrieveNextMessages - lokiRpc could not talk to ${
nodeData.ip
}:${nodeData.port}`
);
return [];
}
return result.messages || [];
}
class LokiMessageAPI {
constructor(ourKey) {
this.jobQueue = new window.JobQueue();
this.sendingData = {};
this.ourKey = ourKey;
}
async sendMessage(pubKey, data, messageTimeStamp, ttl, options = {}) {
const {
isPublic = false,
numConnections = DEFAULT_CONNECTIONS,
publicSendData = null,
} = options;
// Data required to identify a message in a conversation
const messageEventData = {
pubKey,
timestamp: messageTimeStamp,
};
if (isPublic) {
if (!publicSendData) {
throw new window.textsecure.PublicChatError(
'Missing public send data for public chat message'
);
}
const res = await publicSendData.sendMessage(data, messageTimeStamp);
if (res === false) {
throw new window.textsecure.PublicChatError(
'Failed to send public chat message'
);
}
messageEventData.serverId = res;
window.Whisper.events.trigger('publicMessageSent', messageEventData);
return;
}
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Date.now();
const nonce = await calcNonce(
messageEventData,
window.getStoragePubKey(pubKey),
data64,
timestamp,
ttl
);
// Using timestamp as a unique identifier
const swarm = await lokiSnodeAPI.getSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp] = {
swarm,
hasFreshList: false,
};
if (this.sendingData[timestamp].swarm.length < numConnections) {
await this.refreshSendingSwarm(pubKey, timestamp);
}
// send parameters
const params = {
pubKey,
ttl: ttl.toString(),
nonce,
timestamp: timestamp.toString(),
data: data64,
};
const promises = [];
let completedConnections = 0;
for (let i = 0; i < numConnections; i += 1) {
const connectionPromise = this._openSendConnection(params).finally(() => {
completedConnections += 1;
if (completedConnections >= numConnections) {
delete this.sendingData[timestamp];
}
});
promises.push(connectionPromise);
}
let snode;
try {
// eslint-disable-next-line more/no-then
snode = await primitives.firstTrue(promises);
} catch (e) {
log.warn(
`loki_message:::sendMessage - ${e.code} ${e.message} to ${pubKey} via ${
snode.ip
}:${snode.port}`
);
if (e instanceof textsecure.WrongDifficultyError) {
// Force nonce recalculation
// NOTE: Currently if there are snodes with conflicting difficulties we
// will send the message twice (or more). Won't affect client side but snodes
// could store the same message multiple times because they will have different
// timestamps (and therefore nonces)
await this.sendMessage(pubKey, data, messageTimeStamp, ttl, options);
return;
}
throw e;
}
if (!snode) {
throw new window.textsecure.EmptySwarmError(
pubKey,
'Ran out of swarm nodes to query'
);
}
log.info(
`loki_message:::sendMessage - Successfully stored message to ${pubKey} via ${
snode.ip
}:${snode.port}`
);
}
async refreshSendingSwarm(pubKey, timestamp) {
const freshNodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(pubKey);
this.sendingData[timestamp].swarm = freshNodes;
this.sendingData[timestamp].hasFreshList = true;
return true;
}
async _openSendConnection(params) {
// timestamp is likely the current second...
while (!_.isEmpty(this.sendingData[params.timestamp].swarm)) {
const snode = this.sendingData[params.timestamp].swarm.shift();
// TODO: Revert back to using snode address instead of IP
const successfulSend = await this._sendToNode(snode, params);
if (successfulSend) {
return snode;
}
// should we mark snode as bad if it can't store our message?
}
if (!this.sendingData[params.timestamp].hasFreshList) {
// Ensure that there is only a single refresh per outgoing message
if (!this.sendingData[params.timestamp].refreshPromise) {
this.sendingData[
params.timestamp
].refreshPromise = this.refreshSendingSwarm(
params.pubKey,
params.timestamp
);
}
await this.sendingData[params.timestamp].refreshPromise;
// Retry with a fresh list again
return this._openSendConnection(params);
}
return false;
}
async _sendToNode(targetNode, params) {
let successiveFailures = 0;
while (successiveFailures < MAX_ACCEPTABLE_FAILURES) {
// the higher this is, the longer the user delay is
// we don't want to burn through all our retries quickly
// we need to give the node a chance to heal
// also failed the user quickly, just means they pound the retry faster
// this favors a lot more retries and lower delays
// but that may chew up the bandwidth...
await primitives.sleepFor(successiveFailures * 500);
try {
const result = await lokiRpc(
`https://${targetNode.ip}`,
targetNode.port,
'store',
params,
{},
'/storage_rpc/v1',
targetNode
);
// succcessful messages should look like
// `{\"difficulty\":1}`
// but so does invalid pow, so be careful!
// do not return true if we get false here...
if (result === false) {
// this means the node we asked for is likely down
log.warn(
`loki_message:::_sendToNode - Try #${successiveFailures}/${MAX_ACCEPTABLE_FAILURES} ${
targetNode.ip
}:${targetNode.port} failed`
);
successiveFailures += 1;
// eslint-disable-next-line no-continue
continue;
}
// Make sure we aren't doing too much PoW
const currentDifficulty = window.storage.get('PoWDifficulty', null);
if (
result &&
result.difficulty &&
result.difficulty !== currentDifficulty
) {
window.storage.put('PoWDifficulty', result.difficulty);
// should we return false?
}
return true;
} catch (e) {
log.warn(
'loki_message:::_sendToNode - send error:',
e.code,
e.message,
`destination ${targetNode.ip}:${targetNode.port}`
);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);
this.sendingData[params.timestamp].swarm = newSwarm;
this.sendingData[params.timestamp].hasFreshList = true;
return false;
} else if (e instanceof textsecure.WrongDifficultyError) {
const { newDifficulty } = e;
if (!Number.isNaN(newDifficulty)) {
window.storage.put('PoWDifficulty', newDifficulty);
}
throw e;
} else if (e instanceof textsecure.NotFoundError) {
// TODO: Handle resolution error
} else if (e instanceof textsecure.TimestampError) {
log.warn('loki_message:::_sendToNode - Timestamp is invalid');
throw e;
} else if (e instanceof textsecure.HTTPError) {
// TODO: Handle working connection but error response
const body = await e.response.text();
log.warn('loki_message:::_sendToNode - HTTPError body:', body);
}
successiveFailures += 1;
}
}
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
params.pubKey,
targetNode
);
log.error(
`loki_message:::_sendToNode - Too many successive failures trying to send to node ${
targetNode.ip
}:${targetNode.port}, ${remainingSwarmSnodes.lengt} remaining swarm nodes`
);
return false;
}
async pollNodeForGroupId(groupId, nodeParam, onMessages) {
const node = nodeParam;
const lastHash = await window.Signal.Data.getLastHashBySnode(
groupId,
node.address
);
node.lastHash = lastHash;
log.debug(
`[last hash] lashHash for group id ${groupId.substr(0, 5)}: node ${
node.port
}`,
lastHash
);
// eslint-disable-next-line no-constant-condition
while (true) {
try {
let messages = await _retrieveNextMessages(node, groupId);
if (messages.length > 0) {
const lastMessage = _.last(messages);
// TODO: this is for groups, so need to specify ID
// Is this too early to update last hash??
await lokiSnodeAPI.updateLastHash(
groupId,
node.address,
lastMessage.hash,
lastMessage.expiration
);
log.debug(
`Updated lashHash for group id ${groupId.substr(0, 5)}: node ${
node.port
}`,
lastMessage.hash
);
node.lastHash = lastMessage.hash;
messages = await this.jobQueue.add(() =>
filterIncomingMessages(messages)
);
// At this point we still know what servier identity the messages
// are associated with, so we save it in messages' conversationId field:
const modifiedMessages = messages.map(m => {
// eslint-disable-next-line no-param-reassign
m.conversationId = groupId;
return m;
});
onMessages(modifiedMessages);
}
} catch (e) {
log.warn('');
log.warn(
'pollNodeForGroupId - retrieve error:',
e.code,
e.message,
`on ${node.ip}:${node.port}`
);
// TODO: Handle unreachable nodes and wrong swarms
}
await primitives.sleepFor(5000);
}
}
async pollForGroupId(groupId, onMessages) {
log.info(`Starting to poll for group id: ${groupId}`);
// Get nodes for groupId
const nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(groupId);
log.info('Nodes for group id:', nodes);
_.sampleSize(nodes, 3).forEach(node =>
this.pollNodeForGroupId(groupId, node, onMessages)
);
}
async _openRetrieveConnection(pSwarmPool, stopPollingPromise, onMessages) {
const swarmPool = pSwarmPool; // lint
let stopPollingResult = false;
// When message_receiver restarts from onoffline/ononline events it closes
// http-resources, which will then resolve the stopPollingPromise with true. We then
// want to cancel these polling connections because new ones will be created
// eslint-disable-next-line more/no-then
stopPollingPromise.then(result => {
stopPollingResult = result;
});
while (!stopPollingResult && !_.isEmpty(swarmPool)) {
const address = Object.keys(swarmPool)[0]; // X.snode hostname
const nodeData = swarmPool[address];
delete swarmPool[address];
let successiveFailures = 0;
while (
!stopPollingResult &&
successiveFailures < MAX_ACCEPTABLE_FAILURES
) {
// TODO: Revert back to using snode address instead of IP
try {
// in general, I think we want exceptions to bubble up
// so the user facing UI can report unhandled errors
// except in this case of living inside http-resource pollServer
// because it just restarts more connections...
let messages = await _retrieveNextMessages(nodeData, this.ourKey);
// this only tracks retrieval failures
// won't include parsing failures...
successiveFailures = 0;
if (messages.length) {
const lastMessage = _.last(messages);
nodeData.lastHash = lastMessage.hash;
await lokiSnodeAPI.updateLastHash(
this.ourKey,
address,
lastMessage.hash,
lastMessage.expiration
);
messages = await this.jobQueue.add(() =>
filterIncomingMessages(messages)
);
}
// Execute callback even with empty array to signal online status
onMessages(messages);
} catch (e) {
log.warn(
'loki_message:::_openRetrieveConnection - retrieve error:',
e.code,
e.message,
`on ${nodeData.ip}:${nodeData.port}`
);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
// Is this a security concern that we replace the list of snodes
// based on a response from a single snode?
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
// FIXME: restart all openRetrieves when this happens...
// FIXME: lokiSnode should handle this
for (let i = 0; i < newSwarm.length; i += 1) {
const lastHash = await window.Signal.Data.getLastHashBySnode(
this.ourKey,
newSwarm[i]
);
swarmPool[newSwarm[i]] = {
lastHash,
};
}
// Try another snode
break;
} else if (e instanceof textsecure.NotFoundError) {
// DNS/Lokinet error, needs to bubble up
throw new window.textsecure.DNSResolutionError(
'Retrieving messages'
);
}
successiveFailures += 1;
}
// Always wait a bit as we are no longer long-polling
await primitives.sleepFor(Math.max(successiveFailures, 2) * 1000);
}
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
const remainingSwarmSnodes = await lokiSnodeAPI.unreachableNode(
this.ourKey,
nodeData
);
log.warn(
`loki_message:::_openRetrieveConnection - too many successive failures, removing ${
nodeData.ip
}:${nodeData.port} from our swarm pool. We have ${
Object.keys(swarmPool).length
} usable swarm nodes left for our connection (${
remainingSwarmSnodes.length
} in local db)`
);
}
}
// if not stopPollingResult
if (_.isEmpty(swarmPool)) {
log.error(
'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
);
return false;
}
return true;
}
// we don't throw or catch here
async startLongPolling(numConnections, stopPolling, onMessages) {
// load from local DB
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey, {
fetchHashes: true,
});
// Start polling for medium size groups as well (they might be in different swarms)
{
const convos = window
.getConversations()
.filter(c => c.get('is_medium_group'));
const self = this;
convos.forEach(c => {
self.pollForGroupId(c.id, onMessages);
// TODO: unsubscribe if the group is deleted
});
}
if (nodes.length < numConnections) {
log.warn(
'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
);
// load from blockchain
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
if (nodes.length < numConnections) {
log.error(
'loki_message:::startLongPolling - Could not get enough SwarmNodes for our pubkey from blockchain'
);
}
}
log.info(
'loki_message:::startLongPolling - start polling for',
numConnections,
'connections. We have swarmNodes',
nodes.length,
'for',
this.ourKey
);
// ok now split up our swarm pool into numConnections number of pools
// one for each retrieve connection
// floor or ceil probably doesn't matter, since it's likely always uneven
const poolSize = Math.floor(nodes.length / numConnections);
const pools = [];
while (nodes.length) {
const poolList = nodes.splice(0, poolSize);
const byAddressObj = poolList.reduce((result, node) => {
// eslint-disable-next-line no-param-reassign
result[node.address] = node;
return result;
}, {});
pools.push(byAddressObj);
}
const promises = [];
let unresolved = numConnections;
for (let i = 0; i < numConnections; i += 1) {
promises.push(
// eslint-disable-next-line more/no-then
this._openRetrieveConnection(pools[i], stopPolling, onMessages).then(
stoppedPolling => {
unresolved -= 1;
log.info(
`loki_message:::startLongPolling - There are ${unresolved}`,
`open retrieve connections left. Stopped? ${stoppedPolling}`
);
}
)
);
}
// blocks until numConnections snodes in our swarms have been removed from the list
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
// or if there is network issues (ENOUTFOUND due to lokinet)
await Promise.all(promises);
log.warn(
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
);
// should we just call ourself again?
// no, our caller already handles this...
}
}
module.exports = LokiMessageAPI;