@ -1,10 +1,11 @@
import _ from 'lodash' ;
import { from_hex , to_hex } from 'libsodium-wrappers-sumo' ;
import _ , { compact , isNumber } from 'lodash' ;
import { Data } from '../../data/data' ;
import { Storage } from '../../util/storage' ;
import { SnodeNamespaces } from '../apis/snode_api/namespaces' ;
import { ContentMessage } from '../messages/outgoing' ;
import { PubKey } from '../types' ;
import { PartialRawMessage, RawMessage } from '../types/RawMessage' ;
import { OutgoingRawMessage, Stored RawMessage } from '../types/RawMessage' ;
import { MessageUtils } from '../utils' ;
// This is an abstraction for storing pending messages.
@ -15,18 +16,18 @@ import { MessageUtils } from '../utils';
// memory and sync its state with the database on modification (add or remove).
export class PendingMessageCache {
public callbacks : Map < string , ( message : RawMessage) = > Promise < void > > = new Map ( ) ;
public callbacks : Map < string , ( message : Outgoing RawMessage) = > Promise < void > > = new Map ( ) ;
protected loadPromise : Promise < void > | undefined ;
protected cache : Array < RawMessage> = [ ] ;
protected cache : Array < Outgoing RawMessage> = [ ] ;
public async getAllPending ( ) : Promise < Array < RawMessage> > {
public async getAllPending ( ) : Promise < Array < Outgoing RawMessage> > {
await this . loadFromDBIfNeeded ( ) ;
// Get all pending from cache
return [ . . . this . cache ] ;
}
public async getForDevice ( device : PubKey ) : Promise < Array < RawMessage> > {
public async getForDevice ( device : PubKey ) : Promise < Array < Outgoing RawMessage> > {
const pending = await this . getAllPending ( ) ;
return pending . filter ( m = > m . device === device . key ) ;
}
@ -46,7 +47,7 @@ export class PendingMessageCache {
namespace : SnodeNamespaces ,
sentCb ? : ( message : any ) = > Promise < void > ,
isGroup = false
) : Promise < RawMessage> {
) : Promise < Outgoing RawMessage> {
await this . loadFromDBIfNeeded ( ) ;
const rawMessage = await MessageUtils . toRawMessage (
destinationPubKey ,
@ -69,7 +70,7 @@ export class PendingMessageCache {
return rawMessage ;
}
public async remove ( message : RawMessage) : Promise < Array < RawMessage> | undefined > {
public async remove ( message : Outgoing RawMessage) : Promise < Array < Outgoing RawMessage> | undefined > {
await this . loadFromDBIfNeeded ( ) ;
// Should only be called after message is processed
@ -89,7 +90,7 @@ export class PendingMessageCache {
return updatedCache ;
}
public find ( message : RawMessage) : RawMessage | undefined {
public find ( message : Outgoing RawMessage) : Outgoing RawMessage | undefined {
// Find a message in the cache
return this . cache . find ( m = > m . device === message . device && m . identifier === message . identifier ) ;
}
@ -114,33 +115,62 @@ export class PendingMessageCache {
this . cache = messages ;
}
protected async getFromStorage ( ) : Promise < Array < RawMessage> > {
protected async getFromStorage ( ) : Promise < Array < Outgoing RawMessage> > {
const data = await Data . getItemById ( 'pendingMessages' ) ;
if ( ! data || ! data . value ) {
return [ ] ;
}
const barePending = JSON . parse ( String ( data . value ) ) as Array < PartialRawMessage > ;
// Rebuild plainTextBuffer
return barePending . map ( ( message : PartialRawMessage ) = > {
return {
. . . message ,
plainTextBuffer : new Uint8Array ( message . plainTextBuffer ) ,
} as RawMessage ;
} ) ;
try {
// let's do some cleanup, read what we have in DB, remove what is invalid, write to DB, and return filtered data.
// this is because we've added some mandatory fields recently, and the current stored messages won't have them.
const barePending = JSON . parse ( String ( data . value ) ) as Array < StoredRawMessage > ;
const filtered = compact (
barePending . map ( ( message : StoredRawMessage ) = > {
try {
// let's skip outgoing messages which have no networkTimestamp associated with them, as we need one to send a message (mapped to the envelope one)
if (
! message . networkTimestampCreated ||
! isNumber ( message . networkTimestampCreated ) ||
message . networkTimestampCreated <= 0
) {
throw new Error ( 'networkTimestampCreated is emptyo <=0' ) ;
}
const plainTextBuffer = from_hex ( message . plainTextBufferHex ) ; // if a plaintextBufferHex is unset or not hex, this throws and we remove that message entirely
return {
. . . message ,
plainTextBuffer ,
} as OutgoingRawMessage ;
} catch ( e ) {
window . log . warn ( 'failed to decode from message cache:' , e . message ) ;
return null ;
}
// let's also remove that logic with the plaintextbuffer stored as array of numbers, and use base64 strings instead
} )
) ;
await this . saveToDBWithData ( filtered ) ;
return filtered ;
} catch ( e ) {
window . log . warn ( 'getFromStorage failed with' , e . message ) ;
return [ ] ;
}
}
protected async saveToDB() {
// For each plainTextBuffer in cache, save in as a simple Array<number> to avoid
// Node issues with JSON stringifying Buffer without strict typing
const encodedCache = [ . . . this . cache ] . map ( item = > {
const plainTextBuffer = Array . from ( item . plainTextBuffer ) ;
return { . . . item , plainTextBuffer } ;
private async saveToDBWithData ( msg : Array < OutgoingRawMessage > ) {
// For each plainTextBuffer in cache, save it as hex (because Uint8Array are not serializable as is)
const encodedCache = msg . map ( item = > {
return { . . . item , plainTextBufferHex : to_hex ( item . plainTextBuffer ) } ;
} ) ;
const encodedPendingMessages = JSON . stringify ( encodedCache ) || '[]' ;
await Storage . put ( 'pendingMessages' , encodedPendingMessages ) ;
}
protected async saveToDB() {
await this . saveToDBWithData ( this . cache ) ;
}
}