|
|
|
@ -42,10 +42,16 @@ public final class MessageSender {
|
|
|
|
|
|
|
|
|
|
// MARK: - Convenience
|
|
|
|
|
|
|
|
|
|
public static func sendImmediate(_ db: Database, message: Message, to destination: Message.Destination, interactionId: Int64?) throws -> Promise<Void> {
|
|
|
|
|
public static func sendImmediate(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
message: Message,
|
|
|
|
|
to destination: Message.Destination,
|
|
|
|
|
interactionId: Int64?,
|
|
|
|
|
isSyncMessage: Bool
|
|
|
|
|
) throws -> Promise<Void> {
|
|
|
|
|
switch destination {
|
|
|
|
|
case .contact, .closedGroup:
|
|
|
|
|
return try sendToSnodeDestination(db, message: message, to: destination, interactionId: interactionId)
|
|
|
|
|
return try sendToSnodeDestination(db, message: message, to: destination, interactionId: interactionId, isSyncMessage: isSyncMessage)
|
|
|
|
|
|
|
|
|
|
case .openGroup:
|
|
|
|
|
return sendToOpenGroupDestination(db, message: message, to: destination, interactionId: interactionId)
|
|
|
|
@ -65,7 +71,7 @@ public final class MessageSender {
|
|
|
|
|
isSyncMessage: Bool = false
|
|
|
|
|
) throws -> Promise<Void> {
|
|
|
|
|
let (promise, seal) = Promise<Void>.pending()
|
|
|
|
|
let userPublicKey: String = getUserHexEncodedPublicKey(db)
|
|
|
|
|
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
|
|
|
|
|
let messageSendTimestamp: Int64 = SnodeAPI.currentOffsetTimestampMs()
|
|
|
|
|
|
|
|
|
|
// Set the timestamp, sender and recipient
|
|
|
|
@ -73,7 +79,7 @@ public final class MessageSender {
|
|
|
|
|
message.sentTimestamp ?? // Visible messages will already have their sent timestamp set
|
|
|
|
|
UInt64(messageSendTimestamp)
|
|
|
|
|
)
|
|
|
|
|
message.sender = userPublicKey
|
|
|
|
|
message.sender = currentUserPublicKey
|
|
|
|
|
message.recipient = {
|
|
|
|
|
switch destination {
|
|
|
|
|
case .contact(let publicKey): return publicKey
|
|
|
|
@ -84,7 +90,7 @@ public final class MessageSender {
|
|
|
|
|
|
|
|
|
|
// Set the failure handler (need it here already for precondition failure handling)
|
|
|
|
|
func handleFailure(_ db: Database, with error: MessageSenderError) {
|
|
|
|
|
MessageSender.handleFailedMessageSend(db, message: message, with: error, interactionId: interactionId)
|
|
|
|
|
MessageSender.handleFailedMessageSend(db, message: message, with: error, interactionId: interactionId, isSyncMessage: isSyncMessage)
|
|
|
|
|
seal.reject(error)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -94,21 +100,8 @@ public final class MessageSender {
|
|
|
|
|
return promise
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stop here if this is a self-send, unless we should sync the message
|
|
|
|
|
let isSelfSend: Bool = (message.recipient == userPublicKey)
|
|
|
|
|
|
|
|
|
|
guard
|
|
|
|
|
!isSelfSend ||
|
|
|
|
|
isSyncMessage ||
|
|
|
|
|
Message.shouldSync(message: message)
|
|
|
|
|
else {
|
|
|
|
|
try MessageSender.handleSuccessfulMessageSend(db, message: message, to: destination, interactionId: interactionId)
|
|
|
|
|
seal.fulfill(())
|
|
|
|
|
return promise
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attach the user's profile if needed
|
|
|
|
|
if var messageWithProfile: MessageWithProfile = message as? MessageWithProfile {
|
|
|
|
|
if !isSyncMessage, var messageWithProfile: MessageWithProfile = message as? MessageWithProfile {
|
|
|
|
|
let profile: Profile = Profile.fetchOrCreateCurrentUser(db)
|
|
|
|
|
|
|
|
|
|
if let profileKey: Data = profile.profileEncryptionKey?.keyData, let profilePictureUrl: String = profile.profilePictureUrl {
|
|
|
|
@ -123,6 +116,9 @@ public final class MessageSender {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Perform any pre-send actions
|
|
|
|
|
handleMessageWillSend(db, message: message, interactionId: interactionId, isSyncMessage: isSyncMessage)
|
|
|
|
|
|
|
|
|
|
// Convert it to protobuf
|
|
|
|
|
guard let proto = message.toProto(db) else {
|
|
|
|
|
handleFailure(db, with: .protoConversionFailed)
|
|
|
|
@ -233,6 +229,9 @@ public final class MessageSender {
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
let shouldNotify: Bool = {
|
|
|
|
|
// Don't send a notification when sending messages in 'Note to Self'
|
|
|
|
|
guard message.recipient != currentUserPublicKey else { return false }
|
|
|
|
|
|
|
|
|
|
switch message {
|
|
|
|
|
case is VisibleMessage, is UnsendRequest: return !isSyncMessage
|
|
|
|
|
case let callMessage as CallMessage:
|
|
|
|
@ -319,7 +318,6 @@ public final class MessageSender {
|
|
|
|
|
dependencies: SMKDependencies = SMKDependencies()
|
|
|
|
|
) -> Promise<Void> {
|
|
|
|
|
let (promise, seal) = Promise<Void>.pending()
|
|
|
|
|
let threadId: String
|
|
|
|
|
|
|
|
|
|
// Set the timestamp, sender and recipient
|
|
|
|
|
if message.sentTimestamp == nil { // Visible messages will already have their sent timestamp set
|
|
|
|
@ -329,7 +327,6 @@ public final class MessageSender {
|
|
|
|
|
switch destination {
|
|
|
|
|
case .contact, .closedGroup, .openGroupInbox: preconditionFailure()
|
|
|
|
|
case .openGroup(let roomToken, let server, let whisperTo, let whisperMods, _):
|
|
|
|
|
threadId = OpenGroup.idFor(roomToken: roomToken, server: server)
|
|
|
|
|
message.recipient = [
|
|
|
|
|
server,
|
|
|
|
|
roomToken,
|
|
|
|
@ -344,34 +341,12 @@ public final class MessageSender {
|
|
|
|
|
// which would go into this case, so rather than handling it as an invalid state we just want to
|
|
|
|
|
// error in a non-retryable way
|
|
|
|
|
guard
|
|
|
|
|
let openGroup: OpenGroup = try? OpenGroup.fetchOne(db, id: threadId),
|
|
|
|
|
let userEdKeyPair: Box.KeyPair = Identity.fetchUserEd25519KeyPair(db),
|
|
|
|
|
case .openGroup(let roomToken, let server, let whisperTo, let whisperMods, let fileIds) = destination
|
|
|
|
|
else {
|
|
|
|
|
seal.reject(MessageSenderError.invalidMessage)
|
|
|
|
|
return promise
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
message.sender = {
|
|
|
|
|
let capabilities: [Capability.Variant] = (try? Capability
|
|
|
|
|
.select(.variant)
|
|
|
|
|
.filter(Capability.Columns.openGroupServer == server)
|
|
|
|
|
.filter(Capability.Columns.isMissing == false)
|
|
|
|
|
.asRequest(of: Capability.Variant.self)
|
|
|
|
|
.fetchAll(db))
|
|
|
|
|
.defaulting(to: [])
|
|
|
|
|
|
|
|
|
|
// If the server doesn't support blinding then go with an unblinded id
|
|
|
|
|
guard capabilities.isEmpty || capabilities.contains(.blind) else {
|
|
|
|
|
return SessionId(.unblinded, publicKey: userEdKeyPair.publicKey).hexString
|
|
|
|
|
}
|
|
|
|
|
guard let blindedKeyPair: Box.KeyPair = dependencies.sodium.blindedKeyPair(serverPublicKey: openGroup.publicKey, edKeyPair: userEdKeyPair, genericHash: dependencies.genericHash) else {
|
|
|
|
|
preconditionFailure()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return SessionId(.blinded, publicKey: blindedKeyPair.publicKey).hexString
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Set the failure handler (need it here already for precondition failure handling)
|
|
|
|
|
func handleFailure(_ db: Database, with error: MessageSenderError) {
|
|
|
|
|
MessageSender.handleFailedMessageSend(db, message: message, with: error, interactionId: interactionId)
|
|
|
|
@ -402,6 +377,9 @@ public final class MessageSender {
|
|
|
|
|
return promise
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Perform any pre-send actions
|
|
|
|
|
handleMessageWillSend(db, message: message, interactionId: interactionId)
|
|
|
|
|
|
|
|
|
|
// Convert it to protobuf
|
|
|
|
|
guard let proto = message.toProto(db) else {
|
|
|
|
|
handleFailure(db, with: .protoConversionFailed)
|
|
|
|
@ -465,7 +443,6 @@ public final class MessageSender {
|
|
|
|
|
dependencies: SMKDependencies = SMKDependencies()
|
|
|
|
|
) -> Promise<Void> {
|
|
|
|
|
let (promise, seal) = Promise<Void>.pending()
|
|
|
|
|
let userPublicKey: String = getUserHexEncodedPublicKey(db, dependencies: dependencies)
|
|
|
|
|
|
|
|
|
|
guard case .openGroupInbox(let server, let openGroupPublicKey, let recipientBlindedPublicKey) = destination else {
|
|
|
|
|
preconditionFailure()
|
|
|
|
@ -476,7 +453,6 @@ public final class MessageSender {
|
|
|
|
|
message.sentTimestamp = UInt64(SnodeAPI.currentOffsetTimestampMs())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
message.sender = userPublicKey
|
|
|
|
|
message.recipient = recipientBlindedPublicKey
|
|
|
|
|
|
|
|
|
|
// Set the failure handler (need it here already for precondition failure handling)
|
|
|
|
@ -501,6 +477,9 @@ public final class MessageSender {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Perform any pre-send actions
|
|
|
|
|
handleMessageWillSend(db, message: message, interactionId: interactionId)
|
|
|
|
|
|
|
|
|
|
// Convert it to protobuf
|
|
|
|
|
guard let proto = message.toProto(db) else {
|
|
|
|
|
handleFailure(db, with: .protoConversionFailed)
|
|
|
|
@ -569,6 +548,32 @@ public final class MessageSender {
|
|
|
|
|
|
|
|
|
|
// MARK: Success & Failure Handling
|
|
|
|
|
|
|
|
|
|
public static func handleMessageWillSend(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
message: Message,
|
|
|
|
|
interactionId: Int64?,
|
|
|
|
|
isSyncMessage: Bool = false
|
|
|
|
|
) {
|
|
|
|
|
// If the message was a reaction then we don't want to do anything to the original
|
|
|
|
|
// interaction (which the 'interactionId' is pointing to
|
|
|
|
|
guard (message as? VisibleMessage)?.reaction == nil else { return }
|
|
|
|
|
|
|
|
|
|
// Mark messages as "sending"/"syncing" if needed (this is for retries)
|
|
|
|
|
_ = try? RecipientState
|
|
|
|
|
.filter(RecipientState.Columns.interactionId == interactionId)
|
|
|
|
|
.filter(isSyncMessage ?
|
|
|
|
|
RecipientState.Columns.state == RecipientState.State.failedToSync :
|
|
|
|
|
RecipientState.Columns.state == RecipientState.State.failed
|
|
|
|
|
)
|
|
|
|
|
.updateAll(
|
|
|
|
|
db,
|
|
|
|
|
RecipientState.Columns.state.set(to: isSyncMessage ?
|
|
|
|
|
RecipientState.State.syncing :
|
|
|
|
|
RecipientState.State.sending
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static func handleSuccessfulMessageSend(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
message: Message,
|
|
|
|
@ -578,7 +583,7 @@ public final class MessageSender {
|
|
|
|
|
isSyncMessage: Bool = false
|
|
|
|
|
) throws {
|
|
|
|
|
// If the message was a reaction then we want to update the reaction instead of the original
|
|
|
|
|
// interaciton (which the 'interactionId' is pointing to
|
|
|
|
|
// interaction (which the 'interactionId' is pointing to
|
|
|
|
|
if let visibleMessage: VisibleMessage = message as? VisibleMessage, let reaction: VisibleMessage.VMReaction = visibleMessage.reaction {
|
|
|
|
|
try Reaction
|
|
|
|
|
.filter(Reaction.Columns.interactionId == interactionId)
|
|
|
|
@ -597,7 +602,6 @@ public final class MessageSender {
|
|
|
|
|
// real message has no use when we delete a message. It is OK to let it be.
|
|
|
|
|
try interaction.with(
|
|
|
|
|
serverHash: message.serverHash,
|
|
|
|
|
|
|
|
|
|
// Track the open group server message ID and update server timestamp (use server
|
|
|
|
|
// timestamp for open group messages otherwise the quote messages may not be able
|
|
|
|
|
// to be found by the timestamp on other devices
|
|
|
|
@ -610,6 +614,7 @@ public final class MessageSender {
|
|
|
|
|
|
|
|
|
|
// Mark the message as sent
|
|
|
|
|
try interaction.recipientStates
|
|
|
|
|
.filter(RecipientState.Columns.state != RecipientState.State.sent)
|
|
|
|
|
.updateAll(db, RecipientState.Columns.state.set(to: RecipientState.State.sent))
|
|
|
|
|
|
|
|
|
|
// Start the disappearing messages timer if needed
|
|
|
|
@ -624,18 +629,20 @@ public final class MessageSender {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let threadId: String = {
|
|
|
|
|
switch destination {
|
|
|
|
|
case .contact(let publicKey): return publicKey
|
|
|
|
|
case .closedGroup(let groupPublicKey): return groupPublicKey
|
|
|
|
|
case .openGroup(let roomToken, let server, _, _, _):
|
|
|
|
|
return OpenGroup.idFor(roomToken: roomToken, server: server)
|
|
|
|
|
|
|
|
|
|
case .openGroupInbox(_, _, let blindedPublicKey): return blindedPublicKey
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Prevent ControlMessages from being handled multiple times if not supported
|
|
|
|
|
try? ControlMessageProcessRecord(
|
|
|
|
|
threadId: {
|
|
|
|
|
switch destination {
|
|
|
|
|
case .contact(let publicKey): return publicKey
|
|
|
|
|
case .closedGroup(let groupPublicKey): return groupPublicKey
|
|
|
|
|
case .openGroup(let roomToken, let server, _, _, _):
|
|
|
|
|
return OpenGroup.idFor(roomToken: roomToken, server: server)
|
|
|
|
|
|
|
|
|
|
case .openGroupInbox(_, _, let blindedPublicKey): return blindedPublicKey
|
|
|
|
|
}
|
|
|
|
|
}(),
|
|
|
|
|
threadId: threadId,
|
|
|
|
|
message: message,
|
|
|
|
|
serverExpirationTimestamp: (
|
|
|
|
|
(TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000) +
|
|
|
|
@ -643,35 +650,27 @@ public final class MessageSender {
|
|
|
|
|
)
|
|
|
|
|
)?.insert(db)
|
|
|
|
|
|
|
|
|
|
// Sync the message if:
|
|
|
|
|
// • it's a visible message or an expiration timer update
|
|
|
|
|
// • the destination was a contact
|
|
|
|
|
// • we didn't sync it already
|
|
|
|
|
let userPublicKey = getUserHexEncodedPublicKey(db)
|
|
|
|
|
if case .contact(let publicKey) = destination, !isSyncMessage {
|
|
|
|
|
if let message = message as? VisibleMessage { message.syncTarget = publicKey }
|
|
|
|
|
if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey }
|
|
|
|
|
|
|
|
|
|
// FIXME: Make this a job
|
|
|
|
|
try sendToSnodeDestination(
|
|
|
|
|
db,
|
|
|
|
|
message: message,
|
|
|
|
|
to: .contact(publicKey: userPublicKey),
|
|
|
|
|
interactionId: interactionId,
|
|
|
|
|
isSyncMessage: true
|
|
|
|
|
).retainUntilComplete()
|
|
|
|
|
}
|
|
|
|
|
// Sync the message if needed
|
|
|
|
|
scheduleSyncMessageIfNeeded(
|
|
|
|
|
db,
|
|
|
|
|
message: message,
|
|
|
|
|
destination: destination,
|
|
|
|
|
threadId: threadId,
|
|
|
|
|
interactionId: interactionId,
|
|
|
|
|
isAlreadySyncMessage: isSyncMessage
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static func handleFailedMessageSend(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
message: Message,
|
|
|
|
|
with error: MessageSenderError,
|
|
|
|
|
interactionId: Int64?
|
|
|
|
|
interactionId: Int64?,
|
|
|
|
|
isSyncMessage: Bool = false
|
|
|
|
|
) {
|
|
|
|
|
// TODO: Revert the local database change
|
|
|
|
|
// If the message was a reaction then we don't want to do anything to the original
|
|
|
|
|
// interaciton (which the 'interactionId' is pointing to
|
|
|
|
|
// interaction (which the 'interactionId' is pointing to
|
|
|
|
|
guard (message as? VisibleMessage)?.reaction == nil else { return }
|
|
|
|
|
|
|
|
|
|
// Check if we need to mark any "sending" recipients as "failed"
|
|
|
|
@ -682,7 +681,12 @@ public final class MessageSender {
|
|
|
|
|
let rowIds: [Int64] = (try? RecipientState
|
|
|
|
|
.select(Column.rowID)
|
|
|
|
|
.filter(RecipientState.Columns.interactionId == interactionId)
|
|
|
|
|
.filter(RecipientState.Columns.state == RecipientState.State.sending)
|
|
|
|
|
.filter(!isSyncMessage ?
|
|
|
|
|
RecipientState.Columns.state == RecipientState.State.sending : (
|
|
|
|
|
RecipientState.Columns.state == RecipientState.State.syncing ||
|
|
|
|
|
RecipientState.Columns.state == RecipientState.State.sent
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
.asRequest(of: Int64.self)
|
|
|
|
|
.fetchAll(db))
|
|
|
|
|
.defaulting(to: [])
|
|
|
|
@ -697,7 +701,9 @@ public final class MessageSender {
|
|
|
|
|
.filter(rowIds.contains(Column.rowID))
|
|
|
|
|
.updateAll(
|
|
|
|
|
db,
|
|
|
|
|
RecipientState.Columns.state.set(to: RecipientState.State.failed),
|
|
|
|
|
RecipientState.Columns.state.set(
|
|
|
|
|
to: (isSyncMessage ? RecipientState.State.failedToSync : RecipientState.State.failed)
|
|
|
|
|
),
|
|
|
|
|
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
@ -719,6 +725,43 @@ public final class MessageSender {
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static func scheduleSyncMessageIfNeeded(
|
|
|
|
|
_ db: Database,
|
|
|
|
|
message: Message,
|
|
|
|
|
destination: Message.Destination,
|
|
|
|
|
threadId: String?,
|
|
|
|
|
interactionId: Int64?,
|
|
|
|
|
isAlreadySyncMessage: Bool
|
|
|
|
|
) {
|
|
|
|
|
// Sync the message if it's not a sync message, wasn't already sent to the current user and
|
|
|
|
|
// it's a message type which should be synced
|
|
|
|
|
let currentUserPublicKey = getUserHexEncodedPublicKey(db)
|
|
|
|
|
|
|
|
|
|
if
|
|
|
|
|
case .contact(let publicKey) = destination,
|
|
|
|
|
!isAlreadySyncMessage,
|
|
|
|
|
publicKey != currentUserPublicKey,
|
|
|
|
|
Message.shouldSync(message: message)
|
|
|
|
|
{
|
|
|
|
|
if let message = message as? VisibleMessage { message.syncTarget = publicKey }
|
|
|
|
|
if let message = message as? ExpirationTimerUpdate { message.syncTarget = publicKey }
|
|
|
|
|
|
|
|
|
|
JobRunner.add(
|
|
|
|
|
db,
|
|
|
|
|
job: Job(
|
|
|
|
|
variant: .messageSend,
|
|
|
|
|
threadId: threadId,
|
|
|
|
|
interactionId: interactionId,
|
|
|
|
|
details: MessageSendJob.Details(
|
|
|
|
|
destination: .contact(publicKey: currentUserPublicKey),
|
|
|
|
|
message: message,
|
|
|
|
|
isSyncMessage: true
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MARK: - Objective-C Support
|
|
|
|
|