feat: use new closed and open group pollers

pull/486/head
jubb 3 years ago
parent 2460afd1a8
commit 8ee58459dd

@ -34,6 +34,7 @@ import org.session.libsession.messaging.MessagingConfiguration;
import org.session.libsession.messaging.avatars.AvatarHelper; import org.session.libsession.messaging.avatars.AvatarHelper;
import org.session.libsession.messaging.jobs.JobQueue; import org.session.libsession.messaging.jobs.JobQueue;
import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier; import org.session.libsession.messaging.sending_receiving.notifications.MessageNotifier;
import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller;
import org.session.libsession.messaging.sending_receiving.pollers.Poller; import org.session.libsession.messaging.sending_receiving.pollers.Poller;
import org.session.libsession.messaging.threads.Address; import org.session.libsession.messaging.threads.Address;
import org.session.libsession.snode.SnodeConfiguration; import org.session.libsession.snode.SnodeConfiguration;
@ -69,7 +70,6 @@ import org.thoughtcrime.securesms.logging.PersistentLogger;
import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger; import org.thoughtcrime.securesms.logging.UncaughtExceptionLogger;
import org.thoughtcrime.securesms.loki.activities.HomeActivity; import org.thoughtcrime.securesms.loki.activities.HomeActivity;
import org.thoughtcrime.securesms.loki.api.BackgroundPollWorker; import org.thoughtcrime.securesms.loki.api.BackgroundPollWorker;
import org.thoughtcrime.securesms.loki.api.ClosedGroupPoller;
import org.thoughtcrime.securesms.loki.api.LokiPushNotificationManager; import org.thoughtcrime.securesms.loki.api.LokiPushNotificationManager;
import org.thoughtcrime.securesms.loki.api.PublicChatManager; import org.thoughtcrime.securesms.loki.api.PublicChatManager;
import org.thoughtcrime.securesms.loki.api.SessionProtocolImpl; import org.thoughtcrime.securesms.loki.api.SessionProtocolImpl;
@ -478,10 +478,9 @@ public class ApplicationContext extends MultiDexApplication implements Dependenc
} }
LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this); LokiAPIDatabase apiDB = DatabaseFactory.getLokiAPIDatabase(this);
SwarmAPI.Companion.configureIfNeeded(apiDB); SwarmAPI.Companion.configureIfNeeded(apiDB);
SnodeAPI.Companion.configureIfNeeded(userPublicKey, apiDB, broadcaster); SnodeAPI.Companion.configureIfNeeded(userPublicKey, apiDB, broadcaster);
poller = new Poller(); poller = new Poller();
ClosedGroupPoller.Companion.configureIfNeeded(this); closedGroupPoller = new ClosedGroupPoller();
closedGroupPoller = ClosedGroupPoller.Companion.getShared();
} }
public void startPollingIfNeeded() { public void startPollingIfNeeded() {

@ -8,10 +8,7 @@ import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.signal.IncomingEncryptedMessage import org.session.libsession.messaging.messages.signal.*
import org.session.libsession.messaging.messages.signal.IncomingGroupMessage
import org.session.libsession.messaging.messages.signal.IncomingTextMessage
import org.session.libsession.messaging.messages.signal.OutgoingTextMessage
import org.session.libsession.messaging.messages.visible.Attachment import org.session.libsession.messaging.messages.visible.Attachment
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.messaging.opengroups.OpenGroup import org.session.libsession.messaging.opengroups.OpenGroup
@ -23,6 +20,7 @@ import org.session.libsession.messaging.threads.Address
import org.session.libsession.messaging.threads.GroupRecord import org.session.libsession.messaging.threads.GroupRecord
import org.session.libsession.messaging.threads.recipients.Recipient import org.session.libsession.messaging.threads.recipients.Recipient
import org.session.libsession.utilities.GroupUtil import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.IdentityKeyUtil
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.preferences.ProfileKeyUtil import org.session.libsession.utilities.preferences.ProfileKeyUtil
import org.session.libsignal.libsignal.ecc.ECKeyPair import org.session.libsignal.libsignal.ecc.ECKeyPair
@ -31,18 +29,13 @@ import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.api.messages.SignalServiceGroup import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.internal.push.SignalServiceProtos import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import org.session.libsignal.utilities.logging.Log import org.session.libsignal.utilities.logging.Log
import org.session.libsession.utilities.IdentityKeyUtil
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase
import org.thoughtcrime.securesms.loki.protocol.SessionMetaProtocol import org.thoughtcrime.securesms.loki.protocol.SessionMetaProtocol
import org.thoughtcrime.securesms.loki.utilities.OpenGroupUtilities import org.thoughtcrime.securesms.loki.utilities.OpenGroupUtilities
import org.thoughtcrime.securesms.loki.utilities.get import org.thoughtcrime.securesms.loki.utilities.get
import org.thoughtcrime.securesms.loki.utilities.getString import org.thoughtcrime.securesms.loki.utilities.getString
import org.session.libsession.messaging.messages.signal.IncomingMediaMessage
import org.session.libsession.messaging.messages.signal.OutgoingGroupMediaMessage
import org.session.libsession.messaging.messages.signal.OutgoingMediaMessage
import org.thoughtcrime.securesms.mms.PartAuthority import org.thoughtcrime.securesms.mms.PartAuthority
class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), StorageProtocol { class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper), StorageProtocol {
@ -104,7 +97,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
val senderRecipient = Recipient.from(context, senderAddress, false) val senderRecipient = Recipient.from(context, senderAddress, false)
val group: Optional<SignalServiceGroup> = when { val group: Optional<SignalServiceGroup> = when {
openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT)) openGroupID != null -> Optional.of(SignalServiceGroup(openGroupID.toByteArray(), SignalServiceGroup.GroupType.PUBLIC_CHAT))
groupPublicKey != null -> Optional.of(SignalServiceGroup(groupPublicKey.toByteArray(), SignalServiceGroup.GroupType.SIGNAL)) groupPublicKey != null -> {
val doubleEncoded = GroupUtil.doubleEncodeGroupID(groupPublicKey)
Optional.of(SignalServiceGroup(GroupUtil.getDecodedGroupIDAsData(doubleEncoded), SignalServiceGroup.GroupType.SIGNAL))
}
else -> Optional.absent() else -> Optional.absent()
} }
val pointerAttachments = attachments.mapNotNull { val pointerAttachments = attachments.mapNotNull {
@ -453,8 +449,10 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
DatabaseFactory.getLokiAPIDatabase(context).removeAllClosedGroupEncryptionKeyPairs(groupPublicKey) DatabaseFactory.getLokiAPIDatabase(context).removeAllClosedGroupEncryptionKeyPairs(groupPublicKey)
} }
override fun getAllOpenGroups(): Map<Long, PublicChat> { override fun getAllOpenGroups(): Map<Long, OpenGroup> {
return DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats() return DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().mapValues { (_,chat)->
OpenGroup(chat.channel, chat.server, chat.displayName, chat.isDeletable)
}
} }
override fun addOpenGroup(server: String, channel: Long) { override fun addOpenGroup(server: String, channel: Long) {
@ -481,7 +479,7 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
val recipient = Recipient.from(context, Address.fromSerialized(openGroupID), false) val recipient = Recipient.from(context, Address.fromSerialized(openGroupID), false)
return database.getOrCreateThreadIdFor(recipient) return database.getOrCreateThreadIdFor(recipient)
} else if (!groupPublicKey.isNullOrEmpty()) { } else if (!groupPublicKey.isNullOrEmpty()) {
val recipient = Recipient.from(context, Address.fromSerialized(groupPublicKey), false) val recipient = Recipient.from(context, Address.fromSerialized(GroupUtil.doubleEncodeGroupID(groupPublicKey)), false)
return database.getOrCreateThreadIdFor(recipient) return database.getOrCreateThreadIdFor(recipient)
} else { } else {
val recipient = Recipient.from(context, Address.fromSerialized(publicKey), false) val recipient = Recipient.from(context, Address.fromSerialized(publicKey), false)

@ -7,12 +7,12 @@ import androidx.work.*
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.all import nl.komponents.kovenant.all
import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
import org.session.libsignal.utilities.logging.Log
import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsignal.service.api.messages.SignalServiceEnvelope import org.session.libsignal.service.api.messages.SignalServiceEnvelope
import org.session.libsignal.service.loki.api.SnodeAPI import org.session.libsignal.service.loki.api.SnodeAPI
import org.session.libsignal.utilities.logging.Log
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) { class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Worker(context, params) {
@ -76,8 +76,8 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor
promises.add(privateChatsPromise) promises.add(privateChatsPromise)
// Closed groups // Closed groups
ClosedGroupPoller.configureIfNeeded(context) // ClosedGroupPoller.configureIfNeeded(context)
promises.addAll(ClosedGroupPoller.shared.pollOnce()) // promises.addAll(ClosedGroupPoller.shared.pollOnce())
// Open Groups // Open Groups
val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { it.value } val openGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats().map { it.value }

@ -5,20 +5,22 @@ import android.database.ContentObserver
import android.graphics.Bitmap import android.graphics.Bitmap
import android.text.TextUtils import android.text.TextUtils
import androidx.annotation.WorkerThread import androidx.annotation.WorkerThread
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.Util
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import org.session.libsignal.service.loki.api.opengroups.PublicChatInfo
import org.thoughtcrime.securesms.ApplicationContext import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.database.DatabaseContentProviders import org.thoughtcrime.securesms.database.DatabaseContentProviders
import org.thoughtcrime.securesms.database.DatabaseFactory import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.groups.GroupManager import org.thoughtcrime.securesms.groups.GroupManager
import org.thoughtcrime.securesms.util.BitmapUtil import org.thoughtcrime.securesms.util.BitmapUtil
import org.session.libsession.utilities.TextSecurePreferences
import org.session.libsession.utilities.Util
import org.session.libsignal.service.loki.api.opengroups.PublicChatInfo
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import kotlin.jvm.Throws
class PublicChatManager(private val context: Context) { class PublicChatManager(private val context: Context) {
private var chats = mutableMapOf<Long, PublicChat>() private var chats = mutableMapOf<Long, OpenGroup>()
private val pollers = mutableMapOf<Long, PublicChatPoller>() private val pollers = mutableMapOf<Long, OpenGroupPoller>()
private val observers = mutableMapOf<Long, ContentObserver>() private val observers = mutableMapOf<Long, ContentObserver>()
private var isPolling = false private var isPolling = false
@ -35,7 +37,7 @@ class PublicChatManager(private val context: Context) {
public fun markAllAsNotCaughtUp() { public fun markAllAsNotCaughtUp() {
refreshChatsAndPollers() refreshChatsAndPollers()
for ((threadID, chat) in chats) { for ((threadID, chat) in chats) {
val poller = pollers[threadID] ?: PublicChatPoller(context, chat) val poller = pollers[threadID] ?: OpenGroupPoller(chat)
poller.isCaughtUp = false poller.isCaughtUp = false
} }
} }
@ -44,7 +46,7 @@ class PublicChatManager(private val context: Context) {
refreshChatsAndPollers() refreshChatsAndPollers()
for ((threadId, chat) in chats) { for ((threadId, chat) in chats) {
val poller = pollers[threadId] ?: PublicChatPoller(context, chat) val poller = pollers[threadId] ?: OpenGroupPoller(chat)
poller.startIfNeeded() poller.startIfNeeded()
listenToThreadDeletion(threadId) listenToThreadDeletion(threadId)
if (!pollers.containsKey(threadId)) { pollers[threadId] = poller } if (!pollers.containsKey(threadId)) { pollers[threadId] = poller }
@ -109,7 +111,8 @@ class PublicChatManager(private val context: Context) {
} }
private fun refreshChatsAndPollers() { private fun refreshChatsAndPollers() {
val chatsInDB = DatabaseFactory.getLokiThreadDatabase(context).getAllPublicChats() val storage = MessagingConfiguration.shared.storage
val chatsInDB = storage.getAllOpenGroups()
val removedChatThreadIds = chats.keys.filter { !chatsInDB.keys.contains(it) } val removedChatThreadIds = chats.keys.filter { !chatsInDB.keys.contains(it) }
removedChatThreadIds.forEach { pollers.remove(it)?.stop() } removedChatThreadIds.forEach { pollers.remove(it)?.stop() }

@ -20,7 +20,6 @@ import org.session.libsignal.libsignal.ecc.ECKeyPair
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.api.messages.SignalServiceGroup import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.internal.push.SignalServiceProtos import org.session.libsignal.service.internal.push.SignalServiceProtos
import org.session.libsignal.service.loki.api.opengroups.PublicChat
interface StorageProtocol { interface StorageProtocol {
@ -56,7 +55,7 @@ interface StorageProtocol {
// Open Groups // Open Groups
fun getOpenGroup(threadID: String): OpenGroup? fun getOpenGroup(threadID: String): OpenGroup?
fun getThreadID(openGroupID: String): String? fun getThreadID(openGroupID: String): String?
fun getAllOpenGroups(): Map<Long, PublicChat> fun getAllOpenGroups(): Map<Long, OpenGroup>
fun addOpenGroup(server: String, channel: Long) fun addOpenGroup(server: String, channel: Long)
fun setOpenGroupServerMessageID(messageID: Long, serverID: Long) fun setOpenGroupServerMessageID(messageID: Long, serverID: Long)
fun getQuoteServerID(quoteID: Long, publicKey: String): Long? fun getQuoteServerID(quoteID: Long, publicKey: String): Long?

@ -4,6 +4,7 @@ import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.control.* import org.session.libsession.messaging.messages.control.*
import org.session.libsession.messaging.messages.visible.VisibleMessage import org.session.libsession.messaging.messages.visible.VisibleMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsignal.service.internal.push.PushTransportDetails import org.session.libsignal.service.internal.push.PushTransportDetails
import org.session.libsignal.service.internal.push.SignalServiceProtos import org.session.libsignal.service.internal.push.SignalServiceProtos
@ -50,7 +51,7 @@ object MessageReceiver {
// If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp // If the message failed to process the first time around we retry it later (if the error is retryable). In this case the timestamp
// will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround // will already be in the database but we don't want to treat the message as a duplicate. The isRetry flag is a simple workaround
// for this issue. // for this issue.
if (storage.isMessageDuplicated(envelope.timestamp, envelope.source) && !isRetry) throw Error.DuplicateMessage if (storage.isMessageDuplicated(envelope.timestamp, GroupUtil.doubleEncodeGroupID(envelope.source)) && !isRetry) throw Error.DuplicateMessage
storage.addReceivedMessageTimestamp(envelope.timestamp) storage.addReceivedMessageTimestamp(envelope.timestamp)
// Decrypt the contents // Decrypt the contents
val ciphertext = envelope.content ?: throw Error.NoData val ciphertext = envelope.content ?: throw Error.NoData
@ -70,7 +71,7 @@ object MessageReceiver {
} }
SignalServiceProtos.Envelope.Type.CLOSED_GROUP_CIPHERTEXT -> { SignalServiceProtos.Envelope.Type.CLOSED_GROUP_CIPHERTEXT -> {
val hexEncodedGroupPublicKey = envelope.source val hexEncodedGroupPublicKey = envelope.source
if (hexEncodedGroupPublicKey == null || MessagingConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) { if (hexEncodedGroupPublicKey == null || !MessagingConfiguration.shared.storage.isClosedGroup(hexEncodedGroupPublicKey)) {
throw Error.InvalidGroupPublicKey throw Error.InvalidGroupPublicKey
} }
val encryptionKeyPairs = MessagingConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey) val encryptionKeyPairs = MessagingConfiguration.shared.storage.getClosedGroupEncryptionKeyPairs(hexEncodedGroupPublicKey)

@ -186,8 +186,8 @@ fun MessageReceiver.handleVisibleMessage(message: VisibleMessage, proto: SignalS
} }
// Parse stickers if needed // Parse stickers if needed
// Persist the message // Persist the message
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread
message.threadID = threadID message.threadID = threadID
val messageID = storage.persist(message, quoteModel, linkPreviews, message.groupPublicKey, openGroupID, attachments) ?: throw MessageReceiver.Error.NoThread
// Parse & persist attachments // Parse & persist attachments
// Start attachment downloads if needed // Start attachment downloads if needed
storage.getAttachmentsForMessage(messageID).forEach { attachment -> storage.getAttachmentsForMessage(messageID).forEach { attachment ->

@ -4,17 +4,15 @@ import android.os.Handler
import nl.komponents.kovenant.Promise import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map import nl.komponents.kovenant.functional.map
import org.session.libsession.messaging.MessagingConfiguration import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.utilities.MessageWrapper import org.session.libsession.messaging.utilities.MessageWrapper
import org.session.libsession.snode.SnodeAPI import org.session.libsession.snode.SnodeAPI
import org.session.libsignal.utilities.successBackground
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.Base64
import org.session.libsignal.service.loki.utilities.getRandomElementOrNull import org.session.libsignal.service.loki.utilities.getRandomElementOrNull
import org.session.libsignal.utilities.Base64
import org.session.libsignal.utilities.logging.Log
import org.session.libsignal.utilities.successBackground
class ClosedGroupPoller { class ClosedGroupPoller {
private var isPolling = false private var isPolling = false
@ -24,7 +22,7 @@ class ClosedGroupPoller {
override fun run() { override fun run() {
poll() poll()
handler.postDelayed(this, ClosedGroupPoller.pollInterval) handler.postDelayed(this, pollInterval)
} }
} }

Loading…
Cancel
Save