From 503373899403d2efea66c5e12ea18da91bfdd342 Mon Sep 17 00:00:00 2001 From: Morgan Pretty Date: Mon, 5 Dec 2022 17:39:40 +1100 Subject: [PATCH] Fixed a few issues caused by the PromiseKit refactor Started cleaning up the TODOs Fixed a couple of merge conflict issues Fixed a bug with the state of attachments which failed to download # Conflicts: # SessionMessagingKit/Database/Models/Attachment.swift --- Session/Closed Groups/NewClosedGroupVC.swift | 2 + .../ConversationVC+Interaction.swift | 9 +- Session/Conversations/ConversationVC.swift | 26 +- Session/Home/New Conversation/NewDMVC.swift | 1 + .../PhotoCaptureViewController.swift | 6 +- Session/Notifications/SyncPushTokensJob.swift | 6 +- Session/Open Groups/JoinOpenGroupVC.swift | 2 +- .../Open Groups/OpenGroupSuggestionGrid.swift | 31 +- Session/Settings/SettingsViewModel.swift | 63 ---- Session/Utilities/BackgroundPoller.swift | 1 - .../Database/Models/Attachment.swift | 270 +++++++++--------- .../Database/Models/Contact.swift | 19 -- .../Jobs/Types/AttachmentUploadJob.swift | 31 +- .../Jobs/Types/MessageSendJob.swift | 4 +- .../Visible Messages/VisibleMessage.swift | 8 +- .../Open Groups/OpenGroupAPI.swift | 7 +- .../Open Groups/OpenGroupManager.swift | 15 +- .../Errors/MessageSenderError.swift | 2 + .../MessageSender+ClosedGroups.swift | 2 +- .../MessageSender+Convenience.swift | 95 +++--- .../Sending & Receiving/MessageSender.swift | 26 ++ .../Pollers/OpenGroupPoller.swift | 10 +- .../Utilities/ProfileManager.swift | 2 +- SessionShareExtension/ThreadPickerVC.swift | 4 +- SessionSnodeKit/Networking/SnodeAPI.swift | 3 - .../OnionRequestAPI+Encryption.swift | 2 - SessionUtilitiesKit/Networking/HTTP.swift | 2 +- 27 files changed, 267 insertions(+), 382 deletions(-) diff --git a/Session/Closed Groups/NewClosedGroupVC.swift b/Session/Closed Groups/NewClosedGroupVC.swift index 32aa77af9..eee544e92 100644 --- a/Session/Closed Groups/NewClosedGroupVC.swift +++ b/Session/Closed Groups/NewClosedGroupVC.swift @@ -319,6 +319,8 @@ final class NewClosedGroupVC: BaseVC, UITableViewDataSource, UITableViewDelegate .writePublisherFlatMap { db in MessageSender.createClosedGroup(db, name: name, members: selectedContacts) } + .subscribe(on: DispatchQueue.global(qos: .userInitiated)) + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { result in switch result { diff --git a/Session/Conversations/ConversationVC+Interaction.swift b/Session/Conversations/ConversationVC+Interaction.swift index 130616234..91d739cf8 100644 --- a/Session/Conversations/ConversationVC+Interaction.swift +++ b/Session/Conversations/ConversationVC+Interaction.swift @@ -348,10 +348,7 @@ extension ConversationVC: .attachmentPublisher .sinkUntilComplete( receiveValue: { [weak self] attachment in - guard - !modalActivityIndicator.wasCancelled, - let attachment = attachment as? SignalAttachment - else { return } + guard !modalActivityIndicator.wasCancelled else { return } modalActivityIndicator.dismiss { guard !attachment.hasError else { @@ -1680,12 +1677,10 @@ extension ConversationVC: // Remote deletion logic func deleteRemotely(from viewController: UIViewController?, request: AnyPublisher, onComplete: (() -> ())?) { - // TODO: Test that this works // Show a loading indicator Future { resolver in ModalActivityIndicatorViewController.present(fromViewController: viewController, canCancel: false) { _ in - // TODO: Remove the 'Swift.' - resolver(Swift.Result.success(())) + resolver(Result.success(())) } } .flatMap { _ in request } diff --git a/Session/Conversations/ConversationVC.swift b/Session/Conversations/ConversationVC.swift index e906db92d..87376ffa3 100644 --- a/Session/Conversations/ConversationVC.swift +++ b/Session/Conversations/ConversationVC.swift @@ -291,7 +291,7 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl result.textAlignment = .center result.numberOfLines = 0 result.isHidden = ( - !self.messageRequestView.isHidden || + !self.messageRequestStackView.isHidden || self.viewModel.threadData.threadRequiresApproval == false ) @@ -350,7 +350,8 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl // Message requests view & scroll to bottom view.addSubview(scrollButton) - view.addSubview(messageRequestView) + view.addSubview(messageRequestBackgroundView) + view.addSubview(messageRequestStackView) view.addSubview(pendingMessageRequestExplanationLabel) messageRequestView.addSubview(messageRequestBlockButton) @@ -364,7 +365,7 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl self.messageRequestsViewBotomConstraint = messageRequestView.pin(.bottom, to: .bottom, of: view, withInset: -16) self.scrollButtonBottomConstraint = scrollButton.pin(.bottom, to: .bottom, of: view, withInset: -16) self.scrollButtonBottomConstraint?.isActive = false // Note: Need to disable this to avoid a conflict with the other bottom constraint - self.scrollButtonMessageRequestsBottomConstraint = scrollButton.pin(.bottom, to: .top, of: messageRequestView, withInset: -16) + self.scrollButtonMessageRequestsBottomConstraint = scrollButton.pin(.bottom, to: .top, of: messageRequestStackView) self.scrollButtonPendingMessageRequestInfoBottomConstraint = scrollButton.pin(.bottom, to: .top, of: pendingMessageRequestExplanationLabel, withInset: -16) messageRequestBlockButton.pin(.top, to: .top, of: messageRequestView, withInset: 10) @@ -383,10 +384,14 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl messageRequestDeleteButton.pin(.right, to: .right, of: messageRequestView, withInset: -20) messageRequestDeleteButton.pin(.bottom, to: .bottom, of: messageRequestView) messageRequestDeleteButton.set(.width, to: .width, of: messageRequestAcceptButton) + messageRequestBackgroundView.pin(.top, to: .top, of: messageRequestStackView) + messageRequestBackgroundView.pin(.leading, to: .leading, of: view) + messageRequestBackgroundView.pin(.trailing, to: .trailing, of: view) + messageRequestBackgroundView.pin(.bottom, to: .bottom, of: view) - pendingMessageRequestExplanationLabel.pin(.left, to: .left, of: messageRequestView, withInset: 40) - pendingMessageRequestExplanationLabel.pin(.right, to: .right, of: messageRequestView, withInset: -40) - pendingMessageRequestExplanationLabel.pin(.bottom, to: .bottom, of: messageRequestView, withInset: -16) + pendingMessageRequestExplanationLabel.pin(.left, to: .left, of: messageRequestStackView, withInset: 40) + pendingMessageRequestExplanationLabel.pin(.right, to: .right, of: messageRequestStackView, withInset: -40) + pendingMessageRequestExplanationLabel.pin(.bottom, to: .bottom, of: messageRequestStackView, withInset: -16) // Unread count view view.addSubview(unreadCountView) @@ -621,7 +626,7 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl updateNavBarButtons(threadData: updatedThreadData, initialVariant: viewModel.initialThreadVariant) let messageRequestsViewWasVisible: Bool = ( - messageRequestView.isHidden == false + messageRequestStackView.isHidden == false ) let pendingMessageRequestInfoWasVisible: Bool = ( pendingMessageRequestExplanationLabel.isHidden == false @@ -632,13 +637,14 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl updatedThreadData.threadIsMessageRequest == false || updatedThreadData.threadRequiresApproval == true ) + self?.messageRequestBackgroundView.isHidden = (self?.messageRequestStackView.isHidden == true) self?.pendingMessageRequestExplanationLabel.isHidden = ( - self?.messageRequestView.isHidden == false || + self?.messageRequestStackView.isHidden == false || updatedThreadData.threadRequiresApproval == false ) self?.scrollButtonMessageRequestsBottomConstraint?.isActive = ( - self?.messageRequestView.isHidden == false + self?.messageRequestStackView.isHidden == false ) self?.scrollButtonPendingMessageRequestInfoBottomConstraint?.isActive = ( self?.scrollButtonPendingMessageRequestInfoBottomConstraint?.isActive == false && @@ -1162,7 +1168,7 @@ final class ConversationVC: BaseVC, ConversationSearchControllerDelegate, UITabl } let keyboardTop = (UIScreen.main.bounds.height - keyboardRect.minY) - let messageRequestsOffset: CGFloat = (messageRequestView.isHidden ? 0 : messageRequestView.bounds.height + 16) + let messageRequestsOffset: CGFloat = (messageRequestStackView.isHidden ? 0 : messageRequestStackView.bounds.height + 16) let pendingMessageRequestsOffset: CGFloat = (pendingMessageRequestExplanationLabel.isHidden ? 0 : (pendingMessageRequestExplanationLabel.bounds.height + (16 * 2))) let oldContentInset: UIEdgeInsets = tableView.contentInset let newContentInset: UIEdgeInsets = UIEdgeInsets( diff --git a/Session/Home/New Conversation/NewDMVC.swift b/Session/Home/New Conversation/NewDMVC.swift index 410b4fa92..b7497acb9 100644 --- a/Session/Home/New Conversation/NewDMVC.swift +++ b/Session/Home/New Conversation/NewDMVC.swift @@ -184,6 +184,7 @@ final class NewDMVC: BaseVC, UIPageViewControllerDataSource, UIPageViewControlle .present(fromViewController: navigationController!, canCancel: false) { [weak self] modalActivityIndicator in SnodeAPI .getSessionID(for: onsNameOrPublicKey) + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { result in switch result { diff --git a/Session/Media Viewing & Editing/PhotoCaptureViewController.swift b/Session/Media Viewing & Editing/PhotoCaptureViewController.swift index 66e696112..19fd7b31b 100644 --- a/Session/Media Viewing & Editing/PhotoCaptureViewController.swift +++ b/Session/Media Viewing & Editing/PhotoCaptureViewController.swift @@ -195,7 +195,7 @@ class PhotoCaptureViewController: OWSViewController { } photoCapture.switchCamera() - .receiveOnMain() + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { [weak self] result in switch result { @@ -210,7 +210,7 @@ class PhotoCaptureViewController: OWSViewController { func didTapFlashMode() { Logger.debug("") photoCapture.switchFlashMode() - .receiveOnMain() + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { [weak self] _ in self?.updateFlashModeControl() @@ -306,7 +306,7 @@ class PhotoCaptureViewController: OWSViewController { previewView = CapturePreviewView(session: photoCapture.session) photoCapture.startCapture() - .receiveOnMain() + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { [weak self] result in switch result { diff --git a/Session/Notifications/SyncPushTokensJob.swift b/Session/Notifications/SyncPushTokensJob.swift index c2fe63429..19fb41e43 100644 --- a/Session/Notifications/SyncPushTokensJob.swift +++ b/Session/Notifications/SyncPushTokensJob.swift @@ -78,10 +78,8 @@ public enum SyncPushTokensJob: JobExecutor { pushToken: pushToken, voipToken: voipToken, isForcedUpdate: shouldUploadTokens, - // TODO: Remove the 'Swift.' - success: { resolver(Swift.Result.success(())) }, - // TODO: Remove the 'Swift.' - failure: { resolver(Swift.Result.failure($0)) } + success: { resolver(Result.success(())) }, + failure: { resolver(Result.failure($0)) } ) } .handleEvents( diff --git a/Session/Open Groups/JoinOpenGroupVC.swift b/Session/Open Groups/JoinOpenGroupVC.swift index 8e2ff7134..60923ce50 100644 --- a/Session/Open Groups/JoinOpenGroupVC.swift +++ b/Session/Open Groups/JoinOpenGroupVC.swift @@ -168,7 +168,7 @@ final class JoinOpenGroupVC: BaseVC, UIPageViewControllerDataSource, UIPageViewC ModalActivityIndicatorViewController.present(fromViewController: navigationController, canCancel: false) { [weak self] _ in Storage.shared - .writePublisher { db in + .writePublisherFlatMap { db in OpenGroupManager.shared.add( db, roomToken: roomToken, diff --git a/Session/Open Groups/OpenGroupSuggestionGrid.swift b/Session/Open Groups/OpenGroupSuggestionGrid.swift index e62f8ec18..eb12b2e66 100644 --- a/Session/Open Groups/OpenGroupSuggestionGrid.swift +++ b/Session/Open Groups/OpenGroupSuggestionGrid.swift @@ -143,6 +143,7 @@ final class OpenGroupSuggestionGrid: UIView, UICollectionViewDataSource, UIColle widthAnchor.constraint(greaterThanOrEqualToConstant: OpenGroupSuggestionGrid.cellHeight).isActive = true OpenGroupManager.getDefaultRoomsIfNeeded() + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { [weak self] _ in self?.update() }, receiveValue: { [weak self] rooms in self?.rooms = rooms } @@ -316,7 +317,7 @@ extension OpenGroupSuggestionGrid { return } - imageView.image = nil // TODO: Test this + imageView.image = nil Publishers .MergeMany( @@ -331,15 +332,19 @@ extension OpenGroupSuggestionGrid { // we can ignore this 'Just' call which is used to hide the image while loading Just((Data(), false)) .setFailureType(to: Error.self) -// .delay(for: .milliseconds(10), scheduler: DispatchQueue.main) + .delay(for: .milliseconds(10), scheduler: DispatchQueue.main) .eraseToAnyPublisher() ) .receiveOnMain(immediately: true) .sinkUntilComplete( receiveValue: { [weak self] imageData, hasData in - // TODO: Test this behaviour guard hasData else { - self?.imageView.isHidden = true + // This will emit twice (once with the data and once without it), if we + // have actually received the images then we don't want the second emission + // to hide the imageView anymore + if self?.imageView.image == nil { + self?.imageView.isHidden = true + } return } @@ -347,24 +352,6 @@ extension OpenGroupSuggestionGrid { self?.imageView.isHidden = (self?.imageView.image == nil) } ) - -// OpenGroupManager.roomImage(db, fileId: imageId, for: room.token, on: OpenGroupAPI.defaultServer) -// .values -// -// if let imageData: Data = promise.value { -// imageView.image = UIImage(data: imageData) -// imageView.isHidden = (imageView.image == nil) -// } -// else { -// imageView.isHidden = true -// -// _ = promise.done { [weak self] imageData in -// DispatchQueue.main.async { -// self?.imageView.image = UIImage(data: imageData) -// self?.imageView.isHidden = (self?.imageView.image == nil) -// } -// } -// } } } } diff --git a/Session/Settings/SettingsViewModel.swift b/Session/Settings/SettingsViewModel.swift index a1277a29c..44003c2d3 100644 --- a/Session/Settings/SettingsViewModel.swift +++ b/Session/Settings/SettingsViewModel.swift @@ -575,66 +575,3 @@ class SettingsViewModel: SessionTableViewModel Void) { - // TODO: Test this works Publishers .MergeMany( [pollForMessages()] diff --git a/SessionMessagingKit/Database/Models/Attachment.swift b/SessionMessagingKit/Database/Models/Attachment.swift index 84d9cf806..1129c7b26 100644 --- a/SessionMessagingKit/Database/Models/Attachment.swift +++ b/SessionMessagingKit/Database/Models/Attachment.swift @@ -972,6 +972,18 @@ extension Attachment { // MARK: - Upload extension Attachment { + public enum Destination { + case fileServer + case openGroup(OpenGroup) + + var shouldEncrypt: Bool { + switch self { + case .fileServer: return true + case .openGroup: return false + } + } + } + public static func prepare(_ db: Database, attachments: [SignalAttachment], for interactionId: Int64) throws { // Prepare any attachments try attachments.enumerated() @@ -979,7 +991,7 @@ extension Attachment { let maybeAttachment: Attachment? = Attachment( variant: (signalAttachment.isVoiceMessage ? .voiceMessage : - .standard + .standard ), contentType: signalAttachment.mimeType, dataSource: signalAttachment.dataSource, @@ -1001,176 +1013,160 @@ extension Attachment { } internal func upload( - _ db: Database? = nil, - queue: DispatchQueue, - using upload: @escaping (Database, Data) -> AnyPublisher, - encrypt: Bool, - success: ((String?) -> Void)?, - failure: ((Error) -> Void)? - ) { + to destination: Attachment.Destination, + queue: DispatchQueue + ) -> AnyPublisher { // This can occur if an AttachmnetUploadJob was explicitly created for a message // dependant on the attachment being uploaded (in this case the attachment has // already been uploaded so just succeed) guard state != .uploaded else { - success?(Attachment.fileId(for: self.downloadUrl)) - return + return Just(Attachment.fileId(for: self.downloadUrl)) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() } // Get the attachment guard var data = try? readDataFromFile() else { SNLog("Couldn't read attachment from disk.") - failure?(AttachmentError.noAttachment) - return + return Fail(error: AttachmentError.noAttachment) + .eraseToAnyPublisher() } let attachmentId: String = self.id - // If the attachment is a downloaded attachment, check if it came from the server - // and if so just succeed immediately (no use re-uploading an attachment that is - // already present on the server) - or if we want it to be encrypted and it's not - // then encrypt it - // - // Note: The most common cases for this will be for LinkPreviews or Quotes - guard - state != .downloaded || - serverId == nil || - downloadUrl == nil || - !encrypt || - encryptionKey == nil || - digest == nil - else { - // Save the final upload info - let uploadedAttachment: Attachment? = { - guard let db: Database = db else { - Storage.shared.write { db in - try? Attachment - .filter(id: attachmentId) - .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploaded)) + return Storage.shared + .writePublisherFlatMap { db -> AnyPublisher<(String?, Data?, Data?), Error> in + // If the attachment is a downloaded attachment, check if it came from + // the server and if so just succeed immediately (no use re-uploading + // an attachment that is already present on the server) - or if we want + // it to be encrypted and it's not then encrypt it + // + // Note: The most common cases for this will be for LinkPreviews or Quotes + guard + state != .downloaded || + serverId == nil || + downloadUrl == nil || + !destination.shouldEncrypt || + encryptionKey == nil || + digest == nil + else { + // Save the final upload info + _ = try? Attachment + .filter(id: attachmentId) + .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploaded)) + + return Just((Attachment.fileId(for: self.downloadUrl), nil, nil)) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + + var encryptionKey: NSData = NSData() + var digest: NSData = NSData() + + // Encrypt the attachment if needed + if destination.shouldEncrypt { + guard let ciphertext = Cryptography.encryptAttachmentData(data, shouldPad: true, outKey: &encryptionKey, outDigest: &digest) else { + SNLog("Couldn't encrypt attachment.") + return Fail(error: AttachmentError.encryptionFailed) + .eraseToAnyPublisher() } - return self.with(state: .uploaded) + data = ciphertext + } + + // Check the file size + SNLog("File size: \(data.count) bytes.") + if Double(data.count) > Double(FileServerAPI.maxFileSize) / FileServerAPI.fileSizeORMultiplier { + return Fail(error: HTTPError.maxFileSizeExceeded) + .eraseToAnyPublisher() } + // Update the attachment to the 'uploading' state _ = try? Attachment .filter(id: attachmentId) - .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploaded)) + .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploading)) - return self.with(state: .uploaded) - }() - - guard uploadedAttachment != nil else { - SNLog("Couldn't update attachmentUpload job.") - failure?(StorageError.failedToSave) - return - } - - success?(Attachment.fileId(for: self.downloadUrl)) - return - } - - var processedAttachment: Attachment = self - - // Encrypt the attachment if needed - if encrypt { - var encryptionKey: NSData = NSData() - var digest: NSData = NSData() - - guard let ciphertext = Cryptography.encryptAttachmentData(data, shouldPad: true, outKey: &encryptionKey, outDigest: &digest) else { - SNLog("Couldn't encrypt attachment.") - failure?(AttachmentError.encryptionFailed) - return + switch destination { + case .openGroup(let openGroup): + return OpenGroupAPI + .uploadFile( + db, + bytes: data.bytes, + to: openGroup.roomToken, + on: openGroup.server + ) + .map { _, response -> (String, Data?, Data?) in + ( + response.id, + (destination.shouldEncrypt ? encryptionKey as Data : nil), + (destination.shouldEncrypt ? digest as Data : nil) + ) + } + .eraseToAnyPublisher() + + case .fileServer: + /// **Note:** FileServer uploads don't need database access so + return Just(( + nil, + (destination.shouldEncrypt ? encryptionKey as Data : nil), + (destination.shouldEncrypt ? digest as Data : nil) + )) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } } - - processedAttachment = processedAttachment.with( - encryptionKey: encryptionKey as Data, - digest: digest as Data - ) - data = ciphertext - } - - // Check the file size - SNLog("File size: \(data.count) bytes.") - if Double(data.count) > Double(FileServerAPI.maxFileSize) / FileServerAPI.fileSizeORMultiplier { - failure?(HTTP.Error.maxFileSizeExceeded) - return - } - - // Update the attachment to the 'uploading' state - let updatedAttachment: Attachment? = { - guard let db: Database = db else { - Storage.shared.write { db in - try? Attachment - .filter(id: attachmentId) - .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploading)) + .flatMap { maybeFileId, encryptionKey, digest -> AnyPublisher<(String?, Data?, Data?), Error> in + switch destination { + case .openGroup: + /// **Note:** OpenGroup uploads need database access so this should + /// have already been uploaded + return Just((maybeFileId, encryptionKey, digest)) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + + case .fileServer: + return FileServerAPI.upload(data) + .map { response -> (String, Data?, Data?) in (response.id, encryptionKey, digest) } + .eraseToAnyPublisher() } - - return processedAttachment.with(state: .uploading) } - - _ = try? Attachment - .filter(id: attachmentId) - .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.uploading)) - - return processedAttachment.with(state: .uploading) - }() - - guard updatedAttachment != nil else { - SNLog("Couldn't update attachmentUpload job.") - failure?(StorageError.failedToSave) - return - } - - // Perform the upload - let uploadPublisher: AnyPublisher = { - guard let db: Database = db else { - return Storage.shared.readPublisherFlatMap { db in upload(db, data) } + .flatMap { fileId, encryptionKey, digest -> AnyPublisher in + /// Save the final upload info + /// + /// **Note:** We **MUST** use the `.with` function here to ensure the `isValid` flag is + /// updated correctly + Storage.shared + .writePublisher { db in + try self + .with( + serverId: fileId, + state: .uploaded, + creationTimestamp: ( + self.creationTimestamp ?? + Date().timeIntervalSince1970 + ), + downloadUrl: fileId.map { "\(FileServerAPI.server)/file/\($0)" }, + encryptionKey: encryptionKey, + digest: digest + ) + .saved(db) + } + .map { _ in fileId } + .eraseToAnyPublisher() } - - return upload(db, data) - }() - - uploadPublisher - .sinkUntilComplete( + .handleEvents( receiveCompletion: { result in switch result { case .finished: break - case .failure(let error): + case .failure: Storage.shared.write { db in try Attachment .filter(id: attachmentId) .updateAll(db, Attachment.Columns.state.set(to: Attachment.State.failedUpload)) } - - failure?(error) } - }, - receiveValue: { fileId in - /// Save the final upload info - /// - /// **Note:** We **MUST** use the `.with` function here to ensure the `isValid` flag is - /// updated correctly - let uploadedAttachment: Attachment? = Storage.shared.write { db in - try updatedAttachment? - .with( - serverId: "\(fileId)", - state: .uploaded, - creationTimestamp: ( - updatedAttachment?.creationTimestamp ?? - Date().timeIntervalSince1970 - ), - downloadUrl: "\(FileServerAPI.server)/files/\(fileId)" - ) - .saved(db) - } - - guard uploadedAttachment != nil else { - SNLog("Couldn't update attachmentUpload job.") - failure?(StorageError.failedToSave) - return - } - - success?(fileId) } ) + .eraseToAnyPublisher() } } diff --git a/SessionMessagingKit/Database/Models/Contact.swift b/SessionMessagingKit/Database/Models/Contact.swift index ab85bb808..524fd7300 100644 --- a/SessionMessagingKit/Database/Models/Contact.swift +++ b/SessionMessagingKit/Database/Models/Contact.swift @@ -100,22 +100,3 @@ public extension Contact { return ((try? fetchOne(db, id: id)) ?? Contact(id: id)) } } - -// MARK: - Objective-C Support - -// TODO: Remove this when possible -@objc(SMKContact) -public class SMKContact: NSObject { - @objc(isBlockedFor:) - public static func isBlocked(id: String) -> Bool { - return Storage.shared - .read { db in - try Contact - .filter(id: id) - .select(.isBlocked) - .asRequest(of: Bool.self) - .fetchOne(db) - } - .defaulting(to: false) - } -} diff --git a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift index 4abb5ca6a..9b1a8cfec 100644 --- a/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift +++ b/SessionMessagingKit/Jobs/Types/AttachmentUploadJob.swift @@ -52,29 +52,16 @@ public enum AttachmentUploadJob: JobExecutor { // reentrancy issues when the success/failure closures get called before the upload as the JobRunner // will attempt to update the state of the job immediately attachment.upload( - queue: queue, - using: { db, data in - SNLog("[AttachmentUpload] Started for message \(interactionId) (\(attachment.byteCount) bytes)") - - if let openGroup: OpenGroup = openGroup { - return OpenGroupAPI - .uploadFile( - db, - bytes: data.bytes, - to: openGroup.roomToken, - on: openGroup.server - ) - .map { _, response -> String in response.id } - .eraseToAnyPublisher() + to: (openGroup.map { .openGroup($0) } ?? .fileServer), + queue: queue + ) + .sinkUntilComplete( + receiveCompletion: { result in + switch result { + case .failure(let error): failure(job, error, false) + case .finished: success(job, false) } - - return FileServerAPI.upload(data) - .map { response -> String in response.id } - .eraseToAnyPublisher() - }, - encrypt: (openGroup == nil), - success: { _ in success(job, false) }, - failure: { error in failure(job, error, false) } + } ) } } diff --git a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift index d39e438f8..27c234908 100644 --- a/SessionMessagingKit/Jobs/Types/MessageSendJob.swift +++ b/SessionMessagingKit/Jobs/Types/MessageSendJob.swift @@ -169,12 +169,12 @@ public enum MessageSendJob: JobExecutor { try MessageSender.preparedSendData( db, message: details.message, - to: details.destination - .with(fileIds: messageFileIds), + to: details.destination, interactionId: job.interactionId ) } .subscribe(on: queue) + .map { sendData in sendData.with(fileIds: messageFileIds) } .flatMap { MessageSender.sendImmediate(preparedSendData: $0) } .sinkUntilComplete( receiveCompletion: { result in diff --git a/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift b/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift index 980d90b70..e48da7f33 100644 --- a/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift +++ b/SessionMessagingKit/Messages/Visible Messages/VisibleMessage.swift @@ -158,13 +158,7 @@ public final class VisibleMessage: Message { // Attachments - let attachments: [Attachment]? = try? Attachment.fetchAll(db, ids: self.attachmentIds) - - if !(attachments ?? []).allSatisfy({ $0.state == .uploaded }) { - #if DEBUG - preconditionFailure("Sending a message before all associated attachments have been uploaded.") - #endif - } + let attachments: [Attachment]? = try? Attachment.fetchAll(db, ids: self.attachmentIds) let attachmentProtos = (attachments ?? []).compactMap { $0.buildProto() } dataMessage.setAttachments(attachmentProtos) diff --git a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift index d08ff23bf..1261d47b3 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupAPI.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupAPI.swift @@ -1434,9 +1434,12 @@ public enum OpenGroupAPI { .eraseToAnyPublisher() } - return dependencies.onionApi - .sendOnionRequest(signedRequest, to: request.server, with: publicKey) + // We was to avoid blocking the db write thread so we dispatch the API call to a different thread + return Just(()) + .setFailureType(to: Error.self) .subscribe(on: OpenGroupAPI.workQueue) + .receive(on: OpenGroupAPI.workQueue) + .flatMap { dependencies.onionApi.sendOnionRequest(signedRequest, to: request.server, with: publicKey) } .eraseToAnyPublisher() } } diff --git a/SessionMessagingKit/Open Groups/OpenGroupManager.swift b/SessionMessagingKit/Open Groups/OpenGroupManager.swift index abbfbdde3..88f0aa8ad 100644 --- a/SessionMessagingKit/Open Groups/OpenGroupManager.swift +++ b/SessionMessagingKit/Open Groups/OpenGroupManager.swift @@ -246,11 +246,13 @@ public final class OpenGroupManager: NSObject { OpenGroup.Columns.sequenceNumber.set(to: 0) ) + // We was to avoid blocking the db write thread so we dispatch the API call to a different thread + // // Note: We don't do this after the db commit as it can fail (resulting in endless loading) - return Future { resolver in - OpenGroupAPI.workQueue.async { resolver(Result.success(())) } - } + return Just(()) + .setFailureType(to: Error.self) .subscribe(on: OpenGroupAPI.workQueue) + .receive(on: OpenGroupAPI.workQueue) .flatMap { _ in dependencies.storage .readPublisherFlatMap { db in @@ -286,8 +288,7 @@ public final class OpenGroupManager: NSObject { on: targetServer, dependencies: dependencies ) { - // TODO: Remove the 'Swift.' - resolver(Swift.Result.success(())) + resolver(Result.success(())) } } } @@ -943,10 +944,6 @@ public final class OpenGroupManager: NSObject { @discardableResult public static func getDefaultRoomsIfNeeded( using dependencies: OGMDependencies = OGMDependencies() ) -> AnyPublisher<[OpenGroupAPI.Room], Error> { - return Just([]) // TODO: Remove this - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - // Note: If we already have a 'defaultRoomsPromise' then there is no need to get it again if let existingPublisher: AnyPublisher<[OpenGroupAPI.Room], Error> = dependencies.cache.defaultRoomsPublisher { return existingPublisher diff --git a/SessionMessagingKit/Sending & Receiving/Errors/MessageSenderError.swift b/SessionMessagingKit/Sending & Receiving/Errors/MessageSenderError.swift index fb7a304d6..0ee8cd684 100644 --- a/SessionMessagingKit/Sending & Receiving/Errors/MessageSenderError.swift +++ b/SessionMessagingKit/Sending & Receiving/Errors/MessageSenderError.swift @@ -10,6 +10,7 @@ public enum MessageSenderError: LocalizedError { case signingFailed case encryptionFailed case noUsername + case attachmentsNotUploaded // Closed groups case noThread @@ -34,6 +35,7 @@ public enum MessageSenderError: LocalizedError { case .signingFailed: return "Couldn't sign message." case .encryptionFailed: return "Couldn't encrypt message." case .noUsername: return "Missing username." + case .attachmentsNotUploaded: return "Attachments for this message have not been uploaded." // Closed groups case .noThread: return "Couldn't find a thread associated with the given group public key." diff --git a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift index fc2748805..f051c4750 100644 --- a/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift +++ b/SessionMessagingKit/Sending & Receiving/Message Handling/MessageSender+ClosedGroups.swift @@ -99,7 +99,7 @@ extension MessageSender { // the 'ClosedGroup' object we created sentTimestampMs: UInt64(floor(formationTimestamp * 1000)) ), - to: try Message.Destination.from(db, thread: thread), + to: .contact(publicKey: memberId), interactionId: nil ) } diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift b/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift index f387ffbc5..1b934ead5 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender+Convenience.swift @@ -93,66 +93,57 @@ extension MessageSender { case .openGroupInbox(_, _, let blindedPublicKey): return blindedPublicKey } }() - let fileIdPublisher: AnyPublisher<[String?], Error> = Storage.shared - .write { db -> AnyPublisher<[String?], Error>? in + + return Storage.shared + .readPublisherFlatMap { db -> AnyPublisher<(attachments: [Attachment], openGroup: OpenGroup?), Error> in let attachmentStateInfo: [Attachment.StateInfo] = (try? Attachment .stateInfo(interactionId: interactionId, state: .uploading) .fetchAll(db)) .defaulting(to: []) // If there is no attachment data then just return early - guard !attachmentStateInfo.isEmpty else { return nil } - // TODO: Just run an AttachmentUploadJob directly??? - // Otherwise we need to generate the upload requests - let openGroup: OpenGroup? = try? OpenGroup.fetchOne(db, id: threadId) + guard !attachmentStateInfo.isEmpty else { + return Just(([], nil)) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + + // Otherwise fetch the open group (if there is one) + return Just(( + (try? Attachment + .filter(ids: attachmentStateInfo.map { $0.attachmentId }) + .fetchAll(db)) + .defaulting(to: []), + try? OpenGroup.fetchOne(db, id: threadId) + )) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } + .flatMap { attachments, openGroup -> AnyPublisher<[String?], Error> in + guard !attachments.isEmpty else { + return Just<[String?]>([]) + .setFailureType(to: Error.self) + .eraseToAnyPublisher() + } return Publishers .MergeMany( - (try? Attachment - .filter(ids: attachmentStateInfo.map { $0.attachmentId }) - .fetchAll(db)) - .defaulting(to: []) + attachments .map { attachment -> AnyPublisher in - Future { resolver in - attachment.upload( - db, - queue: DispatchQueue.global(qos: .userInitiated), - using: { db, data in - if let openGroup: OpenGroup = openGroup { - return OpenGroupAPI - .uploadFile( - db, - bytes: data.bytes, - to: openGroup.roomToken, - on: openGroup.server - ) - .map { _, response -> String in response.id } - .eraseToAnyPublisher() - } - - return FileServerAPI.upload(data) - .map { response -> String in response.id } - .eraseToAnyPublisher() - }, - encrypt: (openGroup == nil), - success: { fileId in resolver(Swift.Result.success(fileId)) }, - failure: { resolver(Swift.Result.failure($0)) } + attachment + .upload( + to: ( + openGroup.map { Attachment.Destination.openGroup($0) } ?? + .fileServer + ), + queue: DispatchQueue.global(qos: .userInitiated) ) - } - .eraseToAnyPublisher() } ) .collect() .eraseToAnyPublisher() } - .defaulting( - to: Just<[String?]>([]) - .setFailureType(to: Error.self) - .eraseToAnyPublisher() - ) - - return fileIdPublisher - .map { results in + .map { results -> PreparedSendData in // Once the attachments are processed then update the PreparedSendData with // the fileIds associated to the message let fileIds: [String] = results.compactMap { result -> String? in result } @@ -244,15 +235,11 @@ extension MessageSender { .catch { _ in seal.reject(StorageError.generic) } .retainUntilComplete() - // TODO: Test this (does it break anything? want to stop the db write asap) - /// We don't want to block the db write thread so we trigger the actual message sending after the query has - /// finished - return Future { resolver in - db.afterNextTransaction { _ in - resolver(Result.success(())) - } - } - .flatMap { _ in MessageSender.sendImmediate(preparedSendData: sendData) } - .eraseToAnyPublisher() + /// We want to avoid blocking the db write thread so we dispatch the API call to a different thread + return Just(()) + .setFailureType(to: Error.self) + .receive(on: DispatchQueue.global(qos: .userInitiated)) + .flatMap { _ in MessageSender.sendImmediate(preparedSendData: sendData) } + .eraseToAnyPublisher() } } diff --git a/SessionMessagingKit/Sending & Receiving/MessageSender.swift b/SessionMessagingKit/Sending & Receiving/MessageSender.swift index 5dbc6213b..5b8eccd82 100644 --- a/SessionMessagingKit/Sending & Receiving/MessageSender.swift +++ b/SessionMessagingKit/Sending & Receiving/MessageSender.swift @@ -17,6 +17,7 @@ public final class MessageSender { let destination: Message.Destination? let interactionId: Int64? let isSyncMessage: Bool? + let totalAttachmentsUploaded: Int let snodeMessage: SnodeMessage? let plaintext: Data? @@ -32,6 +33,7 @@ public final class MessageSender { destination: Message.Destination?, interactionId: Int64?, isSyncMessage: Bool?, + totalAttachmentsUploaded: Int = 0, snodeMessage: SnodeMessage?, plaintext: Data?, ciphertext: Data?, @@ -44,6 +46,7 @@ public final class MessageSender { self.destination = destination self.interactionId = interactionId self.isSyncMessage = isSyncMessage + self.totalAttachmentsUploaded = totalAttachmentsUploaded self.snodeMessage = snodeMessage self.plaintext = plaintext @@ -60,6 +63,7 @@ public final class MessageSender { self.destination = nil self.interactionId = nil self.isSyncMessage = nil + self.totalAttachmentsUploaded = 0 self.snodeMessage = nil self.plaintext = nil @@ -84,6 +88,7 @@ public final class MessageSender { self.destination = destination self.interactionId = interactionId self.isSyncMessage = isSyncMessage + self.totalAttachmentsUploaded = 0 self.snodeMessage = snodeMessage self.plaintext = nil @@ -105,6 +110,7 @@ public final class MessageSender { self.destination = destination self.interactionId = interactionId self.isSyncMessage = false + self.totalAttachmentsUploaded = 0 self.snodeMessage = nil self.plaintext = plaintext @@ -126,6 +132,7 @@ public final class MessageSender { self.destination = destination self.interactionId = interactionId self.isSyncMessage = false + self.totalAttachmentsUploaded = 0 self.snodeMessage = nil self.plaintext = nil @@ -143,6 +150,7 @@ public final class MessageSender { destination: destination?.with(fileIds: fileIds), interactionId: interactionId, isSyncMessage: isSyncMessage, + totalAttachmentsUploaded: fileIds.count, snodeMessage: snodeMessage, plaintext: plaintext, ciphertext: ciphertext, @@ -607,6 +615,24 @@ public final class MessageSender { .eraseToAnyPublisher() } + // We now allow the creation of message data without validating it's attachments have finished + // uploading first, this is here to ensure we don't send a message which should have uploaded + // files + // + // If you see this error then you need to call `MessageSender.performUploadsIfNeeded(preparedSendData:)` + // before calling this function + switch preparedSendData.message { + case let visibleMessage as VisibleMessage: + guard visibleMessage.attachmentIds.count == preparedSendData.totalAttachmentsUploaded else { + return Fail(error: MessageSenderError.attachmentsNotUploaded) + .eraseToAnyPublisher() + } + + break + + default: break + } + switch preparedSendData.destination { case .contact, .closedGroup: return sendToSnodeDestination(data: preparedSendData, using: dependencies) case .openGroup: return sendToOpenGroupDestination(data: preparedSendData, using: dependencies) diff --git a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift index bc779d665..65827d871 100644 --- a/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift +++ b/SessionMessagingKit/Sending & Receiving/Pollers/OpenGroupPoller.swift @@ -28,7 +28,6 @@ extension OpenGroupAPI { } public func startIfNeeded(using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) { - return// TODO: Remove this (reentrancy issues - looks like it could be resolved by splitting out the OpenGroupAPI request signing into it's own step) guard !hasStarted else { return } hasStarted = true @@ -113,14 +112,7 @@ extension OpenGroupAPI { .map { response in (failureCount, response) } .eraseToAnyPublisher() } - .subscribe( - // If this was run via the background poller then don't run on the pollerQueue - // TODO: Need to test if this dispatches to the next run loop or blocks (want it to block) - on: (calledFromBackgroundPoller ? - DispatchQueue.main : - Threading.pollerQueue - ) - ) + .subscribe(on: Threading.pollerQueue) .handleEvents( receiveOutput: { [weak self] failureCount, response in guard !calledFromBackgroundPoller || isBackgroundPollerValid() else { diff --git a/SessionMessagingKit/Utilities/ProfileManager.swift b/SessionMessagingKit/Utilities/ProfileManager.swift index f44ebc548..39aa0f270 100644 --- a/SessionMessagingKit/Utilities/ProfileManager.swift +++ b/SessionMessagingKit/Utilities/ProfileManager.swift @@ -472,7 +472,7 @@ public struct ProfileManager { } }, receiveValue: { fileUploadResponse in - let downloadUrl: String = "\(FileServerAPI.server)/files/\(fileUploadResponse.id)" + let downloadUrl: String = "\(FileServerAPI.server)/file/\(fileUploadResponse.id)" // Update the cached avatar image value profileAvatarCache.mutate { $0[fileName] = data } diff --git a/SessionShareExtension/ThreadPickerVC.swift b/SessionShareExtension/ThreadPickerVC.swift index 8981335ef..c5b298362 100644 --- a/SessionShareExtension/ThreadPickerVC.swift +++ b/SessionShareExtension/ThreadPickerVC.swift @@ -155,7 +155,6 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView receiveValue: { [weak self] attachments in guard let strongSelf = self else { return } - // TODO: Test this let approvalVC: UINavigationController = AttachmentApprovalViewController.wrappedInNavController( threadId: strongSelf.viewModel.viewData[indexPath.row].threadId, attachments: attachments, @@ -190,7 +189,7 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView NotificationCenter.default.post(name: Database.resumeNotification, object: self) Storage.shared - .writePublisher { [weak self] db -> MessageSender.PreparedSendData in + .writePublisher { db -> MessageSender.PreparedSendData in guard let thread: SessionThread = try SessionThread.fetchOne(db, id: threadId) else { throw MessageSenderError.noThread } @@ -251,6 +250,7 @@ final class ThreadPickerVC: UIViewController, UITableViewDataSource, UITableView } .flatMap { MessageSender.performUploadsIfNeeded(preparedSendData: $0) } .flatMap { MessageSender.sendImmediate(preparedSendData: $0) } + .receive(on: DispatchQueue.main) .sinkUntilComplete( receiveCompletion: { [weak self] result in // Suspend the database diff --git a/SessionSnodeKit/Networking/SnodeAPI.swift b/SessionSnodeKit/Networking/SnodeAPI.swift index aef9ebb3b..3c88ada34 100644 --- a/SessionSnodeKit/Networking/SnodeAPI.swift +++ b/SessionSnodeKit/Networking/SnodeAPI.swift @@ -88,9 +88,6 @@ public final class SnodeAPI { } private static func dropSnodeFromSnodePool(_ snode: Snode) { - #if DEBUG - dispatchPrecondition(condition: .onQueue(Threading.workQueue)) - #endif var snodePool = SnodeAPI.snodePool.wrappedValue snodePool.remove(snode) setSnodePool(to: snodePool) diff --git a/SessionSnodeKit/OnionRequestAPI+Encryption.swift b/SessionSnodeKit/OnionRequestAPI+Encryption.swift index 2878973fe..4d39e5954 100644 --- a/SessionSnodeKit/OnionRequestAPI+Encryption.swift +++ b/SessionSnodeKit/OnionRequestAPI+Encryption.swift @@ -29,7 +29,6 @@ internal extension OnionRequestAPI { _ payload: Data, for destination: OnionRequestAPIDestination ) -> AnyPublisher { - // TODO: Test performance switch destination { case .snode(let snode): // Need to wrap the payload for snode requests @@ -66,7 +65,6 @@ internal extension OnionRequestAPI { to rhs: OnionRequestAPIDestination, using previousEncryptionResult: AESGCM.EncryptionResult ) -> AnyPublisher { - // TODO: Test performance var parameters: JSON switch rhs { diff --git a/SessionUtilitiesKit/Networking/HTTP.swift b/SessionUtilitiesKit/Networking/HTTP.swift index 3882bec0e..5dbd54a64 100644 --- a/SessionUtilitiesKit/Networking/HTTP.swift +++ b/SessionUtilitiesKit/Networking/HTTP.swift @@ -89,7 +89,7 @@ public enum HTTP { public static func execute( _ method: HTTPMethod, _ url: String, - body: Data?, // TODO: Default Value? + body: Data?, timeout: TimeInterval = HTTP.defaultTimeout, useSeedNodeURLSession: Bool = false ) -> AnyPublisher {