/* eslint-disable no-loop-func */ /* eslint-disable no-restricted-syntax */ import { AbortController } from 'abort-controller'; import { isNumber, isObject } from 'lodash'; import autoBind from 'auto-bind'; import { getOpenGroupV2ConversationId } from '../utils/OpenGroupUtils'; import { OpenGroupRequestCommonType } from './ApiUtil'; import { OpenGroupData } from '../../../../data/opengroups'; import { OpenGroupMessageV2 } from './OpenGroupMessageV2'; import { DURATION } from '../../../constants'; import { batchGlobalIsSuccess, OpenGroupBatchRow, parseBatchGlobalStatusCode, sogsBatchSend, SubRequestMessagesObjectType, } from '../sogsv3/sogsV3BatchPoll'; import { handleBatchPollResults } from '../sogsv3/sogsApiV3'; import { fetchCapabilitiesAndUpdateRelatedRoomsOfServerUrl } from '../sogsv3/sogsV3Capabilities'; import { OpenGroupReaction } from '../../../../types/Reaction'; import { markConversationInitialLoadingInProgress, openConversationWithMessages, } from '../../../../state/ducks/conversations'; import { roomHasBlindEnabled } from '../../../../types/sqlSharedTypes'; export type OpenGroupMessageV4 = { /** AFAIK: indicates the number of the message in the group. e.g. 2nd message will be 1 or 2 */ seqno: number; session_id?: string; /** base64 */ signature?: string; /** timestamp number with decimal */ posted?: number; id: number; data?: string; deleted?: boolean; reactions: Record; }; // seqno is not set for SOGS < 1.3.4 export type OpenGroupReactionMessageV4 = Omit & { seqno: number | undefined; }; const pollForEverythingInterval = DURATION.SECONDS * 10; export const invalidAuthRequiresBlinding = 'Invalid authentication: this server requires the use of blinded ids'; /** * An OpenGroupServerPollerV2 polls for everything for a particular server. We should * have only have one OpenGroupServerPollerV2 per opengroup polling. * * So even if you have several rooms on the same server, you should only have one OpenGroupServerPollerV2 * for this server. */ export class OpenGroupServerPoller { /** * The server url to poll for this opengroup poller. * Remember, we have one poller per opengroup poller, no matter how many rooms we have joined on this same server */ private readonly serverUrl: string; /** * The set of rooms to poll from. * */ private readonly roomIdsToPoll: Set = new Set(); /** * This timer is used to tick for compact Polling for this opengroup server * It ticks every `pollForEverythingInterval` except. * If the last run is still in progress, the new one won't start and just return. */ private pollForEverythingTimer?: NodeJS.Timeout; private readonly abortController: AbortController; /** * isPolling is set to true when we have a request going for this serverUrl. * If we have an interval tick while we still doing a request, the new one will be dropped * and only the current one will finish. * This is to ensure that we don't trigger too many request at the same time */ private isPolling = false; private wasStopped = false; constructor(roomInfos: Array) { autoBind(this); if (!roomInfos?.length) { throw new Error('Empty roomInfos list'); } // check that all rooms are from the same serverUrl const firstUrl = roomInfos[0].serverUrl; const every = roomInfos.every(r => r.serverUrl === firstUrl); if (!every) { throw new Error('All rooms must be for the same serverUrl'); } // first verify the rooms we got are all from on the same server window?.log?.info(`Creating a new OpenGroupServerPoller for url ${firstUrl}`); this.serverUrl = firstUrl; roomInfos.forEach(r => { window?.log?.info( `Adding room on construct for url serverUrl: ${firstUrl}, roomId:'${r.roomId}' to poller:${this.serverUrl}` ); this.roomIdsToPoll.add(r.roomId); }); this.abortController = new AbortController(); // eslint-disable-next-line @typescript-eslint/no-misused-promises this.pollForEverythingTimer = global.setInterval(this.compactPoll, pollForEverythingInterval); if (this.roomIdsToPoll.size) { void this.triggerPollAfterAdd(); } } /** * Add a room to the polled room for this server. * If a request is already in progress, it will be added only on the next run. */ public addRoomToPoll(room: OpenGroupRequestCommonType) { if (room.serverUrl !== this.serverUrl) { throw new Error('All rooms must be for the same serverUrl'); } if (this.roomIdsToPoll.has(room.roomId)) { window?.log?.info('skipping addRoomToPoll of already polled room:', room); return; } window?.log?.info( `Adding room on addRoomToPoll for url serverUrl: ${this.serverUrl}, roomId:'${room.roomId}' to poller:${this.serverUrl}` ); this.roomIdsToPoll.add(room.roomId); // if we are not already polling right now, trigger a polling void this.triggerPollAfterAdd(room); } public removeRoomFromPoll(room: OpenGroupRequestCommonType) { if (room.serverUrl !== this.serverUrl) { window?.log?.info('this is not the correct ServerPoller'); return; } if (this.roomIdsToPoll.has(room.roomId) || this.roomIdsToPoll.has(room.roomId.toLowerCase())) { window?.log?.info(`Removing ${room.roomId} from polling for ${this.serverUrl}`); this.roomIdsToPoll.delete(room.roomId); this.roomIdsToPoll.delete(room.roomId.toLowerCase()); } else { window?.log?.info( `Cannot remove polling of ${room.roomId} as it is not polled on ${this.serverUrl}` ); } } public getPolledRoomsCount() { return this.roomIdsToPoll.size; } /** * Stop polling. * Requests currently being made will we canceled. * You can NOT restart for now a stopped serverPoller. * This has to be used only for quitting the app. */ public stop() { if (this.pollForEverythingTimer) { // cancel next ticks for each timer global.clearInterval(this.pollForEverythingTimer); // abort current requests this.abortController?.abort(); this.pollForEverythingTimer = undefined; this.wasStopped = true; } } private async triggerPollAfterAdd(_room?: OpenGroupRequestCommonType) { await this.compactPoll(); } private shouldPoll() { if (this.wasStopped) { window?.log?.error('Serverpoller was stopped. CompactPoll should not happen'); return false; } if (!this.roomIdsToPoll.size) { return false; } // return early if a poll is already in progress if (this.isPolling) { return false; } if (!window.getGlobalOnlineStatus()) { window?.log?.info('OpenGroupServerPoller: offline'); return false; } return true; } /** * creates subrequest options for a batch request. * We need: capabilities, pollInfo, recent messages, DM request inbox messages * @returns Array of subrequest options for our main batch request */ private async makeSubrequestInfo() { const subrequestOptions: Array = []; // capabilities subrequestOptions.push({ type: 'capabilities', }); // adding room specific SOGS subrequests this.roomIdsToPoll.forEach(roomId => { // poll info subrequestOptions.push({ type: 'pollInfo', pollInfo: { roomId, infoUpdated: 0, }, }); const convoId = getOpenGroupV2ConversationId(this.serverUrl, roomId); const roomInfos = OpenGroupData.getV2OpenGroupRoom(convoId); // messages subrequestOptions.push({ type: 'messages', messages: { roomId, sinceSeqNo: roomInfos?.maxMessageFetchedSeqNo, }, }); }); if (this.serverUrl) { const rooms = OpenGroupData.getV2OpenGroupRoomsByServerUrl(this.serverUrl); if (rooms?.length) { if (roomHasBlindEnabled(rooms[0])) { const maxInboxId = Math.max(...rooms.map(r => r.lastInboxIdFetched || 0)); // This only works for servers with blinding capabilities // adding inbox subrequest info subrequestOptions.push({ type: 'inbox', inboxSince: { id: isNumber(maxInboxId) && maxInboxId > 0 ? maxInboxId : undefined }, }); const maxOutboxId = Math.max(...rooms.map(r => r.lastOutboxIdFetched || 0)); // This only works for servers with blinding capabilities // adding outbox subrequest info subrequestOptions.push({ type: 'outbox', outboxSince: { id: isNumber(maxOutboxId) && maxOutboxId > 0 ? maxOutboxId : undefined }, }); } } } return subrequestOptions; } private async compactPoll() { if (!this.shouldPoll()) { return; } // do everything with throwing so we can check only at one place // what we have to clean try { this.isPolling = true; // don't try to make the request if we are aborted if (this.abortController.signal.aborted) { throw new Error('Poller aborted'); } const subrequestOptions: Array = await this.makeSubrequestInfo(); if (!subrequestOptions || subrequestOptions.length === 0) { throw new Error('compactFetch: no subrequestOptions'); } const batchPollResults = await sogsBatchSend( this.serverUrl, this.roomIdsToPoll, this.abortController.signal, subrequestOptions, 'batch' ); if (!batchPollResults) { throw new Error('compactFetch: no batchPollResults'); } // check that we are still not aborted if (this.abortController.signal.aborted) { throw new Error('Abort controller was cancelled. dropping request'); } // if we get a plaintext response from the sogs, it is stored under plainText field // see decodeV4Response() if ( parseBatchGlobalStatusCode(batchPollResults) === 400 && batchPollResults.body && isObject(batchPollResults.body) ) { const bodyPlainText = (batchPollResults.body as any).plainText; // this is temporary (as of 27/06/2022) as we want to not support unblinded sogs after some time if (bodyPlainText === invalidAuthRequiresBlinding) { await fetchCapabilitiesAndUpdateRelatedRoomsOfServerUrl(this.serverUrl); throw new Error('batchPollResults just detected switch to blinded enforced.'); } } if (!batchGlobalIsSuccess(batchPollResults)) { throw new Error('batchPollResults general status code is not 200'); } // ==> At this point all those results need to trigger conversation updates, so update what we have to update await handleBatchPollResults(this.serverUrl, batchPollResults, subrequestOptions); // this is very hacky but is needed to remove the spinner of an opengroup conversation while it loads the first patch of messages. // Absolutely not the react way, but well. for (const room of subrequestOptions) { if (room.type === 'messages' && !room.messages?.sinceSeqNo && room.messages?.roomId) { const conversationKey = getOpenGroupV2ConversationId( this.serverUrl, room.messages.roomId ); global.setTimeout(() => { const stateConversations = window.inboxStore?.getState().conversations; if ( stateConversations.conversationLookup?.[conversationKey]?.isInitialFetchingInProgress ) { if ( stateConversations.selectedConversation && conversationKey === stateConversations.selectedConversation ) { // eslint-disable-next-line more/no-then void openConversationWithMessages({ conversationKey, messageId: null }).then(() => { window.inboxStore?.dispatch( markConversationInitialLoadingInProgress({ conversationKey, isInitialFetchingInProgress: false, }) ); }); } else { window.inboxStore?.dispatch( markConversationInitialLoadingInProgress({ conversationKey, isInitialFetchingInProgress: false, }) ); } } }, 5000); } } } catch (e) { window?.log?.warn('Got error while compact fetch:', e.message); } finally { this.isPolling = false; } } } export const getRoomAndUpdateLastFetchTimestamp = async ( conversationId: string, newMessages: Array, _subRequest: SubRequestMessagesObjectType ) => { const roomInfos = OpenGroupData.getV2OpenGroupRoom(conversationId); if (!roomInfos || !roomInfos.serverUrl || !roomInfos.roomId) { throw new Error(`No room for convo ${conversationId}`); } if (!newMessages.length) { // if we got no new messages, just write our last update timestamp to the db roomInfos.lastFetchTimestamp = Date.now(); await OpenGroupData.saveV2OpenGroupRoom(roomInfos); return null; } return roomInfos; };