Merge pull request #267 from BeaudanBrown/move-filtering

Persistent last hash
pull/271/head
Beaudan Campbell-Brown 6 years ago committed by GitHub
commit cab389fe2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -102,8 +102,10 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
cleanLastHashes,
saveSeenMessageHashes,
saveSeenMessageHash,
updateLastHash,
saveMessages,
removeMessage,
getUnreadByConversation,
@ -114,6 +116,7 @@ module.exports = {
getAllUnsentMessages,
getMessagesBySentAt,
getSeenMessagesByHashList,
getLastHashBySnode,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
@ -421,9 +424,17 @@ async function updateToSchemaVersion6(currentVersion, instance) {
ADD COLUMN friendRequestStatus INTEGER;`
);
await instance.run(
`CREATE TABLE lastHashes(
snode TEXT PRIMARY KEY,
hash TEXT,
expiresAt INTEGER
);`
);
await instance.run(
`CREATE TABLE seenMessages(
hash STRING PRIMARY KEY,
hash TEXT PRIMARY KEY,
expiresAt INTEGER
);`
);
@ -1556,6 +1567,27 @@ async function saveSeenMessageHashes(arrayOfHashes) {
await promise;
}
async function updateLastHash(data) {
const { snode, hash, expiresAt } = data;
await db.run(
`INSERT OR REPLACE INTO lastHashes (
snode,
hash,
expiresAt
) values (
$snode,
$hash,
$expiresAt
)`,
{
$snode: snode,
$hash: hash,
$expiresAt: expiresAt,
}
);
}
async function saveSeenMessageHash(data) {
const { expiresAt, hash } = data;
await db.run(
@ -1573,6 +1605,12 @@ async function saveSeenMessageHash(data) {
);
}
async function cleanLastHashes() {
await db.run('DELETE FROM lastHashes WHERE expiresAt <= $now;', {
$now: Date.now(),
});
}
async function cleanSeenMessages() {
await db.run('DELETE FROM seenMessages WHERE expiresAt <= $now;', {
$now: Date.now(),
@ -1710,6 +1748,18 @@ async function getMessagesBySentAt(sentAt) {
return map(rows, row => jsonToObject(row.json));
}
async function getLastHashBySnode(snode) {
const row = await db.get('SELECT * FROM lastHashes WHERE snode = $snode;', {
$snode: snode,
});
if (!row) {
return null;
}
return row.lastHash;
}
async function getSeenMessagesByHashList(hashes) {
const rows = await db.all(
`SELECT * FROM seenMessages WHERE hash IN ( ${hashes

@ -753,7 +753,6 @@
<script type='text/javascript' src='js/models/blockedNumbers.js'></script>
<script type='text/javascript' src='js/models/profile.js'></script>
<script type='text/javascript' src='js/expiring_messages.js'></script>
<script type='text/javascript' src='js/job_queue.js'></script>
<script type='text/javascript' src='js/chromium.js'></script>
<script type='text/javascript' src='js/registration.js'></script>

@ -501,13 +501,14 @@
}
});
function manageSeenMessages() {
function manageExpiringData() {
window.Signal.Data.cleanSeenMessages();
setTimeout(manageSeenMessages, 1000 * 60 * 60);
window.Signal.Data.cleanLastHashes();
setTimeout(manageExpiringData, 1000 * 60 * 60);
}
async function start() {
manageSeenMessages();
manageExpiringData();
window.dispatchEvent(new Event('storage_ready'));
window.log.info('listening for registration events');

@ -1,28 +0,0 @@
/* eslint-disable more/no-then */
// eslint-disable-next-line func-names
(function() {
'use strict';
class JobQueue {
constructor() {
this.pending = Promise.resolve();
}
add(job) {
const previous = this.pending || Promise.resolve();
this.pending = previous.then(job, job);
const current = this.pending;
current.then(() => {
if (this.pending === current) {
delete this.pending;
}
});
return current;
}
}
window.JobQueue = JobQueue;
})();

@ -130,7 +130,9 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
cleanLastHashes,
saveSeenMessageHash,
updateLastHash,
saveSeenMessageHashes,
saveLegacyMessage,
saveMessages,
@ -151,6 +153,7 @@ module.exports = {
getNextExpiringMessage,
getMessagesByConversation,
getSeenMessagesByHashList,
getLastHashBySnode,
getUnprocessedCount,
getAllUnprocessed,
@ -778,10 +781,18 @@ async function cleanSeenMessages() {
await channels.cleanSeenMessages();
}
async function cleanLastHashes() {
await channels.cleanLastHashes();
}
async function saveSeenMessageHashes(data) {
await channels.saveSeenMessageHashes(_cleanData(data));
}
async function updateLastHash(data) {
await channels.updateLastHash(_cleanData(data));
}
async function saveSeenMessageHash(data) {
await channels.saveSeenMessageHash(_cleanData(data));
}
@ -909,9 +920,12 @@ async function getMessagesByConversation(
return new MessageCollection(messages);
}
async function getLastHashBySnode(snode) {
return channels.getLastHashBySnode(snode);
}
async function getSeenMessagesByHashList(hashes) {
const seenMessages = await channels.getSeenMessagesByHashList(hashes);
return seenMessages;
return channels.getSeenMessagesByHashList(hashes);
}
async function removeAllMessagesInConversation(

@ -0,0 +1,24 @@
/* eslint-disable more/no-then */
class JobQueue {
constructor() {
this.pending = Promise.resolve();
}
add(job) {
const previous = this.pending || Promise.resolve();
this.pending = previous.then(job, job);
const current = this.pending;
current.then(() => {
if (this.pending === current) {
delete this.pending;
}
});
return current;
}
}
module.exports = {
JobQueue,
};

@ -12,6 +12,7 @@ const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll';
class LokiMessageAPI {
constructor({ snodeServerPort }) {
this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : '';
this.jobQueue = new window.JobQueue();
}
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
@ -162,6 +163,22 @@ class LokiMessageAPI {
let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes();
const filterIncomingMessages = async messages => {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
if (newMessages.length) {
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
}
return newMessages;
};
const nodeComplete = nodeUrl => {
completedNodes.push(nodeUrl);
delete ourSwarmNodes[nodeUrl];
@ -189,18 +206,27 @@ class LokiMessageAPI {
);
nodeComplete(nodeUrl);
successfulRequests += 1;
if (Array.isArray(result.messages) && result.messages.length) {
const lastHash = _.last(result.messages).hash;
lokiSnodeAPI.updateLastHash(nodeUrl, lastHash);
callback(result.messages);
const lastMessage = _.last(result.messages);
lokiSnodeAPI.updateLastHash(
nodeUrl,
lastMessage.hash,
lastMessage.expiration
);
const filteredMessages = await this.jobQueue.add(() =>
filterIncomingMessages(result.messages)
);
if (filteredMessages.length) {
callback(filteredMessages);
}
}
successfulRequests += 1;
} catch (e) {
log.warn('Loki retrieve messages:', e);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
await lokiSnodeAPI.updateOurSwarmNodes(newSwarm);
completedNodes.push(nodeUrl);
} else if (e instanceof textsecure.NotFoundError) {
canResolve = false;

@ -106,13 +106,15 @@ class LokiSnodeAPI {
return true;
}
updateLastHash(nodeUrl, hash) {
async updateLastHash(nodeUrl, lastHash, expiresAt) {
await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt });
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
lastHash: hash,
failureCount: 0,
lastHash,
};
} else {
this.ourSwarmNodes[nodeUrl].lastHash = hash;
this.ourSwarmNodes[nodeUrl].lastHash = lastHash;
}
}
@ -139,13 +141,16 @@ class LokiSnodeAPI {
}
}
updateOurSwarmNodes(newNodes) {
async updateOurSwarmNodes(newNodes) {
this.ourSwarmNodes = {};
newNodes.forEach(url => {
const ps = newNodes.map(async url => {
const lastHash = await window.Signal.Data.getLastHashBySnode(url);
this.ourSwarmNodes[url] = {
failureCount: 0,
lastHash,
};
});
await Promise.all(ps);
}
async getOurSwarmNodes() {
@ -153,16 +158,9 @@ class LokiSnodeAPI {
!this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) {
this.ourSwarmNodes = {};
// Try refresh our swarm list once
const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await this.getSwarmNodes(ourKey);
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {
failureCount: 0,
};
});
await this.updateOurSwarmNodes(nodeAddresses);
}
return { ...this.ourSwarmNodes };
}

@ -41,22 +41,6 @@
};
};
const filterIncomingMessages = async function filterIncomingMessages(
messages
) {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(
incomingHashes
);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
return newMessages;
};
window.HttpResource = function HttpResource(_server, opts = {}) {
server = _server;
let { handleRequest } = opts;
@ -64,17 +48,6 @@
handleRequest = request => request.respond(404, 'Not found');
}
let connected = true;
const jobQueue = new window.JobQueue();
const processMessages = async messages => {
const newMessages = await jobQueue.add(() =>
filterIncomingMessages(messages)
);
newMessages.forEach(async message => {
const { data } = message;
this.handleMessage(data);
});
};
this.handleMessage = (message, options = {}) => {
try {
@ -104,16 +77,21 @@
}
};
this.startPolling = async function pollServer(callback) {
this.pollServer = async callback => {
try {
await server.retrieveMessages(processMessages);
await server.retrieveMessages(messages => {
messages.forEach(message => {
const { data } = message;
this.handleMessage(data);
});
});
connected = true;
} catch (err) {
connected = false;
}
callback(connected);
setTimeout(() => {
pollServer(callback);
this.pollServer(callback);
}, POLL_TIME);
};

@ -73,7 +73,7 @@ MessageReceiver.prototype.extend({
this.httpPollingResource = new HttpResource(lokiMessageAPI, {
handleRequest: this.handleRequest.bind(this),
});
this.httpPollingResource.startPolling(connected => {
this.httpPollingResource.pollServer(connected => {
// Emulate receiving an 'empty' websocket messages from the server.
// This is required to update the internal logic that checks
// if we are connected to the server. Without this, for example,

@ -5,6 +5,7 @@ const electron = require('electron');
const semver = require('semver');
const { deferredToPromise } = require('./js/modules/deferred_to_promise');
const { JobQueue } = require('./js/modules/job_queue');
const { app } = electron.remote;
const { clipboard } = electron;
@ -31,6 +32,7 @@ window.getNodeVersion = () => config.node_version;
window.getHostName = () => config.hostname;
window.getServerTrustRoot = () => config.serverTrustRoot;
window.isBehindProxy = () => Boolean(config.proxyUrl);
window.JobQueue = JobQueue;
window.isBeforeVersion = (toCheck, baseVersion) => {
try {

@ -373,7 +373,6 @@
<script type='text/javascript' src='../js/expiring_messages.js' data-cover></script>
<script type='text/javascript' src='../js/notifications.js' data-cover></script>
<script type='text/javascript' src='../js/focus_listener.js'></script>
<script type='text/javascript' src='../js/job_queue.js'></script>
<script type="text/javascript" src="../js/chromium.js" data-cover></script>

Loading…
Cancel
Save