Fixed a few bugs with sync messages

Fixed an issue where sync messages were failing to send if the message wasn't disappearing
Fixed an issue where the 'Failed to Sync' statuses wouldn't be shown
Updated the MessageSenderError, SnodeAPIError and OnionRequestAPIError to actually output our error strings when included in strings
Updated the tryFlatMapWithRandomSnode function to include context of the last error thrown
Cleaned up the 'isSyncMessage' logic
pull/959/head
Morgan Pretty 1 year ago
parent c35f712d93
commit 686768f8b2

@ -133,4 +133,4 @@ SPEC CHECKSUMS:
PODFILE CHECKSUM: 2c877a533db6e82eaa94407c95be114d80c2f893
COCOAPODS: 1.15.0
COCOAPODS: 1.14.3

@ -2807,11 +2807,11 @@
C300A5BB2554AFFB00555489 /* Messages */ = {
isa = PBXGroup;
children = (
C300A5C62554B02D00555489 /* Visible Messages */,
C300A5C72554B03900555489 /* Control Messages */,
C3C2A74325539EB700C340D1 /* Message.swift */,
C352A30825574D8400338F3E /* Message+Destination.swift */,
943C6D812B75E061004ACE64 /* Message+DisappearingMessages.swift */,
C300A5C62554B02D00555489 /* Visible Messages */,
C300A5C72554B03900555489 /* Control Messages */,
);
path = Messages;
sourceTree = "<group>";
@ -7147,6 +7147,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SignalUtilitiesKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
@ -7291,6 +7292,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionSnodeKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_OPTIMIZATION_LEVEL = "-Onone";
@ -7592,6 +7594,7 @@
PRODUCT_BUNDLE_IDENTIFIER = "com.loki-project.SessionMessagingKit";
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = NO;
SUPPORTS_MACCATALYST = NO;
SWIFT_ACTIVE_COMPILATION_CONDITIONS = DEBUG;
SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\"";
@ -7697,6 +7700,7 @@
PRODUCT_NAME = "$(TARGET_NAME:c99extidentifier)";
SDKROOT = iphoneos;
SKIP_INSTALL = YES;
STRIP_INSTALLED_PRODUCT = YES;
SUPPORTS_MACCATALYST = NO;
SWIFT_COMPILATION_MODE = wholemodule;
SWIFT_INCLUDE_PATHS = "$(inherited) \"${PODS_XCFRAMEWORKS_BUILD_DIR}/Clibsodium\" \"$(TARGET_BUILD_DIR)/libSessionUtil\"";
@ -7921,6 +7925,7 @@
PROVISIONING_PROFILE_SPECIFIER = "";
RUN_CLANG_STATIC_ANALYZER = YES;
SDKROOT = iphoneos;
STRIP_INSTALLED_PRODUCT = NO;
SWIFT_OBJC_BRIDGING_HEADER = "Session/Meta/Signal-Bridging-Header.h";
SWIFT_OBJC_INTERFACE_HEADER_NAME = "Session-Swift.h";
SWIFT_OPTIMIZATION_LEVEL = "-Onone";

@ -221,18 +221,15 @@ final class NewDMVC: BaseVC, UIPageViewControllerDataSource, UIPageViewControlle
case .finished: break
case .failure(let error):
modalActivityIndicator.dismiss {
var messageOrNil: String?
let message: String = {
if let error = error as? SnodeAPIError {
switch error {
case .decryptionFailed, .hashingFailed, .validationFailed:
messageOrNil = error.errorDescription
return "\(error)"
default: break
}
}
let message: String = {
if let messageOrNil: String = messageOrNil {
return messageOrNil
}
return (maybeSessionId?.prefix == .blinded15 || maybeSessionId?.prefix == .blinded25 ?
"DM_ERROR_DIRECT_BLINDED_ID".localized() :

@ -63,8 +63,8 @@ public enum AttachmentUploadJob: JobExecutor {
MessageSender.handleMessageWillSend(
db,
message: details.message,
interactionId: interactionId,
isSyncMessage: details.isSyncMessage
destination: details.destination,
interactionId: interactionId
)
}
@ -93,9 +93,9 @@ public enum AttachmentUploadJob: JobExecutor {
MessageSender.handleFailedMessageSend(
db,
message: details.message,
destination: nil,
with: .other(error),
interactionId: interactionId,
isSyncMessage: details.isSyncMessage,
using: dependencies
)
}

@ -51,7 +51,6 @@ public enum GroupLeavingJob: JobExecutor {
to: destination,
namespace: destination.defaultNamespace,
interactionId: job.interactionId,
isSyncMessage: false,
using: dependencies
)
}

@ -176,7 +176,6 @@ public enum MessageSendJob: JobExecutor {
to: details.destination,
namespace: details.destination.defaultNamespace,
interactionId: job.interactionId,
isSyncMessage: details.isSyncMessage,
using: dependencies
)
}
@ -197,7 +196,7 @@ public enum MessageSendJob: JobExecutor {
SNLog("[MessageSendJob] Couldn't send message due to error: \(error) (paths: \(OnionRequestAPI.paths.prettifiedDescription)).")
default:
SNLog("[MessageSendJob] Couldn't send message due to error: \(error).")
SNLog("[MessageSendJob] Couldn't send message due to error: \(error)")
}
// Actual error handling
@ -240,25 +239,22 @@ extension MessageSendJob {
private enum CodingKeys: String, CodingKey {
case destination
case message
case isSyncMessage
@available(*, deprecated, message: "replaced by 'Message.Destination.syncMessage'") case isSyncMessage
case variant
}
public let destination: Message.Destination
public let message: Message
public let isSyncMessage: Bool
public let variant: Message.Variant?
// MARK: - Initialization
public init(
destination: Message.Destination,
message: Message,
isSyncMessage: Bool = false
message: Message
) {
self.destination = destination
self.message = message
self.isSyncMessage = isSyncMessage
self.variant = Message.Variant(from: message)
}
@ -272,10 +268,42 @@ extension MessageSendJob {
throw StorageError.decodingFailed
}
let message: Message = try variant.decode(from: container, forKey: .message)
var destination: Message.Destination = try container.decode(Message.Destination.self, forKey: .destination)
/// Handle the legacy 'isSyncMessage' flag - this flag was deprecated in `2.5.2` (April 2024) and can be removed in a
/// subsequent release after May 2024
if ((try? container.decode(Bool.self, forKey: .isSyncMessage)) ?? false) {
switch (destination, message) {
case (.contact, let message as VisibleMessage):
guard let targetPublicKey: String = message.syncTarget else {
SNLog("Unable to decode messageSend job due to missing syncTarget")
throw StorageError.decodingFailed
}
destination = .syncMessage(originalRecipientPublicKey: targetPublicKey)
case (.contact, let message as ExpirationTimerUpdate):
guard let targetPublicKey: String = message.syncTarget else {
SNLog("Unable to decode messageSend job due to missing syncTarget")
throw StorageError.decodingFailed
}
destination = .syncMessage(originalRecipientPublicKey: targetPublicKey)
case (.contact(let publicKey), _):
SNLog("Sync message in messageSend job was missing explicit syncTarget (falling back to specified value)")
destination = .syncMessage(originalRecipientPublicKey: publicKey)
default:
SNLog("Unable to decode messageSend job due to invalid sync message state")
throw StorageError.decodingFailed
}
}
self = Details(
destination: try container.decode(Message.Destination.self, forKey: .destination),
message: try variant.decode(from: container, forKey: .message),
isSyncMessage: ((try? container.decode(Bool.self, forKey: .isSyncMessage)) ?? false)
destination: destination,
message: message
)
}
@ -289,7 +317,6 @@ extension MessageSendJob {
try container.encode(destination, forKey: .destination)
try container.encode(message, forKey: .message)
try container.encode(isSyncMessage, forKey: .isSyncMessage)
try container.encode(variant, forKey: .variant)
}
}

@ -43,8 +43,7 @@ public enum SendReadReceiptsJob: JobExecutor {
),
to: details.destination,
namespace: details.destination.defaultNamespace,
interactionId: nil,
isSyncMessage: false
interactionId: nil
)
}
.flatMap { MessageSender.sendImmediate(data: $0, using: dependencies) }

@ -7,8 +7,16 @@ import SessionUtilitiesKit
public extension Message {
enum Destination: Codable, Hashable {
/// A message directed to another user
case contact(publicKey: String)
/// A message that was originally sent to another user but needs to be replicated to the current users swarm
case syncMessage(originalRecipientPublicKey: String)
/// A message directed to group conversation
case closedGroup(groupPublicKey: String)
/// A message directed to an open group
case openGroup(
roomToken: String,
server: String,
@ -16,13 +24,15 @@ public extension Message {
whisperMods: Bool = false,
fileIds: [String]? = nil
)
/// A message directed to an open group inbox
case openGroupInbox(server: String, openGroupPublicKey: String, blindedPublicKey: String)
public var defaultNamespace: SnodeAPI.Namespace? {
switch self {
case .contact: return .`default`
case .contact, .syncMessage: return .`default`
case .closedGroup: return .legacyClosedGroup
default: return nil
case .openGroup, .openGroupInbox: return nil
}
}

@ -246,7 +246,7 @@ public extension Message {
static func threadId(forMessage message: Message, destination: Message.Destination) -> String {
switch destination {
case .contact(let publicKey):
case .contact(let publicKey), .syncMessage(let publicKey):
// Extract the 'syncTarget' value if there is one
let maybeSyncTarget: String?
@ -661,29 +661,24 @@ public extension Message {
internal static func getSpecifiedTTL(
message: Message,
isGroupMessage: Bool,
isSyncMessage: Bool
destination: Message.Destination
) -> UInt64 {
guard Features.useNewDisappearingMessagesConfig else { return message.ttl }
// Not disappearing messages
guard let expiresInSeconds = message.expiresInSeconds else { return message.ttl }
// Sync message should be read already, it is the same for disappear after read and disappear after sent
guard !isSyncMessage else { return UInt64(expiresInSeconds * 1000) }
// Disappear after read messages that have not be read
guard let expiresStartedAtMs = message.expiresStartedAtMs else { return message.ttl }
// Disappear after read messages that have already be read
guard message.sentTimestamp == UInt64(expiresStartedAtMs) else { return message.ttl }
switch (destination, message) {
// Disappear after sent messages with exceptions
switch message {
case is ClosedGroupControlMessage, is UnsendRequest:
case (_, is UnsendRequest): return message.ttl
case (.closedGroup, is ClosedGroupControlMessage), (.closedGroup, is ExpirationTimerUpdate):
return message.ttl
case is ExpirationTimerUpdate:
return isGroupMessage ? message.ttl : UInt64(expiresInSeconds * 1000)
default:
guard
let expiresInSeconds = message.expiresInSeconds, // Not disappearing messages
expiresInSeconds > 0, // Not disappearing messages (0 == disabled)
let expiresStartedAtMs = message.expiresStartedAtMs, // Unread disappear after read message
message.sentTimestamp == UInt64(expiresStartedAtMs) // Already read disappear after read message
else { return message.ttl }
return UInt64(expiresInSeconds * 1000)
}
}

@ -4,7 +4,7 @@
import Foundation
public enum MessageSenderError: LocalizedError, Equatable {
public enum MessageSenderError: Error, CustomStringConvertible, Equatable {
case invalidMessage
case protoConversionFailed
case noUserX25519KeyPair
@ -33,24 +33,28 @@ public enum MessageSenderError: LocalizedError, Equatable {
}
}
public var errorDescription: String? {
public var description: String {
switch self {
case .invalidMessage: return "Invalid message."
case .protoConversionFailed: return "Couldn't convert message to proto."
case .noUserX25519KeyPair: return "Couldn't find user X25519 key pair."
case .noUserED25519KeyPair: return "Couldn't find user ED25519 key pair."
case .signingFailed: return "Couldn't sign message."
case .encryptionFailed: return "Couldn't encrypt message."
case .noUsername: return "Missing username."
case .attachmentsNotUploaded: return "Attachments for this message have not been uploaded."
case .blindingFailed: return "Couldn't blind the sender"
case .sendJobTimeout: return "Send job timeout (likely due to path building taking too long)."
case .invalidMessage: return "Invalid message (MessageSenderError.invalidMessage)."
case .protoConversionFailed: return "Couldn't convert message to proto (MessageSenderError.protoConversionFailed)."
case .noUserX25519KeyPair: return "Couldn't find user X25519 key pair (MessageSenderError.noUserX25519KeyPair)."
case .noUserED25519KeyPair: return "Couldn't find user ED25519 key pair (MessageSenderError.noUserED25519KeyPair)."
case .signingFailed: return "Couldn't sign message (MessageSenderError.signingFailed)."
case .encryptionFailed: return "Couldn't encrypt message (MessageSenderError.encryptionFailed)."
case .noUsername: return "Missing username (MessageSenderError.noUsername)."
case .attachmentsNotUploaded: return "Attachments for this message have not been uploaded (MessageSenderError.attachmentsNotUploaded)."
case .blindingFailed: return "Couldn't blind the sender (MessageSenderError.blindingFailed)."
case .sendJobTimeout: return "Send job timeout (likely due to path building taking too long - MessageSenderError.sendJobTimeout)."
// Closed groups
case .noThread: return "Couldn't find a thread associated with the given group public key."
case .noKeyPair: return "Couldn't find a private key associated with the given group public key."
case .invalidClosedGroupUpdate: return "Invalid group update."
case .other(let error): return error.localizedDescription
case .noThread: return "Couldn't find a thread associated with the given group public key (MessageSenderError.noThread)."
case .noKeyPair: return "Couldn't find a private key associated with the given group public key (MessageSenderError.noKeyPair)."
case .invalidClosedGroupUpdate: return "Invalid group update (MessageSenderError.invalidClosedGroupUpdate)."
case .other(let error):
switch error {
case is CustomStringConvertible: return "\(error)"
default: return error.localizedDescription
}
}
}

@ -70,7 +70,6 @@ extension MessageSender {
destination: destination,
threadId: threadId,
interactionId: interactionId,
isAlreadySyncMessage: false,
using: dependencies
)
return
@ -84,8 +83,7 @@ extension MessageSender {
interactionId: interactionId,
details: MessageSendJob.Details(
destination: destination,
message: message,
isSyncMessage: isSyncMessage
message: message
)
),
canStartJob: true,
@ -132,6 +130,7 @@ extension MessageSender {
let threadId: String = {
switch preparedSendData.destination {
case .contact(let publicKey): return publicKey
case .syncMessage(let originalRecipientPublicKey): return originalRecipientPublicKey
case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup(let roomToken, let server, _, _, _):
return OpenGroup.idFor(roomToken: roomToken, server: server)

@ -17,7 +17,6 @@ public final class MessageSender {
let message: Message?
let interactionId: Int64?
let isSyncMessage: Bool?
let totalAttachmentsUploaded: Int
let snodeMessage: SnodeMessage?
@ -30,7 +29,6 @@ public final class MessageSender {
destination: Message.Destination,
namespace: SnodeAPI.Namespace?,
interactionId: Int64?,
isSyncMessage: Bool?,
totalAttachmentsUploaded: Int = 0,
snodeMessage: SnodeMessage?,
plaintext: Data?,
@ -42,7 +40,6 @@ public final class MessageSender {
self.destination = destination
self.namespace = namespace
self.interactionId = interactionId
self.isSyncMessage = isSyncMessage
self.totalAttachmentsUploaded = totalAttachmentsUploaded
self.snodeMessage = snodeMessage
@ -56,7 +53,6 @@ public final class MessageSender {
destination: Message.Destination,
namespace: SnodeAPI.Namespace,
interactionId: Int64?,
isSyncMessage: Bool?,
snodeMessage: SnodeMessage
) {
self.shouldSend = true
@ -65,7 +61,6 @@ public final class MessageSender {
self.destination = destination
self.namespace = namespace
self.interactionId = interactionId
self.isSyncMessage = isSyncMessage
self.totalAttachmentsUploaded = 0
self.snodeMessage = snodeMessage
@ -86,7 +81,6 @@ public final class MessageSender {
self.destination = destination
self.namespace = nil
self.interactionId = interactionId
self.isSyncMessage = false
self.totalAttachmentsUploaded = 0
self.snodeMessage = nil
@ -107,7 +101,6 @@ public final class MessageSender {
self.destination = destination
self.namespace = nil
self.interactionId = interactionId
self.isSyncMessage = false
self.totalAttachmentsUploaded = 0
self.snodeMessage = nil
@ -124,7 +117,6 @@ public final class MessageSender {
destination: destination.with(fileIds: fileIds),
namespace: namespace,
interactionId: interactionId,
isSyncMessage: isSyncMessage,
totalAttachmentsUploaded: fileIds.count,
snodeMessage: snodeMessage,
plaintext: plaintext,
@ -139,7 +131,6 @@ public final class MessageSender {
to destination: Message.Destination,
namespace: SnodeAPI.Namespace?,
interactionId: Int64?,
isSyncMessage: Bool = false,
using dependencies: Dependencies = Dependencies()
) throws -> PreparedSendData {
// Common logic for all destinations
@ -154,7 +145,7 @@ public final class MessageSender {
)
switch destination {
case .contact, .closedGroup:
case .contact, .syncMessage, .closedGroup:
return try prepareSendToSnodeDestination(
db,
message: updatedMessage,
@ -163,7 +154,6 @@ public final class MessageSender {
interactionId: interactionId,
userPublicKey: currentUserPublicKey,
messageSendTimestamp: messageSendTimestamp,
isSyncMessage: isSyncMessage,
using: dependencies
)
@ -198,13 +188,13 @@ public final class MessageSender {
interactionId: Int64?,
userPublicKey: String,
messageSendTimestamp: Int64,
isSyncMessage: Bool = false,
using dependencies: Dependencies
) throws -> PreparedSendData {
message.sender = userPublicKey
message.recipient = {
switch destination {
case .contact(let publicKey): return publicKey
case .syncMessage: return userPublicKey
case .closedGroup(let groupPublicKey): return groupPublicKey
case .openGroup, .openGroupInbox: preconditionFailure()
}
@ -215,6 +205,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .invalidMessage,
interactionId: interactionId,
using: dependencies
@ -223,9 +214,9 @@ public final class MessageSender {
// Attach the user's profile if needed (no need to do so for 'Note to Self' or sync
// messages as they will be managed by the user config handling
let isSelfSend: Bool = (message.recipient == userPublicKey)
if !isSelfSend, !isSyncMessage, var messageWithProfile: MessageWithProfile = message as? MessageWithProfile {
switch (destination, (message.recipient == userPublicKey), message as? MessageWithProfile) {
case (.syncMessage, _, _), (_, true, _), (_, _, .none): break
case (_, _, .some(var messageWithProfile)):
let profile: Profile = Profile.fetchOrCreateCurrentUser(db)
if let profileKey: Data = profile.profileEncryptionKey, let profilePictureUrl: String = profile.profilePictureUrl {
@ -241,7 +232,7 @@ public final class MessageSender {
}
// Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId, isSyncMessage: isSyncMessage)
handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf
let threadId: String = Message.threadId(forMessage: message, destination: destination)
@ -250,6 +241,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .protoConversionFailed,
interactionId: interactionId,
using: dependencies
@ -267,6 +259,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -280,6 +273,9 @@ public final class MessageSender {
case .contact(let publicKey):
ciphertext = try encryptWithSessionProtocol(db, plaintext: plaintext, for: publicKey, using: dependencies)
case .syncMessage:
ciphertext = try encryptWithSessionProtocol(db, plaintext: plaintext, for: userPublicKey, using: dependencies)
case .closedGroup(let groupPublicKey):
guard let encryptionKeyPair: ClosedGroupKeyPair = try? ClosedGroupKeyPair.fetchLatestKeyPair(db, threadId: groupPublicKey) else {
throw MessageSenderError.noKeyPair
@ -300,6 +296,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -311,7 +308,7 @@ public final class MessageSender {
let senderPublicKey: String
switch destination {
case .contact:
case .contact, .syncMessage:
kind = .sessionMessage
senderPublicKey = ""
@ -336,6 +333,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -350,13 +348,7 @@ public final class MessageSender {
data: base64EncodedData,
ttl: Message.getSpecifiedTTL(
message: message,
isGroupMessage: {
switch destination {
case .closedGroup: return true
default: return false
}
}(),
isSyncMessage: isSyncMessage
destination: destination
),
timestampMs: UInt64(messageSendTimestamp)
)
@ -366,7 +358,6 @@ public final class MessageSender {
destination: destination,
namespace: namespace,
interactionId: interactionId,
isSyncMessage: isSyncMessage,
snodeMessage: snodeMessage
)
}
@ -382,7 +373,7 @@ public final class MessageSender {
let threadId: String
switch destination {
case .contact, .closedGroup, .openGroupInbox: preconditionFailure()
case .contact, .syncMessage, .closedGroup, .openGroupInbox: preconditionFailure()
case .openGroup(let roomToken, let server, let whisperTo, let whisperMods, _):
threadId = OpenGroup.idFor(roomToken: roomToken, server: server)
message.recipient = [
@ -439,6 +430,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .invalidMessage,
interactionId: interactionId,
using: dependencies
@ -455,6 +447,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .noUsername,
interactionId: interactionId,
using: dependencies
@ -462,13 +455,14 @@ public final class MessageSender {
}
// Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId)
handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf
guard let proto = message.toProto(db, threadId: threadId) else {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .protoConversionFailed,
interactionId: interactionId,
using: dependencies
@ -486,6 +480,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -533,13 +528,14 @@ public final class MessageSender {
}
// Perform any pre-send actions
handleMessageWillSend(db, message: message, interactionId: interactionId)
handleMessageWillSend(db, message: message, destination: destination, interactionId: interactionId)
// Convert it to protobuf
guard let proto = message.toProto(db, threadId: recipientBlindedPublicKey) else {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .protoConversionFailed,
interactionId: interactionId,
using: dependencies
@ -557,6 +553,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -580,6 +577,7 @@ public final class MessageSender {
throw MessageSender.handleFailedMessageSend(
db,
message: message,
destination: destination,
with: .other(error),
interactionId: interactionId,
using: dependencies
@ -628,9 +626,9 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend(
db,
message: message,
destination: data.destination,
with: .attachmentsNotUploaded,
interactionId: data.interactionId,
isSyncMessage: (data.isSyncMessage == true),
using: dependencies
)
}
@ -646,7 +644,7 @@ public final class MessageSender {
}
switch data.destination {
case .contact, .closedGroup: return sendToSnodeDestination(data: data, using: dependencies)
case .contact, .syncMessage, .closedGroup: return sendToSnodeDestination(data: data, using: dependencies)
case .openGroup: return sendToOpenGroupDestination(data: data, using: dependencies)
case .openGroupInbox: return sendToOpenGroupInbox(data: data, using: dependencies)
}
@ -661,7 +659,6 @@ public final class MessageSender {
guard
let message: Message = data.message,
let namespace: SnodeAPI.Namespace = data.namespace,
let isSyncMessage: Bool = data.isSyncMessage,
let snodeMessage: SnodeMessage = data.snodeMessage
else {
return Fail(error: MessageSenderError.invalidMessage)
@ -680,9 +677,10 @@ public final class MessageSender {
details: NotifyPushServerJob.Details(message: snodeMessage)
)
let shouldNotify: Bool = {
switch updatedMessage {
case is VisibleMessage, is UnsendRequest: return !isSyncMessage
case let callMessage as CallMessage:
switch (updatedMessage, data.destination) {
case (is VisibleMessage, .syncMessage), (is UnsendRequest, .syncMessage): return false
case (is VisibleMessage, _), (is UnsendRequest, _): return true
case (let callMessage as CallMessage, _):
// Note: Other 'CallMessage' types are too big to send as push notifications
// so only send the 'preOffer' message as a notification
switch callMessage.kind {
@ -701,7 +699,6 @@ public final class MessageSender {
message: updatedMessage,
to: data.destination,
interactionId: data.interactionId,
isSyncMessage: isSyncMessage,
using: dependencies
)
@ -758,6 +755,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend(
db,
message: message,
destination: data.destination,
with: .other(error),
interactionId: data.interactionId,
using: dependencies
@ -829,6 +827,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend(
db,
message: message,
destination: data.destination,
with: .other(error),
interactionId: data.interactionId,
using: dependencies
@ -893,6 +892,7 @@ public final class MessageSender {
MessageSender.handleFailedMessageSend(
db,
message: message,
destination: data.destination,
with: .other(error),
interactionId: data.interactionId,
using: dependencies
@ -909,28 +909,34 @@ public final class MessageSender {
public static func handleMessageWillSend(
_ db: Database,
message: Message,
interactionId: Int64?,
isSyncMessage: Bool = false
destination: Message.Destination,
interactionId: Int64?
) {
// If the message was a reaction then we don't want to do anything to the original
// interaction (which the 'interactionId' is pointing to
guard (message as? VisibleMessage)?.reaction == nil else { return }
// Mark messages as "sending"/"syncing" if needed (this is for retries)
switch destination {
case .syncMessage:
_ = try? RecipientState
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(isSyncMessage ?
RecipientState.Columns.state == RecipientState.State.failedToSync :
RecipientState.Columns.state == RecipientState.State.failed
)
.filter(RecipientState.Columns.state == RecipientState.State.failedToSync)
.updateAll(
db,
RecipientState.Columns.state.set(to: isSyncMessage ?
RecipientState.State.syncing :
RecipientState.State.sending
RecipientState.Columns.state.set(to: RecipientState.State.syncing)
)
default:
_ = try? RecipientState
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.failed)
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.sending)
)
}
}
private static func handleSuccessfulMessageSend(
_ db: Database,
@ -938,7 +944,6 @@ public final class MessageSender {
to destination: Message.Destination,
interactionId: Int64?,
serverTimestampMs: UInt64? = nil,
isSyncMessage: Bool = false,
using dependencies: Dependencies
) throws {
// If the message was a reaction then we want to update the reaction instead of the original
@ -957,7 +962,9 @@ public final class MessageSender {
// Get the visible message if possible
if let interaction: Interaction = interaction {
// Only store the server hash of a sync message if the message is self send valid
if (message.isSelfSendValid && isSyncMessage || !isSyncMessage) {
switch (message.isSelfSendValid, destination) {
case (false, .syncMessage): break
case (true, .syncMessage), (_, .contact), (_, .closedGroup), (_, .openGroup), (_, .openGroupInbox):
try interaction.with(
serverHash: message.serverHash,
// Track the open group server message ID and update server timestamp (use server
@ -987,7 +994,7 @@ public final class MessageSender {
)
if
isSyncMessage,
case .syncMessage = destination,
let startedAtMs: Double = interaction.expiresStartedAtMs,
let expiresInSeconds: TimeInterval = interaction.expiresInSeconds,
let serverHash: String = message.serverHash
@ -1038,7 +1045,6 @@ public final class MessageSender {
destination: destination,
threadId: threadId,
interactionId: interactionId,
isAlreadySyncMessage: isSyncMessage,
using: dependencies
)
}
@ -1046,9 +1052,9 @@ public final class MessageSender {
@discardableResult internal static func handleFailedMessageSend(
_ db: Database,
message: Message,
destination: Message.Destination?,
with error: MessageSenderError,
interactionId: Int64?,
isSyncMessage: Bool = false,
using dependencies: Dependencies
) -> Error {
// If the message was a reaction then we don't want to do anything to the original
@ -1060,15 +1066,24 @@ public final class MessageSender {
// Note: The 'db' could be either read-only or writeable so we determine
// if a change is required, and if so dispatch to a separate queue for the
// actual write
let rowIds: [Int64] = (try? RecipientState
let rowIds: [Int64] = (try? {
switch destination {
case .syncMessage:
return RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(!isSyncMessage ?
RecipientState.Columns.state == RecipientState.State.sending : (
.filter(
RecipientState.Columns.state == RecipientState.State.syncing ||
RecipientState.Columns.state == RecipientState.State.sent
)
)
default:
return RecipientState
.select(Column.rowID)
.filter(RecipientState.Columns.interactionId == interactionId)
.filter(RecipientState.Columns.state == RecipientState.State.sending)
}
}()
.asRequest(of: Int64.self)
.fetchAll(db))
.defaulting(to: [])
@ -1079,17 +1094,27 @@ public final class MessageSender {
// issue from occuring in some cases
DispatchQueue.global(qos: .background).async {
dependencies.storage.write { db in
switch destination {
case .syncMessage:
try RecipientState
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
RecipientState.Columns.state.set(
to: (isSyncMessage ? RecipientState.State.failedToSync : RecipientState.State.failed)
),
RecipientState.Columns.mostRecentFailureText.set(to: error.localizedDescription)
RecipientState.Columns.state.set(to: RecipientState.State.failedToSync),
RecipientState.Columns.mostRecentFailureText.set(to: "\(error)")
)
default:
try RecipientState
.filter(rowIds.contains(Column.rowID))
.updateAll(
db,
RecipientState.Columns.state.set(to: RecipientState.State.failed),
RecipientState.Columns.mostRecentFailureText.set(to: "\(error)")
)
}
}
}
return error
}
@ -1116,7 +1141,6 @@ public final class MessageSender {
destination: Message.Destination,
threadId: String?,
interactionId: Int64?,
isAlreadySyncMessage: Bool,
using dependencies: Dependencies
) {
// Sync the message if it's not a sync message, wasn't already sent to the current user and
@ -1125,7 +1149,6 @@ public final class MessageSender {
if
case .contact(let publicKey) = destination,
!isAlreadySyncMessage,
publicKey != currentUserPublicKey,
Message.shouldSync(message: message)
{
@ -1139,9 +1162,8 @@ public final class MessageSender {
threadId: threadId,
interactionId: interactionId,
details: MessageSendJob.Details(
destination: .contact(publicKey: currentUserPublicKey),
message: message,
isSyncMessage: true
destination: .syncMessage(originalRecipientPublicKey: publicKey),
message: message
)
),
canStartJob: true,

@ -1430,15 +1430,27 @@ public extension Publisher where Output == Set<Snode> {
.mapError { $0 }
.flatMap(maxPublishers: maxPublishers) { swarm -> AnyPublisher<T, Error> in
var remainingSnodes: Set<Snode> = swarm
var lastError: Error?
return Just(())
.setFailureType(to: Error.self)
.tryFlatMap(maxPublishers: maxPublishers) { _ -> AnyPublisher<T, Error> in
let snode: Snode = try remainingSnodes.popRandomElement() ?? { throw SnodeAPIError.generic }()
let snode: Snode = try remainingSnodes.popRandomElement() ?? {
throw SnodeAPIError.ranOutOfRandomSnodes(lastError)
}()
return try transform(snode)
.eraseToAnyPublisher()
}
.mapError { error in
// Prevent nesting the 'ranOutOfRandomSnodes' errors
switch error {
case SnodeAPIError.ranOutOfRandomSnodes: break
default: lastError = error
}
return error
}
.retry(retries)
.eraseToAnyPublisher()
}

@ -5,7 +5,7 @@
import Foundation
import SessionUtilitiesKit
public enum OnionRequestAPIError: LocalizedError {
public enum OnionRequestAPIError: Error, CustomStringConvertible {
case httpRequestFailedAtDestination(statusCode: UInt, data: Data, destination: OnionRequestAPIDestination)
case insufficientSnodes
case invalidURL
@ -14,25 +14,25 @@ public enum OnionRequestAPIError: LocalizedError {
case unsupportedSnodeVersion(String)
case invalidRequestInfo
public var errorDescription: String? {
public var description: String {
switch self {
case .httpRequestFailedAtDestination(let statusCode, let data, let destination):
if statusCode == 429 { return "Rate limited." }
if statusCode == 429 { return "Rate limited (OnionRequestAPIError.httpRequestFailedAtDestination)." }
if let processedResponseBodyData: Data = OnionRequestAPI.process(bencodedData: data)?.body, let errorResponse: String = String(data: processedResponseBodyData, encoding: .utf8) {
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode), error body: \(errorResponse)."
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode), error body: \(errorResponse) (OnionRequestAPIError.httpRequestFailedAtDestination)."
}
if let errorResponse: String = String(data: data, encoding: .utf8) {
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode), error body: \(errorResponse)."
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode), error body: \(errorResponse) (OnionRequestAPIError.httpRequestFailedAtDestination)."
}
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode)."
return "HTTP request failed at destination (\(destination)) with status code: \(statusCode) (OnionRequestAPIError.httpRequestFailedAtDestination)."
case .insufficientSnodes: return "Couldn't find enough Service Nodes to build a path."
case .invalidURL: return "Invalid URL"
case .missingSnodeVersion: return "Missing Service Node version."
case .snodePublicKeySetMissing: return "Missing Service Node public key set."
case .unsupportedSnodeVersion(let version): return "Unsupported Service Node version: \(version)."
case .invalidRequestInfo: return "Invalid Request Info"
case .insufficientSnodes: return "Couldn't find enough Service Nodes to build a path (OnionRequestAPIError.insufficientSnodes)."
case .invalidURL: return "Invalid URL (OnionRequestAPIError.invalidURL)."
case .missingSnodeVersion: return "Missing Service Node version (OnionRequestAPIError.missingSnodeVersion)."
case .snodePublicKeySetMissing: return "Missing Service Node public key set (OnionRequestAPIError.snodePublicKeySetMissing)."
case .unsupportedSnodeVersion(let version): return "Unsupported Service Node version: \(version) (OnionRequestAPIError.unsupportedSnodeVersion)."
case .invalidRequestInfo: return "Invalid Request Info (OnionRequestAPIError.invalidRequestInfo)."
}
}
}

@ -4,7 +4,7 @@
import Foundation
public enum SnodeAPIError: LocalizedError {
public enum SnodeAPIError: Error, CustomStringConvertible {
case generic
case clockOutOfSync
case snodePoolUpdatingFailed
@ -15,29 +15,37 @@ public enum SnodeAPIError: LocalizedError {
case invalidIP
case emptySnodePool
case responseFailedValidation
case ranOutOfRandomSnodes(Error?)
// ONS
case decryptionFailed
case hashingFailed
case validationFailed
public var errorDescription: String? {
public var description: String {
switch self {
case .generic: return "An error occurred."
case .clockOutOfSync: return "Your clock is out of sync with the Service Node network. Please check that your device's clock is set to automatic time."
case .snodePoolUpdatingFailed: return "Failed to update the Service Node pool."
case .inconsistentSnodePools: return "Received inconsistent Service Node pool information from the Service Node network."
case .noKeyPair: return "Missing user key pair."
case .signingFailed: return "Couldn't sign message."
case .signatureVerificationFailed: return "Failed to verify the signature."
case .invalidIP: return "Invalid IP."
case .emptySnodePool: return "Service Node pool is empty."
case .responseFailedValidation: return "Response failed validation."
case .generic: return "An error occurred (SnodeAPIError.generic)."
case .clockOutOfSync: return "Your clock is out of sync with the Service Node network. Please check that your device's clock is set to automatic time (SnodeAPIError.clockOutOfSync)."
case .snodePoolUpdatingFailed: return "Failed to update the Service Node pool (SnodeAPIError.snodePoolUpdatingFailed)."
case .inconsistentSnodePools: return "Received inconsistent Service Node pool information from the Service Node network (SnodeAPIError.inconsistentSnodePools)."
case .noKeyPair: return "Missing user key pair (SnodeAPIError.noKeyPair)."
case .signingFailed: return "Couldn't sign message (SnodeAPIError.signingFailed)."
case .signatureVerificationFailed: return "Failed to verify the signature (SnodeAPIError.signatureVerificationFailed)."
case .invalidIP: return "Invalid IP (SnodeAPIError.invalidIP)."
case .emptySnodePool: return "Service Node pool is empty (SnodeAPIError.emptySnodePool)."
case .responseFailedValidation: return "Response failed validation (SnodeAPIError.responseFailedValidation)."
case .ranOutOfRandomSnodes(let maybeError):
switch maybeError {
case .none: return "Ran out of random snodes (SnodeAPIError.ranOutOfRandomSnodes(nil))."
case .some(let error):
let errorDesc = "\(error)".trimmingCharacters(in: CharacterSet(["."]))
return "Ran out of random snodes (SnodeAPIError.ranOutOfRandomSnodes(\(errorDesc))."
}
// ONS
case .decryptionFailed: return "Couldn't decrypt ONS name."
case .hashingFailed: return "Couldn't compute ONS name hash."
case .validationFailed: return "ONS name validation failed."
case .decryptionFailed: return "Couldn't decrypt ONS name (SnodeAPIError.decryptionFailed)."
case .hashingFailed: return "Couldn't compute ONS name hash (SnodeAPIError.hashingFailed)."
case .validationFailed: return "ONS name validation failed (SnodeAPIError.validationFailed)."
}
}
}

Loading…
Cancel
Save