Add synced expiries job

pull/1014/head
charles 2 years ago
parent 8a51c8882c
commit 065417ebbb

@ -201,6 +201,11 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper)
return messageDB.getMessageServerHash(messageID) return messageDB.getMessageServerHash(messageID)
} }
override fun getServerHashForMessages(messageIDs: List<Long>): List<Pair<Long, String?>> {
val messageDB = DatabaseComponent.get(context).lokiMessageDatabase()
return messageDB.getMessageServerHashes(messageIDs)
}
override fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? { override fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? {
val attachmentDatabase = DatabaseComponent.get(context).attachmentDatabase() val attachmentDatabase = DatabaseComponent.get(context).attachmentDatabase()
return attachmentDatabase.getAttachment(AttachmentId(attachmentId, 0)) return attachmentDatabase.getAttachment(AttachmentId(attachmentId, 0))

@ -165,6 +165,13 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab
} }
} }
fun getMessageServerHashes(messageIDs: List<Long>): List<Pair<Long, String?>> {
val database = databaseHelper.readableDatabase
return database.getAll(messageHashTable, "${Companion.messageID} IN (?)", arrayOf(messageIDs.joinToString(","))) { cursor ->
cursor.getLong(messageID) to cursor.getStringOrNull(serverHash)
}
}
fun setMessageServerHash(messageID: Long, serverHash: String) { fun setMessageServerHash(messageID: Long, serverHash: String) {
val database = databaseHelper.writableDatabase val database = databaseHelper.writableDatabase
val contentValues = ContentValues(2) val contentValues = ContentValues(2)

@ -288,6 +288,12 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa
return readerFor(rawQuery(where, null))!! return readerFor(rawQuery(where, null))!!
} }
val expireNotStartedMessages: Reader
get() {
val where = "$EXPIRES_IN > 0 AND $EXPIRE_STARTED = 0"
return readerFor(rawQuery(where, null))!!
}
private fun updateMailboxBitmask( private fun updateMailboxBitmask(
id: Long, id: Long,
maskOff: Long, maskOff: Long,

@ -50,6 +50,7 @@ import org.thoughtcrime.securesms.database.model.ReactionRecord;
import org.thoughtcrime.securesms.database.model.SmsMessageRecord; import org.thoughtcrime.securesms.database.model.SmsMessageRecord;
import org.thoughtcrime.securesms.dependencies.DatabaseComponent; import org.thoughtcrime.securesms.dependencies.DatabaseComponent;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.Collections; import java.util.Collections;
@ -567,6 +568,11 @@ public class SmsDatabase extends MessagingDatabase {
return rawQuery(where, null); return rawQuery(where, null);
} }
public Cursor getExpirationNotStartedMessages() {
String where = EXPIRES_IN + " > 0 AND " + EXPIRE_STARTED + " = 0";
return rawQuery(where, null);
}
public SmsMessageRecord getMessage(long messageId) throws NoSuchMessageException { public SmsMessageRecord getMessage(long messageId) throws NoSuchMessageException {
Cursor cursor = rawQuery(ID_WHERE, new String[]{messageId + ""}); Cursor cursor = rawQuery(ID_WHERE, new String[]{messageId + ""});
Reader reader = new Reader(cursor); Reader reader = new Reader(cursor);
@ -740,7 +746,7 @@ public class SmsDatabase extends MessagingDatabase {
} }
} }
public class Reader { public class Reader implements Closeable {
private final Cursor cursor; private final Cursor cursor;
@ -805,8 +811,11 @@ public class SmsDatabase extends MessagingDatabase {
return new LinkedList<>(); return new LinkedList<>();
} }
@Override
public void close() { public void close() {
cursor.close(); if (cursor != null) {
cursor.close();
}
} }
} }

@ -983,8 +983,25 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseComponent.get(context).expirationConfigurationDatabase().setExpirationConfiguration(config) DatabaseComponent.get(context).expirationConfigurationDatabase().setExpirationConfiguration(config)
} }
override fun getExpiringMessages(messageIds: LongArray): List<Pair<String, Int>> { override fun getExpiringMessages(messageIds: List<Long>): List<Pair<Long, Long>> {
return emptyList() val expiringMessages = mutableListOf<Pair<Long, Long>>()
val smsDb = DatabaseComponent.get(context).smsDatabase()
smsDb.readerFor(smsDb.expirationNotStartedMessages).use { reader ->
while (reader.next != null) {
if (reader.current.id in messageIds) {
expiringMessages.add(reader.current.id to reader.current.expiresIn)
}
}
}
val mmsDb = DatabaseComponent.get(context).mmsDatabase()
mmsDb.expireNotStartedMessages.use { reader ->
while (reader.next != null) {
if (reader.current.id in messageIds) {
expiringMessages.add(reader.current.id to reader.current.expiresIn)
}
}
}
return expiringMessages
} }
override fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState) { override fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState) {

@ -23,6 +23,7 @@ interface MessageDataProvider {
fun deleteMessage(messageID: Long, isSms: Boolean) fun deleteMessage(messageID: Long, isSms: Boolean)
fun updateMessageAsDeleted(timestamp: Long, author: String) fun updateMessageAsDeleted(timestamp: Long, author: String)
fun getServerHashForMessage(messageID: Long): String? fun getServerHashForMessage(messageID: Long): String?
fun getServerHashForMessages(messageIDs: List<Long>): List<Pair<Long, String?>>
fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment?
fun getAttachmentStream(attachmentId: Long): SessionServiceAttachmentStream? fun getAttachmentStream(attachmentId: Long): SessionServiceAttachmentStream?
fun getAttachmentPointer(attachmentId: Long): SessionServiceAttachmentPointer? fun getAttachmentPointer(attachmentId: Long): SessionServiceAttachmentPointer?

@ -201,6 +201,6 @@ interface StorageProtocol {
fun blockedContacts(): List<Recipient> fun blockedContacts(): List<Recipient>
fun getExpirationConfiguration(threadId: Long): ExpirationConfiguration? fun getExpirationConfiguration(threadId: Long): ExpirationConfiguration?
fun setExpirationConfiguration(config: ExpirationConfiguration) fun setExpirationConfiguration(config: ExpirationConfiguration)
fun getExpiringMessages(messageIds: LongArray): List<Pair<String, Int>> fun getExpiringMessages(messageIds: List<Long>): List<Pair<Long, Long>>
fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState) fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState)
} }

@ -2,14 +2,13 @@ package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.ExpirationConfiguration import org.session.libsession.messaging.messages.ExpirationConfiguration
import org.session.libsession.messaging.messages.control.SyncedExpiriesMessage
import org.session.libsession.messaging.messages.control.SyncedExpiry
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.Address
class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val startedAtMs: Long = 0): Job { class DisappearingMessagesJob(
val messageIds: List<Long> = listOf(),
val startedAtMs: Long = 0,
val threadId: Long = 0
) : Job {
override var delegate: JobDelegate? = null override var delegate: JobDelegate? = null
override var id: String? = null override var id: String? = null
@ -18,20 +17,10 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta
override fun execute() { override fun execute() {
if (!ExpirationConfiguration.isNewConfigEnabled) return if (!ExpirationConfiguration.isNewConfigEnabled) return
val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: return
val module = MessagingModuleConfiguration.shared
try { try {
module.storage.getExpiringMessages(messageIds).groupBy { it.second }.forEach { (expiresInSeconds, messages) -> val ids = MessagingModuleConfiguration.shared.storage.getExpiringMessages(messageIds).map { it.first }
val serverHashes = messages.map { it.first } if (ids.isNotEmpty()) {
if (serverHashes.isEmpty()) return JobQueue.shared.add(SyncedExpiriesJob(ids, startedAtMs, threadId))
val expirationTimestamp = startedAtMs + expiresInSeconds * 1000
val syncTarget = ""
val syncedExpiriesMessage = SyncedExpiriesMessage()
syncedExpiriesMessage.conversationExpiries = mapOf(
syncTarget to serverHashes.map { serverHash -> SyncedExpiry(serverHash, expirationTimestamp) }
)
MessageSender.send(syncedExpiriesMessage, Address.fromSerialized(userPublicKey))
SnodeAPI.updateExpiry(expirationTimestamp, serverHashes)
} }
} catch (e: Exception) { } catch (e: Exception) {
delegate?.handleJobFailed(this, e) delegate?.handleJobFailed(this, e)
@ -41,8 +30,9 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta
} }
override fun serialize(): Data = Data.Builder() override fun serialize(): Data = Data.Builder()
.putLongArray(MESSAGE_IDS, messageIds) .putLongArray(MESSAGE_IDS, messageIds.toLongArray())
.putLong(STARTED_AT_MS, startedAtMs) .putLong(STARTED_AT_MS, startedAtMs)
.putLong(THREAD_ID, threadId)
.build() .build()
override fun getFactoryKey(): String = KEY override fun getFactoryKey(): String = KEY
@ -50,8 +40,9 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta
class Factory : Job.Factory<DisappearingMessagesJob> { class Factory : Job.Factory<DisappearingMessagesJob> {
override fun create(data: Data): DisappearingMessagesJob { override fun create(data: Data): DisappearingMessagesJob {
return DisappearingMessagesJob( return DisappearingMessagesJob(
data.getLongArray(MESSAGE_IDS), data.getLongArray(MESSAGE_IDS).toList(),
data.getLong(STARTED_AT_MS) data.getLong(STARTED_AT_MS),
data.getLong(THREAD_ID)
) )
} }
} }
@ -61,6 +52,7 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta
private const val MESSAGE_IDS = "messageIds" private const val MESSAGE_IDS = "messageIds"
private const val STARTED_AT_MS = "startedAtMs" private const val STARTED_AT_MS = "startedAtMs"
private const val THREAD_ID = "threadId"
} }
} }

@ -115,7 +115,8 @@ class JobQueue : JobDelegate {
while (isActive) { while (isActive) {
when (val job = queue.receive()) { when (val job = queue.receive()) {
is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is DisappearingMessagesJob -> { is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is DisappearingMessagesJob,
is SyncedExpiriesJob -> {
txQueue.send(job) txQueue.send(job)
} }
is AttachmentDownloadJob -> { is AttachmentDownloadJob -> {

@ -0,0 +1,84 @@
package org.session.libsession.messaging.jobs
import org.session.libsession.messaging.MessagingModuleConfiguration
import org.session.libsession.messaging.messages.ExpirationConfiguration
import org.session.libsession.messaging.messages.control.SyncedExpiriesMessage
import org.session.libsession.messaging.messages.control.SyncedExpiry
import org.session.libsession.messaging.sending_receiving.MessageSender
import org.session.libsession.messaging.utilities.Data
import org.session.libsession.snode.SnodeAPI
import org.session.libsession.utilities.Address
class SyncedExpiriesJob(
val messageIds: List<Long> = listOf(),
val startedAtMs: Long = 0,
val threadId: Long = 0
) : Job {
override var delegate: JobDelegate? = null
override var id: String? = null
override var failureCount: Int = 0
override val maxFailureCount: Int = 1
override fun execute() {
if (!ExpirationConfiguration.isNewConfigEnabled) return
val module = MessagingModuleConfiguration.shared
val userPublicKey = module.storage.getUserPublicKey() ?: return
try {
val messageIdsWithNoServerHashByExpiresIn = mutableMapOf<Long, List<Long>>()
module.storage.getExpiringMessages(messageIds).groupBy { it.second }.forEach { (expiresInSeconds, messageIds) ->
val serverHashesByMessageIds = module.messageDataProvider.getServerHashForMessages(messageIds.map { it.first })
val messageIdsWithNoHash = serverHashesByMessageIds.filter { it.second == null }.map { it.first }
if (messageIdsWithNoHash.isNotEmpty()) {
messageIdsWithNoServerHashByExpiresIn[expiresInSeconds] = messageIdsWithNoHash
}
val serverHashes = serverHashesByMessageIds.mapNotNull { it.second }
if (serverHashes.isEmpty()) return
val expirationTimestamp = startedAtMs + expiresInSeconds * 1000
val syncTarget = ""
val syncedExpiriesMessage = SyncedExpiriesMessage()
syncedExpiriesMessage.conversationExpiries = mapOf(
syncTarget to serverHashes.map { serverHash -> SyncedExpiry(serverHash, expirationTimestamp) }
)
MessageSender.send(syncedExpiriesMessage, Address.fromSerialized(userPublicKey))
SnodeAPI.updateExpiry(expirationTimestamp, serverHashes)
}
if (messageIdsWithNoServerHashByExpiresIn.isNotEmpty()) {
JobQueue.shared.add(
SyncedExpiriesJob(messageIdsWithNoServerHashByExpiresIn.flatMap { it.value }, startedAtMs, threadId)
)
}
} catch (e: Exception) {
delegate?.handleJobFailed(this, e)
return
}
delegate?.handleJobSucceeded(this)
}
override fun serialize(): Data = Data.Builder()
.putLongArray(MESSAGE_IDS, messageIds.toLongArray())
.putLong(STARTED_AT_MS, startedAtMs)
.putLong(THREAD_ID, threadId)
.build()
override fun getFactoryKey(): String = KEY
class Factory : Job.Factory<SyncedExpiriesJob> {
override fun create(data: Data): SyncedExpiriesJob {
return SyncedExpiriesJob(
data.getLongArray(MESSAGE_IDS).toList(),
data.getLong(STARTED_AT_MS),
data.getLong(THREAD_ID)
)
}
}
companion object {
const val KEY = "DisappearingMessagesJob"
private const val MESSAGE_IDS = "messageIds"
private const val STARTED_AT_MS = "startedAtMs"
private const val THREAD_ID = "threadId"
}
}

@ -12,6 +12,6 @@ class ExpirationConfiguration(
val expirationType: ExpirationType? = ExpirationType.valueOf(expirationTypeValue) val expirationType: ExpirationType? = ExpirationType.valueOf(expirationTypeValue)
companion object { companion object {
val isNewConfigEnabled = true//System.currentTimeMillis() > 1_674_000_000_000 // 18/01/2023 val isNewConfigEnabled = true /* TODO: System.currentTimeMillis() > 1_674_000_000_000 // 18/01/2023 */
} }
} }
Loading…
Cancel
Save