add poller for open group v2

pull/1576/head
Audric Ackermann 4 years ago
parent ad1d5a3c4c
commit 19f3b6014d
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -190,11 +190,11 @@ export const ActionsPanel = () => {
);
if (parsedRoom) {
setTimeout(async () => {
await joinOpenGroupV2(parsedRoom);
const oldMessages = await getMessages({
serverUrl: parsedRoom.serverUrl,
roomId: parsedRoom.roomId,
});
// await joinOpenGroupV2(parsedRoom);
// const oldMessages = await getMessages({
// serverUrl: parsedRoom.serverUrl,
// roomId: parsedRoom.roomId,
// });
// const msg = new OpenGroupMessageV2({
// base64EncodedData: 'dffdldfkldf',
// sentTimestamp: Date.now(),
@ -207,11 +207,10 @@ export const ActionsPanel = () => {
// serverUrl: parsedRoom.serverUrl,
// roomId: parsedRoom.roomId,
// });
const rooms = [
{ serverUrl: 'https://opengroup.bilb.us', roomId: 'main' },
];
await compactFetchEverything(rooms);
// const rooms = [
// { serverUrl: 'https://opengroup.bilb.us', roomId: 'main' },
// ];
// await compactFetchEverything(rooms);
}, 6000);
}
}, []);

@ -1,4 +1,7 @@
import { getV2OpenGroupRoomByRoomId } from '../../data/opengroups';
import {
getV2OpenGroupRoomByRoomId,
saveV2OpenGroupRoom,
} from '../../data/opengroups';
import {
OpenGroupRequestCommonType,
OpenGroupV2CompactPollRequest,
@ -7,22 +10,27 @@ import {
import { parseStatusCodeFromOnionRequest } from './OpenGroupAPIV2Parser';
import _ from 'lodash';
import { sendViaOnion } from '../../session/onions/onionSend';
import { OpenGroupManagerV2 } from './OpenGroupManagerV2';
import { OpenGroupMessageV2 } from './OpenGroupMessageV2';
import { getAuthToken } from './OpenGroupAPIV2';
const COMPACT_POLL_ENDPOINT = 'compact_poll';
export const compactFetchEverything = async (
rooms: Array<OpenGroupRequestCommonType>
): Promise<null | any> => {
serverUrl: string,
rooms: Set<string>,
abortSignal: AbortSignal
): Promise<Array<ParsedRoomCompactPollResults> | null> => {
// fetch all we need
const compactPollRequest = await getCompactPollRequest(rooms);
const compactPollRequest = await getCompactPollRequest(serverUrl, rooms);
if (!compactPollRequest) {
window.log.info('Nothing found to be fetched. returning');
return null;
}
const result = await sendOpenGroupV2RequestCompactPoll(compactPollRequest);
const result = await sendOpenGroupV2RequestCompactPoll(
compactPollRequest,
abortSignal
);
const statusCode = parseStatusCodeFromOnionRequest(result);
if (statusCode !== 200) {
return null;
@ -34,26 +42,14 @@ export const compactFetchEverything = async (
* This return body to be used to do the compactPoll
*/
const getCompactPollRequest = async (
rooms: Array<OpenGroupRequestCommonType>
serverUrl: string,
rooms: Set<string>
): Promise<null | OpenGroupV2CompactPollRequest> => {
// first verify the rooms we got are all from on the same server
let firstUrl: string;
if (rooms) {
firstUrl = rooms[0].serverUrl;
const anotherUrl = rooms.some(r => r.serverUrl !== firstUrl);
if (anotherUrl) {
throw new Error('CompactPoll is designed for a single server');
}
} else {
window.log.warn('CompactPoll: No room given. nothing to do');
return null;
}
const allServerPubKeys: Array<string> = [];
const roomsRequestInfos = _.compact(
await Promise.all(
rooms.map(async ({ roomId, serverUrl }) => {
[...rooms].map(async roomId => {
try {
const fetchedInfo = await getV2OpenGroupRoomByRoomId({
serverUrl,
@ -112,7 +108,7 @@ const getCompactPollRequest = async (
});
return {
body,
server: firstUrl,
server: serverUrl,
serverPubKey: firstPubkey,
endpoint: COMPACT_POLL_ENDPOINT,
};
@ -122,18 +118,25 @@ const getCompactPollRequest = async (
* This call is separate as a lot of the logic is custom (statusCode handled separately, etc)
*/
async function sendOpenGroupV2RequestCompactPoll(
request: OpenGroupV2CompactPollRequest
): Promise<Object | null> {
const { server, endpoint, body, serverPubKey } = request;
request: OpenGroupV2CompactPollRequest,
abortSignal: AbortSignal
): Promise<Array<ParsedRoomCompactPollResults> | null> {
const { server: serverUrl, endpoint, body, serverPubKey } = request;
// this will throw if the url is not valid
const builtUrl = new URL(`${server}/${endpoint}`);
const builtUrl = new URL(`${serverUrl}/${endpoint}`);
console.warn(`sending compactPoll request: ${request.body}`);
const res = await sendViaOnion(serverPubKey, builtUrl, {
method: 'POST',
body,
});
const res = await sendViaOnion(
serverPubKey,
builtUrl,
{
method: 'POST',
body,
},
{},
abortSignal
);
const statusCode = parseStatusCodeFromOnionRequest(res);
if (!statusCode) {
@ -141,16 +144,43 @@ async function sendOpenGroupV2RequestCompactPoll(
'sendOpenGroupV2Request Got unknown status code; res:',
res
);
return res as object;
return null;
}
const results = await parseCompactPollResults(res);
if (!results) {
window.log.info('got empty compactPollResults');
return null;
}
// get all roomIds which needs a refreshed token
const roomTokensToRefresh = results
.filter(ret => ret.statusCode === 401)
.map(r => r.roomId);
if (roomTokensToRefresh) {
await Promise.all(
roomTokensToRefresh.map(async roomId => {
const roomDetails = await getV2OpenGroupRoomByRoomId({
serverUrl,
roomId,
});
if (!roomDetails) {
return;
}
roomDetails.token = undefined;
// we might need to retry doing the request here, but how to make sure we don't retry indefinetely?
await saveV2OpenGroupRoom(roomDetails);
// do not await for that. We have a only one at a time logic on a per room basis
void getAuthToken({ serverUrl, roomId });
})
);
}
throw new Error(
'See how we handle needs of new tokens, and save stuff to db (last deleted, ... conversation commit, etc'
);
return res as object;
return results;
}
type ParsedRoomCompactPollResults = {
@ -158,6 +188,7 @@ type ParsedRoomCompactPollResults = {
deletions: Array<number>;
messages: Array<OpenGroupMessageV2>;
moderators: Array<string>;
statusCode: number;
};
const parseCompactPollResult = async (
@ -168,13 +199,15 @@ const parseCompactPollResult = async (
deletions: rawDeletions,
messages: rawMessages,
moderators: rawMods,
status_code: rawStatusCode,
} = singleRoomResult;
if (
!room_id ||
rawDeletions === undefined ||
rawMessages === undefined ||
rawMods === undefined
rawMods === undefined ||
!rawStatusCode
) {
window.log.warn('Invalid compactPoll result', singleRoomResult);
return null;
@ -183,8 +216,15 @@ const parseCompactPollResult = async (
const validMessages = await parseMessages(rawMessages);
const moderators = rawMods as Array<string>;
const deletions = rawDeletions as Array<number>;
const statusCode = rawStatusCode as number;
return { roomId: room_id, deletions, messages: validMessages, moderators };
return {
roomId: room_id,
deletions,
messages: validMessages,
moderators,
statusCode,
};
};
const parseCompactPollResults = async (

@ -8,7 +8,6 @@ import { ConversationController } from '../../session/conversations';
import { allowOnlyOneAtATime } from '../../session/utils/Promise';
import { getOpenGroupV2ConversationId } from '../utils/OpenGroupUtils';
import { openGroupV2GetRoomInfo } from './OpenGroupAPIV2';
import { OpenGroupPollerV2 } from './OpenGroupPollerV2';
/**
* When we get our configuration from the network, we might get a few times the same open group on two different messages.

@ -1,52 +0,0 @@
import { AbortController } from 'abort-controller';
import { OpenGroupV2Room } from '../../data/opengroups';
export class OpenGroupPollerV2 {
private static readonly pollForEverythingInterval = 4 * 1000;
private readonly openGroupRoom: OpenGroupV2Room;
private pollForEverythingTimer?: NodeJS.Timeout;
private abortController?: AbortController;
private hasStarted = false;
private isPolling = false;
constructor(openGroupRoom: OpenGroupV2Room) {
this.openGroupRoom = openGroupRoom;
}
public startIfNeeded() {
if (this.hasStarted) {
return;
}
this.hasStarted = true;
this.abortController = new AbortController();
this.pollForEverythingTimer = global.setInterval(
this.compactPoll,
OpenGroupPollerV2.pollForEverythingInterval
);
}
public stop() {
if (this.pollForEverythingTimer) {
global.clearInterval(this.pollForEverythingTimer);
this.abortController?.abort();
this.abortController = undefined;
this.pollForEverythingTimer = undefined;
}
}
private async compactPoll() {
// return early if a poll is already in progress
if (this.isPolling) {
return;
}
this.isPolling = true;
window.log.warn('compactPoll TODO');
// use abortController and do not trigger new messages if it was canceled
this.isPolling = false;
}
}

@ -0,0 +1,155 @@
import { AbortController } from 'abort-controller';
import { OpenGroupRequestCommonType } from './ApiUtil';
import { compactFetchEverything } from './OpenGroupAPIV2CompactPoll';
const pollForEverythingInterval = 4 * 1000;
/**
* An OpenGroupServerPollerV2 polls for everything for a particular server. We should
* have only have one OpenGroupServerPollerV2 per opengroup polling.
*
* So even if you have several rooms on the same server, you should only have one OpenGroupServerPollerV2
* for this server.
*/
export class OpenGroupServerPoller {
private readonly serverUrl: string;
private readonly roomIdsToPoll: Set<string> = new Set();
private pollForEverythingTimer?: NodeJS.Timeout;
private abortController?: AbortController;
/**
* isPolling is set to true when we have a request going for this serverUrl.
* If we have an interval tick while we still doing a request, the new one will be dropped
* and only the current one will finish.
* This is to ensure that we don't trigger too many request at the same time
*/
private isPolling = false;
private wasStopped = false;
constructor(roomInfos: Array<OpenGroupRequestCommonType>) {
if (!roomInfos?.length) {
throw new Error('Empty roomInfos list');
}
// check that all rooms are from the same serverUrl
const firstUrl = roomInfos[0].serverUrl;
const every = roomInfos.every(r => r.serverUrl === firstUrl);
if (!every) {
throw new Error('All rooms must be for the same serverUrl');
}
this.serverUrl = firstUrl;
roomInfos.forEach(r => {
this.roomIdsToPoll.add(r.roomId);
});
this.abortController = new AbortController();
this.pollForEverythingTimer = global.setInterval(
this.compactPoll,
pollForEverythingInterval
);
// first verify the rooms we got are all from on the same server
}
/**
* Add a room to the polled room for this server.
* If a request is already in progress, it will be added only on the next run.
* The interval is always ticking, even doing nothing except realizing it has nothing to do
*/
public addRoomToPoll(room: OpenGroupRequestCommonType) {
if (room.serverUrl !== this.serverUrl) {
throw new Error('All rooms must be for the same serverUrl');
}
if (this.roomIdsToPoll.has(room.roomId)) {
window.log.info('skipping addRoomToPoll of already polled room:', room);
return;
}
this.roomIdsToPoll.add(room.roomId);
}
public removeRoomFromPoll(room: OpenGroupRequestCommonType) {
if (room.serverUrl !== this.serverUrl) {
window.log.info('this is not the correct ServerPoller');
return;
}
if (this.roomIdsToPoll.has(room.roomId)) {
window.log.info(
`Removing ${room.roomId} from polling for ${this.serverUrl}`
);
this.roomIdsToPoll.delete(room.roomId);
} else {
window.log.info(
`Cannot remove polling of ${room.roomId} as it is not polled on ${this.serverUrl}`
);
}
}
/**
* Stop polling.
* Requests currently being made will we canceled.
* You can NOT restart for now a stopped serverPoller.
* This has to be used only for quiting the app.
*/
public stop() {
if (this.pollForEverythingTimer) {
global.clearInterval(this.pollForEverythingTimer);
this.abortController?.abort();
this.pollForEverythingTimer = undefined;
this.wasStopped = true;
}
}
private async compactPoll() {
if (this.wasStopped) {
window.log.error(
'serverpoller was stopped. CompactPoll should not happen'
);
return;
}
if (!this.roomIdsToPoll.size) {
return;
}
// return early if a poll is already in progress
if (this.isPolling) {
return;
}
// do everything with throwing so we can check only at one place
// what we have to clean
try {
this.isPolling = true;
if (!this.abortController || this.abortController.signal.aborted) {
throw new Error('Poller aborted');
}
let compactFetchResults = await compactFetchEverything(
this.serverUrl,
this.roomIdsToPoll,
this.abortController.signal
);
if (this.abortController && this.abortController.signal.aborted) {
this.abortController = undefined;
window.log.warn('Abort controller was canceled. dropping request');
return;
}
if (!compactFetchResults) {
window.log.info('compactFetch: no results');
return;
}
// we were not aborted, just make sure to filter out roomIds we are not polling for anymore
compactFetchResults = compactFetchResults.filter(result =>
this.roomIdsToPoll.has(result.roomId)
);
window.log.warn(
`compactFetchResults for ${this.serverUrl}:`,
compactFetchResults
);
} catch (e) {
window.log.warn('Got error while compact fetch:', e);
} finally {
if (this.abortController && this.abortController.signal.aborted) {
this.abortController = undefined;
window.log.warn('Abort controller was canceled. dropping request');
}
this.isPolling = false;
}
}
}

@ -36,11 +36,12 @@ type OnionFetchBasicOptions = {
};
const handleSendViaOnionRetry = async (
result: number,
result: RequestError,
options: OnionFetchBasicOptions,
srvPubKey: string,
url: URL,
fetchOptions: OnionFetchOptions
fetchOptions: OnionFetchOptions,
abortSignal?: AbortSignal
) => {
window.log.error(
'sendOnionRequestLsrpcDest() returned a number indicating an error: ',
@ -59,11 +60,17 @@ const handleSendViaOnionRetry = async (
);
}
// retry the same request, and increment the counter
return sendViaOnion(srvPubKey, url, fetchOptions, {
...options,
retry: (options.retry as number) + 1,
counter: options.requestNumber,
});
return sendViaOnion(
srvPubKey,
url,
fetchOptions,
{
...options,
retry: (options.retry as number) + 1,
counter: options.requestNumber,
},
abortSignal
);
};
const buildSendViaOnionPayload = (
@ -137,7 +144,8 @@ export const sendViaOnion = async (
srvPubKey: string,
url: URL,
fetchOptions: OnionFetchOptions,
options: OnionFetchBasicOptions = {}
options: OnionFetchBasicOptions = {},
abortSignal?: AbortSignal
): Promise<{
result: SnodeResponse;
txtResponse: string;
@ -176,7 +184,8 @@ export const sendViaOnion = async (
srvPubKey,
finalRelayOptions,
payloadObj,
defaultedOptions.requestNumber
defaultedOptions.requestNumber,
abortSignal
);
} catch (e) {
window.log.error('sendViaOnion - lokiRpcUtils error', e.code, e.message);
@ -184,13 +193,18 @@ export const sendViaOnion = async (
}
// RequestError return type is seen as number (as it is an enum)
if (typeof result === 'number') {
if (typeof result === 'string') {
if (result === RequestError.ABORTED) {
window.log.info('sendViaOnion aborted. not retrying');
return null;
}
const retriedResult = await handleSendViaOnionRetry(
result,
defaultedOptions,
srvPubKey,
url,
fetchOptions
fetchOptions,
abortSignal
);
// keep the await separate so we can log it easily
return retriedResult;

@ -5,11 +5,11 @@ import { Snode } from './snodePool';
import ByteBuffer from 'bytebuffer';
import { StringUtils } from '../utils';
import { OnionPaths } from '../onions';
import { toHex } from '../utils/String';
export enum RequestError {
BAD_PATH,
OTHER,
BAD_PATH = 'BAD_PATH',
OTHER = 'OTHER',
ABORTED = 'ABORTED',
}
/**
@ -253,10 +253,16 @@ const processOnionResponse = async (
reqIdx: number,
response: any,
sharedKey: ArrayBuffer,
debug: boolean
debug: boolean,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> => {
const { log, libloki, dcodeIO, StringView } = window;
if (abortSignal?.aborted) {
log.warn(`(${reqIdx}) [path] Call aborted`);
return RequestError.ABORTED;
}
// FIXME: 401/500 handling?
// detect SNode is not ready (not in swarm; not done syncing)
@ -400,13 +406,9 @@ export type FinalDestinationOptions = {
* 1, 2, 3 = onion Snodes
*
*
* @param reqIdx
* @param nodePath the onion path to use to send the request
* @param destX25519Any
* @param finalDestOptions those are the options for the request from 3 to R. It contains for instance the payload and headers.
* @param finalRelayOptions those are the options 3 will use to make a request to R. It contains for instance the host to make the request to
* @param lsrpcIdx
* @returns
*/
const sendOnionRequest = async (
reqIdx: number,
@ -418,7 +420,8 @@ const sendOnionRequest = async (
body?: string;
},
finalRelayOptions?: FinalRelayOptions,
lsrpcIdx?: any
lsrpcIdx?: any,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> => {
const { log, StringView } = window;
@ -503,6 +506,7 @@ const sendOnionRequest = async (
body: payload,
// we are talking to a snode...
agent: snodeHttpsAgent,
abortSignal,
};
const target = useV2 ? '/onion_req/v2' : '/onion_req';
@ -513,7 +517,13 @@ const sendOnionRequest = async (
const response = await insecureNodeFetch(guardUrl, guardFetchOptions);
return processOnionResponse(reqIdx, response, destCtx.symmetricKey, false);
return processOnionResponse(
reqIdx,
response,
destCtx.symmetricKey,
false,
abortSignal
);
};
async function sendOnionRequestSnodeDest(
@ -543,7 +553,8 @@ export async function sendOnionRequestLsrpcDest(
destX25519Any: string,
finalRelayOptions: FinalRelayOptions,
payloadObj: FinalDestinationOptions,
lsrpcIdx: number
lsrpcIdx: number,
abortSignal?: AbortSignal
): Promise<SnodeResponse | RequestError> {
return sendOnionRequest(
reqIdx,
@ -551,7 +562,8 @@ export async function sendOnionRequestLsrpcDest(
destX25519Any,
payloadObj,
finalRelayOptions,
lsrpcIdx
lsrpcIdx,
abortSignal
);
}
@ -602,6 +614,17 @@ export async function lokiOnionFetch(
)} to ${targetNode.ip}:${targetNode.port}`
);
return false;
} else if (result === RequestError.ABORTED) {
// could mean, fail to parse results
// or status code wasn't 200
// or can't decrypt
// it's not a bad_path, so we don't need to mark the path as bad
log.error(
`[path] sendOnionRequest gave aborted for path: ${getPathString(
path
)} to ${targetNode.ip}:${targetNode.port}`
);
return false;
} else {
return result;
}

Loading…
Cancel
Save