diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt index ffe7e230e0..fc3e7c1bba 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/BackgroundPollWorker.kt @@ -10,8 +10,7 @@ import nl.komponents.kovenant.functional.map import org.session.libsession.messaging.jobs.MessageReceiveJob import org.session.libsession.messaging.open_groups.OpenGroupV2 import org.session.libsession.messaging.sending_receiving.pollers.ClosedGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupV2Poller +import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 import org.session.libsession.snode.SnodeAPI import org.session.libsession.utilities.TextSecurePreferences import org.session.libsignal.utilities.Log @@ -61,12 +60,14 @@ class BackgroundPollWorker(val context: Context, params: WorkerParameters) : Wor promises.addAll(ClosedGroupPoller().pollOnce()) // Open Groups - val v2OpenGroups = DatabaseFactory.getLokiThreadDatabase(context).getAllV2OpenGroups().values.groupBy(OpenGroupV2::server) + val threadDB = DatabaseFactory.getLokiThreadDatabase(context) + val v2OpenGroups = threadDB.getAllV2OpenGroups() + val v2OpenGroupServers = v2OpenGroups.map { it.value.server }.toSet() - v2OpenGroups.values.map { groups -> - OpenGroupV2Poller(groups) - }.forEach { poller -> - promises.add(poller.compactPoll(true).map { }) + for (server in v2OpenGroupServers) { + val poller = OpenGroupPollerV2(server, null) + poller.hasStarted = true + promises.add(poller.poll(true)) } // Wait until all the promises are resolved diff --git a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt index 70bb78d8b6..8048c99e62 100644 --- a/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt +++ b/app/src/main/java/org/thoughtcrime/securesms/loki/api/PublicChatManager.kt @@ -8,7 +8,7 @@ import androidx.annotation.WorkerThread import org.session.libsession.messaging.MessagingModuleConfiguration import org.session.libsession.messaging.open_groups.* import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPoller -import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupV2Poller +import org.session.libsession.messaging.sending_receiving.pollers.OpenGroupPollerV2 import org.session.libsession.utilities.TextSecurePreferences import org.session.libsession.utilities.Util import org.session.libsignal.utilities.ThreadUtils @@ -22,7 +22,7 @@ class PublicChatManager(private val context: Context) { private var chats = mutableMapOf() private var v2Chats = mutableMapOf() private val pollers = mutableMapOf() - private val v2Pollers = mutableMapOf() + private val v2Pollers = mutableMapOf() private val observers = mutableMapOf() private var isPolling = false private val executorService = Executors.newScheduledThreadPool(4) @@ -43,9 +43,12 @@ class PublicChatManager(private val context: Context) { val poller = pollers[threadID] ?: OpenGroupPoller(chat, executorService) poller.isCaughtUp = false } + /* + // FIXME: This wasn't even being used actually... for ((_,poller) in v2Pollers) { poller.isCaughtUp = false } + */ } public fun startPollersIfNeeded() { @@ -60,7 +63,7 @@ class PublicChatManager(private val context: Context) { v2Pollers.values.forEach { it.stop() } v2Pollers.clear() v2Chats.entries.groupBy { (_, group) -> group.server }.forEach { (server, threadedRooms) -> - val poller = OpenGroupV2Poller(threadedRooms.map { it.value }, executorService) + val poller = OpenGroupPollerV2(server, executorService) poller.startIfNeeded() threadedRooms.forEach { (thread, _) -> listenToThreadDeletion(thread) diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt new file mode 100644 index 0000000000..8fb2f24cef --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPollerV2.kt @@ -0,0 +1,91 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.map +import org.session.libsession.messaging.MessagingModuleConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 +import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 +import org.session.libsession.utilities.Address +import org.session.libsession.utilities.GroupUtil +import org.session.libsignal.protos.SignalServiceProtos +import org.session.libsignal.utilities.Log +import org.session.libsignal.utilities.successBackground +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit + +class OpenGroupPollerV2(private val server: String, private val executorService: ScheduledExecutorService?) { + var hasStarted = false + private var future: ScheduledFuture<*>? = null + + companion object { + private val pollInterval: Long = 4 * 1000 + } + + fun startIfNeeded() { + if (hasStarted) { return } + hasStarted = true + future = executorService?.schedule(::poll, 0, TimeUnit.MILLISECONDS) + } + + fun stop() { + future?.cancel(false) + hasStarted = false + } + + fun poll(isBackgroundPoll: Boolean = false): Promise { + val rooms: List = listOf() + return OpenGroupAPIV2.compactPoll(rooms, server).successBackground { responses -> + responses.forEach { (room, response) -> + val openGroupID = "$server.$room" + handleNewMessages(openGroupID, response.messages, isBackgroundPoll) + handleDeletedMessages(openGroupID, response.deletions) + } + }.always { + executorService?.schedule(this@OpenGroupPollerV2::poll, OpenGroupPollerV2.pollInterval, TimeUnit.MILLISECONDS) + }.map { } + } + + private fun handleNewMessages(openGroupID: String, messages: List, isBackgroundPoll: Boolean) { + if (!hasStarted) { return } + messages.sortedBy { it.serverID!! }.forEach { message -> + try { + val senderPublicKey = message.sender!! + val builder = SignalServiceProtos.Envelope.newBuilder() + builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE + builder.source = senderPublicKey + builder.sourceDevice = 1 + builder.content = message.toProto().toByteString() + builder.timestamp = message.sentTimestamp + val envelope = builder.build() + val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, openGroupID) + if (isBackgroundPoll) { + job.executeAsync() + } else { + JobQueue.shared.add(job) + } + } catch (e: Exception) { + Log.e("Loki", "Exception parsing message", e) + } + } + } + + private fun handleDeletedMessages(openGroupID: String, deletedMessageServerIDs: List) { + val storage = MessagingModuleConfiguration.shared.storage + val dataProvider = MessagingModuleConfiguration.shared.messageDataProvider + val groupID = GroupUtil.getEncodedOpenGroupID(openGroupID.toByteArray()) + val threadID = storage.getThreadIdFor(Address.fromSerialized(groupID)) ?: return + val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverID -> + val messageID = dataProvider.getMessageID(serverID, threadID) + if (messageID == null) { + Log.d("Loki", "Couldn't find message ID for message with serverID: $serverID.") + } + messageID + } + deletedMessageIDs.forEach { (messageId, isSms) -> + MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) + } + } +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt deleted file mode 100644 index f0d03e2c29..0000000000 --- a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupV2Poller.kt +++ /dev/null @@ -1,134 +0,0 @@ -package org.session.libsession.messaging.sending_receiving.pollers - -import nl.komponents.kovenant.Promise -import org.session.libsession.messaging.MessagingModuleConfiguration -import org.session.libsession.messaging.jobs.JobQueue -import org.session.libsession.messaging.jobs.MessageReceiveJob -import org.session.libsession.messaging.open_groups.OpenGroupAPIV2 -import org.session.libsession.messaging.open_groups.OpenGroupMessageV2 -import org.session.libsession.messaging.open_groups.OpenGroupV2 -import org.session.libsession.utilities.Address -import org.session.libsession.utilities.GroupUtil -import org.session.libsignal.protos.SignalServiceProtos -import org.session.libsignal.utilities.Log -import org.session.libsignal.utilities.successBackground -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit - -class OpenGroupV2Poller(private val openGroups: List, private val executorService: ScheduledExecutorService? = null) { - - private var hasStarted = false - @Volatile private var isPollOngoing = false - var isCaughtUp = false - - private val cancellableFutures = mutableListOf>() - - // use this as a receive time-based window to calculate re-poll interval - private val receivedQueue = ArrayDeque(50) - - private fun calculatePollInterval(): Long { - // sample last default poll time * 2 - while (receivedQueue.size > 50) { - receivedQueue.removeLast() - } - val sampleWindow = System.currentTimeMillis() - pollForNewMessagesInterval * 2 - val numberInSample = receivedQueue.toList().filter { it > sampleWindow }.size.coerceAtLeast(1) - return ((2 + (50 / numberInSample / 20)*5) * 1000).toLong() - } - - // region Settings - companion object { - private val pollForNewMessagesInterval: Long = 10 * 1000 - } - // endregion - - // region Lifecycle - fun startIfNeeded() { - if (hasStarted || executorService == null) return - cancellableFutures += executorService.schedule(::compactPoll, 0, TimeUnit.MILLISECONDS) - hasStarted = true - } - - fun stop() { - cancellableFutures.forEach { future -> - future.cancel(false) - } - cancellableFutures.clear() - hasStarted = false - } - // endregion - - // region Polling - - private fun compactPoll(): Promise { - return compactPoll(false) - } - - fun compactPoll(isBackgroundPoll: Boolean): Promise { - if (isPollOngoing || !hasStarted) return Promise.of(Unit) - isPollOngoing = true - val server = openGroups.first().server // assume all the same server - val rooms = openGroups.map { it.room } - return OpenGroupAPIV2.compactPoll(rooms = rooms, server).successBackground { results -> - results.forEach { (room, results) -> - val serverRoomId = "$server.$room" - handleNewMessages(serverRoomId, results.messages.sortedBy { it.serverID }, isBackgroundPoll) - handleDeletedMessages(serverRoomId,results.deletions) - } - }.always { - isPollOngoing = false - if (!isBackgroundPoll) { - val delay = calculatePollInterval() - executorService?.schedule(this@OpenGroupV2Poller::compactPoll, delay, TimeUnit.MILLISECONDS) - } - } - } - - private fun handleNewMessages(serverRoomId: String, newMessages: List, isBackgroundPoll: Boolean) { - if (!hasStarted) return - newMessages.forEach { message -> - try { - val senderPublicKey = message.sender!! - // Main message - // Envelope - val builder = SignalServiceProtos.Envelope.newBuilder() - builder.type = SignalServiceProtos.Envelope.Type.SESSION_MESSAGE - builder.source = senderPublicKey - builder.sourceDevice = 1 - builder.content = message.toProto().toByteString() - builder.timestamp = message.sentTimestamp - val envelope = builder.build() - val job = MessageReceiveJob(envelope.toByteArray(), message.serverID, serverRoomId) - Log.d("Loki", "Scheduling Job $job") - if (isBackgroundPoll) { - job.executeAsync() - // The promise is just used to keep track of when we're done - } else { - JobQueue.shared.add(job) - } - receivedQueue.addFirst(message.sentTimestamp) - } catch (e: Exception) { - Log.e("Loki", "Exception parsing message", e) - } - } - } - - private fun handleDeletedMessages(serverRoomId: String, deletedMessageServerIDs: List) { - val messagingModule = MessagingModuleConfiguration.shared - val address = GroupUtil.getEncodedOpenGroupID(serverRoomId.toByteArray()) - val threadId = messagingModule.storage.getThreadIdFor(Address.fromSerialized(address)) ?: return - - val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { serverId -> - val id = messagingModule.messageDataProvider.getMessageID(serverId, threadId) - if (id == null) { - Log.d("Loki", "Couldn't find server ID $serverId") - } - id - } - deletedMessageIDs.forEach { (messageId, isSms) -> - MessagingModuleConfiguration.shared.messageDataProvider.deleteMessage(messageId, isSms) - } - } - // endregion -} \ No newline at end of file