feat: open group messages are now procesed via the cache

cached entries are now added or removed based on the "optimistic" state that we want
pull/2454/head
William Grant 3 years ago
parent 469de252cb
commit 5ebd1775c0

@ -34,7 +34,7 @@ import { handleOutboxMessageModel } from '../../../../receiver/dataMessage';
import { ConversationTypeEnum } from '../../../../models/conversationAttributes';
import { createSwarmMessageSentFromUs } from '../../../../models/messageFactory';
import { Data } from '../../../../data/data';
import { handleOpenGroupMessageReactions } from '../../../../util/reactions';
import { processMessagesUsingCache } from './sogsV3MutationCache';
/**
* Get the convo matching those criteria and make sure it is an opengroup convo, or return null.
@ -312,7 +312,7 @@ const handleMessagesResponseV4 = async (
if (groupConvo && groupConvo.isOpenGroupV2()) {
for (const message of messagesWithReactions) {
void groupConvo.queueJob(async () => {
await handleOpenGroupMessageReactions(message.reactions, message.id);
await processMessagesUsingCache(serverUrl, roomId, message);
});
}
}

@ -3,7 +3,9 @@
* Currently only supports message reactions 26/08/2022
*/
import { findIndex } from 'lodash';
import { filter, findIndex, remove } from 'lodash';
import { handleOpenGroupMessageReactions } from '../../../../util/reactions';
import { OpenGroupMessageV4 } from '../opengroupV2/OpenGroupServerPoller';
export enum ChangeType {
REACTIONS = 0,
@ -19,8 +21,8 @@ type ReactionChange = {
export type SogsV3Mutation = {
seqno: number | null; // null until mutating API request returns
server: string; // server address
room: string; // room name
server: string; // serverUrl
room: string; // roomId
changeType: ChangeType;
metadata: ReactionChange; // For now we only support message reactions
};
@ -42,27 +44,92 @@ function verifyEntry(entry: SogsV3Mutation): boolean {
export function addToMutationCache(entry: SogsV3Mutation) {
if (!verifyEntry(entry)) {
window.log.error('SOGS Mutation Cache: Entry verification failed!');
window.log.error('SOGS Mutation Cache: Entry verification on add failed!', entry);
} else {
sogsMutationCache.push(entry);
window.log.info('SOGS Mutation Cache: Entry added!', entry);
}
}
export function updateMutationCache(entry: SogsV3Mutation) {
export function updateMutationCache(entry: SogsV3Mutation, seqno: number) {
if (!verifyEntry(entry)) {
window.log.error('SOGS Mutation Cache: Entry verification failed!');
window.log.error('SOGS Mutation Cache: Entry verification on update failed!');
} else {
const entryIndex = findIndex(sogsMutationCache, entry);
if (entryIndex >= 0) {
sogsMutationCache[entryIndex] = entry;
window.log.info('SOGS Mutation Cache: Entry updated!', entry);
const updatedEntry = entry;
updatedEntry.seqno = seqno;
sogsMutationCache[entryIndex] = updatedEntry;
window.log.info('SOGS Mutation Cache: Entry updated!', updatedEntry);
} else {
window.log.error('SOGS Mutation Cache: Updated failed! Cannot find entry');
window.log.error('SOGS Mutation Cache: Updated failed! Cannot find entry', entry);
}
}
}
export function removeFromMutationCache() {
// TODO
export async function processMessagesUsingCache(
server: string,
room: string,
message: OpenGroupMessageV4
) {
const updatedReactions = message.reactions;
const matches: Array<SogsV3Mutation> = filter(sogsMutationCache, { server, room });
if (matches?.length) {
for (const match of matches) {
if (message.seqno && match.seqno && match.seqno <= message.seqno) {
const removedEntry = remove(sogsMutationCache, match);
window.log.info('SOGS Mutation Cache: Entry ignored and removed!', removedEntry);
} else if (!message.seqno || (message.seqno && match.seqno && match.seqno > message.seqno)) {
for (const reaction of Object.keys(message.reactions)) {
const _matches = filter(sogsMutationCache, {
server,
room,
changeType: ChangeType.REACTIONS,
metadata: {
messageId: message.id,
emoji: reaction,
},
});
if (_matches?.length) {
for (const match of _matches) {
switch (match.metadata.action) {
case 'ADD':
updatedReactions[reaction].you = true;
updatedReactions[reaction].count += 1;
window.log.info(
'SOGS Mutation Cache: Added our reaction based on the cache',
updatedReactions[reaction]
);
break;
case 'REMOVE':
updatedReactions[reaction].you = false;
updatedReactions[reaction].count -= 1;
window.log.info(
'SOGS Mutation Cache: Removed our reaction based on the cache',
updatedReactions[reaction]
);
break;
default:
window.log.warn(
'SOGS Mutation Cache: Unsupported metadata action in OpenGroupMessageV4',
match
);
break;
}
}
const removedMatches = remove(sogsMutationCache, ...matches);
window.log.info(
'SOGS Mutation Cache: Removed processed entries from cache!',
removedMatches
);
}
}
}
}
}
message.reactions = updatedReactions;
await handleOpenGroupMessageReactions(message.reactions, message.id);
}

@ -36,7 +36,7 @@ export const hasReactionSupport = async (serverId: number): Promise<boolean> =>
export const sendSogsReactionOnionV4 = async (
serverUrl: string,
room: string,
room: string, // this is the roomId
abortSignal: AbortSignal,
reaction: Reaction,
blinded: boolean
@ -107,16 +107,10 @@ export const sendSogsReactionOnionV4 = async (
throw new Error('putReaction parsing failed');
}
window.log.info(
`You ${reaction.action === Action.REACT ? 'added' : 'removed'} a`,
reaction.emoji,
`reaction on ${serverUrl}/${room}`
);
const success = Boolean(reaction.action === Action.REACT ? rawMessage.added : rawMessage.removed);
if (success && rawMessage.seqno) {
cacheEntry.seqno = rawMessage.seqno;
updateMutationCache(cacheEntry);
if (success) {
updateMutationCache(cacheEntry, rawMessage.seqno);
}
return success;

@ -124,7 +124,11 @@ export const sendMessageReaction = async (messageId: string, emoji: string) => {
`You ${action === Action.REACT ? 'added' : 'removed'} a`,
emoji,
'reaction for message',
id
id,
found.get('isPublic') &&
`on ${conversationModel.toOpenGroupV2().serverUrl}/${
conversationModel.toOpenGroupV2().roomId
}`
);
return reaction;
} else {

Loading…
Cancel
Save