From 19f3b6014d5477e28bac390479a884043a61aaa6 Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Thu, 22 Apr 2021 17:00:02 +1000 Subject: [PATCH] add poller for open group v2 --- ts/components/session/ActionsPanel.tsx | 19 +-- .../opengroupV2/OpenGroupAPIV2CompactPoll.ts | 108 ++++++++---- .../opengroupV2/OpenGroupManagerV2.ts | 1 - ts/opengroup/opengroupV2/OpenGroupPollerV2.ts | 52 ------ .../opengroupV2/OpenGroupServerPoller.ts | 155 ++++++++++++++++++ ts/session/onions/onionSend.ts | 36 ++-- ts/session/snode_api/onions.ts | 47 ++++-- 7 files changed, 298 insertions(+), 120 deletions(-) delete mode 100644 ts/opengroup/opengroupV2/OpenGroupPollerV2.ts create mode 100644 ts/opengroup/opengroupV2/OpenGroupServerPoller.ts diff --git a/ts/components/session/ActionsPanel.tsx b/ts/components/session/ActionsPanel.tsx index 113f4763b..3cdff8148 100644 --- a/ts/components/session/ActionsPanel.tsx +++ b/ts/components/session/ActionsPanel.tsx @@ -190,11 +190,11 @@ export const ActionsPanel = () => { ); if (parsedRoom) { setTimeout(async () => { - await joinOpenGroupV2(parsedRoom); - const oldMessages = await getMessages({ - serverUrl: parsedRoom.serverUrl, - roomId: parsedRoom.roomId, - }); + // await joinOpenGroupV2(parsedRoom); + // const oldMessages = await getMessages({ + // serverUrl: parsedRoom.serverUrl, + // roomId: parsedRoom.roomId, + // }); // const msg = new OpenGroupMessageV2({ // base64EncodedData: 'dffdldfkldf', // sentTimestamp: Date.now(), @@ -207,11 +207,10 @@ export const ActionsPanel = () => { // serverUrl: parsedRoom.serverUrl, // roomId: parsedRoom.roomId, // }); - - const rooms = [ - { serverUrl: 'https://opengroup.bilb.us', roomId: 'main' }, - ]; - await compactFetchEverything(rooms); + // const rooms = [ + // { serverUrl: 'https://opengroup.bilb.us', roomId: 'main' }, + // ]; + // await compactFetchEverything(rooms); }, 6000); } }, []); diff --git a/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts b/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts index 16956f834..e0cd8dca9 100644 --- a/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts +++ b/ts/opengroup/opengroupV2/OpenGroupAPIV2CompactPoll.ts @@ -1,4 +1,7 @@ -import { getV2OpenGroupRoomByRoomId } from '../../data/opengroups'; +import { + getV2OpenGroupRoomByRoomId, + saveV2OpenGroupRoom, +} from '../../data/opengroups'; import { OpenGroupRequestCommonType, OpenGroupV2CompactPollRequest, @@ -7,22 +10,27 @@ import { import { parseStatusCodeFromOnionRequest } from './OpenGroupAPIV2Parser'; import _ from 'lodash'; import { sendViaOnion } from '../../session/onions/onionSend'; -import { OpenGroupManagerV2 } from './OpenGroupManagerV2'; import { OpenGroupMessageV2 } from './OpenGroupMessageV2'; +import { getAuthToken } from './OpenGroupAPIV2'; const COMPACT_POLL_ENDPOINT = 'compact_poll'; export const compactFetchEverything = async ( - rooms: Array -): Promise => { + serverUrl: string, + rooms: Set, + abortSignal: AbortSignal +): Promise | null> => { // fetch all we need - const compactPollRequest = await getCompactPollRequest(rooms); + const compactPollRequest = await getCompactPollRequest(serverUrl, rooms); if (!compactPollRequest) { window.log.info('Nothing found to be fetched. returning'); return null; } - const result = await sendOpenGroupV2RequestCompactPoll(compactPollRequest); + const result = await sendOpenGroupV2RequestCompactPoll( + compactPollRequest, + abortSignal + ); const statusCode = parseStatusCodeFromOnionRequest(result); if (statusCode !== 200) { return null; @@ -34,26 +42,14 @@ export const compactFetchEverything = async ( * This return body to be used to do the compactPoll */ const getCompactPollRequest = async ( - rooms: Array + serverUrl: string, + rooms: Set ): Promise => { - // first verify the rooms we got are all from on the same server - let firstUrl: string; - if (rooms) { - firstUrl = rooms[0].serverUrl; - const anotherUrl = rooms.some(r => r.serverUrl !== firstUrl); - if (anotherUrl) { - throw new Error('CompactPoll is designed for a single server'); - } - } else { - window.log.warn('CompactPoll: No room given. nothing to do'); - return null; - } - const allServerPubKeys: Array = []; const roomsRequestInfos = _.compact( await Promise.all( - rooms.map(async ({ roomId, serverUrl }) => { + [...rooms].map(async roomId => { try { const fetchedInfo = await getV2OpenGroupRoomByRoomId({ serverUrl, @@ -112,7 +108,7 @@ const getCompactPollRequest = async ( }); return { body, - server: firstUrl, + server: serverUrl, serverPubKey: firstPubkey, endpoint: COMPACT_POLL_ENDPOINT, }; @@ -122,18 +118,25 @@ const getCompactPollRequest = async ( * This call is separate as a lot of the logic is custom (statusCode handled separately, etc) */ async function sendOpenGroupV2RequestCompactPoll( - request: OpenGroupV2CompactPollRequest -): Promise { - const { server, endpoint, body, serverPubKey } = request; + request: OpenGroupV2CompactPollRequest, + abortSignal: AbortSignal +): Promise | null> { + const { server: serverUrl, endpoint, body, serverPubKey } = request; // this will throw if the url is not valid - const builtUrl = new URL(`${server}/${endpoint}`); + const builtUrl = new URL(`${serverUrl}/${endpoint}`); console.warn(`sending compactPoll request: ${request.body}`); - const res = await sendViaOnion(serverPubKey, builtUrl, { - method: 'POST', - body, - }); + const res = await sendViaOnion( + serverPubKey, + builtUrl, + { + method: 'POST', + body, + }, + {}, + abortSignal + ); const statusCode = parseStatusCodeFromOnionRequest(res); if (!statusCode) { @@ -141,16 +144,43 @@ async function sendOpenGroupV2RequestCompactPoll( 'sendOpenGroupV2Request Got unknown status code; res:', res ); - return res as object; + return null; } const results = await parseCompactPollResults(res); + if (!results) { + window.log.info('got empty compactPollResults'); + return null; + } + // get all roomIds which needs a refreshed token + const roomTokensToRefresh = results + .filter(ret => ret.statusCode === 401) + .map(r => r.roomId); + + if (roomTokensToRefresh) { + await Promise.all( + roomTokensToRefresh.map(async roomId => { + const roomDetails = await getV2OpenGroupRoomByRoomId({ + serverUrl, + roomId, + }); + if (!roomDetails) { + return; + } + roomDetails.token = undefined; + // we might need to retry doing the request here, but how to make sure we don't retry indefinetely? + await saveV2OpenGroupRoom(roomDetails); + // do not await for that. We have a only one at a time logic on a per room basis + void getAuthToken({ serverUrl, roomId }); + }) + ); + } throw new Error( 'See how we handle needs of new tokens, and save stuff to db (last deleted, ... conversation commit, etc' ); - return res as object; + return results; } type ParsedRoomCompactPollResults = { @@ -158,6 +188,7 @@ type ParsedRoomCompactPollResults = { deletions: Array; messages: Array; moderators: Array; + statusCode: number; }; const parseCompactPollResult = async ( @@ -168,13 +199,15 @@ const parseCompactPollResult = async ( deletions: rawDeletions, messages: rawMessages, moderators: rawMods, + status_code: rawStatusCode, } = singleRoomResult; if ( !room_id || rawDeletions === undefined || rawMessages === undefined || - rawMods === undefined + rawMods === undefined || + !rawStatusCode ) { window.log.warn('Invalid compactPoll result', singleRoomResult); return null; @@ -183,8 +216,15 @@ const parseCompactPollResult = async ( const validMessages = await parseMessages(rawMessages); const moderators = rawMods as Array; const deletions = rawDeletions as Array; + const statusCode = rawStatusCode as number; - return { roomId: room_id, deletions, messages: validMessages, moderators }; + return { + roomId: room_id, + deletions, + messages: validMessages, + moderators, + statusCode, + }; }; const parseCompactPollResults = async ( diff --git a/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts b/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts index a7db84931..4b055374d 100644 --- a/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts +++ b/ts/opengroup/opengroupV2/OpenGroupManagerV2.ts @@ -8,7 +8,6 @@ import { ConversationController } from '../../session/conversations'; import { allowOnlyOneAtATime } from '../../session/utils/Promise'; import { getOpenGroupV2ConversationId } from '../utils/OpenGroupUtils'; import { openGroupV2GetRoomInfo } from './OpenGroupAPIV2'; -import { OpenGroupPollerV2 } from './OpenGroupPollerV2'; /** * When we get our configuration from the network, we might get a few times the same open group on two different messages. diff --git a/ts/opengroup/opengroupV2/OpenGroupPollerV2.ts b/ts/opengroup/opengroupV2/OpenGroupPollerV2.ts deleted file mode 100644 index 4a71a1b96..000000000 --- a/ts/opengroup/opengroupV2/OpenGroupPollerV2.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { AbortController } from 'abort-controller'; -import { OpenGroupV2Room } from '../../data/opengroups'; - -export class OpenGroupPollerV2 { - private static readonly pollForEverythingInterval = 4 * 1000; - - private readonly openGroupRoom: OpenGroupV2Room; - - private pollForEverythingTimer?: NodeJS.Timeout; - - private abortController?: AbortController; - - private hasStarted = false; - private isPolling = false; - - constructor(openGroupRoom: OpenGroupV2Room) { - this.openGroupRoom = openGroupRoom; - } - - public startIfNeeded() { - if (this.hasStarted) { - return; - } - - this.hasStarted = true; - this.abortController = new AbortController(); - this.pollForEverythingTimer = global.setInterval( - this.compactPoll, - OpenGroupPollerV2.pollForEverythingInterval - ); - } - - public stop() { - if (this.pollForEverythingTimer) { - global.clearInterval(this.pollForEverythingTimer); - this.abortController?.abort(); - this.abortController = undefined; - this.pollForEverythingTimer = undefined; - } - } - - private async compactPoll() { - // return early if a poll is already in progress - if (this.isPolling) { - return; - } - this.isPolling = true; - window.log.warn('compactPoll TODO'); - // use abortController and do not trigger new messages if it was canceled - this.isPolling = false; - } -} diff --git a/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts b/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts new file mode 100644 index 000000000..492344e20 --- /dev/null +++ b/ts/opengroup/opengroupV2/OpenGroupServerPoller.ts @@ -0,0 +1,155 @@ +import { AbortController } from 'abort-controller'; +import { OpenGroupRequestCommonType } from './ApiUtil'; +import { compactFetchEverything } from './OpenGroupAPIV2CompactPoll'; +const pollForEverythingInterval = 4 * 1000; + +/** + * An OpenGroupServerPollerV2 polls for everything for a particular server. We should + * have only have one OpenGroupServerPollerV2 per opengroup polling. + * + * So even if you have several rooms on the same server, you should only have one OpenGroupServerPollerV2 + * for this server. + */ +export class OpenGroupServerPoller { + private readonly serverUrl: string; + private readonly roomIdsToPoll: Set = new Set(); + private pollForEverythingTimer?: NodeJS.Timeout; + private abortController?: AbortController; + + /** + * isPolling is set to true when we have a request going for this serverUrl. + * If we have an interval tick while we still doing a request, the new one will be dropped + * and only the current one will finish. + * This is to ensure that we don't trigger too many request at the same time + */ + private isPolling = false; + private wasStopped = false; + + constructor(roomInfos: Array) { + if (!roomInfos?.length) { + throw new Error('Empty roomInfos list'); + } + // check that all rooms are from the same serverUrl + const firstUrl = roomInfos[0].serverUrl; + const every = roomInfos.every(r => r.serverUrl === firstUrl); + if (!every) { + throw new Error('All rooms must be for the same serverUrl'); + } + this.serverUrl = firstUrl; + roomInfos.forEach(r => { + this.roomIdsToPoll.add(r.roomId); + }); + + this.abortController = new AbortController(); + this.pollForEverythingTimer = global.setInterval( + this.compactPoll, + pollForEverythingInterval + ); + + // first verify the rooms we got are all from on the same server + } + + /** + * Add a room to the polled room for this server. + * If a request is already in progress, it will be added only on the next run. + * The interval is always ticking, even doing nothing except realizing it has nothing to do + */ + public addRoomToPoll(room: OpenGroupRequestCommonType) { + if (room.serverUrl !== this.serverUrl) { + throw new Error('All rooms must be for the same serverUrl'); + } + if (this.roomIdsToPoll.has(room.roomId)) { + window.log.info('skipping addRoomToPoll of already polled room:', room); + return; + } + this.roomIdsToPoll.add(room.roomId); + } + + public removeRoomFromPoll(room: OpenGroupRequestCommonType) { + if (room.serverUrl !== this.serverUrl) { + window.log.info('this is not the correct ServerPoller'); + return; + } + if (this.roomIdsToPoll.has(room.roomId)) { + window.log.info( + `Removing ${room.roomId} from polling for ${this.serverUrl}` + ); + this.roomIdsToPoll.delete(room.roomId); + } else { + window.log.info( + `Cannot remove polling of ${room.roomId} as it is not polled on ${this.serverUrl}` + ); + } + } + + /** + * Stop polling. + * Requests currently being made will we canceled. + * You can NOT restart for now a stopped serverPoller. + * This has to be used only for quiting the app. + */ + public stop() { + if (this.pollForEverythingTimer) { + global.clearInterval(this.pollForEverythingTimer); + this.abortController?.abort(); + this.pollForEverythingTimer = undefined; + this.wasStopped = true; + } + } + + private async compactPoll() { + if (this.wasStopped) { + window.log.error( + 'serverpoller was stopped. CompactPoll should not happen' + ); + return; + } + if (!this.roomIdsToPoll.size) { + return; + } + // return early if a poll is already in progress + if (this.isPolling) { + return; + } + // do everything with throwing so we can check only at one place + // what we have to clean + try { + this.isPolling = true; + if (!this.abortController || this.abortController.signal.aborted) { + throw new Error('Poller aborted'); + } + + let compactFetchResults = await compactFetchEverything( + this.serverUrl, + this.roomIdsToPoll, + this.abortController.signal + ); + + if (this.abortController && this.abortController.signal.aborted) { + this.abortController = undefined; + window.log.warn('Abort controller was canceled. dropping request'); + return; + } + if (!compactFetchResults) { + window.log.info('compactFetch: no results'); + return; + } + // we were not aborted, just make sure to filter out roomIds we are not polling for anymore + compactFetchResults = compactFetchResults.filter(result => + this.roomIdsToPoll.has(result.roomId) + ); + window.log.warn( + `compactFetchResults for ${this.serverUrl}:`, + compactFetchResults + ); + } catch (e) { + window.log.warn('Got error while compact fetch:', e); + } finally { + if (this.abortController && this.abortController.signal.aborted) { + this.abortController = undefined; + window.log.warn('Abort controller was canceled. dropping request'); + } + this.isPolling = false; + } + } +} diff --git a/ts/session/onions/onionSend.ts b/ts/session/onions/onionSend.ts index 9b2e01ef8..5428456bd 100644 --- a/ts/session/onions/onionSend.ts +++ b/ts/session/onions/onionSend.ts @@ -36,11 +36,12 @@ type OnionFetchBasicOptions = { }; const handleSendViaOnionRetry = async ( - result: number, + result: RequestError, options: OnionFetchBasicOptions, srvPubKey: string, url: URL, - fetchOptions: OnionFetchOptions + fetchOptions: OnionFetchOptions, + abortSignal?: AbortSignal ) => { window.log.error( 'sendOnionRequestLsrpcDest() returned a number indicating an error: ', @@ -59,11 +60,17 @@ const handleSendViaOnionRetry = async ( ); } // retry the same request, and increment the counter - return sendViaOnion(srvPubKey, url, fetchOptions, { - ...options, - retry: (options.retry as number) + 1, - counter: options.requestNumber, - }); + return sendViaOnion( + srvPubKey, + url, + fetchOptions, + { + ...options, + retry: (options.retry as number) + 1, + counter: options.requestNumber, + }, + abortSignal + ); }; const buildSendViaOnionPayload = ( @@ -137,7 +144,8 @@ export const sendViaOnion = async ( srvPubKey: string, url: URL, fetchOptions: OnionFetchOptions, - options: OnionFetchBasicOptions = {} + options: OnionFetchBasicOptions = {}, + abortSignal?: AbortSignal ): Promise<{ result: SnodeResponse; txtResponse: string; @@ -176,7 +184,8 @@ export const sendViaOnion = async ( srvPubKey, finalRelayOptions, payloadObj, - defaultedOptions.requestNumber + defaultedOptions.requestNumber, + abortSignal ); } catch (e) { window.log.error('sendViaOnion - lokiRpcUtils error', e.code, e.message); @@ -184,13 +193,18 @@ export const sendViaOnion = async ( } // RequestError return type is seen as number (as it is an enum) - if (typeof result === 'number') { + if (typeof result === 'string') { + if (result === RequestError.ABORTED) { + window.log.info('sendViaOnion aborted. not retrying'); + return null; + } const retriedResult = await handleSendViaOnionRetry( result, defaultedOptions, srvPubKey, url, - fetchOptions + fetchOptions, + abortSignal ); // keep the await separate so we can log it easily return retriedResult; diff --git a/ts/session/snode_api/onions.ts b/ts/session/snode_api/onions.ts index 3de0736a5..2e3d843aa 100644 --- a/ts/session/snode_api/onions.ts +++ b/ts/session/snode_api/onions.ts @@ -5,11 +5,11 @@ import { Snode } from './snodePool'; import ByteBuffer from 'bytebuffer'; import { StringUtils } from '../utils'; import { OnionPaths } from '../onions'; -import { toHex } from '../utils/String'; export enum RequestError { - BAD_PATH, - OTHER, + BAD_PATH = 'BAD_PATH', + OTHER = 'OTHER', + ABORTED = 'ABORTED', } /** @@ -253,10 +253,16 @@ const processOnionResponse = async ( reqIdx: number, response: any, sharedKey: ArrayBuffer, - debug: boolean + debug: boolean, + abortSignal?: AbortSignal ): Promise => { const { log, libloki, dcodeIO, StringView } = window; + if (abortSignal?.aborted) { + log.warn(`(${reqIdx}) [path] Call aborted`); + return RequestError.ABORTED; + } + // FIXME: 401/500 handling? // detect SNode is not ready (not in swarm; not done syncing) @@ -400,13 +406,9 @@ export type FinalDestinationOptions = { * 1, 2, 3 = onion Snodes * * - * @param reqIdx * @param nodePath the onion path to use to send the request - * @param destX25519Any * @param finalDestOptions those are the options for the request from 3 to R. It contains for instance the payload and headers. * @param finalRelayOptions those are the options 3 will use to make a request to R. It contains for instance the host to make the request to - * @param lsrpcIdx - * @returns */ const sendOnionRequest = async ( reqIdx: number, @@ -418,7 +420,8 @@ const sendOnionRequest = async ( body?: string; }, finalRelayOptions?: FinalRelayOptions, - lsrpcIdx?: any + lsrpcIdx?: any, + abortSignal?: AbortSignal ): Promise => { const { log, StringView } = window; @@ -503,6 +506,7 @@ const sendOnionRequest = async ( body: payload, // we are talking to a snode... agent: snodeHttpsAgent, + abortSignal, }; const target = useV2 ? '/onion_req/v2' : '/onion_req'; @@ -513,7 +517,13 @@ const sendOnionRequest = async ( const response = await insecureNodeFetch(guardUrl, guardFetchOptions); - return processOnionResponse(reqIdx, response, destCtx.symmetricKey, false); + return processOnionResponse( + reqIdx, + response, + destCtx.symmetricKey, + false, + abortSignal + ); }; async function sendOnionRequestSnodeDest( @@ -543,7 +553,8 @@ export async function sendOnionRequestLsrpcDest( destX25519Any: string, finalRelayOptions: FinalRelayOptions, payloadObj: FinalDestinationOptions, - lsrpcIdx: number + lsrpcIdx: number, + abortSignal?: AbortSignal ): Promise { return sendOnionRequest( reqIdx, @@ -551,7 +562,8 @@ export async function sendOnionRequestLsrpcDest( destX25519Any, payloadObj, finalRelayOptions, - lsrpcIdx + lsrpcIdx, + abortSignal ); } @@ -602,6 +614,17 @@ export async function lokiOnionFetch( )} to ${targetNode.ip}:${targetNode.port}` ); return false; + } else if (result === RequestError.ABORTED) { + // could mean, fail to parse results + // or status code wasn't 200 + // or can't decrypt + // it's not a bad_path, so we don't need to mark the path as bad + log.error( + `[path] sendOnionRequest gave aborted for path: ${getPathString( + path + )} to ${targetNode.ip}:${targetNode.port}` + ); + return false; } else { return result; }