From 133706e1d68ab852f215f96cb351a62147fa3be0 Mon Sep 17 00:00:00 2001 From: Ryan ZHAO Date: Thu, 10 Dec 2020 15:34:17 +1100 Subject: [PATCH] WIP pollers --- .../pollers/ClosedGroupPoller.kt | 90 +++++ .../pollers/OpenGroupPoller.kt | 307 ++++++++++++++++++ .../sending_receiving/pollers/Poller.kt | 111 +++++++ 3 files changed, 508 insertions(+) create mode 100644 libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt create mode 100644 libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt create mode 100644 libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt new file mode 100644 index 0000000000..8d990776f3 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/ClosedGroupPoller.kt @@ -0,0 +1,90 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import android.os.Handler +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.bind +import nl.komponents.kovenant.functional.map + +import org.session.libsession.messaging.MessagingConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.utilities.MessageWrapper +import org.session.libsession.snode.SnodeAPI +import org.session.libsession.utilities.successBackground + +import org.session.libsignal.libsignal.logging.Log +import org.session.libsignal.service.internal.util.Base64 +import org.session.libsignal.service.loki.utilities.getRandomElementOrNull + +class ClosedGroupPoller { + private var isPolling = false + private val handler: Handler by lazy { Handler() } + + private val task = object : Runnable { + + override fun run() { + poll() + handler.postDelayed(this, ClosedGroupPoller.pollInterval) + } + } + + // region Settings + companion object { + private val pollInterval: Long = 2 * 1000 + } + // endregion + + // region Error + class InsufficientSnodesException() : Exception("No snodes left to poll.") + class PollingCanceledException() : Exception("Polling canceled.") + // endregion + + // region Public API + public fun startIfNeeded() { + if (isPolling) { return } + isPolling = true + task.run() + } + + public fun pollOnce(): List> { + if (isPolling) { return listOf() } + isPolling = true + return poll() + } + + public fun stopIfNeeded() { + isPolling = false + handler.removeCallbacks(task) + } + // endregion + + // region Private API + private fun poll(): List> { + if (!isPolling) { return listOf() } + val publicKeys = MessagingConfiguration.shared.sskDatabase.getAllClosedGroupPublicKeys() + return publicKeys.map { publicKey -> + val promise = SnodeAPI.getSwarm(publicKey).bind { swarm -> + val snode = swarm.getRandomElementOrNull() ?: throw InsufficientSnodesException() // Should be cryptographically secure + if (!isPolling) { throw PollingCanceledException() } + SnodeAPI.getRawMessages(snode, publicKey).map {SnodeAPI.parseRawMessagesResponse(it, snode, publicKey) } + } + promise.successBackground { messages -> + if (messages.isNotEmpty()) { + Log.d("Loki", "Received ${messages.count()} new message(s) in closed group with public key: $publicKey.") + } + messages.forEach { message -> + val rawMessageAsJSON = message as? Map<*, *> + val base64EncodedData = rawMessageAsJSON?.get("data") as? String + val data = base64EncodedData?.let { Base64.decode(it) } ?: return@forEach + val job = MessageReceiveJob(MessageWrapper.unwrap(data), false) + JobQueue.shared.add(job) + } + } + promise.fail { + Log.d("Loki", "Polling failed for closed group with public key: $publicKey due to error: $it.") + } + promise.map { Unit } + } + } + // endregion +} diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt new file mode 100644 index 0000000000..1c541cb2e2 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/OpenGroupPoller.kt @@ -0,0 +1,307 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import android.content.Context +import android.os.Handler +import org.thoughtcrime.securesms.logging.Log +import androidx.annotation.WorkerThread +import nl.komponents.kovenant.Promise +import nl.komponents.kovenant.functional.bind +import nl.komponents.kovenant.functional.map +import org.thoughtcrime.securesms.ApplicationContext +import org.thoughtcrime.securesms.crypto.IdentityKeyUtil +import org.thoughtcrime.securesms.database.Address +import org.thoughtcrime.securesms.database.DatabaseFactory +import org.thoughtcrime.securesms.jobs.PushDecryptJob +import org.thoughtcrime.securesms.jobs.RetrieveProfileAvatarJob +import org.thoughtcrime.securesms.loki.protocol.SessionMetaProtocol +import org.thoughtcrime.securesms.loki.utilities.successBackground +import org.thoughtcrime.securesms.recipients.Recipient +import org.thoughtcrime.securesms.util.TextSecurePreferences +import org.session.libsignal.libsignal.util.guava.Optional +import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer +import org.session.libsignal.service.api.messages.SignalServiceContent +import org.session.libsignal.service.api.messages.SignalServiceDataMessage +import org.session.libsignal.service.api.messages.SignalServiceGroup +import org.session.libsignal.service.api.messages.multidevice.SentTranscriptMessage +import org.session.libsignal.service.api.push.SignalServiceAddress +import org.session.libsignal.service.loki.api.fileserver.FileServerAPI +import org.session.libsignal.service.loki.api.opengroups.PublicChat +import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI +import org.session.libsignal.service.loki.api.opengroups.PublicChatMessage +import org.session.libsignal.service.loki.protocol.shelved.multidevice.MultiDeviceProtocol +import java.security.MessageDigest +import java.util.* +import java.util.concurrent.CompletableFuture + +class OpenGroupPoller(private val context: Context, private val group: PublicChat) { + private val handler by lazy { Handler() } + private var hasStarted = false + private var isPollOngoing = false + public var isCaughtUp = false + + // region Convenience + private val userHexEncodedPublicKey = TextSecurePreferences.getLocalNumber(context) + private var displayNameUpdatees = setOf() + + private val api: PublicChatAPI + get() = { + val userPrivateKey = IdentityKeyUtil.getIdentityKeyPair(context).privateKey.serialize() + val lokiAPIDatabase = DatabaseFactory.getLokiAPIDatabase(context) + val lokiUserDatabase = DatabaseFactory.getLokiUserDatabase(context) + val openGroupDatabase = DatabaseFactory.getGroupDatabase(context) + PublicChatAPI(userHexEncodedPublicKey, userPrivateKey, lokiAPIDatabase, lokiUserDatabase, openGroupDatabase) + }() + // endregion + + // region Tasks + private val pollForNewMessagesTask = object : Runnable { + + override fun run() { + pollForNewMessages() + handler.postDelayed(this, pollForNewMessagesInterval) + } + } + + private val pollForDeletedMessagesTask = object : Runnable { + + override fun run() { + pollForDeletedMessages() + handler.postDelayed(this, pollForDeletedMessagesInterval) + } + } + + private val pollForModeratorsTask = object : Runnable { + + override fun run() { + pollForModerators() + handler.postDelayed(this, pollForModeratorsInterval) + } + } + + private val pollForDisplayNamesTask = object : Runnable { + + override fun run() { + pollForDisplayNames() + handler.postDelayed(this, pollForDisplayNamesInterval) + } + } + // endregion + + // region Settings + companion object { + private val pollForNewMessagesInterval: Long = 4 * 1000 + private val pollForDeletedMessagesInterval: Long = 60 * 1000 + private val pollForModeratorsInterval: Long = 10 * 60 * 1000 + private val pollForDisplayNamesInterval: Long = 60 * 1000 + } + // endregion + + // region Lifecycle + fun startIfNeeded() { + if (hasStarted) return + pollForNewMessagesTask.run() + pollForDeletedMessagesTask.run() + pollForModeratorsTask.run() + pollForDisplayNamesTask.run() + hasStarted = true + } + + fun stop() { + handler.removeCallbacks(pollForNewMessagesTask) + handler.removeCallbacks(pollForDeletedMessagesTask) + handler.removeCallbacks(pollForModeratorsTask) + handler.removeCallbacks(pollForDisplayNamesTask) + hasStarted = false + } + // endregion + + // region Polling + private fun getDataMessage(message: PublicChatMessage): SignalServiceDataMessage { + val id = group.id.toByteArray() + val serviceGroup = SignalServiceGroup(SignalServiceGroup.Type.UPDATE, id, SignalServiceGroup.GroupType.PUBLIC_CHAT, null, null, null, null) + val quote = if (message.quote != null) { + SignalServiceDataMessage.Quote(message.quote!!.quotedMessageTimestamp, SignalServiceAddress(message.quote!!.quoteePublicKey), message.quote!!.quotedMessageBody, listOf()) + } else { + null + } + val attachments = message.attachments.mapNotNull { attachment -> + if (attachment.kind != PublicChatMessage.Attachment.Kind.Attachment) { return@mapNotNull null } + SignalServiceAttachmentPointer( + attachment.serverID, + attachment.contentType, + ByteArray(0), + Optional.of(attachment.size), + Optional.absent(), + attachment.width, attachment.height, + Optional.absent(), + Optional.of(attachment.fileName), + false, + Optional.fromNullable(attachment.caption), + attachment.url) + } + val linkPreview = message.attachments.firstOrNull { it.kind == PublicChatMessage.Attachment.Kind.LinkPreview } + val signalLinkPreviews = mutableListOf() + if (linkPreview != null) { + val attachment = SignalServiceAttachmentPointer( + linkPreview.serverID, + linkPreview.contentType, + ByteArray(0), + Optional.of(linkPreview.size), + Optional.absent(), + linkPreview.width, linkPreview.height, + Optional.absent(), + Optional.of(linkPreview.fileName), + false, + Optional.fromNullable(linkPreview.caption), + linkPreview.url) + signalLinkPreviews.add(SignalServiceDataMessage.Preview(linkPreview.linkPreviewURL!!, linkPreview.linkPreviewTitle!!, Optional.of(attachment))) + } + val body = if (message.body == message.timestamp.toString()) "" else message.body // Workaround for the fact that the back-end doesn't accept messages without a body + return SignalServiceDataMessage(message.timestamp, serviceGroup, attachments, body, false, 0, false, null, false, quote, null, signalLinkPreviews, null) + } + + fun pollForNewMessages(): Promise { + fun processIncomingMessage(message: PublicChatMessage) { + // If the sender of the current message is not a slave device, set the display name in the database + val masterHexEncodedPublicKey = MultiDeviceProtocol.shared.getMasterDevice(message.senderPublicKey) + if (masterHexEncodedPublicKey == null) { + val senderDisplayName = "${message.displayName} (...${message.senderPublicKey.takeLast(8)})" + DatabaseFactory.getLokiUserDatabase(context).setServerDisplayName(group.id, message.senderPublicKey, senderDisplayName) + } + val senderHexEncodedPublicKey = masterHexEncodedPublicKey ?: message.senderPublicKey + val serviceDataMessage = getDataMessage(message) + val serviceContent = SignalServiceContent(serviceDataMessage, senderHexEncodedPublicKey, SignalServiceAddress.DEFAULT_DEVICE_ID, message.serverTimestamp, false, false) + if (serviceDataMessage.quote.isPresent || (serviceDataMessage.attachments.isPresent && serviceDataMessage.attachments.get().size > 0) || serviceDataMessage.previews.isPresent) { + PushDecryptJob(context).handleMediaMessage(serviceContent, serviceDataMessage, Optional.absent(), Optional.of(message.serverID)) + } else { + PushDecryptJob(context).handleTextMessage(serviceContent, serviceDataMessage, Optional.absent(), Optional.of(message.serverID)) + } + // Update profile picture if needed + val senderAsRecipient = Recipient.from(context, Address.fromSerialized(senderHexEncodedPublicKey), false) + if (message.profilePicture != null && message.profilePicture!!.url.isNotEmpty()) { + val profileKey = message.profilePicture!!.profileKey + val url = message.profilePicture!!.url + if (senderAsRecipient.profileKey == null || !MessageDigest.isEqual(senderAsRecipient.profileKey, profileKey)) { + val database = DatabaseFactory.getRecipientDatabase(context) + database.setProfileKey(senderAsRecipient, profileKey) + ApplicationContext.getInstance(context).jobManager.add(RetrieveProfileAvatarJob(senderAsRecipient, url)) + } + } + } + fun processOutgoingMessage(message: PublicChatMessage) { + val messageServerID = message.serverID ?: return + val messageID = DatabaseFactory.getLokiMessageDatabase(context).getMessageID(messageServerID) + var isDuplicate = false + if (messageID != null) { + isDuplicate = DatabaseFactory.getMmsDatabase(context).getThreadIdForMessage(messageID) >= 0 + || DatabaseFactory.getSmsDatabase(context).getThreadIdForMessage(messageID) >= 0 + } + if (isDuplicate) { return } + if (message.body.isEmpty() && message.attachments.isEmpty() && message.quote == null) { return } + val userHexEncodedPublicKey = TextSecurePreferences.getLocalNumber(context) + val dataMessage = getDataMessage(message) + SessionMetaProtocol.dropFromTimestampCacheIfNeeded(message.serverTimestamp) + val transcript = SentTranscriptMessage(userHexEncodedPublicKey, message.serverTimestamp, dataMessage, dataMessage.expiresInSeconds.toLong(), Collections.singletonMap(userHexEncodedPublicKey, false)) + transcript.messageServerID = messageServerID + if (dataMessage.quote.isPresent || (dataMessage.attachments.isPresent && dataMessage.attachments.get().size > 0) || dataMessage.previews.isPresent) { + PushDecryptJob(context).handleSynchronizeSentMediaMessage(transcript) + } else { + PushDecryptJob(context).handleSynchronizeSentTextMessage(transcript) + } + // If we got a message from our master device then make sure our mapping stays in sync + val recipient = Recipient.from(context, Address.fromSerialized(message.senderPublicKey), false) + if (recipient.isUserMasterDevice && message.profilePicture != null) { + val profileKey = message.profilePicture!!.profileKey + val url = message.profilePicture!!.url + if (recipient.profileKey == null || !MessageDigest.isEqual(recipient.profileKey, profileKey)) { + val database = DatabaseFactory.getRecipientDatabase(context) + database.setProfileKey(recipient, profileKey) + database.setProfileAvatar(recipient, url) + ApplicationContext.getInstance(context).updateOpenGroupProfilePicturesIfNeeded() + } + } + } + if (isPollOngoing) { return Promise.of(Unit) } + isPollOngoing = true + val userDevices = MultiDeviceProtocol.shared.getAllLinkedDevices(userHexEncodedPublicKey) + var uniqueDevices = setOf() + val userPrivateKey = IdentityKeyUtil.getIdentityKeyPair(context).privateKey.serialize() + val apiDB = DatabaseFactory.getLokiAPIDatabase(context) + FileServerAPI.configure(userHexEncodedPublicKey, userPrivateKey, apiDB) + // Kovenant propagates a context to chained promises, so LokiPublicChatAPI.sharedContext should be used for all of the below + val promise = api.getMessages(group.channel, group.server).bind(PublicChatAPI.sharedContext) { messages -> + /* + if (messages.isNotEmpty()) { + // We need to fetch the device mapping for any devices we don't have + uniqueDevices = messages.map { it.senderPublicKey }.toSet() + val devicesToUpdate = uniqueDevices.filter { !userDevices.contains(it) && FileServerAPI.shared.hasDeviceLinkCacheExpired(publicKey = it) } + if (devicesToUpdate.isNotEmpty()) { + return@bind FileServerAPI.shared.getDeviceLinks(devicesToUpdate.toSet()).then { messages } + } + } + */ + Promise.of(messages) + } + promise.successBackground { + /* + val newDisplayNameUpdatees = uniqueDevices.mapNotNull { + // This will return null if the current device is a master device + MultiDeviceProtocol.shared.getMasterDevice(it) + }.toSet() + // Fetch the display names of the master devices + displayNameUpdatees = displayNameUpdatees.union(newDisplayNameUpdatees) + */ + } + promise.successBackground { messages -> + // Process messages in the background + messages.forEach { message -> + if (userDevices.contains(message.senderPublicKey)) { + processOutgoingMessage(message) + } else { + processIncomingMessage(message) + } + } + isCaughtUp = true + isPollOngoing = false + } + promise.fail { + Log.d("Loki", "Failed to get messages for group chat with ID: ${group.channel} on server: ${group.server}.") + isPollOngoing = false + } + return promise.map { Unit } + } + + private fun pollForDisplayNames() { + if (displayNameUpdatees.isEmpty()) { return } + val hexEncodedPublicKeys = displayNameUpdatees + displayNameUpdatees = setOf() + api.getDisplayNames(hexEncodedPublicKeys, group.server).successBackground { mapping -> + for (pair in mapping.entries) { + val senderDisplayName = "${pair.value} (...${pair.key.takeLast(8)})" + DatabaseFactory.getLokiUserDatabase(context).setServerDisplayName(group.id, pair.key, senderDisplayName) + } + }.fail { + displayNameUpdatees = displayNameUpdatees.union(hexEncodedPublicKeys) + } + } + + private fun pollForDeletedMessages() { + api.getDeletedMessageServerIDs(group.channel, group.server).success { deletedMessageServerIDs -> + val lokiMessageDatabase = DatabaseFactory.getLokiMessageDatabase(context) + val deletedMessageIDs = deletedMessageServerIDs.mapNotNull { lokiMessageDatabase.getMessageID(it) } + val smsMessageDatabase = DatabaseFactory.getSmsDatabase(context) + val mmsMessageDatabase = DatabaseFactory.getMmsDatabase(context) + deletedMessageIDs.forEach { + smsMessageDatabase.deleteMessage(it) + mmsMessageDatabase.delete(it) + } + }.fail { + Log.d("Loki", "Failed to get deleted messages for group chat with ID: ${group.channel} on server: ${group.server}.") + } + } + + private fun pollForModerators() { + api.getModerators(group.channel, group.server) + } + // endregion +} \ No newline at end of file diff --git a/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt new file mode 100644 index 0000000000..7afce79e85 --- /dev/null +++ b/libsession/src/main/java/org/session/libsession/messaging/sending_receiving/pollers/Poller.kt @@ -0,0 +1,111 @@ +package org.session.libsession.messaging.sending_receiving.pollers + +import nl.komponents.kovenant.* +import nl.komponents.kovenant.functional.bind + +import org.session.libsession.messaging.MessagingConfiguration +import org.session.libsession.messaging.jobs.JobQueue +import org.session.libsession.messaging.jobs.MessageReceiveJob +import org.session.libsession.messaging.utilities.MessageWrapper +import org.session.libsession.snode.Snode +import org.session.libsession.snode.SnodeAPI +import org.session.libsession.snode.SnodeConfiguration + +import org.session.libsignal.libsignal.logging.Log +import org.session.libsignal.service.internal.util.Base64 + +import java.security.SecureRandom +import java.util.* + +private class PromiseCanceledException : Exception("Promise canceled.") + +class Poller { + private val userPublicKey = MessagingConfiguration.shared.storage.getUserPublicKey() ?: "" + private var hasStarted: Boolean = false + private val usedSnodes: MutableSet = mutableSetOf() + public var isCaughtUp = false + + // region Settings + companion object { + private val retryInterval: Long = 1 * 1000 + } + // endregion + + // region Public API + fun startIfNeeded() { + if (hasStarted) { return } + Log.d("Loki", "Started polling.") + hasStarted = true + setUpPolling() + } + + fun stopIfNeeded() { + Log.d("Loki", "Stopped polling.") + hasStarted = false + usedSnodes.clear() + } + // endregion + + // region Private API + private fun setUpPolling() { + if (!hasStarted) { return; } + val thread = Thread.currentThread() + SnodeAPI.getSwarm(userPublicKey).bind(SnodeAPI.messagePollingContext) { + usedSnodes.clear() + val deferred = deferred(SnodeAPI.messagePollingContext) + pollNextSnode(deferred) + deferred.promise + }.always { + Timer().schedule(object : TimerTask() { + + override fun run() { + thread.run { setUpPolling() } + } + }, retryInterval) + } + } + + private fun pollNextSnode(deferred: Deferred) { + val swarm = SnodeConfiguration.shared.storage.getSwarm(userPublicKey) ?: setOf() + val unusedSnodes = swarm.subtract(usedSnodes) + if (unusedSnodes.isNotEmpty()) { + val index = SecureRandom().nextInt(unusedSnodes.size) + val nextSnode = unusedSnodes.elementAt(index) + usedSnodes.add(nextSnode) + Log.d("Loki", "Polling $nextSnode.") + poll(nextSnode, deferred).fail { exception -> + if (exception is PromiseCanceledException) { + Log.d("Loki", "Polling $nextSnode canceled.") + } else { + Log.d("Loki", "Polling $nextSnode failed; dropping it and switching to next snode.") + SnodeAPI.dropSnodeFromSwarmIfNeeded(nextSnode, userPublicKey) + pollNextSnode(deferred) + } + } + } else { + isCaughtUp = true + deferred.resolve() + } + } + + private fun poll(snode: Snode, deferred: Deferred): Promise { + if (!hasStarted) { return Promise.ofFail(PromiseCanceledException()) } + return SnodeAPI.getRawMessages(snode, userPublicKey).bind(SnodeAPI.messagePollingContext) { rawResponse -> + isCaughtUp = true + if (deferred.promise.isDone()) { + task { Unit } // The long polling connection has been canceled; don't recurse + } else { + val messages = SnodeAPI.parseRawMessagesResponse(rawResponse, snode, userPublicKey) + messages.forEach { message -> + val rawMessageAsJSON = message as? Map<*, *> + val base64EncodedData = rawMessageAsJSON?.get("data") as? String + val data = base64EncodedData?.let { Base64.decode(it) } ?: return@forEach + val job = MessageReceiveJob(MessageWrapper.unwrap(data), false) + JobQueue.shared.add(job) + } + poll(snode, deferred) + } + } + } + // endregion +}