diff --git a/ts/session/sending/PendingMessageCache.ts b/ts/session/sending/PendingMessageCache.ts index 899d934bd..30bfe35d6 100644 --- a/ts/session/sending/PendingMessageCache.ts +++ b/ts/session/sending/PendingMessageCache.ts @@ -1,36 +1,135 @@ -import { RawMessage } from '../types/RawMessage'; +import { createOrUpdateItem, getItemById } from '../../../js/modules/data'; +import { PartialRawMessage, RawMessage } from '../types/RawMessage'; import { ContentMessage } from '../messages/outgoing'; +import { PubKey } from '../types'; +import * as MessageUtils from '../utils'; -// TODO: We should be able to import functions straight from the db here without going through the window object +// This is an abstraction for storing pending messages. +// Ideally we want to store pending messages in the database so that +// on next launch we can re-send the pending messages, but we don't want +// to constantly fetch pending messages from the database. +// Thus we have an intermediary cache which will store pending messagesin +// memory and sync its state with the database on modification (add or remove). export class PendingMessageCache { - private readonly cachedMessages: Array = []; + public readonly isReady: Promise; + private cache: Array; constructor() { - // TODO: We should load pending messages from db here + // Load pending messages from the database + // You should await isReady on making a new PendingMessageCache + // if you'd like to have instant access to the cache + this.cache = []; + + this.isReady = new Promise(async resolve => { + await this.loadFromDB(); + resolve(true); + }); + } + + public getAllPending(): Array { + // Get all pending from cache, sorted with oldest first + return [...this.cache].sort((a, b) => a.timestamp - b.timestamp); + } + + public getForDevice(device: PubKey): Array { + return this.getAllPending().filter(m => m.device === device.key); + } + + public getDevices(): Array { + // Gets all unique devices with pending messages + const pubkeyStrings = [...new Set(this.cache.map(m => m.device))]; + + return pubkeyStrings.map(PubKey.from).filter((k): k is PubKey => !!k); } - public addPendingMessage( - device: string, + public async add( + device: PubKey, message: ContentMessage - ): RawMessage { - // TODO: Maybe have a util for converting OutgoingContentMessage to RawMessage? - // TODO: Raw message has uuid, how are we going to set that? maybe use a different identifier? - // One could be device + timestamp would make a unique identifier - // TODO: Return previous pending message if it exists - return {} as RawMessage; + ): Promise { + const rawMessage = MessageUtils.toRawMessage(device, message); + + // Does it exist in cache already? + if (this.find(rawMessage)) { + return rawMessage; + } + + this.cache.push(rawMessage); + await this.saveToDB(); + + return rawMessage; + } + + public async remove( + message: RawMessage + ): Promise | undefined> { + // Should only be called after message is processed + + // Return if message doesn't exist in cache + if (!this.find(message)) { + return; + } + + // Remove item from cache and sync with database + const updatedCache = this.cache.filter( + m => m.identifier !== message.identifier + ); + this.cache = updatedCache; + await this.saveToDB(); + + return updatedCache; + } + + public find(message: RawMessage): RawMessage | undefined { + // Find a message in the cache + return this.cache.find( + m => m.device === message.device && m.timestamp === message.timestamp + ); } - public removePendingMessage(message: RawMessage) { - // TODO: implement + public async clear() { + // Clears the cache and syncs to DB + this.cache = []; + await this.saveToDB(); } - public getPendingDevices(): Array { - // TODO: this should return all devices which have pending messages - return []; + public async loadFromDB() { + const messages = await this.getFromStorage(); + this.cache = messages; } - public getPendingMessages(device: string): Array { - return []; + private async getFromStorage(): Promise> { + const data = await getItemById('pendingMessages'); + if (!data || !data.value) { + return []; + } + + const barePending = JSON.parse(String(data.value)) as Array< + PartialRawMessage + >; + + // Rebuild plainTextBuffer + return barePending.map((message: PartialRawMessage) => { + return { + ...message, + plainTextBuffer: new Uint8Array(message.plainTextBuffer), + } as RawMessage; + }); + } + + private async saveToDB() { + // For each plainTextBuffer in cache, save in as a simple Array to avoid + // Node issues with JSON stringifying Buffer without strict typing + const encodedCache = [...this.cache].map(item => { + const plainTextBuffer = Array.from(item.plainTextBuffer); + + return { ...item, plainTextBuffer }; + }); + + const encodedPendingMessages = JSON.stringify(encodedCache) || '[]'; + await createOrUpdateItem({ + id: 'pendingMessages', + value: encodedPendingMessages, + }); } } diff --git a/ts/session/types/PubKey.ts b/ts/session/types/PubKey.ts new file mode 100644 index 000000000..5653db5b5 --- /dev/null +++ b/ts/session/types/PubKey.ts @@ -0,0 +1,28 @@ +export class PubKey { + public static readonly PUBKEY_LEN = 66; + private static readonly regex: string = `^05[0-9a-fA-F]{${PubKey.PUBKEY_LEN - + 2}}$`; + public readonly key: string; + + constructor(pubkeyString: string) { + PubKey.validate(pubkeyString); + this.key = pubkeyString; + } + + public static from(pubkeyString: string): PubKey | undefined { + // Returns a new instance if the pubkey is valid + if (PubKey.validate(pubkeyString)) { + return new PubKey(pubkeyString); + } + + return undefined; + } + + public static validate(pubkeyString: string): boolean { + if (pubkeyString.match(PubKey.regex)) { + return true; + } + + return false; + } +} diff --git a/ts/session/types/RawMessage.ts b/ts/session/types/RawMessage.ts index 30d2e0d9b..a0333c603 100644 --- a/ts/session/types/RawMessage.ts +++ b/ts/session/types/RawMessage.ts @@ -10,3 +10,13 @@ export interface RawMessage { ttl: number; encryption: EncryptionType; } + +// For building RawMessages from JSON +export interface PartialRawMessage { + identifier: string; + plainTextBuffer: any; + timestamp: number; + device: string; + ttl: number; + encryption: number; +} diff --git a/ts/session/types/index.ts b/ts/session/types/index.ts new file mode 100644 index 000000000..c7c994c52 --- /dev/null +++ b/ts/session/types/index.ts @@ -0,0 +1,3 @@ +export * from './EncryptionType'; +export * from './RawMessage'; +export * from './PubKey'; diff --git a/ts/session/utils/Messages.ts b/ts/session/utils/Messages.ts new file mode 100644 index 000000000..04ae0f815 --- /dev/null +++ b/ts/session/utils/Messages.ts @@ -0,0 +1,24 @@ +import { RawMessage } from '../types/RawMessage'; +import { ContentMessage } from '../messages/outgoing'; +import { EncryptionType, PubKey } from '../types'; + +export function toRawMessage( + device: PubKey, + message: ContentMessage +): RawMessage { + const ttl = message.ttl(); + const timestamp = message.timestamp; + const plainTextBuffer = message.plainTextBuffer(); + + // tslint:disable-next-line: no-unnecessary-local-variable + const rawMessage: RawMessage = { + identifier: message.identifier, + plainTextBuffer, + timestamp, + device: device.key, + ttl, + encryption: EncryptionType.Signal, + }; + + return rawMessage; +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index a33d528ba..96904245d 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -1,2 +1,3 @@ export * from './TypedEmitter'; export * from './JobQueue'; +export * from './Messages'; diff --git a/ts/test/session/sending/PendingMessageCache_test.ts b/ts/test/session/sending/PendingMessageCache_test.ts new file mode 100644 index 000000000..d732d859e --- /dev/null +++ b/ts/test/session/sending/PendingMessageCache_test.ts @@ -0,0 +1,259 @@ +import { expect } from 'chai'; +import * as _ from 'lodash'; +import * as MessageUtils from '../../../session/utils'; +import { TestUtils } from '../../../test/test-utils'; +import { PendingMessageCache } from '../../../session/sending/PendingMessageCache'; + +// Equivalent to Data.StorageItem +interface StorageItem { + id: string; + value: any; +} + +describe('PendingMessageCache', () => { + // Initialize new stubbed cache + let data: StorageItem; + let pendingMessageCacheStub: PendingMessageCache; + + beforeEach(async () => { + // Stub out methods which touch the database + const storageID = 'pendingMessages'; + data = { + id: storageID, + value: '[]', + }; + + TestUtils.stubData('getItemById') + .withArgs('pendingMessages') + .callsFake(async () => { + return data; + }); + + TestUtils.stubData('createOrUpdateItem').callsFake((item: StorageItem) => { + if (item.id === storageID) { + data = item; + } + }); + + pendingMessageCacheStub = new PendingMessageCache(); + await pendingMessageCacheStub.isReady; + }); + + afterEach(() => { + TestUtils.restoreStubs(); + }); + + it('can initialize cache', async () => { + const cache = pendingMessageCacheStub.getAllPending(); + + // We expect the cache to initialise as an empty array + expect(cache).to.be.instanceOf(Array); + expect(cache).to.have.length(0); + }); + + it('can add to cache', async () => { + const device = TestUtils.generateFakePubkey(); + const message = TestUtils.generateUniqueChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + + // Verify that the message is in the cache + const finalCache = pendingMessageCacheStub.getAllPending(); + + expect(finalCache).to.have.length(1); + + const addedMessage = finalCache[0]; + expect(addedMessage.device).to.deep.equal(rawMessage.device); + expect(addedMessage.timestamp).to.deep.equal(rawMessage.timestamp); + }); + + it('can remove from cache', async () => { + const device = TestUtils.generateFakePubkey(); + const message = TestUtils.generateUniqueChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + + const initialCache = pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(1); + + // Remove the message + await pendingMessageCacheStub.remove(rawMessage); + + const finalCache = pendingMessageCacheStub.getAllPending(); + + // Verify that the message was removed + expect(finalCache).to.have.length(0); + }); + + it('can get devices', async () => { + const cacheItems = [ + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + ]; + + cacheItems.forEach(async item => { + await pendingMessageCacheStub.add(item.device, item.message); + }); + + const cache = pendingMessageCacheStub.getAllPending(); + expect(cache).to.have.length(cacheItems.length); + + // Get list of devices + const devicesKeys = cacheItems.map(item => item.device.key); + const pulledDevices = pendingMessageCacheStub.getDevices(); + const pulledDevicesKeys = pulledDevices.map(d => d.key); + + // Verify that device list from cache is equivalent to devices added + expect(pulledDevicesKeys).to.have.members(devicesKeys); + }); + + it('can get pending for device', async () => { + const cacheItems = [ + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + ]; + + cacheItems.forEach(async item => { + await pendingMessageCacheStub.add(item.device, item.message); + }); + + const initialCache = pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(cacheItems.length); + + // Get pending for each specific device + cacheItems.forEach(item => { + const pendingForDevice = pendingMessageCacheStub.getForDevice( + item.device + ); + expect(pendingForDevice).to.have.length(1); + expect(pendingForDevice[0].device).to.equal(item.device.key); + }); + }); + + it('can find nothing when empty', async () => { + const device = TestUtils.generateFakePubkey(); + const message = TestUtils.generateUniqueChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + const foundMessage = pendingMessageCacheStub.find(rawMessage); + expect(foundMessage, 'a message was found in empty cache').to.be.undefined; + }); + + it('can find message in cache', async () => { + const device = TestUtils.generateFakePubkey(); + const message = TestUtils.generateUniqueChatMessage(); + const rawMessage = MessageUtils.toRawMessage(device, message); + + await pendingMessageCacheStub.add(device, message); + + const finalCache = pendingMessageCacheStub.getAllPending(); + expect(finalCache).to.have.length(1); + + const foundMessage = pendingMessageCacheStub.find(rawMessage); + expect(foundMessage, 'message not found in cache').to.be.ok; + foundMessage && expect(foundMessage.device).to.equal(device.key); + }); + + it('can clear cache', async () => { + const cacheItems = [ + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + ]; + + cacheItems.forEach(async item => { + await pendingMessageCacheStub.add(item.device, item.message); + }); + + const initialCache = pendingMessageCacheStub.getAllPending(); + expect(initialCache).to.have.length(cacheItems.length); + + // Clear cache + await pendingMessageCacheStub.clear(); + + const finalCache = pendingMessageCacheStub.getAllPending(); + expect(finalCache).to.have.length(0); + }); + + it('can restore from db', async () => { + const cacheItems = [ + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + { + device: TestUtils.generateFakePubkey(), + message: TestUtils.generateUniqueChatMessage(), + }, + ]; + + cacheItems.forEach(async item => { + await pendingMessageCacheStub.add(item.device, item.message); + }); + + const addedMessages = pendingMessageCacheStub.getAllPending(); + expect(addedMessages).to.have.length(cacheItems.length); + + // Rebuild from DB + const freshCache = new PendingMessageCache(); + await freshCache.isReady; + + // Verify messages + const rebuiltMessages = freshCache.getAllPending(); + + rebuiltMessages.forEach((message, index) => { + const addedMessage = addedMessages[index]; + + // Pull out plainTextBuffer for a separate check + const buffersCompare = + Buffer.compare( + message.plainTextBuffer, + addedMessage.plainTextBuffer + ) === 0; + expect(buffersCompare).to.equal( + true, + 'buffers were not loaded properly from database' + ); + + // Compare all other valures + const trimmedAdded = _.omit(addedMessage, ['plainTextBuffer']); + const trimmedRebuilt = _.omit(message, ['plainTextBuffer']); + + expect(_.isEqual(trimmedAdded, trimmedRebuilt)).to.equal( + true, + 'cached messages were not rebuilt properly' + ); + }); + }); +}); diff --git a/ts/test/test-utils/testUtils.ts b/ts/test/test-utils/testUtils.ts index 317186827..a8ced37d5 100644 --- a/ts/test/test-utils/testUtils.ts +++ b/ts/test/test-utils/testUtils.ts @@ -1,7 +1,12 @@ import * as sinon from 'sinon'; -import { ImportMock } from 'ts-mock-imports'; -import * as DataShape from '../../../js/modules/data'; +import * as crypto from 'crypto'; import * as window from '../../window'; +import * as DataShape from '../../../js/modules/data'; +import { v4 as uuid } from 'uuid'; + +import { ImportMock } from 'ts-mock-imports'; +import { PubKey } from '../../../ts/session/types'; +import { ChatMessage } from '../../session/messages/outgoing'; const sandbox = sinon.createSandbox(); @@ -40,3 +45,25 @@ export function restoreStubs() { ImportMock.restore(); sandbox.restore(); } + +export function generateFakePubkey(): PubKey { + // Generates a mock pubkey for testing + const numBytes = PubKey.PUBKEY_LEN / 2 - 1; + const hexBuffer = crypto.randomBytes(numBytes).toString('hex'); + const pubkeyString = `05${hexBuffer}`; + + return new PubKey(pubkeyString); +} + +export function generateUniqueChatMessage(): ChatMessage { + return new ChatMessage({ + body: 'Lorem ipsum dolor sit amet, consectetur adipiscing elit', + identifier: uuid(), + timestamp: Date.now(), + attachments: undefined, + quote: undefined, + expireTimer: undefined, + lokiProfile: undefined, + preview: undefined, + }); +}