|
|
|
@ -6,23 +6,6 @@ const _ = require('lodash');
|
|
|
|
|
const primitives = require('./loki_primitives');
|
|
|
|
|
|
|
|
|
|
const DEFAULT_CONNECTIONS = 3;
|
|
|
|
|
const MAX_ACCEPTABLE_FAILURES = 10;
|
|
|
|
|
|
|
|
|
|
const filterIncomingMessages = async messages => {
|
|
|
|
|
const incomingHashes = messages.map(m => m.hash);
|
|
|
|
|
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
|
|
|
|
|
incomingHashes
|
|
|
|
|
);
|
|
|
|
|
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
|
|
|
|
|
if (newMessages.length) {
|
|
|
|
|
const newHashes = newMessages.map(m => ({
|
|
|
|
|
expiresAt: m.expiration,
|
|
|
|
|
hash: m.hash,
|
|
|
|
|
}));
|
|
|
|
|
await window.Signal.Data.saveSeenMessageHashes(newHashes);
|
|
|
|
|
}
|
|
|
|
|
return newMessages;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
|
|
|
|
|
const difficulty = window.storage.get('PoWDifficulty', null);
|
|
|
|
@ -32,12 +15,8 @@ const calcNonce = (messageEventData, pubKey, data64, timestamp, ttl) => {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class LokiMessageAPI {
|
|
|
|
|
constructor(ourKey) {
|
|
|
|
|
this.jobQueue = new window.JobQueue();
|
|
|
|
|
constructor() {
|
|
|
|
|
this.sendingData = {};
|
|
|
|
|
this.ourKey = ourKey;
|
|
|
|
|
// stop polling for a group if its id is no longer found here
|
|
|
|
|
this.groupIdsToPoll = {};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -188,283 +167,6 @@ class LokiMessageAPI {
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async pollNodeForGroupId(groupId, nodeParam, onMessages) {
|
|
|
|
|
const node = nodeParam;
|
|
|
|
|
const lastHash = await window.Signal.Data.getLastHashBySnode(
|
|
|
|
|
groupId,
|
|
|
|
|
node.address
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
node.lastHash = lastHash;
|
|
|
|
|
|
|
|
|
|
log.debug(
|
|
|
|
|
`[last hash] lashHash for group id ${groupId.substr(0, 5)}: node ${
|
|
|
|
|
node.port
|
|
|
|
|
}`,
|
|
|
|
|
lastHash
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// eslint-disable-next-line no-constant-condition
|
|
|
|
|
while (this.groupIdsToPoll[groupId]) {
|
|
|
|
|
try {
|
|
|
|
|
let messages = await window.NewSnodeAPI.retrieveNextMessages(
|
|
|
|
|
node,
|
|
|
|
|
node.lastHash,
|
|
|
|
|
groupId
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if (messages.length > 0) {
|
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
|
|
|
|
|
|
// TODO: this is for groups, so need to specify ID
|
|
|
|
|
|
|
|
|
|
// Is this too early to update last hash??
|
|
|
|
|
await lokiSnodeAPI.updateLastHash(
|
|
|
|
|
groupId,
|
|
|
|
|
node.address,
|
|
|
|
|
lastMessage.hash,
|
|
|
|
|
lastMessage.expiration
|
|
|
|
|
);
|
|
|
|
|
log.debug(
|
|
|
|
|
`Updated lashHash for group id ${groupId.substr(0, 5)}: node ${
|
|
|
|
|
node.port
|
|
|
|
|
}`,
|
|
|
|
|
lastMessage.hash
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
node.lastHash = lastMessage.hash;
|
|
|
|
|
|
|
|
|
|
messages = await this.jobQueue.add(() =>
|
|
|
|
|
filterIncomingMessages(messages)
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// At this point we still know what servier identity the messages
|
|
|
|
|
// are associated with, so we save it in messages' conversationId field:
|
|
|
|
|
const modifiedMessages = messages.map(m => {
|
|
|
|
|
// eslint-disable-next-line no-param-reassign
|
|
|
|
|
m.conversationId = groupId;
|
|
|
|
|
return m;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
onMessages(modifiedMessages);
|
|
|
|
|
}
|
|
|
|
|
} catch (e) {
|
|
|
|
|
log.warn('');
|
|
|
|
|
|
|
|
|
|
log.warn(
|
|
|
|
|
'pollNodeForGroupId - retrieve error:',
|
|
|
|
|
e.code,
|
|
|
|
|
e.message,
|
|
|
|
|
`on ${node.ip}:${node.port}`
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// TODO: Handle unreachable nodes and wrong swarms
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await primitives.sleepFor(5000);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async stopPollingForGroup(groupId) {
|
|
|
|
|
if (!this.groupIdsToPoll[groupId]) {
|
|
|
|
|
log.warn(`Already not polling for group id: ${groupId}`);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.warn(`Stop polling for group id: ${groupId}`);
|
|
|
|
|
delete this.groupIdsToPoll[groupId];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async _openRetrieveConnection(pSwarmPool, stopPollingPromise, onMessages) {
|
|
|
|
|
const swarmPool = pSwarmPool; // lint
|
|
|
|
|
let stopPollingResult = false;
|
|
|
|
|
|
|
|
|
|
// When message_receiver restarts from onoffline/ononline events it closes
|
|
|
|
|
// http-resources, which will then resolve the stopPollingPromise with true. We then
|
|
|
|
|
// want to cancel these polling connections because new ones will be created
|
|
|
|
|
|
|
|
|
|
// eslint-disable-next-line more/no-then
|
|
|
|
|
stopPollingPromise.then(result => {
|
|
|
|
|
stopPollingResult = result;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
while (!stopPollingResult && !_.isEmpty(swarmPool)) {
|
|
|
|
|
const address = Object.keys(swarmPool)[0]; // X.snode hostname
|
|
|
|
|
const nodeData = swarmPool[address];
|
|
|
|
|
delete swarmPool[address];
|
|
|
|
|
let successiveFailures = 0;
|
|
|
|
|
while (
|
|
|
|
|
!stopPollingResult &&
|
|
|
|
|
successiveFailures < MAX_ACCEPTABLE_FAILURES
|
|
|
|
|
) {
|
|
|
|
|
// TODO: Revert back to using snode address instead of IP
|
|
|
|
|
try {
|
|
|
|
|
// in general, I think we want exceptions to bubble up
|
|
|
|
|
// so the user facing UI can report unhandled errors
|
|
|
|
|
// except in this case of living inside http-resource pollServer
|
|
|
|
|
// because it just restarts more connections...
|
|
|
|
|
let messages = await window.NewSnodeAPI.retrieveNextMessages(
|
|
|
|
|
nodeData,
|
|
|
|
|
nodeData.lastHash,
|
|
|
|
|
this.ourKey
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// this only tracks retrieval failures
|
|
|
|
|
// won't include parsing failures...
|
|
|
|
|
successiveFailures = 0;
|
|
|
|
|
if (messages.length) {
|
|
|
|
|
const lastMessage = _.last(messages);
|
|
|
|
|
nodeData.lastHash = lastMessage.hash;
|
|
|
|
|
await lokiSnodeAPI.updateLastHash(
|
|
|
|
|
this.ourKey,
|
|
|
|
|
address,
|
|
|
|
|
lastMessage.hash,
|
|
|
|
|
lastMessage.expiration
|
|
|
|
|
);
|
|
|
|
|
messages = await this.jobQueue.add(() =>
|
|
|
|
|
filterIncomingMessages(messages)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
// Execute callback even with empty array to signal online status
|
|
|
|
|
|
|
|
|
|
onMessages(messages);
|
|
|
|
|
} catch (e) {
|
|
|
|
|
log.warn(
|
|
|
|
|
'loki_message:::_openRetrieveConnection - retrieve error:',
|
|
|
|
|
e.code,
|
|
|
|
|
e.message,
|
|
|
|
|
`on ${nodeData.ip}:${nodeData.port}`
|
|
|
|
|
);
|
|
|
|
|
if (e instanceof textsecure.WrongSwarmError) {
|
|
|
|
|
const { newSwarm } = e;
|
|
|
|
|
|
|
|
|
|
// Is this a security concern that we replace the list of snodes
|
|
|
|
|
// based on a response from a single snode?
|
|
|
|
|
await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm);
|
|
|
|
|
// FIXME: restart all openRetrieves when this happens...
|
|
|
|
|
// FIXME: lokiSnode should handle this
|
|
|
|
|
for (let i = 0; i < newSwarm.length; i += 1) {
|
|
|
|
|
const lastHash = await window.Signal.Data.getLastHashBySnode(
|
|
|
|
|
this.ourKey,
|
|
|
|
|
newSwarm[i]
|
|
|
|
|
);
|
|
|
|
|
swarmPool[newSwarm[i]] = {
|
|
|
|
|
lastHash,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
// Try another snode
|
|
|
|
|
break;
|
|
|
|
|
} else if (e instanceof textsecure.NotFoundError) {
|
|
|
|
|
// DNS/Lokinet error, needs to bubble up
|
|
|
|
|
throw new window.textsecure.DNSResolutionError(
|
|
|
|
|
'Retrieving messages'
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
successiveFailures += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Always wait a bit as we are no longer long-polling
|
|
|
|
|
await primitives.sleepFor(Math.max(successiveFailures, 2) * 1000);
|
|
|
|
|
}
|
|
|
|
|
if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) {
|
|
|
|
|
const remainingSwarmSnodes = await window.SnodePool.markUnreachableForPubkey(
|
|
|
|
|
this.ourKey,
|
|
|
|
|
nodeData
|
|
|
|
|
);
|
|
|
|
|
log.warn(
|
|
|
|
|
`loki_message:::_openRetrieveConnection - too many successive failures, removing ${
|
|
|
|
|
nodeData.ip
|
|
|
|
|
}:${nodeData.port} from our swarm pool. We have ${
|
|
|
|
|
Object.keys(swarmPool).length
|
|
|
|
|
} usable swarm nodes left for our connection (${
|
|
|
|
|
remainingSwarmSnodes.length
|
|
|
|
|
} in local db)`
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// if not stopPollingResult
|
|
|
|
|
if (_.isEmpty(swarmPool)) {
|
|
|
|
|
log.error(
|
|
|
|
|
'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection'
|
|
|
|
|
);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// we don't throw or catch here
|
|
|
|
|
async startLongPolling(numConnections, stopPolling, onMessages) {
|
|
|
|
|
// load from local DB
|
|
|
|
|
let nodes = await lokiSnodeAPI.getSwarmNodesForPubKey(this.ourKey, {
|
|
|
|
|
fetchHashes: true,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (nodes.length < numConnections) {
|
|
|
|
|
log.warn(
|
|
|
|
|
'loki_message:::startLongPolling - Not enough SwarmNodes for our pubkey in local database, getting current list from blockchain'
|
|
|
|
|
);
|
|
|
|
|
// load from blockchain
|
|
|
|
|
nodes = await lokiSnodeAPI.refreshSwarmNodesForPubKey(this.ourKey);
|
|
|
|
|
if (nodes.length < numConnections) {
|
|
|
|
|
log.error(
|
|
|
|
|
'loki_message:::startLongPolling - Could not get enough SwarmNodes for our pubkey from blockchain'
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
log.info(
|
|
|
|
|
'loki_message:::startLongPolling - start polling for',
|
|
|
|
|
numConnections,
|
|
|
|
|
'connections. We have swarmNodes',
|
|
|
|
|
nodes.length,
|
|
|
|
|
'for',
|
|
|
|
|
this.ourKey
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// ok now split up our swarm pool into numConnections number of pools
|
|
|
|
|
// one for each retrieve connection
|
|
|
|
|
|
|
|
|
|
// floor or ceil probably doesn't matter, since it's likely always uneven
|
|
|
|
|
const poolSize = Math.floor(nodes.length / numConnections);
|
|
|
|
|
const pools = [];
|
|
|
|
|
while (nodes.length) {
|
|
|
|
|
const poolList = nodes.splice(0, poolSize);
|
|
|
|
|
const byAddressObj = poolList.reduce((result, node) => {
|
|
|
|
|
// eslint-disable-next-line no-param-reassign
|
|
|
|
|
result[node.address] = node;
|
|
|
|
|
return result;
|
|
|
|
|
}, {});
|
|
|
|
|
pools.push(byAddressObj);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const promises = [];
|
|
|
|
|
|
|
|
|
|
let unresolved = numConnections;
|
|
|
|
|
for (let i = 0; i < numConnections; i += 1) {
|
|
|
|
|
promises.push(
|
|
|
|
|
// eslint-disable-next-line more/no-then
|
|
|
|
|
this._openRetrieveConnection(pools[i], stopPolling, onMessages).then(
|
|
|
|
|
stoppedPolling => {
|
|
|
|
|
unresolved -= 1;
|
|
|
|
|
log.info(
|
|
|
|
|
`loki_message:::startLongPolling - There are ${unresolved}`,
|
|
|
|
|
`open retrieve connections left. Stopped? ${stoppedPolling}`
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// blocks until numConnections snodes in our swarms have been removed from the list
|
|
|
|
|
// less than numConnections being active is fine, only need to restart if none per Niels 20/02/13
|
|
|
|
|
// or if there is network issues (ENOUTFOUND due to lokinet)
|
|
|
|
|
await Promise.all(promises);
|
|
|
|
|
log.warn(
|
|
|
|
|
'loki_message:::startLongPolling - All our long poll swarm connections have been removed'
|
|
|
|
|
);
|
|
|
|
|
// should we just call ourself again?
|
|
|
|
|
// no, our caller already handles this...
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// These files are expected to be in commonjs so we can't use es6 syntax :(
|
|
|
|
|