Merge pull request #45 from BeaudanBrown/filter-messages

Filtering incoming messages
pull/49/head
sachaaaaa 6 years ago committed by GitHub
commit 97d0df9b79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -89,6 +89,9 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
saveSeenMessageHashes,
saveSeenMessageHash,
saveMessages,
removeMessage,
getUnreadByConversation,
@ -98,6 +101,7 @@ module.exports = {
getAllMessageIds,
getAllUnsentMessages,
getMessagesBySentAt,
getSeenMessagesByHashList,
getExpiredMessages,
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
@ -390,6 +394,13 @@ async function updateToSchemaVersion6(currentVersion, instance) {
console.log('updateToSchemaVersion6: starting...');
await instance.run('BEGIN TRANSACTION;');
await instance.run(
`CREATE TABLE seenMessages(
hash STRING PRIMARY KEY,
expiresAt INTEGER
);`
);
// key-value, ids are strings, one extra column
await instance.run(
`CREATE TABLE sessions(
@ -1230,6 +1241,45 @@ async function saveMessage(data, { forceSave } = {}) {
return toCreate.id;
}
async function saveSeenMessageHashes(arrayOfHashes) {
let promise;
db.serialize(() => {
promise = Promise.all([
db.run('BEGIN TRANSACTION;'),
...map(arrayOfHashes, hashData => saveSeenMessageHash(hashData)),
db.run('COMMIT TRANSACTION;'),
]);
});
await promise;
}
async function saveSeenMessageHash(data) {
const {
expiresAt,
hash,
} = data;
await db.run(
`INSERT INTO seenMessages (
expiresAt,
hash
) values (
$expiresAt,
$hash
);`, {
$expiresAt: expiresAt,
$hash: hash,
}
);
}
async function cleanSeenMessages() {
await db.run('DELETE FROM seenMessages WHERE expiresAt <= $now;', {
$now: Date.now(),
});
}
async function saveMessages(arrayOfMessages, { forceSave } = {}) {
let promise;
@ -1360,6 +1410,15 @@ async function getMessagesBySentAt(sentAt) {
return map(rows, row => jsonToObject(row.json));
}
async function getSeenMessagesByHashList(hashes) {
const rows = await db.all(
`SELECT * FROM seenMessages WHERE hash IN ( ${hashes.map(() => '?').join(', ')} );`,
hashes
);
return map(rows, row => row.hash);
}
async function getExpiredMessages() {
const now = Date.now();

@ -464,7 +464,13 @@
}
});
function manageSeenMessages() {
window.Signal.Data.cleanSeenMessages();
setTimeout(manageSeenMessages, 1000 * 60 * 60);
}
async function start() {
manageSeenMessages();
window.dispatchEvent(new Event('storage_ready'));
window.log.info('listening for registration events');

@ -122,6 +122,9 @@ module.exports = {
getMessageCount,
saveMessage,
cleanSeenMessages,
saveSeenMessageHash,
saveSeenMessageHashes,
saveLegacyMessage,
saveMessages,
removeMessage,
@ -140,6 +143,7 @@ module.exports = {
getOutgoingWithoutExpiresAt,
getNextExpiringMessage,
getMessagesByConversation,
getSeenMessagesByHashList,
getUnprocessedCount,
getAllUnprocessed,
@ -728,6 +732,18 @@ async function getMessageCount() {
return channels.getMessageCount();
}
async function cleanSeenMessages() {
await channels.cleanSeenMessages();
}
async function saveSeenMessageHashes(data) {
await channels.saveSeenMessageHashes(_cleanData(data));
}
async function saveSeenMessageHash(data) {
await channels.saveSeenMessageHash(_cleanData(data));
}
async function saveMessage(data, { forceSave, Message } = {}) {
const updated = keysFromArrayBuffer(MESSAGE_PRE_KEYS, data);
const id = await channels.saveMessage(_cleanData(updated), { forceSave });
@ -861,6 +877,13 @@ async function getMessagesByConversation(
return new MessageCollection(encoded);
}
async function getSeenMessagesByHashList(
hashes
) {
const seenMessages = await channels.getSeenMessagesByHashList(hashes);
return seenMessages;
}
async function removeAllMessagesInConversation(
conversationId,
{ MessageCollection }

@ -40,6 +40,17 @@
};
};
const filterIncomingMessages = async function filterIncomingMessages(messages) {
const incomingHashes = messages.map(m => m.hash);
const dupHashes = await window.Signal.Data.getSeenMessagesByHashList(incomingHashes);
const newMessages = messages.filter(m => !dupHashes.includes(m.hash));
const newHashes = newMessages.map(m => ({
expiresAt: m.expiration,
hash: m.hash,
}));
await window.Signal.Data.saveSeenMessageHashes(newHashes);
return newMessages;
};
window.HttpResource = function HttpResource(_server, opts = {}) {
server = _server;
@ -68,7 +79,8 @@
setTimeout(() => { pollServer(callBack); }, 5000);
return;
}
result.messages.forEach(async message => {
const newMessages = await filterIncomingMessages(result.messages);
newMessages.forEach(async message => {
const { data } = message;
const dataPlaintext = stringToArrayBufferBase64(data);
const messageBuf = textsecure.protobuf.WebSocketMessage.decode(dataPlaintext);

Loading…
Cancel
Save