|
|
|
@ -3,6 +3,11 @@ package org.session.libsession.messaging.jobs
|
|
|
|
|
import com.esotericsoftware.kryo.Kryo
|
|
|
|
|
import com.esotericsoftware.kryo.io.Input
|
|
|
|
|
import com.esotericsoftware.kryo.io.Output
|
|
|
|
|
import kotlinx.coroutines.flow.Flow
|
|
|
|
|
import kotlinx.coroutines.flow.filter
|
|
|
|
|
import kotlinx.coroutines.flow.first
|
|
|
|
|
import kotlinx.coroutines.flow.map
|
|
|
|
|
import kotlinx.coroutines.flow.onStart
|
|
|
|
|
import kotlinx.coroutines.withTimeout
|
|
|
|
|
import org.session.libsession.messaging.MessagingModuleConfiguration
|
|
|
|
|
import org.session.libsession.messaging.jobs.Job.Companion.MAX_BUFFER_SIZE_BYTES
|
|
|
|
@ -12,6 +17,9 @@ import org.session.libsession.messaging.messages.visible.VisibleMessage
|
|
|
|
|
import org.session.libsession.messaging.sending_receiving.MessageSender
|
|
|
|
|
import org.session.libsession.messaging.utilities.Data
|
|
|
|
|
import org.session.libsession.snode.utilities.await
|
|
|
|
|
import org.session.libsession.utilities.ConfigFactoryProtocol
|
|
|
|
|
import org.session.libsession.utilities.ConfigUpdateNotification
|
|
|
|
|
import org.session.libsignal.utilities.AccountId
|
|
|
|
|
import org.session.libsignal.utilities.HTTP
|
|
|
|
|
import org.session.libsignal.utilities.Log
|
|
|
|
|
|
|
|
|
@ -77,6 +85,12 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
withTimeout(20_000L) {
|
|
|
|
|
// Shouldn't send message to group when the group has no keys available
|
|
|
|
|
if (destination is Destination.ClosedGroup) {
|
|
|
|
|
MessagingModuleConfiguration.shared.configFactory
|
|
|
|
|
.waitForGroupEncryptionKeys(AccountId(destination.publicKey))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MessageSender.send(this@MessageSendJob.message, destination, isSync).await()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -92,6 +106,17 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private suspend fun ConfigFactoryProtocol.waitForGroupEncryptionKeys(groupId: AccountId) {
|
|
|
|
|
(configUpdateNotifications
|
|
|
|
|
.filter { it is ConfigUpdateNotification.GroupConfigsUpdated && it.groupId == groupId }
|
|
|
|
|
as Flow<*>
|
|
|
|
|
).onStart { emit(Unit) }
|
|
|
|
|
.filter {
|
|
|
|
|
withGroupConfigs(groupId) { configs -> configs.groupKeys.keys().isNotEmpty() }
|
|
|
|
|
}
|
|
|
|
|
.first()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private fun handleSuccess(dispatcherName: String) {
|
|
|
|
|
delegate?.handleJobSucceeded(this, dispatcherName)
|
|
|
|
|
}
|
|
|
|
|