fix bug fetching new token every request

pull/1576/head
Audric Ackermann 4 years ago
parent 40793eb74d
commit 35d66d8865
No known key found for this signature in database
GPG Key ID: 999F434D76324AD4

@ -693,7 +693,10 @@ export async function _removeMessages(ids: Array<string>): Promise<void> {
await channels.removeMessage(ids);
}
export async function getMessageIdsFromServerIds(serverIds: Array<string>, conversationId: string) {
export async function getMessageIdsFromServerIds(
serverIds: Array<string> | Array<number>,
conversationId: string
): Promise<Array<string> | undefined> {
return channels.getMessageIdsFromServerIds(serverIds, conversationId);
}

@ -75,11 +75,11 @@ export const setCachedModerators = (
roomId: string,
newModerators: Array<string>
) => {
const allRoomsMods = cachedModerators.get(serverUrl);
let allRoomsMods = cachedModerators.get(serverUrl);
if (!allRoomsMods) {
cachedModerators.set(serverUrl, new Map());
allRoomsMods = cachedModerators.get(serverUrl);
}
// tslint:disable: no-non-null-assertion
if (!allRoomsMods!.get(roomId)) {
allRoomsMods!.set(roomId, new Set());
}

@ -236,11 +236,12 @@ export async function getAuthToken({
return null;
}
if (roomDetails?.token) {
return roomDetails?.token;
return roomDetails.token;
}
await allowOnlyOneAtATime(`getAuthTokenV2${serverUrl}:${roomId}`, async () => {
try {
console.warn('TRIGGERING NEW AUTH TOKEN WITH', { serverUrl, roomId });
const token = await requestNewAuthToken({ serverUrl, roomId });
if (!token) {
window.log.warn('invalid new auth token', token);
@ -371,33 +372,33 @@ export const postMessage = async (
};
/** Those functions are related to moderators management */
export const getModerators = async ({
serverUrl,
roomId,
}: OpenGroupRequestCommonType): Promise<Array<string>> => {
const request: OpenGroupV2Request = {
method: 'GET',
room: roomId,
server: serverUrl,
isAuthRequired: true,
endpoint: 'moderators',
};
const result = await sendOpenGroupV2Request(request);
const statusCode = parseStatusCodeFromOnionRequest(result);
if (statusCode !== 200) {
window.log.error(`Could not getModerators, status code: ${statusCode}`);
return [];
}
const moderators = parseModerators(result);
if (moderators === undefined) {
// if moderators is undefined, do not update the cached moderator list
window.log.warn('Could not getModerators, got no moderatorsGot at all in json.');
return [];
}
setCachedModerators(serverUrl, roomId, moderators || []);
return moderators || [];
};
// export const getModerators = async ({
// serverUrl,
// roomId,
// }: OpenGroupRequestCommonType): Promise<Array<string>> => {
// const request: OpenGroupV2Request = {
// method: 'GET',
// room: roomId,
// server: serverUrl,
// isAuthRequired: true,
// endpoint: 'moderators',
// };
// const result = await sendOpenGroupV2Request(request);
// const statusCode = parseStatusCodeFromOnionRequest(result);
// if (statusCode !== 200) {
// window.log.error(`Could not getModerators, status code: ${statusCode}`);
// return [];
// }
// const moderators = parseModerators(result);
// if (moderators === undefined) {
// // if moderators is undefined, do not update t+++++++++++++++++++++++++he cached moderator list
// window.log.warn('Could not getModerators, got no moderatorsGot at all in json.');
// return [];
// }
// setCachedModerators(serverUrl, roomId, moderators || []);
// return moderators || [];
// };
export const isUserModerator = (
publicKey: string,

@ -3,6 +3,7 @@ import {
OpenGroupRequestCommonType,
OpenGroupV2CompactPollRequest,
parseMessages,
setCachedModerators,
} from './ApiUtil';
import { parseStatusCodeFromOnionRequest } from './OpenGroupAPIV2Parser';
import _ from 'lodash';
@ -132,7 +133,7 @@ async function sendOpenGroupV2RequestCompactPoll(
return null;
}
const results = await parseCompactPollResults(res);
const results = await parseCompactPollResults(res, serverUrl);
if (!results) {
window.log.info('got empty compactPollResults');
return null;
@ -144,6 +145,7 @@ async function sendOpenGroupV2RequestCompactPoll(
const roomPollValidResults = results.filter(ret => ret.statusCode === 200);
if (roomWithTokensToRefresh) {
console.warn('roomWithTokensToRefresh', roomWithTokensToRefresh);
await Promise.all(
roomWithTokensToRefresh.map(async roomId => {
const roomDetails = await getV2OpenGroupRoomByRoomId({
@ -157,7 +159,7 @@ async function sendOpenGroupV2RequestCompactPoll(
// we might need to retry doing the request here, but how to make sure we don't retry indefinetely?
await saveV2OpenGroupRoom(roomDetails);
// do not await for that. We have a only one at a time logic on a per room basis
void getAuthToken({ serverUrl, roomId });
await getAuthToken({ serverUrl, roomId });
})
);
}
@ -165,7 +167,7 @@ async function sendOpenGroupV2RequestCompactPoll(
return roomPollValidResults;
}
type ParsedRoomCompactPollResults = {
export type ParsedRoomCompactPollResults = {
roomId: string;
deletions: Array<number>;
messages: Array<OpenGroupMessageV2>;
@ -174,7 +176,8 @@ type ParsedRoomCompactPollResults = {
};
const parseCompactPollResult = async (
singleRoomResult: any
singleRoomResult: any,
serverUrl: string
): Promise<ParsedRoomCompactPollResults | null> => {
const {
room_id,
@ -196,9 +199,10 @@ const parseCompactPollResult = async (
}
const validMessages = await parseMessages(rawMessages);
const moderators = rawMods as Array<string>;
const moderators = rawMods.sort() as Array<string>;
const deletions = rawDeletions as Array<number>;
const statusCode = rawStatusCode as number;
setCachedModerators(serverUrl, room_id, moderators || []);
return {
roomId: room_id,
@ -210,7 +214,8 @@ const parseCompactPollResult = async (
};
const parseCompactPollResults = async (
res: any
res: any,
serverUrl: string
): Promise<Array<ParsedRoomCompactPollResults> | null> => {
if (!res || !res.result || !res.result.results || !res.result.results.length) {
return null;
@ -218,7 +223,11 @@ const parseCompactPollResults = async (
const arrayOfResults = res.result.results as Array<any>;
const parsedResults: Array<ParsedRoomCompactPollResults> = _.compact(
await Promise.all(arrayOfResults.map(parseCompactPollResult))
await Promise.all(
arrayOfResults.map(async m => {
return parseCompactPollResult(m, serverUrl);
})
)
);
if (!parsedResults || !parsedResults.length) {

@ -88,7 +88,6 @@ export class OpenGroupManagerV2 {
public removeRoomFromPolledRooms(roomInfos: OpenGroupRequestCommonType) {
const poller = this.pollers.get(roomInfos.serverUrl);
if (!poller) {
console.warn('No such poller');
return;
}
// this won't do a thing if the room is already polled for

@ -1,6 +1,18 @@
import { AbortController } from 'abort-controller';
import { ConversationController } from '../../session/conversations';
import { getOpenGroupV2ConversationId } from '../utils/OpenGroupUtils';
import { OpenGroupRequestCommonType } from './ApiUtil';
import { compactFetchEverything } from './OpenGroupAPIV2CompactPoll';
import { compactFetchEverything, ParsedRoomCompactPollResults } from './OpenGroupAPIV2CompactPoll';
import _ from 'lodash';
import { ConversationModel } from '../../models/conversation';
import { getMessageIdsFromServerIds, removeMessage } from '../../data/data';
import {
getV2OpenGroupRoom,
getV2OpenGroupRoomByRoomId,
saveV2OpenGroupRoom,
} from '../../data/opengroups';
import { OpenGroupMessageV2 } from './OpenGroupMessageV2';
import { handleOpenGroupV2Message } from '../../receiver/receiver';
const pollForEverythingInterval = 4 * 1000;
/**
@ -146,6 +158,7 @@ export class OpenGroupServerPoller {
window.log.warn(`compactFetchResults for ${this.serverUrl}:`, compactFetchResults);
// ==> At this point all those results need to trigger conversation updates, so update what we have to update
await handleCompactPollResults(this.serverUrl, compactFetchResults);
} catch (e) {
window.log.warn('Got error while compact fetch:', e);
} finally {
@ -153,3 +166,99 @@ export class OpenGroupServerPoller {
}
}
}
const handleDeletions = async (
deletedIds: Array<number>,
conversationId: string,
convo?: ConversationModel
) => {
try {
const maxDeletedId = Math.max(...deletedIds);
const messageIds = await getMessageIdsFromServerIds(deletedIds, conversationId);
if (!messageIds?.length) {
return;
}
const roomInfos = await getV2OpenGroupRoom(conversationId);
if (roomInfos && roomInfos.lastMessageDeletedServerID !== maxDeletedId) {
roomInfos.lastMessageDeletedServerID = maxDeletedId;
await saveV2OpenGroupRoom(roomInfos);
}
await Promise.all(
messageIds.map(async id => {
if (convo) {
await convo.removeMessage(id);
}
await removeMessage(id);
})
);
// we want to try to update our lastDeletedId
} catch (e) {
window.log.warn('handleDeletions failed:', e);
}
};
const handleNewMessages = async (
newMessages: Array<OpenGroupMessageV2>,
conversationId: string,
convo?: ConversationModel
) => {
try {
const incomingMessageIds = _.compact(newMessages.map(n => n.serverId));
const maxNewMessageId = Math.max(...incomingMessageIds);
// TODO filter out duplicates ?
// tslint:disable-next-line: prefer-for-of
for (let index = 0; index < newMessages.length; index++) {
const newMessage = newMessages[index];
await handleOpenGroupV2Message(newMessage);
}
const roomInfos = await getV2OpenGroupRoom(conversationId);
if (roomInfos && roomInfos.lastMessageFetchedServerID !== maxNewMessageId) {
roomInfos.lastMessageFetchedServerID = maxNewMessageId;
await saveV2OpenGroupRoom(roomInfos);
}
} catch (e) {
// window.log.warn('handleNewMessages failed:', e);
}
};
const handleCompactPollResults = async (
serverUrl: string,
results: Array<ParsedRoomCompactPollResults>
) => {
await Promise.all(
results.map(async res => {
const convoId = getOpenGroupV2ConversationId(serverUrl, res.roomId);
const convo = ConversationController.getInstance().get(convoId);
// we want to do deletions even if we somehow lost the convo.
if (res.deletions.length) {
// new deletions
await handleDeletions(res.deletions, convoId, convo);
}
if (res.messages.length) {
// new deletions
await handleNewMessages(res.messages, convoId, convo);
}
if (!convo) {
window.log.warn('Could not find convo for compactPoll', convoId);
return;
}
const existingModerators = convo.get('moderators') || [];
let changeOnConvo = false;
// res.moderators is already sorted
if (!_.isEqual(existingModerators.sort(), res.moderators)) {
convo.set({ moderators: res.moderators });
changeOnConvo = true;
}
if (changeOnConvo) {
await convo.commit();
}
})
);
};

@ -291,3 +291,35 @@ export async function handlePublicMessage(messageData: any) {
await handleMessageEvent(ev); // open groups
}
export async function handleOpenGroupV2Message(messageData: any) {
const { source } = messageData;
const { group, profile, profileKey } = messageData.message;
const isMe = UserUtils.isUsFromCache(source);
if (!isMe && profile) {
const conversation = await ConversationController.getInstance().getOrCreateAndWait(
source,
ConversationType.PRIVATE
);
await updateProfile(conversation, profile, profileKey);
}
const isPublicVisibleMessage = group && group.id && !!group.id.match(openGroupPrefixRegex);
if (!isPublicVisibleMessage) {
throw new Error('handlePublicMessage Should only be called with public message groups');
}
const ev = {
// Public chat messages from ourselves should be outgoing
type: isMe ? 'sent' : 'message',
data: messageData,
confirm: () => {
/* do nothing */
},
};
await handleMessageEvent(ev); // open groups
}

@ -14,8 +14,12 @@ export class TaskTimedOutError extends Error {
}
// one action resolves all
const snodeGlobalLocks: any = {};
export async function allowOnlyOneAtATime(name: string, process: any, timeoutMs?: number) {
const snodeGlobalLocks: Record<string, Promise<any>> = {};
export async function allowOnlyOneAtATime(
name: string,
process: () => Promise<any>,
timeoutMs?: number
) {
// if currently not in progress
if (snodeGlobalLocks[name] === undefined) {
// set lock
@ -64,6 +68,7 @@ export async function allowOnlyOneAtATime(name: string, process: any, timeoutMs?
// release the kraken
resolve(innerRetVal);
});
} else {
}
return snodeGlobalLocks[name];
}

Loading…
Cancel
Save