You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
3.8 KiB
TypeScript
125 lines
3.8 KiB
TypeScript
|
2 years ago
|
/* eslint-disable no-await-in-loop */
|
||
|
|
import { compact, isEmpty, isNumber, uniq } from 'lodash';
|
||
|
|
import { v4 } from 'uuid';
|
||
|
|
import { Data } from '../../../../data/data';
|
||
|
|
import { MessageModel } from '../../../../models/message';
|
||
|
|
import { isSignInByLinking } from '../../../../util/storage';
|
||
|
|
import { getExpiriesFromSnode } from '../../../apis/snode_api/getExpiriesRequest';
|
||
|
|
import { DisappearingMessages } from '../../../disappearing_messages';
|
||
|
|
import { runners } from '../JobRunner';
|
||
|
|
import {
|
||
|
|
AddJobCheckReturn,
|
||
|
|
FetchMsgExpirySwarmPersistedData,
|
||
|
|
PersistedJob,
|
||
|
|
RunJobResult,
|
||
|
|
} from '../PersistedJob';
|
||
|
|
|
||
|
|
class FetchMsgExpirySwarmJob extends PersistedJob<FetchMsgExpirySwarmPersistedData> {
|
||
|
|
constructor({
|
||
|
|
identifier,
|
||
|
|
nextAttemptTimestamp,
|
||
|
|
maxAttempts,
|
||
|
|
currentRetry,
|
||
|
|
msgIds,
|
||
|
|
}: Partial<
|
||
|
|
Pick<
|
||
|
|
FetchMsgExpirySwarmPersistedData,
|
||
|
|
'identifier' | 'nextAttemptTimestamp' | 'currentRetry' | 'maxAttempts'
|
||
|
|
>
|
||
|
|
> &
|
||
|
|
Pick<FetchMsgExpirySwarmPersistedData, 'msgIds'>) {
|
||
|
|
super({
|
||
|
|
jobType: 'FetchMsgExpirySwarmJobType',
|
||
|
|
identifier: identifier || v4(),
|
||
|
|
delayBetweenRetries: 2000,
|
||
|
|
maxAttempts: isNumber(maxAttempts) ? maxAttempts : 2,
|
||
|
|
currentRetry: isNumber(currentRetry) ? currentRetry : 0,
|
||
|
|
nextAttemptTimestamp: nextAttemptTimestamp || Date.now(),
|
||
|
|
msgIds: uniq(msgIds),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
public async run(): Promise<RunJobResult> {
|
||
|
|
const start = Date.now();
|
||
|
|
|
||
|
|
try {
|
||
|
|
if (!this.persistedData.msgIds || isEmpty(this.persistedData.msgIds)) {
|
||
|
|
return RunJobResult.Success;
|
||
|
|
}
|
||
|
|
let msgModels = await Data.getMessagesById(this.persistedData.msgIds);
|
||
|
|
const messageHashes = compact(msgModels.map(m => m.getMessageHash()));
|
||
|
|
|
||
|
|
if (isEmpty(msgModels) || isEmpty(messageHashes)) {
|
||
|
|
return RunJobResult.Success;
|
||
|
|
}
|
||
|
|
|
||
|
|
const fetchedExpiries = await getExpiriesFromSnode({
|
||
|
|
messageHashes,
|
||
|
|
});
|
||
|
|
const updatedMsgModels: Array<MessageModel> = [];
|
||
|
|
|
||
|
|
if (fetchedExpiries.length) {
|
||
|
|
// get a fresh list of attributes for those message models
|
||
|
|
msgModels = await Data.getMessagesById(this.persistedData.msgIds);
|
||
|
|
for (let index = 0; index < fetchedExpiries.length; index++) {
|
||
|
|
const expiry = fetchedExpiries[index];
|
||
|
|
if (expiry.fetchedExpiry <= 0) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
const found = msgModels.find(m => m.getMessageHash() === expiry.messageHash);
|
||
|
|
if (!found) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
if (found.get('expires_at') !== expiry.fetchedExpiry) {
|
||
|
|
found.set('expires_at', expiry.fetchedExpiry);
|
||
|
|
updatedMsgModels.push(found);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
await Promise.all(updatedMsgModels.map(m => m.commit()));
|
||
|
|
await DisappearingMessages.destroyExpiredMessages();
|
||
|
|
|
||
|
|
return RunJobResult.Success;
|
||
|
|
} finally {
|
||
|
|
window.log.debug(`FetchMsgExpirySwarmJob run() took ${Date.now() - start}ms`);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
public serializeJob(): FetchMsgExpirySwarmPersistedData {
|
||
|
|
const fromParent = super.serializeBase();
|
||
|
|
return fromParent;
|
||
|
|
}
|
||
|
|
|
||
|
|
public addJobCheck(jobs: Array<FetchMsgExpirySwarmPersistedData>): AddJobCheckReturn {
|
||
|
|
return this.addJobCheckEveryMsgIdsAlreadyPresent(jobs);
|
||
|
|
}
|
||
|
|
|
||
|
|
public nonRunningJobsToRemove(_jobs: Array<FetchMsgExpirySwarmPersistedData>) {
|
||
|
|
return [];
|
||
|
|
}
|
||
|
|
|
||
|
|
public getJobTimeoutMs(): number {
|
||
|
|
return 20000;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
async function queueNewJobIfNeeded(msgIds: Array<string>) {
|
||
|
|
if (isSignInByLinking()) {
|
||
|
|
window.log.info('NOT Scheduling FetchMsgExpirySwarmJob: as we are linking a device');
|
||
|
|
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
if (isEmpty(msgIds)) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
await runners.fetchSwarmMsgExpiryRunner.addJob(
|
||
|
|
new FetchMsgExpirySwarmJob({ nextAttemptTimestamp: Date.now() + 1000, msgIds })
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
export const FetchMsgExpirySwarm = {
|
||
|
|
FetchMsgExpirySwarmJob,
|
||
|
|
queueNewJobIfNeeded,
|
||
|
|
};
|