Move filter messages to message api and some other cleaning

pull/267/head
Beaudan 6 years ago
parent 796181e00c
commit 620380d2d9

@ -12,6 +12,7 @@ const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
class LokiMessageAPI { class LokiMessageAPI {
constructor({ snodeServerPort }) { constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
this.jobQueue = new window.JobQueue();
} }
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
@ -162,6 +163,22 @@ class LokiMessageAPI {
let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
if (newHashes.length) {
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
const nodeComplete = nodeUrl => { const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl); completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl]; delete ourSwarmNodes[nodeUrl];
@ -189,18 +206,23 @@ class LokiMessageAPI {
); );
nodeComplete(nodeUrl); nodeComplete(nodeUrl);
successfulRequests += 1;
if (Array.isArray(result.messages) && result.messages.length) { if (Array.isArray(result.messages) && result.messages.length) {
const lastHash = _.last(result.messages).hash; const lastMessage = _.last(result.messages);
lokiSnodeAPI.updateLastHash(nodeUrl, lastHash); lokiSnodeAPI.updateLastHash(nodeUrl, lastMessage.hash, lastMessage.expiration);
callback(result.messages); const filteredMessages = await this.jobQueue.add(() =>
filterIncomingMessages(result.messages)
);
if (filteredMessages.length) {
callback(filteredMessages);
}
} }
successfulRequests += 1;
} catch (e) { } catch (e) {
log.warn('Loki retrieve messages:', e); log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) { if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e; const { newSwarm } = e;
lokiSnodeAPI.updateOurSwarmNodes(newSwarm); await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
completedNodes.push(nodeUrl); completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) { } else if (e instanceof textsecure.NotFoundError) {
canResolve = false; canResolve = false;

@ -106,13 +106,15 @@ class LokiSnodeAPI {
return true; return true;
} }
updateLastHash(nodeUrl, hash) { async updateLastHash(nodeUrl, lastHash, expiresAt) {
await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt });
if (!this.ourSwarmNodes[nodeUrl]) { if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = { this.ourSwarmNodes[nodeUrl] = {
lastHash: hash, failureCount: 0,
lastHash,
}; };
} else { } else {
this.ourSwarmNodes[nodeUrl].lastHash = hash; this.ourSwarmNodes[nodeUrl].lastHash = lastHash;
} }
} }
@ -139,13 +141,16 @@ class LokiSnodeAPI {
} }
} }
updateOurSwarmNodes(newNodes) { async updateOurSwarmNodes(newNodes) {
this.ourSwarmNodes = {}; this.ourSwarmNodes = {};
newNodes.forEach(url => { const ps = newNodes.map(async url => {
const lastHash = await window.Signal.Data.getLastHashBySnode(url);
this.ourSwarmNodes[url] = { this.ourSwarmNodes[url] = {
failureCount: 0, failureCount: 0,
lastHash,
}; };
}); });
await Promise.all(ps);
} }
async getOurSwarmNodes() { async getOurSwarmNodes() {
@ -153,16 +158,9 @@ class LokiSnodeAPI {
!this.ourSwarmNodes || !this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) { ) {
this.ourSwarmNodes = {};
// Try refresh our swarm list once
const ourKey = window.textsecure.storage.user.getNumber(); const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await this.getSwarmNodes(ourKey); const nodeAddresses = await this.getSwarmNodes(ourKey);
await this.updateOurSwarmNodes(nodeAddresses);
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {
failureCount: 0,
};
});
} }
return { ...this.ourSwarmNodes }; return { ...this.ourSwarmNodes };
} }

@ -41,22 +41,6 @@
}; };
}; };
const filterIncomingMessages = async function filterIncomingMessages(
messages
) {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
return newMessages;
};
window.HttpResource = function HttpResource(_server, opts = {}) { window.HttpResource = function HttpResource(_server, opts = {}) {
server = _server; server = _server;
let { handleRequest } = opts; let { handleRequest } = opts;
@ -64,17 +48,6 @@
handleRequest = request => request.respond(404, 'Not found'); handleRequest = request => request.respond(404, 'Not found');
} }
let connected = true; let connected = true;
const jobQueue = new window.JobQueue();
const processMessages = async messages => {
const newMessages = await jobQueue.add(() =>
filterIncomingMessages(messages)
);
newMessages.forEach(async message => {
const { data } = message;
this.handleMessage(data);
});
};
this.handleMessage = (message, options = {}) => { this.handleMessage = (message, options = {}) => {
try { try {
@ -104,16 +77,21 @@
} }
}; };
this.startPolling = async function pollServer(callback) { this.pollServer = async callback => {
try { try {
await server.retrieveMessages(processMessages); await server.retrieveMessages(messages => {
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
});
connected = true; connected = true;
} catch (err) { } catch (err) {
connected = false; connected = false;
} }
callback(connected); callback(connected);
setTimeout(() => { setTimeout(() => {
pollServer(callback); this.pollServer(callback);
}, POLL_TIME); }, POLL_TIME);
}; };

@ -73,7 +73,7 @@ MessageReceiver.prototype.extend({
this.httpPollingResource = new HttpResource(lokiMessageAPI, { this.httpPollingResource = new HttpResource(lokiMessageAPI, {
handleRequest: this.handleRequest.bind(this), handleRequest: this.handleRequest.bind(this),
}); });
this.httpPollingResource.startPolling(connected => { this.httpPollingResource.pollServer(connected => {
// Emulate receiving an 'empty' websocket messages from the server. // Emulate receiving an 'empty' websocket messages from the server.
// This is required to update the internal logic that checks // This is required to update the internal logic that checks
// if we are connected to the server. Without this, for example, // if we are connected to the server. Without this, for example,

Loading…
Cancel
Save