@ -1,56 +1,33 @@
package org.session.libsession.messaging.sending_receiving.pollers
import android.content.Context
import android.os.Handler
import org.thoughtcrime.securesms.logging.Lo g
import androidx.annotation.WorkerThread
import com.google.protobuf.ByteStrin g
import nl.komponents.kovenant.Promise
import nl.komponents.kovenant.functional.bind
import nl.komponents.kovenant.functional.map
import org.thoughtcrime.securesms.ApplicationContext
import org.thoughtcrime.securesms.crypto.IdentityKeyUtil
import org.thoughtcrime.securesms.database.Address
import org.thoughtcrime.securesms.database.DatabaseFactory
import org.thoughtcrime.securesms.jobs.PushDecryptJob
import org.thoughtcrime.securesms.jobs.RetrieveProfileAvatarJob
import org.thoughtcrime.securesms.loki.protocol.SessionMetaProtocol
import org.thoughtcrime.securesms.loki.utilities.successBackground
import org.thoughtcrime.securesms.recipients.Recipient
import org.thoughtcrime.securesms.util.TextSecurePreferences
import org.session.libsignal.libsignal.util.guava.Optional
import org.session.libsignal.service.api.messages.SignalServiceAttachmentPointer
import org.session.libsignal.service.api.messages.SignalServiceContent
import org.session.libsignal.service.api.messages.SignalServiceDataMessage
import org.session.libsignal.service.api.messages.SignalServiceGroup
import org.session.libsignal.service.api.messages.multidevice.SentTranscriptMessage
import org.session.libsignal.service.api.push.SignalServiceAddress
import org.session.libsignal.service.loki.api.fileserver.FileServerAPI
import org.session.libsignal.service.loki.api.opengroups.PublicChat
import org.session.libsignal.service.loki.api.opengroups.PublicChatAPI
import org.session.libsignal.service.loki.api.opengroups.PublicChatMessage
import org.session.libsignal.service.loki.protocol.shelved.multidevice.MultiDeviceProtocol
import java.security.MessageDigest
import nl.komponents.kovenant.deferred
import org.session.libsession.messaging.MessagingConfiguration
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageReceiveJob
import org.session.libsession.messaging.opengroups.OpenGroup
import org.session.libsession.messaging.opengroups.OpenGroupAPI
import org.session.libsession.messaging.opengroups.OpenGroupMessage
import org.session.libsession.utilities.GroupUtil
import org.session.libsession.utilities.successBackground
import org.session.libsignal.libsignal.logging.Log
import org.session.libsignal.service.internal.push.SignalServiceProtos.*
import java.util.*
import java.util.concurrent.CompletableFuture
class OpenGroupPoller ( private val context: Context , private val group : PublicChat ) {
class OpenGroupPoller ( private val openGroup : OpenGroup ) {
private val handler by lazy { Handler ( ) }
private var hasStarted = false
private var isPollOngoing = false
public var isCaughtUp = false
// region Convenience
private val userHexEncodedPublicKey = TextSecurePreferences. getLocalNumber ( context )
private val userHexEncodedPublicKey = MessagingConfiguration . shared . storage . getUserPublicKey ( ) ?: " "
private var displayNameUpdatees = setOf < String > ( )
private val api : PublicChatAPI
get ( ) = {
val userPrivateKey = IdentityKeyUtil . getIdentityKeyPair ( context ) . privateKey . serialize ( )
val lokiAPIDatabase = DatabaseFactory . getLokiAPIDatabase ( context )
val lokiUserDatabase = DatabaseFactory . getLokiUserDatabase ( context )
val openGroupDatabase = DatabaseFactory . getGroupDatabase ( context )
PublicChatAPI ( userHexEncodedPublicKey , userPrivateKey , lokiAPIDatabase , lokiUserDatabase , openGroupDatabase )
} ( )
// endregion
// region Tasks
@ -116,169 +93,145 @@ class OpenGroupPoller(private val context: Context, private val group: PublicCha
// endregion
// region Polling
private fun getDataMessage ( message : PublicChatMessage ) : SignalServiceDataMessage {
val id = group . id . toByteArray ( )
val serviceGroup = SignalServiceGroup ( SignalServiceGroup . Type . UPDATE , id , SignalServiceGroup . GroupType . PUBLIC _CHAT , null , null , null , null )
val quote = if ( message . quote != null ) {
SignalServiceDataMessage . Quote ( message . quote !! . quotedMessageTimestamp , SignalServiceAddress ( message . quote !! . quoteePublicKey ) , message . quote !! . quotedMessageBody , listOf ( ) )
} else {
null
}
val attachments = message . attachments . mapNotNull { attachment ->
if ( attachment . kind != PublicChatMessage . Attachment . Kind . Attachment ) { return @mapNotNull null }
SignalServiceAttachmentPointer (
attachment . serverID ,
attachment . contentType ,
ByteArray ( 0 ) ,
Optional . of ( attachment . size ) ,
Optional . absent ( ) ,
attachment . width , attachment . height ,
Optional . absent ( ) ,
Optional . of ( attachment . fileName ) ,
false ,
Optional . fromNullable ( attachment . caption ) ,
attachment . url )
}
val linkPreview = message . attachments . firstOrNull { it . kind == PublicChatMessage . Attachment . Kind . LinkPreview }
val signalLinkPreviews = mutableListOf < SignalServiceDataMessage . Preview > ( )
if ( linkPreview != null ) {
val attachment = SignalServiceAttachmentPointer (
linkPreview . serverID ,
linkPreview . contentType ,
ByteArray ( 0 ) ,
Optional . of ( linkPreview . size ) ,
Optional . absent ( ) ,
linkPreview . width , linkPreview . height ,
Optional . absent ( ) ,
Optional . of ( linkPreview . fileName ) ,
false ,
Optional . fromNullable ( linkPreview . caption ) ,
linkPreview . url )
signalLinkPreviews . add ( SignalServiceDataMessage . Preview ( linkPreview . linkPreviewURL !! , linkPreview . linkPreviewTitle !! , Optional . of ( attachment ) ) )
}
val body = if ( message . body == message . timestamp . toString ( ) ) " " else message . body // Workaround for the fact that the back-end doesn't accept messages without a body
return SignalServiceDataMessage ( message . timestamp , serviceGroup , attachments , body , false , 0 , false , null , false , quote , null , signalLinkPreviews , null )
fun pollForNewMessages ( ) : Promise < Unit , Exception > {
return pollForNewMessages ( false )
}
fun pollForNewMessages ( ) : Promise < Unit , Exception > {
fun processIncomingMessage ( message : PublicChatMessage ) {
// If the sender of the current message is not a slave device, set the display name in the database
val masterHexEncodedPublicKey = MultiDeviceProtocol . shared . getMasterDevice ( message . senderPublicKey )
if ( masterHexEncodedPublicKey == null ) {
val senderDisplayName = " ${message.displayName} (... ${message.senderPublicKey.takeLast(8)} ) "
DatabaseFactory . getLokiUserDatabase ( context ) . setServerDisplayName ( group . id , message . senderPublicKey , senderDisplayName )
}
val senderHexEncodedPublicKey = masterHexEncodedPublicKey ?: message . senderPublicKey
val serviceDataMessage = getDataMessage ( message )
val serviceContent = SignalServiceContent ( serviceDataMessage , senderHexEncodedPublicKey , SignalServiceAddress . DEFAULT _DEVICE _ID , message . serverTimestamp , false , false )
if ( serviceDataMessage . quote . isPresent || ( serviceDataMessage . attachments . isPresent && serviceDataMessage . attachments . get ( ) . size > 0 ) || serviceDataMessage . previews . isPresent ) {
PushDecryptJob ( context ) . handleMediaMessage ( serviceContent , serviceDataMessage , Optional . absent ( ) , Optional . of ( message . serverID ) )
} else {
PushDecryptJob ( context ) . handleTextMessage ( serviceContent , serviceDataMessage , Optional . absent ( ) , Optional . of ( message . serverID ) )
}
// Update profile picture if needed
val senderAsRecipient = Recipient . from ( context , Address . fromSerialized ( senderHexEncodedPublicKey ) , false )
if ( message . profilePicture != null && message . profilePicture !! . url . isNotEmpty ( ) ) {
val profileKey = message . profilePicture !! . profileKey
val url = message . profilePicture !! . url
if ( senderAsRecipient . profileKey == null || ! MessageDigest . isEqual ( senderAsRecipient . profileKey , profileKey ) ) {
val database = DatabaseFactory . getRecipientDatabase ( context )
database . setProfileKey ( senderAsRecipient , profileKey )
ApplicationContext . getInstance ( context ) . jobManager . add ( RetrieveProfileAvatarJob ( senderAsRecipient , url ) )
}
}
}
fun processOutgoingMessage ( message : PublicChatMessage ) {
val messageServerID = message . serverID ?: return
val messageID = DatabaseFactory . getLokiMessageDatabase ( context ) . getMessageID ( messageServerID )
var isDuplicate = false
if ( messageID != null ) {
isDuplicate = DatabaseFactory . getMmsDatabase ( context ) . getThreadIdForMessage ( messageID ) >= 0
|| DatabaseFactory . getSmsDatabase ( context ) . getThreadIdForMessage ( messageID ) >= 0
}
if ( isDuplicate ) { return }
if ( message . body . isEmpty ( ) && message . attachments . isEmpty ( ) && message . quote == null ) { return }
val userHexEncodedPublicKey = TextSecurePreferences . getLocalNumber ( context )
val dataMessage = getDataMessage ( message )
SessionMetaProtocol . dropFromTimestampCacheIfNeeded ( message . serverTimestamp )
val transcript = SentTranscriptMessage ( userHexEncodedPublicKey , message . serverTimestamp , dataMessage , dataMessage . expiresInSeconds . toLong ( ) , Collections . singletonMap ( userHexEncodedPublicKey , false ) )
transcript . messageServerID = messageServerID
if ( dataMessage . quote . isPresent || ( dataMessage . attachments . isPresent && dataMessage . attachments . get ( ) . size > 0 ) || dataMessage . previews . isPresent ) {
PushDecryptJob ( context ) . handleSynchronizeSentMediaMessage ( transcript )
} else {
PushDecryptJob ( context ) . handleSynchronizeSentTextMessage ( transcript )
}
// If we got a message from our master device then make sure our mapping stays in sync
val recipient = Recipient . from ( context , Address . fromSerialized ( message . senderPublicKey ) , false )
if ( recipient . isUserMasterDevice && message . profilePicture != null ) {
val profileKey = message . profilePicture !! . profileKey
val url = message . profilePicture !! . url
if ( recipient . profileKey == null || ! MessageDigest . isEqual ( recipient . profileKey , profileKey ) ) {
val database = DatabaseFactory . getRecipientDatabase ( context )
database . setProfileKey ( recipient , profileKey )
database . setProfileAvatar ( recipient , url )
ApplicationContext . getInstance ( context ) . updateOpenGroupProfilePicturesIfNeeded ( )
}
}
}
fun pollForNewMessages ( isBackgroundPoll : Boolean ) : Promise < Unit , Exception > {
if ( isPollOngoing ) { return Promise . of ( Unit ) }
isPollOngoing = true
val userDevices = MultiDeviceProtocol . shared . getAllLinkedDevices ( userHexEncodedPublicKey )
var uniqueDevices = setOf < String > ( )
val userPrivateKey = IdentityKeyUtil . getIdentityKeyPair ( context ) . privateKey . serialize ( )
val apiDB = DatabaseFactory . getLokiAPIDatabase ( context )
FileServerAPI . configure ( userHexEncodedPublicKey , userPrivateKey , apiDB )
// Kovenant propagates a context to chained promises, so LokiPublicChatAPI.sharedContext should be used for all of the below
val promise = api . getMessages ( group . channel , group . server ) . bind ( PublicChatAPI . sharedContext ) { messages ->
/ *
if ( messages . isNotEmpty ( ) ) {
// We need to fetch the device mapping for any devices we don't have
uniqueDevices = messages . map { it . senderPublicKey } . toSet ( )
val devicesToUpdate = uniqueDevices . filter { ! userDevices . contains ( it ) && FileServerAPI . shared . hasDeviceLinkCacheExpired ( publicKey = it ) }
if ( devicesToUpdate . isNotEmpty ( ) ) {
return @bind FileServerAPI . shared . getDeviceLinks ( devicesToUpdate . toSet ( ) ) . then { messages }
}
}
* /
Promise . of ( messages )
}
promise . successBackground {
/ *
val newDisplayNameUpdatees = uniqueDevices . mapNotNull {
// This will return null if the current device is a master device
MultiDeviceProtocol . shared . getMasterDevice ( it )
} . toSet ( )
// Fetch the display names of the master devices
displayNameUpdatees = displayNameUpdatees . union ( newDisplayNameUpdatees )
* /
}
promise . successBackground { messages ->
val deferred = deferred < Unit , Exception > ( )
// Kovenant propagates a context to chained promises, so OpenGroupAPI.sharedContext should be used for all of the below
OpenGroupAPI . getMessages ( openGroup . channel , openGroup . server ) . successBackground { messages ->
// Process messages in the background
messages . forEach { message ->
if ( userDevices . contains ( message . senderPublicKey ) ) {
processOutgoingMessage ( message )
val senderPublicKey = message . senderPublicKey
val wasSentByCurrentUser = ( senderPublicKey == userHexEncodedPublicKey )
fun generateDisplayName ( rawDisplayName : String ) : String {
return " ${rawDisplayName} ( ${senderPublicKey.takeLast(8)} ) "
}
val senderDisplayName = MessagingConfiguration . shared . storage . getOpenGroupDisplayName ( senderPublicKey , openGroup . channel , openGroup . server ) ?: generateDisplayName ( " Anonymous " )
val id = GroupUtil . getEncodedOpenGroupIDAsData ( openGroup . id )
// Main message
val dataMessageProto = DataMessage . newBuilder ( )
val body = if ( message . body == message . timestamp . toString ( ) ) { " " } else { message . body }
dataMessageProto . setBody ( body )
dataMessageProto . setTimestamp ( message . timestamp )
// Attachments
val attachmentProtos = message . attachments . mapNotNull { attachment ->
if ( attachment . kind != OpenGroupMessage . Attachment . Kind . Attachment ) { return @mapNotNull null }
val attachmentProto = AttachmentPointer . newBuilder ( )
attachmentProto . setId ( attachment . serverID )
attachmentProto . setContentType ( attachment . contentType )
attachmentProto . setSize ( attachment . size )
attachmentProto . setFileName ( attachment . fileName )
attachmentProto . setFlags ( attachment . flags )
attachmentProto . setWidth ( attachment . width )
attachmentProto . setHeight ( attachment . height )
attachment . caption . let { attachmentProto . setCaption ( it ) }
attachmentProto . setUrl ( attachment . url )
attachmentProto . build ( )
}
dataMessageProto . addAllAttachments ( attachmentProtos )
// Link preview
val linkPreview = message . attachments . firstOrNull { it . kind == OpenGroupMessage . Attachment . Kind . LinkPreview }
if ( linkPreview != null ) {
val linkPreviewProto = DataMessage . Preview . newBuilder ( )
linkPreviewProto . setUrl ( linkPreview . linkPreviewURL !! )
linkPreviewProto . setTitle ( linkPreview . linkPreviewTitle !! )
val attachmentProto = AttachmentPointer . newBuilder ( )
attachmentProto . setId ( linkPreview . serverID )
attachmentProto . setContentType ( linkPreview . contentType )
attachmentProto . setSize ( linkPreview . size )
attachmentProto . setFileName ( linkPreview . fileName )
attachmentProto . setFlags ( linkPreview . flags )
attachmentProto . setWidth ( linkPreview . width )
attachmentProto . setHeight ( linkPreview . height )
linkPreview . caption . let { attachmentProto . setCaption ( it ) }
attachmentProto . setUrl ( linkPreview . url )
linkPreviewProto . setImage ( attachmentProto . build ( ) )
dataMessageProto . addPreview ( linkPreviewProto . build ( ) )
}
// Quote
val quote = message . quote
if ( quote != null ) {
val quoteProto = DataMessage . Quote . newBuilder ( )
quoteProto . setId ( quote . quotedMessageTimestamp )
quoteProto . setAuthor ( quote . quoteePublicKey )
if ( quote . quotedMessageBody != quote . quotedMessageTimestamp . toString ( ) ) { quoteProto . setText ( quote . quotedMessageBody ) }
dataMessageProto . setQuote ( quoteProto . build ( ) )
}
// Profile
val profileProto = DataMessage . LokiProfile . newBuilder ( )
profileProto . setDisplayName ( message . displayName )
val profilePicture = message . profilePicture
if ( profilePicture != null ) {
profileProto . setProfilePicture ( profilePicture . url )
dataMessageProto . setProfileKey ( ByteString . copyFrom ( profilePicture . profileKey ) )
}
dataMessageProto . setProfile ( profileProto . build ( ) )
// Open group info
val messageServerID = message . serverID
if ( messageServerID != null ) {
val openGroupProto = PublicChatInfo . newBuilder ( )
openGroupProto . setServerID ( messageServerID )
dataMessageProto . setPublicChatInfo ( openGroupProto . build ( ) )
}
// Signal group context
val groupProto = GroupContext . newBuilder ( )
groupProto . setId ( ByteString . copyFrom ( id ) )
groupProto . setType ( GroupContext . Type . DELIVER )
groupProto . setName ( openGroup . displayName )
dataMessageProto . setGroup ( groupProto . build ( ) )
// Content
val content = Content . newBuilder ( )
if ( ! wasSentByCurrentUser ) { // Incoming message
content . setDataMessage ( dataMessageProto . build ( ) )
} else { // Outgoing message
// FIXME: This needs to be updated as we removed sync message handling
val syncMessageSentBuilder = SyncMessage . Sent . newBuilder ( )
syncMessageSentBuilder . setMessage ( dataMessageProto )
syncMessageSentBuilder . setDestination ( userHexEncodedPublicKey )
syncMessageSentBuilder . setTimestamp ( message . timestamp )
val syncMessageSent = syncMessageSentBuilder . build ( )
val syncMessageBuilder = SyncMessage . newBuilder ( )
syncMessageBuilder . setSent ( syncMessageSent )
content . setSyncMessage ( syncMessageBuilder . build ( ) )
}
// Envelope
val builder = Envelope . newBuilder ( )
builder . type = Envelope . Type . UNIDENTIFIED _SENDER
builder . source = senderPublicKey
builder . sourceDevice = 1
builder . setContent ( content . build ( ) . toByteString ( ) )
builder . serverTimestamp = message . serverTimestamp
val envelope = builder . build ( )
val job = MessageReceiveJob ( envelope . toByteArray ( ) , isBackgroundPoll , messageServerID , openGroup . id )
if ( isBackgroundPoll ) {
job . executeAsync ( ) . success { deferred . resolve ( Unit ) } . fail { deferred . resolve ( Unit ) }
// The promise is just used to keep track of when we're done
} else {
processIncomingMessage ( message )
JobQueue . shared . add ( job )
deferred . resolve ( Unit )
}
}
isCaughtUp = true
isPollOngoing = false
}
promise . fail {
Log . d ( " Loki " , " Failed to get messages for group chat with ID: ${group.channel} on server: ${group.server} . " )
} . fail {
Log . d ( " Loki " , " Failed to get messages for group chat with ID: ${openGroup.channel} on server: ${openGroup.server} . " )
isPollOngoing = false
}
return promise . map { Unit }
return deferred. promise
}
private fun pollForDisplayNames ( ) {
if ( displayNameUpdatees . isEmpty ( ) ) { return }
val hexEncodedPublicKeys = displayNameUpdatees
displayNameUpdatees = setOf ( )
api . getDisplayNames ( hexEncodedPublicKeys , group . server ) . successBackground { mapping ->
OpenGroupAPI . getDisplayNames ( hexEncodedPublicKeys , openG roup. server ) . successBackground { mapping ->
for ( pair in mapping . entries ) {
val senderDisplayName = " ${pair.value} (... ${pair.key.takeLast(8)} ) "
DatabaseFactory . getLokiUserDatabase ( context ) . setServerDisplayName ( group . id , pair . key , senderDisplayName )
MessagingConfiguration. shared . storage . setOpenGroupDisplayName ( pair . key , openGroup . channel , openGroup . server , senderDisplayName )
}
} . fail {
displayNameUpdatees = displayNameUpdatees . union ( hexEncodedPublicKeys )
@ -286,22 +239,18 @@ class OpenGroupPoller(private val context: Context, private val group: PublicCha
}
private fun pollForDeletedMessages ( ) {
api . getDeletedMessageServerIDs ( group . channel , group . server ) . success { deletedMessageServerIDs ->
val lokiMessageDatabase = DatabaseFactory . getLokiMessageDatabase ( context )
val deletedMessageIDs = deletedMessageServerIDs . mapNotNull { lokiMessageDatabase . getMessageID ( it ) }
val smsMessageDatabase = DatabaseFactory . getSmsDatabase ( context )
val mmsMessageDatabase = DatabaseFactory . getMmsDatabase ( context )
OpenGroupAPI . getDeletedMessageServerIDs ( openGroup . channel , openGroup . server ) . success { deletedMessageServerIDs ->
val deletedMessageIDs = deletedMessageServerIDs . mapNotNull { MessagingConfiguration . shared . messageDataProvider . getMessageID ( it ) }
deletedMessageIDs . forEach {
smsMessageDatabase . deleteMessage ( it )
mmsMessageDatabase . delete ( it )
MessagingConfiguration . shared . messageDataProvider . deleteMessage ( it )
}
} . fail {
Log . d ( " Loki " , " Failed to get deleted messages for group chat with ID: ${ group.channel} on server: ${g roup.server}. " )
Log . d ( " Loki " , " Failed to get deleted messages for group chat with ID: ${ openGroup.channel} on server: ${openG roup.server}. " )
}
}
private fun pollForModerators ( ) {
api . getModerators ( group . channel , g roup. server )
OpenGroupAPI . getModerators ( openGroup . channel , openG roup. server )
}
// endregion
}