From 35d66d88654b17db88965d1ab51f9c74ee9e40ae Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Fri, 23 Apr 2021 17:05:10 +1000 Subject: [PATCH] fix bug fetching new token every request --- ts/data/data.ts | 5 +- ts/opengroup/opengroupV2/ApiUtil.ts | 4 +- ts/opengroup/opengroupV2/OpenGroupAPIV2.ts | 57 ++++----- .../opengroupV2/OpenGroupAPIV2CompactPoll.ts | 23 ++-- .../opengroupV2/OpenGroupManagerV2.ts | 1 - .../opengroupV2/OpenGroupServerPoller.ts | 111 +++++++++++++++++- ts/receiver/receiver.ts | 32 +++++ ts/session/utils/Promise.ts | 9 +- 8 files changed, 200 insertions(+), 42 deletions(-) diff --git a/ts/data/data.ts b/ts/data/data.ts index 0563dfbd9..d1fced994 100644 --- a/ts/data/data.ts +++ b/ts/data/data.ts @@ -693,7 +693,10 @@ export async function _removeMessages(ids: Array): Promise { await channels.removeMessage(ids); } -export async function getMessageIdsFromServerIds(serverIds: Array, conversationId: string) { +export async function getMessageIdsFromServerIds( + serverIds: Array | Array, + conversationId: string +): Promise | undefined> { return channels.getMessageIdsFromServerIds(serverIds, conversationId); } diff --git a/ts/opengroup/opengroupV2/ApiUtil.ts b/ts/opengroup/opengroupV2/ApiUtil.ts index 9119ac139..a6ab72bc3 100644 --- a/ts/opengroup/opengroupV2/ApiUtil.ts +++ b/ts/opengroup/opengroupV2/ApiUtil.ts @@ -75,11 +75,11 @@ export const setCachedModerators = ( roomId: string, newModerators: Array ) => { - const allRoomsMods = cachedModerators.get(serverUrl); + let allRoomsMods = cachedModerators.get(serverUrl); if (!allRoomsMods) { cachedModerators.set(serverUrl, new Map()); + allRoomsMods = cachedModerators.get(serverUrl); } - // tslint:disable: no-non-null-assertion if (!allRoomsMods!.get(roomId)) { allRoomsMods!.set(roomId, new Set()); } diff --git a/ts/opengroup/opengroupV2/OpenGroupAPIV2.ts b/ts/opengroup/opengroupV2/OpenGroupAPIV2.ts index 41bda880a..66864b313 100644 --- a/ts/opengroup/opengroupV2/OpenGroupAPIV2.ts +++ b/ts/opengroup/opengroupV2/OpenGroupAPIV2.ts @@ -236,11 +236,12 @@ export async function getAuthToken({ return null; } if (roomDetails?.token) { - return roomDetails?.token; + return roomDetails.token; } await allowOnlyOneAtATime(`getAuthTokenV2${serverUrl}:${roomId}`, async () => { try { + console.warn('TRIGGERING NEW AUTH TOKEN WITH', { serverUrl, roomId }); const token = await requestNewAuthToken({ serverUrl, roomId }); if (!token) { window.log.warn('invalid new auth token', token); @@ -371,33 +372,33 @@ export const postMessage = async ( }; /** Those functions are related to moderators management */ -export const getModerators = async ({ - serverUrl, - roomId, -}: OpenGroupRequestCommonType): Promise> => { - const request: OpenGroupV2Request = { - method: 'GET', - room: roomId, - server: serverUrl, - isAuthRequired: true, - endpoint: 'moderators', - }; - const result = await sendOpenGroupV2Request(request); - const statusCode = parseStatusCodeFromOnionRequest(result); - - if (statusCode !== 200) { - window.log.error(`Could not getModerators, status code: ${statusCode}`); - return []; - } - const moderators = parseModerators(result); - if (moderators === undefined) { - // if moderators is undefined, do not update the cached moderator list - window.log.warn('Could not getModerators, got no moderatorsGot at all in json.'); - return []; - } - setCachedModerators(serverUrl, roomId, moderators || []); - return moderators || []; -}; +// export const getModerators = async ({ +// serverUrl, +// roomId, +// }: OpenGroupRequestCommonType): Promise> => { +// const request: OpenGroupV2Request = { +// method: 'GET', +// room: roomId, +// server: serverUrl, +// isAuthRequired: true, +// endpoint: 'moderators', +// }; +// const result = await sendOpenGroupV2Request(request); +// const statusCode = parseStatusCodeFromOnionRequest(result); + +// if (statusCode !== 200) { +// window.log.error(`Could not getModerators, status code: ${statusCode}`); +// return []; +// } +// const moderators = parseModerators(result); +// if (moderators === undefined) { +// // if moderators is undefined, do not update t+++++++++++++++++++++++++he cached moderator list +// window.log.warn('Could not getModerators, got no moderatorsGot at all in json.'); +// return []; +// } +// setCachedModerators(serverUrl, roomId, moderators || []); +// return moderators || []; +// }; export const isUserModerator = ( publicKey: string, diff --git a/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts b/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts index 08826d867..04f0436c6 100644 --- a/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts +++ b/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts @@ -3,6 +3,7 @@ import { OpenGroupRequestCommonType, OpenGroupV2CompactPollRequest, parseMessages, + setCachedModerators, } from './ApiUtil'; import { parseStatusCodeFromOnionRequest } from './OpenGroupAPIV2Parser'; import _ from 'lodash'; @@ -132,7 +133,7 @@ async function sendOpenGroupV2RequestCompactPoll( return null; } - const results = await parseCompactPollResults(res); + const results = await parseCompactPollResults(res, serverUrl); if (!results) { window.log.info('got empty compactPollResults'); return null; @@ -144,6 +145,7 @@ async function sendOpenGroupV2RequestCompactPoll( const roomPollValidResults = results.filter(ret => ret.statusCode === 200); if (roomWithTokensToRefresh) { + console.warn('roomWithTokensToRefresh', roomWithTokensToRefresh); await Promise.all( roomWithTokensToRefresh.map(async roomId => { const roomDetails = await getV2OpenGroupRoomByRoomId({ @@ -157,7 +159,7 @@ async function sendOpenGroupV2RequestCompactPoll( // we might need to retry doing the request here, but how to make sure we don't retry indefinetely? await saveV2OpenGroupRoom(roomDetails); // do not await for that. We have a only one at a time logic on a per room basis - void getAuthToken({ serverUrl, roomId }); + await getAuthToken({ serverUrl, roomId }); }) ); } @@ -165,7 +167,7 @@ async function sendOpenGroupV2RequestCompactPoll( return roomPollValidResults; } -type ParsedRoomCompactPollResults = { +export type ParsedRoomCompactPollResults = { roomId: string; deletions: Array; messages: Array; @@ -174,7 +176,8 @@ type ParsedRoomCompactPollResults = { }; const parseCompactPollResult = async ( - singleRoomResult: any + singleRoomResult: any, + serverUrl: string ): Promise => { const { room_id, @@ -196,9 +199,10 @@ const parseCompactPollResult = async ( } const validMessages = await parseMessages(rawMessages); - const moderators = rawMods as Array; + const moderators = rawMods.sort() as Array; const deletions = rawDeletions as Array; const statusCode = rawStatusCode as number; + setCachedModerators(serverUrl, room_id, moderators || []); return { roomId: room_id, @@ -210,7 +214,8 @@ const parseCompactPollResult = async ( }; const parseCompactPollResults = async ( - res: any + res: any, + serverUrl: string ): Promise | null> => { if (!res || !res.result || !res.result.results || !res.result.results.length) { return null; @@ -218,7 +223,11 @@ const parseCompactPollResults = async ( const arrayOfResults = res.result.results as Array; const parsedResults: Array = _.compact( - await Promise.all(arrayOfResults.map(parseCompactPollResult)) + await Promise.all( + arrayOfResults.map(async m => { + return parseCompactPollResult(m, serverUrl); + }) + ) ); if (!parsedResults || !parsedResults.length) { diff --git a/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts b/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts index 5c8644332..05565216e 100644 --- a/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts +++ b/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts @@ -88,7 +88,6 @@ export class OpenGroupManagerV2 { public removeRoomFromPolledRooms(roomInfos: OpenGroupRequestCommonType) { const poller = this.pollers.get(roomInfos.serverUrl); if (!poller) { - console.warn('No such poller'); return; } // this won't do a thing if the room is already polled for diff --git a/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts b/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts index db165283e..95df97a4e 100644 --- a/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts +++ b/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts @@ -1,6 +1,18 @@ import { AbortController } from 'abort-controller'; +import { ConversationController } from '../../session/conversations'; +import { getOpenGroupV2ConversationId } from '../utils/OpenGroupUtils'; import { OpenGroupRequestCommonType } from './ApiUtil'; -import { compactFetchEverything } from './OpenGroupAPIV2CompactPoll'; +import { compactFetchEverything, ParsedRoomCompactPollResults } from './OpenGroupAPIV2CompactPoll'; +import _ from 'lodash'; +import { ConversationModel } from '../../models/conversation'; +import { getMessageIdsFromServerIds, removeMessage } from '../../data/data'; +import { + getV2OpenGroupRoom, + getV2OpenGroupRoomByRoomId, + saveV2OpenGroupRoom, +} from '../../data/opengroups'; +import { OpenGroupMessageV2 } from './OpenGroupMessageV2'; +import { handleOpenGroupV2Message } from '../../receiver/receiver'; const pollForEverythingInterval = 4 * 1000; /** @@ -146,6 +158,7 @@ export class OpenGroupServerPoller { window.log.warn(`compactFetchResults for ${this.serverUrl}:`, compactFetchResults); // ==> At this point all those results need to trigger conversation updates, so update what we have to update + await handleCompactPollResults(this.serverUrl, compactFetchResults); } catch (e) { window.log.warn('Got error while compact fetch:', e); } finally { @@ -153,3 +166,99 @@ export class OpenGroupServerPoller { } } } + +const handleDeletions = async ( + deletedIds: Array, + conversationId: string, + convo?: ConversationModel +) => { + try { + const maxDeletedId = Math.max(...deletedIds); + const messageIds = await getMessageIdsFromServerIds(deletedIds, conversationId); + if (!messageIds?.length) { + return; + } + const roomInfos = await getV2OpenGroupRoom(conversationId); + if (roomInfos && roomInfos.lastMessageDeletedServerID !== maxDeletedId) { + roomInfos.lastMessageDeletedServerID = maxDeletedId; + await saveV2OpenGroupRoom(roomInfos); + } + + await Promise.all( + messageIds.map(async id => { + if (convo) { + await convo.removeMessage(id); + } + await removeMessage(id); + }) + ); + // we want to try to update our lastDeletedId + } catch (e) { + window.log.warn('handleDeletions failed:', e); + } +}; + +const handleNewMessages = async ( + newMessages: Array, + conversationId: string, + convo?: ConversationModel +) => { + try { + const incomingMessageIds = _.compact(newMessages.map(n => n.serverId)); + const maxNewMessageId = Math.max(...incomingMessageIds); + // TODO filter out duplicates ? + + // tslint:disable-next-line: prefer-for-of + for (let index = 0; index < newMessages.length; index++) { + const newMessage = newMessages[index]; + await handleOpenGroupV2Message(newMessage); + } + + const roomInfos = await getV2OpenGroupRoom(conversationId); + if (roomInfos && roomInfos.lastMessageFetchedServerID !== maxNewMessageId) { + roomInfos.lastMessageFetchedServerID = maxNewMessageId; + await saveV2OpenGroupRoom(roomInfos); + } + } catch (e) { + // window.log.warn('handleNewMessages failed:', e); + } +}; + +const handleCompactPollResults = async ( + serverUrl: string, + results: Array +) => { + await Promise.all( + results.map(async res => { + const convoId = getOpenGroupV2ConversationId(serverUrl, res.roomId); + const convo = ConversationController.getInstance().get(convoId); + + // we want to do deletions even if we somehow lost the convo. + if (res.deletions.length) { + // new deletions + await handleDeletions(res.deletions, convoId, convo); + } + + if (res.messages.length) { + // new deletions + await handleNewMessages(res.messages, convoId, convo); + } + + if (!convo) { + window.log.warn('Could not find convo for compactPoll', convoId); + return; + } + const existingModerators = convo.get('moderators') || []; + let changeOnConvo = false; + // res.moderators is already sorted + if (!_.isEqual(existingModerators.sort(), res.moderators)) { + convo.set({ moderators: res.moderators }); + changeOnConvo = true; + } + + if (changeOnConvo) { + await convo.commit(); + } + }) + ); +}; diff --git a/ts/receiver/receiver.ts b/ts/receiver/receiver.ts index c5265f231..f688ec184 100644 --- a/ts/receiver/receiver.ts +++ b/ts/receiver/receiver.ts @@ -291,3 +291,35 @@ export async function handlePublicMessage(messageData: any) { await handleMessageEvent(ev); // open groups } + +export async function handleOpenGroupV2Message(messageData: any) { + const { source } = messageData; + const { group, profile, profileKey } = messageData.message; + + const isMe = UserUtils.isUsFromCache(source); + + if (!isMe && profile) { + const conversation = await ConversationController.getInstance().getOrCreateAndWait( + source, + ConversationType.PRIVATE + ); + await updateProfile(conversation, profile, profileKey); + } + + const isPublicVisibleMessage = group && group.id && !!group.id.match(openGroupPrefixRegex); + + if (!isPublicVisibleMessage) { + throw new Error('handlePublicMessage Should only be called with public message groups'); + } + + const ev = { + // Public chat messages from ourselves should be outgoing + type: isMe ? 'sent' : 'message', + data: messageData, + confirm: () => { + /* do nothing */ + }, + }; + + await handleMessageEvent(ev); // open groups +} diff --git a/ts/session/utils/Promise.ts b/ts/session/utils/Promise.ts index ea3214748..38b59a7e1 100644 --- a/ts/session/utils/Promise.ts +++ b/ts/session/utils/Promise.ts @@ -14,8 +14,12 @@ export class TaskTimedOutError extends Error { } // one action resolves all -const snodeGlobalLocks: any = {}; -export async function allowOnlyOneAtATime(name: string, process: any, timeoutMs?: number) { +const snodeGlobalLocks: Record> = {}; +export async function allowOnlyOneAtATime( + name: string, + process: () => Promise, + timeoutMs?: number +) { // if currently not in progress if (snodeGlobalLocks[name] === undefined) { // set lock @@ -64,6 +68,7 @@ export async function allowOnlyOneAtATime(name: string, process: any, timeoutMs? // release the kraken resolve(innerRetVal); }); + } else { } return snodeGlobalLocks[name]; }