fix: auto retry forever to push group notification msgs

pull/3281/head
Audric Ackermann 3 months ago
parent 91a71f0604
commit a56cd3c577
No known key found for this signature in database

@ -103,7 +103,7 @@ const StyledMessageInfoContainer = styled.div`
`;
type MessageInfoProps = {
errors: Array<Error>;
errors?: string;
attachments: Array<PropsForAttachment>;
};
@ -151,10 +151,7 @@ async function getPropsForMessageInfo(
}
}
// This will make the error message for outgoing key errors a bit nicer
const errors = (found.get('errors') || []).map((error: any) => {
return error;
});
const errors = found.get('errors');
const toRet: MessageInfoProps = {
errors,

@ -1,5 +1,3 @@
import { isEmpty } from 'lodash';
import styled from 'styled-components';
import { MessageFrom } from '.';
import {
@ -117,7 +115,7 @@ const DebugMessageInfo = ({ messageId }: { messageId: string }) => {
);
};
export const MessageInfo = ({ messageId, errors }: { messageId: string; errors: Array<Error> }) => {
export const MessageInfo = ({ messageId, errors }: { messageId: string; errors?: string }) => {
const sender = useMessageSender(messageId);
const direction = useMessageDirection(messageId);
const sentAt = useMessageTimestamp(messageId);
@ -138,15 +136,6 @@ export const MessageInfo = ({ messageId, errors }: { messageId: string; errors:
formatStr: formatTimestampStr,
});
const hasError = !isEmpty(errors);
const errorString = hasError
? errors?.reduce((previous, current, currentIndex) => {
return `${previous}${current.message}${
errors.length > 1 && currentIndex < errors.length - 1 ? ', ' : ''
}`;
}, '')
: null;
return (
<Flex container={true} flexDirection="column">
<LabelWithInfo label={window.i18n('sent')} info={sentAtStr} />
@ -157,13 +146,13 @@ export const MessageInfo = ({ messageId, errors }: { messageId: string; errors:
) : null}
<SpacerSM />
<MessageFrom sender={sender} isSenderAdmin={isSenderAdmin} />
{hasError && (
{!!errors && (
<>
<SpacerSM />
<LabelWithInfo
title={window.i18n('helpReportABugExportLogsSaveToDesktopDescription')}
label={`${window.i18n('theError')}:`}
info={errorString || window.i18n('errorUnknown')}
info={errors || window.i18n('errorUnknown')}
dataColor={'var(--danger-color)'}
onClick={() => {
void saveLogToDesktop();

@ -616,6 +616,16 @@ async function findAllMessageHashesInConversationMatchingAuthor(
return msgAttrs.map((msg: any) => new MessageModel(msg));
}
async function fetchAllGroupUpdateFailedMessage(groupPk: GroupPubkeyType) {
const msgAttrs = await channels.fetchAllGroupUpdateFailedMessage(groupPk);
if (!msgAttrs || isEmpty(msgAttrs) || !isArray(msgAttrs)) {
return [];
}
return msgAttrs.map((msg: any) => new MessageModel(msg));
}
async function getMessagesBySentAt(sentAt: number): Promise<MessageCollection> {
const messages = await channels.getMessagesBySentAt(sentAt);
return new MessageCollection(messages);
@ -902,6 +912,7 @@ export const Data = {
findAllMessageFromSendersInConversation,
findAllMessageHashesInConversation,
findAllMessageHashesInConversationMatchingAuthor,
fetchAllGroupUpdateFailedMessage,
getMessagesBySentAt,
getExpiredMessages,
getOutgoingWithoutExpiresAt,

@ -56,6 +56,7 @@ const channelsToMake = new Set([
'findAllMessageFromSendersInConversation',
'findAllMessageHashesInConversation',
'findAllMessageHashesInConversationMatchingAuthor',
'fetchAllGroupUpdateFailedMessage',
'getMessageCount',
'filterAlreadyFetchedOpengroupMessage',
'getMessagesBySenderAndSentAt',

@ -841,7 +841,7 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
const error = new Error('Network is not available');
error.name = 'SendMessageNetworkError';
(error as any).number = this.id;
await messageModel.saveErrors([error]);
await messageModel.saveErrors(error);
await this.commit();
return;
@ -2204,8 +2204,10 @@ export class ConversationModel extends Backbone.Model<ConversationAttributes> {
}
throw new TypeError(`Invalid conversation type: '${this.get('type')}'`);
} catch (e) {
await message.saveErrors(e);
} catch (e: unknown) {
if (e instanceof Error) {
await message.saveErrors(e);
}
}
}

@ -3,7 +3,7 @@ import Backbone from 'backbone';
import autoBind from 'auto-bind';
import filesize from 'filesize';
import { GroupPubkeyType, PubkeyType } from 'libsession_util_nodejs';
import { cloneDeep, debounce, isEmpty, size as lodashSize, partition, pick, uniq } from 'lodash';
import { cloneDeep, debounce, isEmpty, size as lodashSize, uniq } from 'lodash';
import { SignalService } from '../protobuf';
import { ConvoHub } from '../session/conversations';
import { ContentMessage } from '../session/messages/outgoing';
@ -886,7 +886,7 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
return null;
}
this.set({ errors: null, sent: false, sent_to: [] });
this.set({ errors: undefined, sent: false, sent_to: [] });
await this.commit();
try {
const conversation: ConversationModel | undefined = this.getConversation();
@ -987,21 +987,14 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
message: closedGroupVisibleMessage,
namespace: SnodeNamespaces.LegacyClosedGroup,
});
} catch (e) {
await this.saveErrors(e);
} catch (e: unknown) {
if (e instanceof Error) {
await this.saveErrors(e);
}
return null;
}
}
public removeOutgoingErrors(number: string) {
const errors = partition(
this.get('errors'),
e => e.number === number && e.name === 'SendMessageNetworkError'
);
this.set({ errors: errors[1] });
return errors[0][0];
}
public getConversation(): ConversationModel | undefined {
// This needs to be an unsafe call, because this method is called during
// initial module setup. We may be in the middle of the initial fetch to
@ -1114,32 +1107,14 @@ export class MessageModel extends Backbone.Model<MessageAttributes> {
await this.commit();
}
public async saveErrors(providedErrors: any) {
let errors = providedErrors;
if (!(errors instanceof Array)) {
errors = [errors];
public async saveErrors(providedError: Error) {
if (!(providedError instanceof Error)) {
throw new Error('saveErrors expects a single error to be provided');
}
errors.forEach((e: any) => {
window?.log?.error(
'Message.saveErrors:',
e && e.reason ? e.reason : null,
e && e.stack ? e.stack : e
);
});
errors = errors.map((e: any) => {
if (
e.constructor === Error ||
e.constructor === TypeError ||
e.constructor === ReferenceError
) {
return pick(e, 'name', 'message', 'code', 'number', 'reason');
}
return e;
});
errors = errors.concat(this.get('errors') || []);
this.set({ errors });
const errorStr = `${providedError.name} - "${providedError.message || 'unknown error message'}"`;
this.set({ errors: errorStr });
await this.commit();
}

@ -44,7 +44,7 @@ export interface MessageAttributes {
groupInvitation?: { url: string | undefined; name: string } | undefined;
attachments?: any;
conversationId: string;
errors?: any;
errors?: string;
flags?: number;
hasAttachments: 1 | 0;
hasFileAttachments: 1 | 0;

@ -108,6 +108,7 @@ const LOKI_SCHEMA_VERSIONS = [
updateToSessionSchemaVersion37,
updateToSessionSchemaVersion38,
updateToSessionSchemaVersion39,
updateToSessionSchemaVersion40,
];
function updateToSessionSchemaVersion1(currentVersion: number, db: BetterSqlite3.Database) {
@ -2025,6 +2026,27 @@ function updateToSessionSchemaVersion39(currentVersion: number, db: BetterSqlite
console.log(`updateToSessionSchemaVersion${targetVersion}: success!`);
}
function updateToSessionSchemaVersion40(currentVersion: number, db: BetterSqlite3.Database) {
const targetVersion = 40;
if (currentVersion >= targetVersion) {
return;
}
console.log(`updateToSessionSchemaVersion${targetVersion}: starting...`);
db.transaction(() => {
// the 'errors' field used to do different things. We need to extract it to be a single string.
db.exec(`ALTER TABLE ${MESSAGES_TABLE} ADD COLUMN errors TEXT;`);
db.exec(`UPDATE ${MESSAGES_TABLE}
SET json = json_remove(json, '$.errors');
`);
writeSessionSchemaVersion(targetVersion, db);
})();
console.log(`updateToSessionSchemaVersion${targetVersion}: success!`);
}
export function printTableColumns(table: string, db: BetterSqlite3.Database) {
console.info(db.pragma(`table_info('${table}');`));
}

@ -851,6 +851,7 @@ function saveMessage(data: MessageAttributes) {
expirationStartTimestamp,
flags,
messageHash,
errors,
} = data;
if (!id) {
@ -884,6 +885,7 @@ function saveMessage(data: MessageAttributes) {
unread,
flags: flags ?? 0,
messageHash,
errors,
};
assertGlobalInstance()
@ -909,7 +911,8 @@ function saveMessage(data: MessageAttributes) {
type,
unread,
flags,
messageHash
messageHash,
errors
) values (
$id,
$json,
@ -931,7 +934,8 @@ function saveMessage(data: MessageAttributes) {
$type,
$unread,
$flags,
$messageHash
$messageHash,
$errors
);`
)
.run(payload);
@ -1191,6 +1195,29 @@ function findAllMessageHashesInConversationMatchingAuthor(
return map(rows, row => jsonToObject(row.json));
}
function fetchAllGroupUpdateFailedMessage(
groupPk: GroupPubkeyType,
instance?: BetterSqlite3.Database
) {
if (!groupPk) {
return [];
}
const rows = assertGlobalInstanceOrInstance(instance)
.prepare(
`SELECT json FROM ${MESSAGES_TABLE} WHERE conversationId = ? AND JSON_EXTRACT(json, '$.group_update') IS NOT NULL AND errors IS NOT NULL;`
)
.all(groupPk);
if (!rows || isEmpty(rows)) {
return [];
}
const objs = map(rows, row => jsonToObject(row.json)).filter(m => {
return !isEmpty(m);
});
return objs;
}
function cleanUpExpirationTimerUpdateHistory(
conversationId: string,
isPrivate: boolean,
@ -2730,6 +2757,7 @@ export const sqlNode = {
findAllMessageFromSendersInConversation,
findAllMessageHashesInConversation,
findAllMessageHashesInConversationMatchingAuthor,
fetchAllGroupUpdateFailedMessage,
getUnreadByConversation,
getUnreadDisappearingByConversation,
markAllAsReadByConversationNoExpiration,

@ -312,7 +312,7 @@ async function handleRegularMessage(
body: rawDataMessage.body,
conversationId: conversation.id,
messageHash,
errors: [],
errors: undefined,
});
const serverTimestamp = message.get('serverTimestamp');

@ -1384,3 +1384,18 @@ export type BatchStoreWithExtraParams =
| DeleteHashesFromUserNodeSubRequest
| SubaccountRevokeSubRequest
| SubaccountUnrevokeSubRequest;
export type StoreUserInitiatedMessage =
| StoreGroupMessageSubRequest
| StoreLegacyGroupMessageSubRequest
| StoreUserMessageSubRequest;
export function isStoreUserInitiatedMessage(
request: SnodeAPISubRequest<string>
): request is StoreUserInitiatedMessage {
return (
request instanceof StoreGroupMessageSubRequest ||
request instanceof StoreLegacyGroupMessageSubRequest ||
request instanceof StoreUserMessageSubRequest
);
}

@ -16,6 +16,7 @@ import {
DeleteAllFromGroupMsgNodeSubRequest,
DeleteHashesFromGroupNodeSubRequest,
DeleteHashesFromUserNodeSubRequest,
isStoreUserInitiatedMessage,
MethodBatchType,
RawSnodeSubRequests,
StoreGroupInfoSubRequest,
@ -559,6 +560,18 @@ async function sendEncryptedDataToSnode<T extends GroupPubkeyType | PubkeyType>(
return batchResults;
} catch (e) {
window.log.warn(`sendEncryptedDataToSnode failed with ${e.message}`);
const sortedSubRequestsWithMsg = sortedSubRequests.filter(r => isStoreUserInitiatedMessage(r));
for (let index = 0; index < sortedSubRequestsWithMsg.length; index++) {
const request = sortedSubRequestsWithMsg[index];
if (request.dbMessageIdentifier) {
// eslint-disable-next-line no-await-in-loop
await MessageSentHandler.handleSwarmMessageSentFailure(
{ device: destination, identifier: request.dbMessageIdentifier },
e
);
}
}
return null;
}
}
@ -660,11 +673,7 @@ async function handleBatchResultWithSubRequests({
// there are some things we need to do when storing messages
// for groups/legacy groups or user (but not for config messages)
if (
subRequest instanceof StoreGroupMessageSubRequest ||
subRequest instanceof StoreLegacyGroupMessageSubRequest ||
subRequest instanceof StoreUserMessageSubRequest
) {
if (isStoreUserInitiatedMessage(subRequest)) {
const storedAt = batchResult?.[index]?.body?.t;
const storedHash = batchResult?.[index]?.body?.hash;
const subRequestStatusCode = batchResult?.[index]?.code;

@ -143,6 +143,7 @@ async function handleSwarmMessageSentSuccess(
sent_to: sentTo,
sent: true,
sent_at: effectiveTimestamp,
errors: undefined,
});
DisappearingMessages.checkForExpiringOutgoingMessage(fetchedMessage, 'handleMessageSentSuccess');

@ -13,6 +13,7 @@ import {
import {
DeleteAllFromGroupMsgNodeSubRequest,
DeleteHashesFromGroupNodeSubRequest,
MAX_SUBREQUESTS_COUNT,
StoreGroupKeysSubRequest,
StoreGroupMessageSubRequest,
SubaccountRevokeSubRequest,
@ -38,6 +39,13 @@ import {
import { DURATION } from '../../../constants';
import { WithAllow401s } from '../../../types/with';
import type { WithTimeoutMs } from '../../../apis/snode_api/requestWith';
import { Data } from '../../../../data/data';
import { GroupUpdateInfoChangeMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateInfoChangeMessage';
import { NetworkTime } from '../../../../util/NetworkTime';
import { SignalService } from '../../../../protobuf';
import { getSodiumRenderer } from '../../../crypto';
import { DisappearingMessages } from '../../../disappearing_messages';
import { GroupUpdateMemberChangeMessage } from '../../../messages/outgoing/controlMessage/group_v2/to_group/GroupUpdateMemberChangeMessage';
const defaultMsBetweenRetries = 15000; // a long time between retries, to avoid running multiple jobs at the same time, when one was postponed at the same time as one already planned (5s)
const defaultMaxAttempts = 2;
@ -223,6 +231,111 @@ async function pushChangesToGroupSwarmIfNeeded({
return RunJobResult.Success;
}
async function allFailedToSentGroupControlMessagesToRetry(groupPk: GroupPubkeyType) {
try {
const sodium = await getSodiumRenderer();
const msgsToResend = await Data.fetchAllGroupUpdateFailedMessage(groupPk);
const firstChunk = msgsToResend.slice(0, Math.floor(MAX_SUBREQUESTS_COUNT / 2));
const convo = ConvoHub.use().get(groupPk);
if (!convo) {
throw new Error('allFailedToSentGroupControlMessagesToRetry: convo not found');
}
const group = await UserGroupsWrapperActions.getGroup(groupPk);
if (!group || !group.secretKey || isEmpty(group.secretKey)) {
throw new Error('allFailedToSentGroupControlMessagesToRetry: group secret key is not found');
}
const secretKey = group.secretKey;
const extraStoreRequests = await StoreGroupRequestFactory.makeGroupMessageSubRequest(
firstChunk.map(m => {
const groupUpdate = m.get('group_update');
const createAtNetworkTimestamp = m.get('sent_at') || NetworkTime.now();
const identifier = m.get('id');
if (!group.secretKey) {
return null;
}
const shared = {
groupPk,
identifier,
createAtNetworkTimestamp,
secretKey,
sodium,
...DisappearingMessages.getExpireDetailsForOutgoingMessage(
convo,
createAtNetworkTimestamp
),
};
if (groupUpdate?.avatarChange) {
return new GroupUpdateInfoChangeMessage({
typeOfChange: SignalService.GroupUpdateInfoChangeMessage.Type.AVATAR,
...shared,
});
}
if (groupUpdate?.name) {
return new GroupUpdateInfoChangeMessage({
typeOfChange: SignalService.GroupUpdateInfoChangeMessage.Type.NAME,
updatedName: groupUpdate.name || '',
...shared,
});
}
if (groupUpdate?.joined?.length) {
return new GroupUpdateMemberChangeMessage({
typeOfChange: 'added',
added: groupUpdate.joined,
...shared,
});
}
if (groupUpdate?.joinedWithHistory?.length) {
return new GroupUpdateMemberChangeMessage({
typeOfChange: 'addedWithHistory',
added: groupUpdate.joinedWithHistory,
...shared,
});
}
if (groupUpdate?.kicked?.length) {
return new GroupUpdateMemberChangeMessage({
typeOfChange: 'removed',
removed: groupUpdate.kicked,
...shared,
});
}
if (groupUpdate?.promoted?.length) {
return new GroupUpdateMemberChangeMessage({
typeOfChange: 'promoted',
promoted: groupUpdate.promoted,
...shared,
});
}
const expirationTimerUpdate = m.get('expirationTimerUpdate');
if (expirationTimerUpdate) {
return new GroupUpdateInfoChangeMessage({
typeOfChange: SignalService.GroupUpdateInfoChangeMessage.Type.DISAPPEARING_MESSAGES,
...shared,
updatedExpirationSeconds: expirationTimerUpdate.expireTimer,
expirationType: expirationTimerUpdate.expirationType || 'unknown',
});
}
window.log.warn(
`allFailedToSentGroupControlMessagesToRetry unhandled result for ms ${shared.identifier}`
);
return null;
}),
group
);
if (!extraStoreRequests.length) {
return;
}
await pushChangesToGroupSwarmIfNeeded({
groupPk,
extraStoreRequests,
allow401s: false,
});
} catch (e) {
window.log.warn('failed');
}
}
class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
constructor({
identifier, // this has to be the group's pubkey
@ -263,6 +376,8 @@ class GroupSyncJob extends PersistedJob<GroupSyncPersistedData> {
return RunJobResult.PermanentFailure;
}
await allFailedToSentGroupControlMessagesToRetry(thisJobDestination);
// return await so we catch exceptions in here
return await GroupSync.pushChangesToGroupSwarmIfNeeded({
groupPk: thisJobDestination,

Loading…
Cancel
Save