From 065417ebbb6b185fd97d0c571915fb2a2410eb47 Mon Sep 17 00:00:00 2001 From: charles Date: Mon, 19 Dec 2022 11:22:13 +1100 Subject: [PATCH] Add synced expiries job --- .../attachments/DatabaseAttachmentProvider.kt | 5 ++ .../securesms/database/LokiMessageDatabase.kt | 7 ++ .../securesms/database/MmsDatabase.kt | 6 ++ .../securesms/database/SmsDatabase.java | 13 ++- .../securesms/database/Storage.kt | 21 ++++- .../database/MessageDataProvider.kt | 1 + .../libsession/database/StorageProtocol.kt | 2 +- .../messaging/jobs/DisappearingMessagesJob.kt | 36 ++++---- .../libsession/messaging/jobs/JobQueue.kt | 3 +- .../messaging/jobs/SyncedExpiriesJob.kt | 84 +++++++++++++++++++ .../messages/ExpirationConfiguration.kt | 2 +- 11 files changed, 151 insertions(+), 29 deletions(-) create mode 100644 libsession/src/main/java/org/session/libsession/messaging/jobs/SyncedExpiriesJob.kt diff --git a/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachmentProvider.kt b/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachmentProvider.kt index fa0fce7bd3..3282a328db 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachmentProvider.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/attachments/DatabaseAttachmentProvider.kt @@ -201,6 +201,11 @@ class DatabaseAttachmentProvider(context: Context, helper: SQLCipherOpenHelper) return messageDB.getMessageServerHash(messageID) } + override fun getServerHashForMessages(messageIDs: List): List> { + val messageDB = DatabaseComponent.get(context).lokiMessageDatabase() + return messageDB.getMessageServerHashes(messageIDs) + } + override fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? { val attachmentDatabase = DatabaseComponent.get(context).attachmentDatabase() return attachmentDatabase.getAttachment(AttachmentId(attachmentId, 0)) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/LokiMessageDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/LokiMessageDatabase.kt index 3cfdd13017..553809e9fe 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/LokiMessageDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/LokiMessageDatabase.kt @@ -165,6 +165,13 @@ class LokiMessageDatabase(context: Context, helper: SQLCipherOpenHelper) : Datab } } + fun getMessageServerHashes(messageIDs: List): List> { + val database = databaseHelper.readableDatabase + return database.getAll(messageHashTable, "${Companion.messageID} IN (?)", arrayOf(messageIDs.joinToString(","))) { cursor -> + cursor.getLong(messageID) to cursor.getStringOrNull(serverHash) + } + } + fun setMessageServerHash(messageID: Long, serverHash: String) { val database = databaseHelper.writableDatabase val contentValues = ContentValues(2) diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt index d82c6bb278..ef4e43f973 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/MmsDatabase.kt @@ -288,6 +288,12 @@ class MmsDatabase(context: Context, databaseHelper: SQLCipherOpenHelper) : Messa return readerFor(rawQuery(where, null))!! } + val expireNotStartedMessages: Reader + get() { + val where = "$EXPIRES_IN > 0 AND $EXPIRE_STARTED = 0" + return readerFor(rawQuery(where, null))!! + } + private fun updateMailboxBitmask( id: Long, maskOff: Long, diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java index 67243f73b6..9bf2658bd7 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java +++ b/app/src/main/java/org/thoughtcrime/securesms/database/SmsDatabase.java @@ -50,6 +50,7 @@ import org.thoughtcrime.securesms.database.model.ReactionRecord; import org.thoughtcrime.securesms.database.model.SmsMessageRecord; import org.thoughtcrime.securesms.dependencies.DatabaseComponent; +import java.io.Closeable; import java.io.IOException; import java.security.SecureRandom; import java.util.Collections; @@ -567,6 +568,11 @@ public class SmsDatabase extends MessagingDatabase { return rawQuery(where, null); } + public Cursor getExpirationNotStartedMessages() { + String where = EXPIRES_IN + " > 0 AND " + EXPIRE_STARTED + " = 0"; + return rawQuery(where, null); + } + public SmsMessageRecord getMessage(long messageId) throws NoSuchMessageException { Cursor cursor = rawQuery(ID_WHERE, new String[]{messageId + ""}); Reader reader = new Reader(cursor); @@ -740,7 +746,7 @@ public class SmsDatabase extends MessagingDatabase { } } - public class Reader { + public class Reader implements Closeable { private final Cursor cursor; @@ -805,8 +811,11 @@ public class SmsDatabase extends MessagingDatabase { return new LinkedList<>(); } + @Override public void close() { - cursor.close(); + if (cursor != null) { + cursor.close(); + } } } diff --git a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt index 71825feb89..bb7ab440d4 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/database/Storage.kt @@ -983,8 +983,25 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, DatabaseComponent.get(context).expirationConfigurationDatabase().setExpirationConfiguration(config) } - override fun getExpiringMessages(messageIds: LongArray): List> { - return emptyList() + override fun getExpiringMessages(messageIds: List): List> { + val expiringMessages = mutableListOf>() + val smsDb = DatabaseComponent.get(context).smsDatabase() + smsDb.readerFor(smsDb.expirationNotStartedMessages).use { reader -> + while (reader.next != null) { + if (reader.current.id in messageIds) { + expiringMessages.add(reader.current.id to reader.current.expiresIn) + } + } + } + val mmsDb = DatabaseComponent.get(context).mmsDatabase() + mmsDb.expireNotStartedMessages.use { reader -> + while (reader.next != null) { + if (reader.current.id in messageIds) { + expiringMessages.add(reader.current.id to reader.current.expiresIn) + } + } + } + return expiringMessages } override fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState) { diff --git a/libsession/src/main/java/org/session/libsession/database/MessageDataProvider.kt b/libsession/src/main/java/org/session/libsession/database/MessageDataProvider.kt index eb40df6e09..f5882a1e2b 100644 --- a/libsession/src/main/java/org/session/libsession/database/MessageDataProvider.kt +++ b/libsession/src/main/java/org/session/libsession/database/MessageDataProvider.kt @@ -23,6 +23,7 @@ interface MessageDataProvider { fun deleteMessage(messageID: Long, isSms: Boolean) fun updateMessageAsDeleted(timestamp: Long, author: String) fun getServerHashForMessage(messageID: Long): String? + fun getServerHashForMessages(messageIDs: List): List> fun getDatabaseAttachment(attachmentId: Long): DatabaseAttachment? fun getAttachmentStream(attachmentId: Long): SessionServiceAttachmentStream? fun getAttachmentPointer(attachmentId: Long): SessionServiceAttachmentPointer? diff --git a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt index c50e20b472..99b384fedd 100644 --- a/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt +++ b/libsession/src/main/java/org/session/libsession/database/StorageProtocol.kt @@ -201,6 +201,6 @@ interface StorageProtocol { fun blockedContacts(): List fun getExpirationConfiguration(threadId: Long): ExpirationConfiguration? fun setExpirationConfiguration(config: ExpirationConfiguration) - fun getExpiringMessages(messageIds: LongArray): List> + fun getExpiringMessages(messageIds: List): List> fun updateDisappearingState(address: String, disappearingState: Recipient.DisappearingState) } diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/DisappearingMessagesJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/DisappearingMessagesJob.kt index 420fcda0ab..bbfb59add9 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/DisappearingMessagesJob.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/DisappearingMessagesJob.kt @@ -2,14 +2,13 @@ package org.session.libsession.messaging.jobs import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.messages.ExpirationConfiguration -import org.session.libsession.messaging.messages.control.SyncedExpiriesMessage -import org.session.libsession.messaging.messages.control.SyncedExpiry -import org.session.libsession.messaging.sending_receiving.MessageSender import org.session.libsession.messaging.utilities.Data -import org.session.libsession.snode.SnodeAPI -import org.session.libsession.utilities.Address -class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val startedAtMs: Long = 0): Job { +class DisappearingMessagesJob( + val messageIds: List = listOf(), + val startedAtMs: Long = 0, + val threadId: Long = 0 +) : Job { override var delegate: JobDelegate? = null override var id: String? = null @@ -18,20 +17,10 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta override fun execute() { if (!ExpirationConfiguration.isNewConfigEnabled) return - val userPublicKey = MessagingModuleConfiguration.shared.storage.getUserPublicKey() ?: return - val module = MessagingModuleConfiguration.shared try { - module.storage.getExpiringMessages(messageIds).groupBy { it.second }.forEach { (expiresInSeconds, messages) -> - val serverHashes = messages.map { it.first } - if (serverHashes.isEmpty()) return - val expirationTimestamp = startedAtMs + expiresInSeconds * 1000 - val syncTarget = "" - val syncedExpiriesMessage = SyncedExpiriesMessage() - syncedExpiriesMessage.conversationExpiries = mapOf( - syncTarget to serverHashes.map { serverHash -> SyncedExpiry(serverHash, expirationTimestamp) } - ) - MessageSender.send(syncedExpiriesMessage, Address.fromSerialized(userPublicKey)) - SnodeAPI.updateExpiry(expirationTimestamp, serverHashes) + val ids = MessagingModuleConfiguration.shared.storage.getExpiringMessages(messageIds).map { it.first } + if (ids.isNotEmpty()) { + JobQueue.shared.add(SyncedExpiriesJob(ids, startedAtMs, threadId)) } } catch (e: Exception) { delegate?.handleJobFailed(this, e) @@ -41,8 +30,9 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta } override fun serialize(): Data = Data.Builder() - .putLongArray(MESSAGE_IDS, messageIds) + .putLongArray(MESSAGE_IDS, messageIds.toLongArray()) .putLong(STARTED_AT_MS, startedAtMs) + .putLong(THREAD_ID, threadId) .build() override fun getFactoryKey(): String = KEY @@ -50,8 +40,9 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta class Factory : Job.Factory { override fun create(data: Data): DisappearingMessagesJob { return DisappearingMessagesJob( - data.getLongArray(MESSAGE_IDS), - data.getLong(STARTED_AT_MS) + data.getLongArray(MESSAGE_IDS).toList(), + data.getLong(STARTED_AT_MS), + data.getLong(THREAD_ID) ) } } @@ -61,6 +52,7 @@ class DisappearingMessagesJob(val messageIds: LongArray = longArrayOf(), val sta private const val MESSAGE_IDS = "messageIds" private const val STARTED_AT_MS = "startedAtMs" + private const val THREAD_ID = "threadId" } } \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt index c47621682e..d7962ab24c 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/JobQueue.kt @@ -115,7 +115,8 @@ class JobQueue : JobDelegate { while (isActive) { when (val job = queue.receive()) { - is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is DisappearingMessagesJob -> { + is NotifyPNServerJob, is AttachmentUploadJob, is MessageSendJob, is DisappearingMessagesJob, + is SyncedExpiriesJob -> { txQueue.send(job) } is AttachmentDownloadJob -> { diff --git a/libsession/src/main/java/org/session/libsession/messaging/jobs/SyncedExpiriesJob.kt b/libsession/src/main/java/org/session/libsession/messaging/jobs/SyncedExpiriesJob.kt new file mode 100644 index 0000000000..24f3f420b1 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/jobs/SyncedExpiriesJob.kt @@ -0,0 +1,84 @@ +package org.session.libsession.messaging.jobs + +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.messages.ExpirationConfiguration +import org.session.libsession.messaging.messages.control.SyncedExpiriesMessage +import org.session.libsession.messaging.messages.control.SyncedExpiry +import org.session.libsession.messaging.sending_receiving.MessageSender +import org.session.libsession.messaging.utilities.Data +import org.session.libsession.snode.SnodeAPI +import org.session.libsession.utilities.Address + +class SyncedExpiriesJob( + val messageIds: List = listOf(), + val startedAtMs: Long = 0, + val threadId: Long = 0 +) : Job { + + override var delegate: JobDelegate? = null + override var id: String? = null + override var failureCount: Int = 0 + override val maxFailureCount: Int = 1 + + override fun execute() { + if (!ExpirationConfiguration.isNewConfigEnabled) return + val module = MessagingModuleConfiguration.shared + val userPublicKey = module.storage.getUserPublicKey() ?: return + try { + val messageIdsWithNoServerHashByExpiresIn = mutableMapOf>() + module.storage.getExpiringMessages(messageIds).groupBy { it.second }.forEach { (expiresInSeconds, messageIds) -> + val serverHashesByMessageIds = module.messageDataProvider.getServerHashForMessages(messageIds.map { it.first }) + val messageIdsWithNoHash = serverHashesByMessageIds.filter { it.second == null }.map { it.first } + if (messageIdsWithNoHash.isNotEmpty()) { + messageIdsWithNoServerHashByExpiresIn[expiresInSeconds] = messageIdsWithNoHash + } + val serverHashes = serverHashesByMessageIds.mapNotNull { it.second } + if (serverHashes.isEmpty()) return + val expirationTimestamp = startedAtMs + expiresInSeconds * 1000 + val syncTarget = "" + val syncedExpiriesMessage = SyncedExpiriesMessage() + syncedExpiriesMessage.conversationExpiries = mapOf( + syncTarget to serverHashes.map { serverHash -> SyncedExpiry(serverHash, expirationTimestamp) } + ) + MessageSender.send(syncedExpiriesMessage, Address.fromSerialized(userPublicKey)) + SnodeAPI.updateExpiry(expirationTimestamp, serverHashes) + } + if (messageIdsWithNoServerHashByExpiresIn.isNotEmpty()) { + JobQueue.shared.add( + SyncedExpiriesJob(messageIdsWithNoServerHashByExpiresIn.flatMap { it.value }, startedAtMs, threadId) + ) + } + } catch (e: Exception) { + delegate?.handleJobFailed(this, e) + return + } + delegate?.handleJobSucceeded(this) + } + + override fun serialize(): Data = Data.Builder() + .putLongArray(MESSAGE_IDS, messageIds.toLongArray()) + .putLong(STARTED_AT_MS, startedAtMs) + .putLong(THREAD_ID, threadId) + .build() + + override fun getFactoryKey(): String = KEY + + class Factory : Job.Factory { + override fun create(data: Data): SyncedExpiriesJob { + return SyncedExpiriesJob( + data.getLongArray(MESSAGE_IDS).toList(), + data.getLong(STARTED_AT_MS), + data.getLong(THREAD_ID) + ) + } + } + + companion object { + const val KEY = "DisappearingMessagesJob" + + private const val MESSAGE_IDS = "messageIds" + private const val STARTED_AT_MS = "startedAtMs" + private const val THREAD_ID = "threadId" + } + +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/messages/ExpirationConfiguration.kt b/libsession/src/main/java/org/session/libsession/messaging/messages/ExpirationConfiguration.kt index 37c13776e5..bc3cd86766 100644 --- a/libsession/src/main/java/org/session/libsession/messaging/messages/ExpirationConfiguration.kt +++ b/libsession/src/main/java/org/session/libsession/messaging/messages/ExpirationConfiguration.kt @@ -12,6 +12,6 @@ class ExpirationConfiguration( val expirationType: ExpirationType? = ExpirationType.valueOf(expirationTypeValue) companion object { - val isNewConfigEnabled = true//System.currentTimeMillis() > 1_674_000_000_000 // 18/01/2023 + val isNewConfigEnabled = true /* TODO: System.currentTimeMillis() > 1_674_000_000_000 // 18/01/2023 */ } } \ No newline at end of file