From 960bd3fff25a6d91631452a682100e04bfddbec6 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 11:04:02 +1000 Subject: [PATCH 1/7] Move JobQueue to be a module so that we can use it in preload --- background.html | 1 - js/job_queue.js | 28 ---------------------------- js/modules/job_queue.js | 24 ++++++++++++++++++++++++ preload.js | 2 ++ test/index.html | 1 - 5 files changed, 26 insertions(+), 30 deletions(-) delete mode 100644 js/job_queue.js create mode 100644 js/modules/job_queue.js diff --git a/background.html b/background.html index b153ca8ca..b2665f5bf 100644 --- a/background.html +++ b/background.html @@ -737,7 +737,6 @@ - diff --git a/js/job_queue.js b/js/job_queue.js deleted file mode 100644 index 348d0868a..000000000 --- a/js/job_queue.js +++ /dev/null @@ -1,28 +0,0 @@ -/* eslint-disable more/no-then */ - -// eslint-disable-next-line func-names -(function() { - 'use strict'; - - class JobQueue { - constructor() { - this.pending = Promise.resolve(); - } - - add(job) { - const previous = this.pending || Promise.resolve(); - this.pending = previous.then(job, job); - const current = this.pending; - - current.then(() => { - if (this.pending === current) { - delete this.pending; - } - }); - - return current; - } - } - - window.JobQueue = JobQueue; -})(); diff --git a/js/modules/job_queue.js b/js/modules/job_queue.js new file mode 100644 index 000000000..4a4a14b41 --- /dev/null +++ b/js/modules/job_queue.js @@ -0,0 +1,24 @@ +/* eslint-disable more/no-then */ +class JobQueue { + constructor() { + this.pending = Promise.resolve(); + } + + add(job) { + const previous = this.pending || Promise.resolve(); + this.pending = previous.then(job, job); + const current = this.pending; + + current.then(() => { + if (this.pending === current) { + delete this.pending; + } + }); + + return current; + } +} + +module.exports = { + JobQueue, +} diff --git a/preload.js b/preload.js index 64586c599..c2052acf9 100644 --- a/preload.js +++ b/preload.js @@ -5,6 +5,7 @@ const electron = require('electron'); const semver = require('semver'); const { deferredToPromise } = require('./js/modules/deferred_to_promise'); +const { JobQueue } = require('./js/modules/job_queue'); const { app } = electron.remote; const { clipboard } = electron; @@ -31,6 +32,7 @@ window.getNodeVersion = () => config.node_version; window.getHostName = () => config.hostname; window.getServerTrustRoot = () => config.serverTrustRoot; window.isBehindProxy = () => Boolean(config.proxyUrl); +window.JobQueue = JobQueue; window.isBeforeVersion = (toCheck, baseVersion) => { try { diff --git a/test/index.html b/test/index.html index 08c57c08d..c541e717b 100644 --- a/test/index.html +++ b/test/index.html @@ -373,7 +373,6 @@ - From 796181e00cb8e5dbcdfc6b0c2ee0cb1ccf283f1e Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 13:19:06 +1000 Subject: [PATCH 2/7] Add database functions for storing, retrieving and cleaning last hash for snodes --- app/sql.js | 51 ++++++++++++++++++++++++++++++++++++++++++++++ js/background.js | 7 ++++--- js/modules/data.js | 16 +++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/app/sql.js b/app/sql.js index cf8fb38ac..96f2ff7e9 100644 --- a/app/sql.js +++ b/app/sql.js @@ -102,8 +102,10 @@ module.exports = { getMessageCount, saveMessage, cleanSeenMessages, + cleanLastHashes, saveSeenMessageHashes, saveSeenMessageHash, + updateLastHash, saveMessages, removeMessage, getUnreadByConversation, @@ -114,6 +116,7 @@ module.exports = { getAllUnsentMessages, getMessagesBySentAt, getSeenMessagesByHashList, + getLastHashBySnode, getExpiredMessages, getOutgoingWithoutExpiresAt, getNextExpiringMessage, @@ -421,6 +424,14 @@ async function updateToSchemaVersion6(currentVersion, instance) { ADD COLUMN friendRequestStatus INTEGER;` ); + await instance.run( + `CREATE TABLE lastHashes( + snode STRING PRIMARY KEY, + hash STRING, + expiresAt INTEGER + );` + ); + await instance.run( `CREATE TABLE seenMessages( hash STRING PRIMARY KEY, @@ -1556,6 +1567,27 @@ async function saveSeenMessageHashes(arrayOfHashes) { await promise; } +async function updateLastHash(data) { + const { snode, hash, expiresAt } = data; + + await db.run( + `INSERT OR REPLACE INTO lastHashes ( + snode, + hash, + expiresAt + ) values ( + $snode, + $hash, + $expiresAt + )`, + { + $snode: snode, + $hash: hash, + $expiresAt: expiresAt, + } + ); +} + async function saveSeenMessageHash(data) { const { expiresAt, hash } = data; await db.run( @@ -1573,6 +1605,12 @@ async function saveSeenMessageHash(data) { ); } +async function cleanLastHashes() { + await db.run('DELETE FROM lastHashes WHERE expiresAt <= $now;', { + $now: Date.now(), + }); +} + async function cleanSeenMessages() { await db.run('DELETE FROM seenMessages WHERE expiresAt <= $now;', { $now: Date.now(), @@ -1710,6 +1748,19 @@ async function getMessagesBySentAt(sentAt) { return map(rows, row => jsonToObject(row.json)); } +async function getLastHashBySnode(snode) { + const row = await db.get('SELECT * FROM lastHashes WHERE snode = $snode;', { + $snode: snode, + }); + + + if (!row) { + return null; + } + + return row.lastHash; +} + async function getSeenMessagesByHashList(hashes) { const rows = await db.all( `SELECT * FROM seenMessages WHERE hash IN ( ${hashes diff --git a/js/background.js b/js/background.js index 7ae4592ad..7ee457b05 100644 --- a/js/background.js +++ b/js/background.js @@ -501,13 +501,14 @@ } }); - function manageSeenMessages() { + function manageExpiringData() { window.Signal.Data.cleanSeenMessages(); - setTimeout(manageSeenMessages, 1000 * 60 * 60); + window.Signal.Data.cleanLastHashes(); + setTimeout(manageExpiringData, 1000 * 60 * 60); } async function start() { - manageSeenMessages(); + manageExpiringData(); window.dispatchEvent(new Event('storage_ready')); window.log.info('listening for registration events'); diff --git a/js/modules/data.js b/js/modules/data.js index c9c3dddbb..768eb9530 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -130,7 +130,9 @@ module.exports = { getMessageCount, saveMessage, cleanSeenMessages, + cleanLastHashes, saveSeenMessageHash, + updateLastHash, saveSeenMessageHashes, saveLegacyMessage, saveMessages, @@ -151,6 +153,7 @@ module.exports = { getNextExpiringMessage, getMessagesByConversation, getSeenMessagesByHashList, + getLastHashBySnode, getUnprocessedCount, getAllUnprocessed, @@ -778,10 +781,18 @@ async function cleanSeenMessages() { await channels.cleanSeenMessages(); } +async function cleanLastHashes() { + await channels.cleanLastHashes(); +} + async function saveSeenMessageHashes(data) { await channels.saveSeenMessageHashes(_cleanData(data)); } +async function updateLastHash(data) { + await channels.updateLastHash(_cleanData(data)); +} + async function saveSeenMessageHash(data) { await channels.saveSeenMessageHash(_cleanData(data)); } @@ -909,6 +920,11 @@ async function getMessagesByConversation( return new MessageCollection(messages); } +async function getLastHashBySnode(snode) { + const lastHash = await channels.getLastHashBySnode(snode); + return lastHash; +} + async function getSeenMessagesByHashList(hashes) { const seenMessages = await channels.getSeenMessagesByHashList(hashes); return seenMessages; From 620380d2d9fee1b921dd752d651ba179c93dc05b Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 14:45:55 +1000 Subject: [PATCH 3/7] Move filter messages to message api and some other cleaning --- js/modules/loki_message_api.js | 32 ++++++++++++++++++++++---- js/modules/loki_snode_api.js | 24 +++++++++---------- libtextsecure/http-resources.js | 38 +++++++------------------------ libtextsecure/message_receiver.js | 2 +- 4 files changed, 47 insertions(+), 49 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 175fdc479..75e47a075 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -12,6 +12,7 @@ const LOKI_LONGPOLL_HEADER = 'X-Loki-Long-Poll'; class LokiMessageAPI { constructor({ snodeServerPort }) { this.snodeServerPort = snodeServerPort ? `:${snodeServerPort}` : ''; + this.jobQueue = new window.JobQueue(); } async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) { @@ -162,6 +163,22 @@ class LokiMessageAPI { let ourSwarmNodes = await lokiSnodeAPI.getOurSwarmNodes(); + const filterIncomingMessages = async messages => { + const incomingHashes = messages.map(m => m.hash); + const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( + incomingHashes + ); + const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); + const newHashes = newMessages.map(m => ({ + expiresAt: m.expiration, + hash: m.hash, + })); + if (newHashes.length) { + await window.Signal.Data.saveSeenMessageHashes(newHashes); + } + return newMessages; + }; + const nodeComplete = nodeUrl => { completedNodes.push(nodeUrl); delete ourSwarmNodes[nodeUrl]; @@ -189,18 +206,23 @@ class LokiMessageAPI { ); nodeComplete(nodeUrl); + successfulRequests += 1; if (Array.isArray(result.messages) && result.messages.length) { - const lastHash = _.last(result.messages).hash; - lokiSnodeAPI.updateLastHash(nodeUrl, lastHash); - callback(result.messages); + const lastMessage = _.last(result.messages); + lokiSnodeAPI.updateLastHash(nodeUrl, lastMessage.hash, lastMessage.expiration); + const filteredMessages = await this.jobQueue.add(() => + filterIncomingMessages(result.messages) + ); + if (filteredMessages.length) { + callback(filteredMessages); + } } - successfulRequests += 1; } catch (e) { log.warn('Loki retrieve messages:', e); if (e instanceof textsecure.WrongSwarmError) { const { newSwarm } = e; - lokiSnodeAPI.updateOurSwarmNodes(newSwarm); + await lokiSnodeAPI.updateOurSwarmNodes(newSwarm); completedNodes.push(nodeUrl); } else if (e instanceof textsecure.NotFoundError) { canResolve = false; diff --git a/js/modules/loki_snode_api.js b/js/modules/loki_snode_api.js index 8edd758ba..79667f13f 100644 --- a/js/modules/loki_snode_api.js +++ b/js/modules/loki_snode_api.js @@ -106,13 +106,15 @@ class LokiSnodeAPI { return true; } - updateLastHash(nodeUrl, hash) { + async updateLastHash(nodeUrl, lastHash, expiresAt) { + await window.Signal.Data.updateLastHash({ nodeUrl, lastHash, expiresAt }); if (!this.ourSwarmNodes[nodeUrl]) { this.ourSwarmNodes[nodeUrl] = { - lastHash: hash, + failureCount: 0, + lastHash, }; } else { - this.ourSwarmNodes[nodeUrl].lastHash = hash; + this.ourSwarmNodes[nodeUrl].lastHash = lastHash; } } @@ -139,13 +141,16 @@ class LokiSnodeAPI { } } - updateOurSwarmNodes(newNodes) { + async updateOurSwarmNodes(newNodes) { this.ourSwarmNodes = {}; - newNodes.forEach(url => { + const ps = newNodes.map(async url => { + const lastHash = await window.Signal.Data.getLastHashBySnode(url); this.ourSwarmNodes[url] = { failureCount: 0, + lastHash, }; }); + await Promise.all(ps); } async getOurSwarmNodes() { @@ -153,16 +158,9 @@ class LokiSnodeAPI { !this.ourSwarmNodes || Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES ) { - this.ourSwarmNodes = {}; - // Try refresh our swarm list once const ourKey = window.textsecure.storage.user.getNumber(); const nodeAddresses = await this.getSwarmNodes(ourKey); - - nodeAddresses.forEach(url => { - this.ourSwarmNodes[url] = { - failureCount: 0, - }; - }); + await this.updateOurSwarmNodes(nodeAddresses); } return { ...this.ourSwarmNodes }; } diff --git a/libtextsecure/http-resources.js b/libtextsecure/http-resources.js index 65237f750..02f1eb95b 100644 --- a/libtextsecure/http-resources.js +++ b/libtextsecure/http-resources.js @@ -41,22 +41,6 @@ }; }; - const filterIncomingMessages = async function filterIncomingMessages( - messages - ) { - const incomingHashes = messages.map(m => m.hash); - const dupHashes = await window.Signal.Data.getSeenMessagesByHashList( - incomingHashes - ); - const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); - const newHashes = newMessages.map(m => ({ - expiresAt: m.expiration, - hash: m.hash, - })); - await window.Signal.Data.saveSeenMessageHashes(newHashes); - return newMessages; - }; - window.HttpResource = function HttpResource(_server, opts = {}) { server = _server; let { handleRequest } = opts; @@ -64,17 +48,6 @@ handleRequest = request => request.respond(404, 'Not found'); } let connected = true; - const jobQueue = new window.JobQueue(); - - const processMessages = async messages => { - const newMessages = await jobQueue.add(() => - filterIncomingMessages(messages) - ); - newMessages.forEach(async message => { - const { data } = message; - this.handleMessage(data); - }); - }; this.handleMessage = (message, options = {}) => { try { @@ -104,16 +77,21 @@ } }; - this.startPolling = async function pollServer(callback) { + this.pollServer = async callback => { try { - await server.retrieveMessages(processMessages); + await server.retrieveMessages(messages => { + messages.forEach(message => { + const { data } = message; + this.handleMessage(data); + }); + }); connected = true; } catch (err) { connected = false; } callback(connected); setTimeout(() => { - pollServer(callback); + this.pollServer(callback); }, POLL_TIME); }; diff --git a/libtextsecure/message_receiver.js b/libtextsecure/message_receiver.js index d689c4416..e83ff7509 100644 --- a/libtextsecure/message_receiver.js +++ b/libtextsecure/message_receiver.js @@ -73,7 +73,7 @@ MessageReceiver.prototype.extend({ this.httpPollingResource = new HttpResource(lokiMessageAPI, { handleRequest: this.handleRequest.bind(this), }); - this.httpPollingResource.startPolling(connected => { + this.httpPollingResource.pollServer(connected => { // Emulate receiving an 'empty' websocket messages from the server. // This is required to update the internal logic that checks // if we are connected to the server. Without this, for example, From faf1b4b66b64c7182a0b94af27dc0b73937a7a45 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 14:53:01 +1000 Subject: [PATCH 4/7] Lint --- app/sql.js | 1 - js/modules/job_queue.js | 2 +- js/modules/loki_message_api.js | 6 +++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/app/sql.js b/app/sql.js index 96f2ff7e9..576c8ef93 100644 --- a/app/sql.js +++ b/app/sql.js @@ -1753,7 +1753,6 @@ async function getLastHashBySnode(snode) { $snode: snode, }); - if (!row) { return null; } diff --git a/js/modules/job_queue.js b/js/modules/job_queue.js index 4a4a14b41..fa0ca6038 100644 --- a/js/modules/job_queue.js +++ b/js/modules/job_queue.js @@ -21,4 +21,4 @@ class JobQueue { module.exports = { JobQueue, -} +}; diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index 75e47a075..d09052ef2 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -210,7 +210,11 @@ class LokiMessageAPI { if (Array.isArray(result.messages) && result.messages.length) { const lastMessage = _.last(result.messages); - lokiSnodeAPI.updateLastHash(nodeUrl, lastMessage.hash, lastMessage.expiration); + lokiSnodeAPI.updateLastHash( + nodeUrl, + lastMessage.hash, + lastMessage.expiration + ); const filteredMessages = await this.jobQueue.add(() => filterIncomingMessages(result.messages) ); From e63e9b9053a622f2c596a75240ee802e95942a18 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 15:26:21 +1000 Subject: [PATCH 5/7] Small optimisation --- js/modules/loki_message_api.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/js/modules/loki_message_api.js b/js/modules/loki_message_api.js index d09052ef2..102ea3090 100644 --- a/js/modules/loki_message_api.js +++ b/js/modules/loki_message_api.js @@ -169,11 +169,11 @@ class LokiMessageAPI { incomingHashes ); const newMessages = messages.filter(m => !dupHashes.includes(m.hash)); - const newHashes = newMessages.map(m => ({ - expiresAt: m.expiration, - hash: m.hash, - })); - if (newHashes.length) { + if (newMessages.length) { + const newHashes = newMessages.map(m => ({ + expiresAt: m.expiration, + hash: m.hash, + })); await window.Signal.Data.saveSeenMessageHashes(newHashes); } return newMessages; From 3522513590e717603e870df1ea2ffaeb2f001636 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Mon, 15 Apr 2019 15:43:35 +1000 Subject: [PATCH 6/7] Use TEXT over STRING --- app/sql.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/sql.js b/app/sql.js index 576c8ef93..44d5ebf6d 100644 --- a/app/sql.js +++ b/app/sql.js @@ -426,15 +426,15 @@ async function updateToSchemaVersion6(currentVersion, instance) { await instance.run( `CREATE TABLE lastHashes( - snode STRING PRIMARY KEY, - hash STRING, + snode TEXT PRIMARY KEY, + hash TEXT, expiresAt INTEGER );` ); await instance.run( `CREATE TABLE seenMessages( - hash STRING PRIMARY KEY, + hash TEXT PRIMARY KEY, expiresAt INTEGER );` ); From 8ff647e3cd4669bc3b8e1d90c81199eb7c632958 Mon Sep 17 00:00:00 2001 From: Beaudan Date: Tue, 16 Apr 2019 08:41:31 +1000 Subject: [PATCH 7/7] Return instead of await --- js/modules/data.js | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/js/modules/data.js b/js/modules/data.js index 768eb9530..9fb5d1f07 100644 --- a/js/modules/data.js +++ b/js/modules/data.js @@ -921,13 +921,11 @@ async function getMessagesByConversation( } async function getLastHashBySnode(snode) { - const lastHash = await channels.getLastHashBySnode(snode); - return lastHash; + return channels.getLastHashBySnode(snode); } async function getSeenMessagesByHashList(hashes) { - const seenMessages = await channels.getSeenMessagesByHashList(hashes); - return seenMessages; + return channels.getSeenMessagesByHashList(hashes); } async function removeAllMessagesInConversation(