Created new table to store the received message hashes. Checking this table when receiving messages to look for duplicates. Should be cleared of expired messages on app start and every hour after

pull/45/head
Beaudan 6 years ago
parent 38d5b6e833
commit 712566ef3b

@ -89,6 +89,9 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
saveSeenMessageHashes,
saveSeenMessageHash,
saveMessages,
removeMessage,
getUnreadByConversation,
@ -97,6 +100,7 @@ module.exports = {
getAllMessages,
getAllMessageIds,
getMessagesBySentAt,
getSeenMessagesByHashList,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
@ -388,6 +392,14 @@ async function updateToSchemaVersion6(currentVersion, instance) {
console.log('updateToSchemaVersion6: starting...');
await instance.run('BEGIN TRANSACTION;');
await instance.run(
`CREATE TABLE seenMessages(
id STRING PRIMARY KEY ASC,
expiresAt INTEGER,
hash STRING
);`
);
// key-value, ids are strings, one extra column
await instance.run(
`CREATE TABLE sessions(
@ -1223,6 +1235,48 @@ async function saveMessage(data, { forceSave } = {}) {
return toCreate.id;
}
async function saveSeenMessageHashes(arrayOfHashes) {
let promise;
db.serialize(() => {
promise = Promise.all([
db.run('BEGIN TRANSACTION;'),
...map(arrayOfHashes, hashData => saveSeenMessageHash(hashData)),
db.run('COMMIT TRANSACTION;'),
]);
});
await promise;
}
async function saveSeenMessageHash(data) {
const {
expiresAt,
hash,
} = data;
await db.run(
`INSERT INTO seenMessages (
id,
expiresAt,
hash
) values (
$id,
$expiresAt,
$hash
);`, {
$id: generateUUID(),
$expiresAt: expiresAt,
$hash: hash,
}
);
}
async function cleanSeenMessages() {
await db.run('DELETE FROM seenMessages WHERE expiresAt <= $now;', {
$now: Date.now(),
});
}
async function saveMessages(arrayOfMessages, { forceSave } = {}) {
let promise;
@ -1343,6 +1397,15 @@ async function getMessagesBySentAt(sentAt) {
return map(rows, row => jsonToObject(row.json));
}
async function getSeenMessagesByHashList(hashes) {
const rows = await db.all(
`SELECT * FROM seenMessages WHERE hash IN ( ${hashes.map(() => '?').join(', ')} );`,
hashes
);
return map(rows, row => row.hash);
}
async function getExpiredMessages() {
const now = Date.now();

@ -456,7 +456,13 @@
}
});
function manageSeenMessages() {
window.Signal.Data.cleanSeenMessages();
setTimeout(manageSeenMessages, 1000 * 60 * 60);
}
async function start() {
manageSeenMessages();
window.dispatchEvent(new Event('storage_ready'));
window.log.info('listening for registration events');

@ -122,6 +122,9 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
saveSeenMessageHash,
saveSeenMessageHashes,
saveLegacyMessage,
saveMessages,
removeMessage,
@ -139,6 +142,7 @@ module.exports = {
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
getMessagesByConversation,
getSeenMessagesByHashList,
getUnprocessedCount,
getAllUnprocessed,
@ -727,6 +731,18 @@ async function getMessageCount() {
return channels.getMessageCount();
}
async function cleanSeenMessages() {
await channels.cleanSeenMessages();
}
async function saveSeenMessageHashes(data) {
await channels.saveSeenMessageHashes(_cleanData(data));
}
async function saveSeenMessageHash(data) {
await channels.saveSeenMessageHash(_cleanData(data));
}
async function saveMessage(data, { forceSave, Message } = {}) {
const updated = keysFromArrayBuffer(MESSAGE_PRE_KEYS, data);
const id = await channels.saveMessage(_cleanData(updated), { forceSave });
@ -854,6 +870,13 @@ async function getMessagesByConversation(
return new MessageCollection(encoded);
}
async function getSeenMessagesByHashList(
hashes
) {
const seenMessages = await channels.getSeenMessagesByHashList(hashes);
return seenMessages;
}
async function removeAllMessagesInConversation(
conversationId,
{ MessageCollection }

@ -68,7 +68,19 @@
setTimeout(() => { pollServer(callBack); }, 5000);
return;
}
result.messages.forEach(async message => {
const incomingHashes = result.messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(incomingHashes);
if (incomingHashes.length === dupHashes.length) {
setTimeout(() => { pollServer(callBack); }, 5000);
return;
}
const NewMessages = result.messages.filter(m => !dupHashes.includes(m.hash));
const NewHashes = NewMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveMessageHashes(NewHashes);
NewMessages.forEach(async message => {
const { data } = message;
const dataPlaintext = stringToArrayBufferBase64(data);
const messageBuf = textsecure.protobuf.WebSocketMessage.decode(dataPlaintext);

Loading…
Cancel
Save