Merge branch 'dev' into menu-redesign

pull/689/head
ryanzhao 2 years ago
commit 681fe92ee5

@ -139,6 +139,8 @@
7B81682328A4C1210069F315 /* UpdateTypes.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682228A4C1210069F315 /* UpdateTypes.swift */; };
7B81682828B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682728B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift */; };
7B8C44C528B49DDA00FBE25F /* NewConversationVC.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B8C44C428B49DDA00FBE25F /* NewConversationVC.swift */; };
7B81682A28B6F1420069F315 /* ReactionResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682928B6F1420069F315 /* ReactionResponse.swift */; };
7B81682C28B72F480069F315 /* PendingChange.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B81682B28B72F480069F315 /* PendingChange.swift */; };
7B8D5FC428332600008324D9 /* VisibleMessage+Reaction.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B8D5FC328332600008324D9 /* VisibleMessage+Reaction.swift */; };
7B93D06A27CF173D00811CB6 /* MessageRequestsViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B93D06927CF173D00811CB6 /* MessageRequestsViewController.swift */; };
7B93D07027CF194000811CB6 /* ConfigurationMessage+Convenience.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7B93D06E27CF194000811CB6 /* ConfigurationMessage+Convenience.swift */; };
@ -1179,6 +1181,8 @@
7B81682228A4C1210069F315 /* UpdateTypes.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UpdateTypes.swift; sourceTree = "<group>"; };
7B81682728B310D50069F315 /* _007_HomeQueryOptimisationIndexes.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = _007_HomeQueryOptimisationIndexes.swift; sourceTree = "<group>"; };
7B8C44C428B49DDA00FBE25F /* NewConversationVC.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = NewConversationVC.swift; sourceTree = "<group>"; };
7B81682928B6F1420069F315 /* ReactionResponse.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ReactionResponse.swift; sourceTree = "<group>"; };
7B81682B28B72F480069F315 /* PendingChange.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PendingChange.swift; sourceTree = "<group>"; };
7B8D5FC328332600008324D9 /* VisibleMessage+Reaction.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "VisibleMessage+Reaction.swift"; sourceTree = "<group>"; };
7B93D06927CF173D00811CB6 /* MessageRequestsViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = MessageRequestsViewController.swift; sourceTree = "<group>"; };
7B93D06E27CF194000811CB6 /* ConfigurationMessage+Convenience.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ConfigurationMessage+Convenience.swift"; sourceTree = "<group>"; };
@ -3853,6 +3857,8 @@
FDC438A327BB107F00C60D73 /* UserBanRequest.swift */,
FDC438A527BB113A00C60D73 /* UserUnbanRequest.swift */,
FDC438A927BB12BB00C60D73 /* UserModeratorRequest.swift */,
7B81682928B6F1420069F315 /* ReactionResponse.swift */,
7B81682B28B72F480069F315 /* PendingChange.swift */,
);
path = Models;
sourceTree = "<group>";
@ -5190,6 +5196,7 @@
C3471ECB2555356A00297E91 /* MessageSender+Encryption.swift in Sources */,
FDF40CDE2897A1BC006A0CC4 /* _004_RemoveLegacyYDB.swift in Sources */,
FDF0B74928060D13004C14C5 /* QuotedReplyModel.swift in Sources */,
7B81682C28B72F480069F315 /* PendingChange.swift in Sources */,
FD77289A284AF1BD0018502F /* Sodium+Utilities.swift in Sources */,
FD5C7309285007920029977D /* BlindedIdLookup.swift in Sources */,
7B4C75CB26B37E0F0000AC89 /* UnsendRequest.swift in Sources */,
@ -5203,6 +5210,7 @@
FD6A7A6B2818C17C00035AC1 /* UpdateProfilePictureJob.swift in Sources */,
FD716E6A2850327900C96BF4 /* EndCallMode.swift in Sources */,
FDF0B75C2807F41D004C14C5 /* MessageSender+Convenience.swift in Sources */,
7B81682A28B6F1420069F315 /* ReactionResponse.swift in Sources */,
FD09799727FFA84A00936362 /* RecipientState.swift in Sources */,
FDA8EB00280E8D58002B68E5 /* FailedAttachmentDownloadsJob.swift in Sources */,
FD09798927FD1C5A00936362 /* OpenGroup.swift in Sources */,
@ -5784,7 +5792,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEBUG_INFORMATION_FORMAT = dwarf;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -5857,7 +5865,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO;
@ -5923,7 +5931,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEBUG_INFORMATION_FORMAT = dwarf;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = "$(inherited)";
@ -5997,7 +6005,7 @@
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CODE_SIGN_STYLE = Automatic;
COPY_PHASE_STRIP = NO;
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym";
DEVELOPMENT_TEAM = SUQ8J2PCT7;
ENABLE_NS_ASSERTIONS = NO;
@ -6935,7 +6943,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",
@ -7007,7 +7015,7 @@
CODE_SIGN_ENTITLEMENTS = Session/Meta/Signal.entitlements;
CODE_SIGN_IDENTITY = "iPhone Developer";
"CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer";
CURRENT_PROJECT_VERSION = 372;
CURRENT_PROJECT_VERSION = 373;
DEVELOPMENT_TEAM = SUQ8J2PCT7;
FRAMEWORK_SEARCH_PATHS = (
"$(inherited)",

@ -150,7 +150,8 @@ extension ContextMenuVC {
)
let canDelete: Bool = (
cellViewModel.threadVariant != .openGroup ||
currentUserIsOpenGroupModerator
currentUserIsOpenGroupModerator ||
cellViewModel.state == .failed
)
let canBan: Bool = (
cellViewModel.threadVariant == .openGroup &&

@ -1137,6 +1137,14 @@ extension ConversationVC:
else { return }
if remove {
let pendingChange = OpenGroupManager
.addPendingReaction(
emoji: emoji,
id: openGroupServerMessageId,
in: openGroup.roomToken,
on: openGroup.server,
type: .remove
)
OpenGroupAPI
.reactionDelete(
db,
@ -1145,8 +1153,23 @@ extension ConversationVC:
in: openGroup.roomToken,
on: openGroup.server
)
.map { _, response in
OpenGroupManager
.updatePendingChange(
pendingChange,
seqNo: response.seqNo
)
}
.retainUntilComplete()
} else {
let pendingChange = OpenGroupManager
.addPendingReaction(
emoji: emoji,
id: openGroupServerMessageId,
in: openGroup.roomToken,
on: openGroup.server,
type: .react
)
OpenGroupAPI
.reactionAdd(
db,
@ -1155,6 +1178,13 @@ extension ConversationVC:
in: openGroup.roomToken,
on: openGroup.server
)
.map { _, response in
OpenGroupManager
.updatePendingChange(
pendingChange,
seqNo: response.seqNo
)
}
.retainUntilComplete()
}
@ -1383,7 +1413,60 @@ extension ConversationVC:
on: openGroup.server
)
)
else { return }
else {
// If the message hasn't been sent yet then just delete locally
guard cellViewModel.state == .sending || cellViewModel.state == .failed else {
return
}
// Retrieve any message send jobs for this interaction
let jobs: [Job] = Storage.shared
.read { db in
try? Job
.filter(Job.Columns.variant == Job.Variant.messageSend)
.filter(Job.Columns.interactionId == cellViewModel.id)
.fetchAll(db)
}
.defaulting(to: [])
// If the job is currently running then wait until it's done before triggering
// the deletion
let targetJob: Job? = jobs.first(where: { JobRunner.isCurrentlyRunning($0) })
guard targetJob == nil else {
JobRunner.afterCurrentlyRunningJob(targetJob) { [weak self] result in
switch result {
// If it succeeded then we'll need to delete from the server so re-run
// this function (if we still don't have the server id for some reason
// then this would result in a local-only deletion which should be fine
case .succeeded: self?.delete(cellViewModel)
// Otherwise we just need to cancel the pending job (in case it retries)
// and delete the interaction
default:
JobRunner.removePendingJob(targetJob)
Storage.shared.writeAsync { db in
_ = try Interaction
.filter(id: cellViewModel.id)
.deleteAll(db)
}
}
}
return
}
// If it's not currently running then remove any pending jobs (just to be safe) and
// delete the interaction locally
jobs.forEach { JobRunner.removePendingJob($0) }
Storage.shared.writeAsync { db in
_ = try Interaction
.filter(id: cellViewModel.id)
.deleteAll(db)
}
return
}
// Delete the message from the open group
deleteRemotely(

@ -259,15 +259,21 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
threadId: thread.id,
threadVariant: thread.variant
)
let fallbackSound: Preferences.Sound = db[.defaultNotificationSound]
.defaulting(to: Preferences.Sound.defaultNotificationSound)
DispatchQueue.main.async {
let sound: Preferences.Sound? = self.requestSound(
thread: thread,
fallbackSound: fallbackSound
)
notificationBody = MentionUtilities.highlightMentions(
in: (notificationBody ?? ""),
threadVariant: thread.variant,
currentUserPublicKey: userPublicKey,
currentUserBlindedPublicKey: userBlindedKey
)
let sound: Preferences.Sound? = self.requestSound(thread: thread)
self.adaptee.notify(
category: category,
@ -322,9 +328,14 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
threadName
)
}
let fallbackSound: Preferences.Sound = db[.defaultNotificationSound]
.defaulting(to: Preferences.Sound.defaultNotificationSound)
DispatchQueue.main.async {
let sound = self.requestSound(thread: thread)
let sound = self.requestSound(
thread: thread,
fallbackSound: fallbackSound
)
self.adaptee.notify(
category: category,
@ -373,9 +384,14 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
closedGroupName: nil, // Not supported
openGroupName: nil // Not supported
)
let fallbackSound: Preferences.Sound = db[.defaultNotificationSound]
.defaulting(to: Preferences.Sound.defaultNotificationSound)
DispatchQueue.main.async {
let sound = self.requestSound(thread: thread)
let sound = self.requestSound(
thread: thread,
fallbackSound: fallbackSound
)
self.adaptee.notify(
category: category,
@ -420,9 +436,14 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
let userInfo = [
AppNotificationUserInfoKey.threadId: thread.id
]
let fallbackSound: Preferences.Sound = db[.defaultNotificationSound]
.defaulting(to: Preferences.Sound.defaultNotificationSound)
DispatchQueue.main.async {
let sound: Preferences.Sound? = self.requestSound(thread: thread)
let sound: Preferences.Sound? = self.requestSound(
thread: thread,
fallbackSound: fallbackSound
)
self.adaptee.notify(
category: .errorMessage,
@ -458,12 +479,12 @@ public class NotificationPresenter: NSObject, NotificationsProtocol {
var mostRecentNotifications = TruncatedList<UInt64>(maxLength: kAudioNotificationsThrottleCount)
private func requestSound(thread: SessionThread) -> Preferences.Sound? {
private func requestSound(thread: SessionThread, fallbackSound: Preferences.Sound) -> Preferences.Sound? {
guard checkIfShouldPlaySound() else {
return nil
}
return thread.notificationSound
return (thread.notificationSound ?? fallbackSound)
}
private func checkIfShouldPlaySound() -> Bool {

@ -353,6 +353,7 @@ public extension Message {
_ db: Database,
openGroupId: String,
message: OpenGroupAPI.Message,
associatedPendingChanges: [OpenGroupAPI.PendingChange],
dependencies: SMKDependencies = SMKDependencies()
) -> [Reaction] {
var results: [Reaction] = []
@ -364,13 +365,46 @@ public extension Message {
threadVariant: .openGroup
)
for (encodedEmoji, rawReaction) in reactions {
if let emoji = encodedEmoji.removingPercentEncoding,
if let decodedEmoji = encodedEmoji.removingPercentEncoding,
rawReaction.count > 0,
let reactors = rawReaction.reactors
{
// Decide whether we need to add an extra reaction from current user
let pendingChangeSelfReaction: Bool? = {
// Find the newest 'PendingChange' entry with a matching emoji, if one exists, and
// set the "self reaction" value based on it's action
let maybePendingChange: OpenGroupAPI.PendingChange? = associatedPendingChanges
.sorted(by: { lhs, rhs -> Bool in (lhs.seqNo ?? Int64.max) > (rhs.seqNo ?? Int64.max) })
.first { pendingChange in
if case .reaction(_, let emoji, _) = pendingChange.metadata {
return emoji == decodedEmoji
}
return false
}
// If there is no pending change for this reaction then return nil
guard
let pendingChange: OpenGroupAPI.PendingChange = maybePendingChange,
case .reaction(_, _, let action) = pendingChange.metadata
else { return nil }
// Otherwise add/remove accordingly
return (action == .react)
}()
let shouldAddSelfReaction: Bool = (
pendingChangeSelfReaction ??
(rawReaction.you || reactors.contains(userPublicKey))
)
let count: Int64 = rawReaction.you ? rawReaction.count - 1 : rawReaction.count
let timestampMs: Int64 = Int64(floor((Date().timeIntervalSince1970 * 1000)))
let maxLength: Int = shouldAddSelfReaction ? 4 : 5
let desiredReactorIds: [String] = reactors
.filter { $0 != blindedUserPublicKey }
.filter { $0 != blindedUserPublicKey && $0 != userPublicKey } // Remove current user for now, will add back if needed
.prefix(maxLength)
.map{ $0 }
results = results
.appending( // Add the first reaction (with the count)
@ -381,8 +415,8 @@ public extension Message {
serverHash: nil,
timestampMs: timestampMs,
authorId: reactor,
emoji: emoji,
count: rawReaction.count,
emoji: decodedEmoji,
count: count,
sortId: rawReaction.index
)
}
@ -398,22 +432,22 @@ public extension Message {
serverHash: nil,
timestampMs: timestampMs,
authorId: reactor,
emoji: emoji,
emoji: decodedEmoji,
count: 0, // Only want this on the first reaction
sortId: rawReaction.index
)
}
)
.appending( // Add the current user reaction (if applicable and not already included)
!rawReaction.you || reactors.contains(userPublicKey) ?
!shouldAddSelfReaction ?
nil :
Reaction(
interactionId: message.id,
serverHash: nil,
timestampMs: timestampMs,
authorId: userPublicKey,
emoji: emoji,
count: (desiredReactorIds.isEmpty ? rawReaction.count : 0),
emoji: decodedEmoji,
count: 1,
sortId: rawReaction.index
)
)

@ -0,0 +1,41 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
extension OpenGroupAPI {
public struct PendingChange: Equatable {
enum ChangeType {
case reaction
}
enum Metadata {
case reaction(messageId: Int64, emoji: String, action: VisibleMessage.VMReaction.Kind)
}
let server: String
let room: String
let changeType: ChangeType
var seqNo: Int64?
let metadata: Metadata
public static func == (lhs: OpenGroupAPI.PendingChange, rhs: OpenGroupAPI.PendingChange) -> Bool {
guard lhs.server == rhs.server &&
lhs.room == rhs.room &&
lhs.changeType == rhs.changeType &&
lhs.seqNo == rhs.seqNo
else {
return false
}
switch lhs.changeType {
case .reaction:
if case .reaction(let lhsMessageId, let lhsEmoji, let lhsAction) = lhs.metadata,
case .reaction(let rhsMessageId, let rhsEmoji, let rhsAction) = rhs.metadata {
return lhsMessageId == rhsMessageId && lhsEmoji == rhsEmoji && lhsAction == rhsAction
} else {
return false
}
}
}
}
}

@ -0,0 +1,44 @@
// Copyright © 2022 Rangeproof Pty Ltd. All rights reserved.
import Foundation
extension OpenGroupAPI {
public struct ReactionAddResponse: Codable, Equatable {
enum CodingKeys: String, CodingKey {
case added
case seqNo = "seqno"
}
/// This field indicates whether the reaction was added (true) or already present (false).
public let added: Bool
/// The seqNo after the reaction is added.
public let seqNo: Int64
}
public struct ReactionRemoveResponse: Codable, Equatable {
enum CodingKeys: String, CodingKey {
case removed
case seqNo = "seqno"
}
/// This field indicates whether the reaction was removed (true) or was not present to begin with (false).
public let removed: Bool
/// The seqNo after the reaction is removed.
public let seqNo: Int64
}
public struct ReactionRemoveAllResponse: Codable, Equatable {
enum CodingKeys: String, CodingKey {
case removed
case seqNo = "seqno"
}
/// This field shows the total number of reactions that were deleted.
public let removed: Int64
/// The seqNo after the reactions is all removed.
public let seqNo: Int64
}
}

@ -99,7 +99,7 @@ public enum OpenGroupAPI {
),
queryParameters: [
.updateTypes: UpdateTypes.reaction.rawValue,
.reactors: "20"
.reactors: "5"
]
),
responseType: [Failable<Message>].self
@ -701,7 +701,7 @@ public enum OpenGroupAPI {
in roomToken: String,
on server: String,
using dependencies: SMKDependencies = SMKDependencies()
) -> Promise<OnionRequestResponseInfoType> {
) -> Promise<(OnionRequestResponseInfoType, ReactionAddResponse)> {
/// URL(String:) won't convert raw emojis, so need to do a little encoding here.
/// The raw emoji will come back when calling url.path
guard let encodedEmoji: String = emoji.addingPercentEncoding(withAllowedCharacters: .urlPathAllowed) else {
@ -718,7 +718,7 @@ public enum OpenGroupAPI {
),
using: dependencies
)
.map { responseInfo, _ in responseInfo }
.decoded(as: ReactionAddResponse.self, on: OpenGroupAPI.workQueue, using: dependencies)
}
public static func reactionDelete(
@ -728,7 +728,7 @@ public enum OpenGroupAPI {
in roomToken: String,
on server: String,
using dependencies: SMKDependencies = SMKDependencies()
) -> Promise<OnionRequestResponseInfoType> {
) -> Promise<(OnionRequestResponseInfoType, ReactionRemoveResponse)> {
/// URL(String:) won't convert raw emojis, so need to do a little encoding here.
/// The raw emoji will come back when calling url.path
guard let encodedEmoji: String = emoji.addingPercentEncoding(withAllowedCharacters: .urlPathAllowed) else {
@ -745,7 +745,7 @@ public enum OpenGroupAPI {
),
using: dependencies
)
.map { responseInfo, _ in responseInfo }
.decoded(as: ReactionRemoveResponse.self, on: OpenGroupAPI.workQueue, using: dependencies)
}
public static func reactionDeleteAll(
@ -755,7 +755,7 @@ public enum OpenGroupAPI {
in roomToken: String,
on server: String,
using dependencies: SMKDependencies = SMKDependencies()
) -> Promise<OnionRequestResponseInfoType> {
) -> Promise<(OnionRequestResponseInfoType, ReactionRemoveAllResponse)> {
/// URL(String:) won't convert raw emojis, so need to do a little encoding here.
/// The raw emoji will come back when calling url.path
guard let encodedEmoji: String = emoji.addingPercentEncoding(withAllowedCharacters: .urlPathAllowed) else {
@ -772,7 +772,7 @@ public enum OpenGroupAPI {
),
using: dependencies
)
.map { responseInfo, _ in responseInfo }
.decoded(as: ReactionRemoveAllResponse.self, on: OpenGroupAPI.workQueue, using: dependencies)
}
// MARK: - Pinning

@ -20,6 +20,8 @@ public protocol OGMCacheType {
var timeSinceLastPoll: [String: TimeInterval] { get set }
func getTimeSinceLastOpen(using dependencies: Dependencies) -> TimeInterval
var pendingChanges: [OpenGroupAPI.PendingChange] { get set }
}
// MARK: - OpenGroupManager
@ -53,6 +55,8 @@ public final class OpenGroupManager: NSObject {
_timeSinceLastOpen = dependencies.date.timeIntervalSince(lastOpen)
return dependencies.date.timeIntervalSince(lastOpen)
}
public var pendingChanges: [OpenGroupAPI.PendingChange] = []
}
// MARK: - Variables
@ -529,11 +533,17 @@ public final class OpenGroupManager: NSObject {
.filter { $0.deleted == true }
.map { $0.id }
// Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId')
if let seqNo: Int64 = seqNo {
// Update the 'openGroupSequenceNumber' value (Note: SOGS V4 uses the 'seqNo' instead of the 'serverId')
_ = try? OpenGroup
.filter(id: openGroup.id)
.updateAll(db, OpenGroup.Columns.sequenceNumber.set(to: seqNo))
// Update pendingChange cache
dependencies.mutableCache.mutate {
$0.pendingChanges = $0.pendingChanges
.filter { $0.seqNo == nil || $0.seqNo! > seqNo }
}
}
// Process the messages
@ -589,11 +599,23 @@ public final class OpenGroupManager: NSObject {
db,
openGroupId: openGroup.id,
message: message,
associatedPendingChanges: dependencies.cache.pendingChanges
.filter {
guard $0.server == server && $0.room == roomToken && $0.changeType == .reaction else {
return false
}
if case .reaction(let messageId, _, _) = $0.metadata {
return messageId == message.id
}
return false
},
dependencies: dependencies
)
try MessageReceiver.handleOpenGroupReactions(
db,
threadId: openGroup.threadId,
openGroupMessageServerId: message.id,
openGroupReactions: reactions
)
@ -608,6 +630,7 @@ public final class OpenGroupManager: NSObject {
guard !messageServerIdsToRemove.isEmpty else { return }
_ = try? Interaction
.filter(Interaction.Columns.threadId == openGroup.threadId)
.filter(messageServerIdsToRemove.contains(Interaction.Columns.openGroupServerMessageId))
.deleteAll(db)
}
@ -736,6 +759,44 @@ public final class OpenGroupManager: NSObject {
// MARK: - Convenience
public static func addPendingReaction(
emoji: String,
id: Int64,
in roomToken: String,
on server: String,
type: VisibleMessage.VMReaction.Kind,
using dependencies: OGMDependencies = OGMDependencies()
) -> OpenGroupAPI.PendingChange {
let pendingChange = OpenGroupAPI.PendingChange(
server: server,
room: roomToken,
changeType: .reaction,
metadata: .reaction(
messageId: id,
emoji: emoji,
action: type
)
)
dependencies.mutableCache.mutate {
$0.pendingChanges.append(pendingChange)
}
return pendingChange
}
public static func updatePendingChange(
_ pendingChange: OpenGroupAPI.PendingChange,
seqNo: Int64,
using dependencies: OGMDependencies = OGMDependencies()
) {
dependencies.mutableCache.mutate {
if let index = $0.pendingChanges.firstIndex(of: pendingChange) {
$0.pendingChanges[index].seqNo = seqNo
}
}
}
/// This method specifies if the given capability is supported on a specified Open Group
public static func isOpenGroupSupport(
_ capability: Capability.Variant,

@ -21,7 +21,11 @@ extension MessageReceiver {
case .screenshot: return .infoScreenshotNotification
case .mediaSaved: return .infoMediaSavedNotification
}
}()
}(),
timestampMs: (
message.sentTimestamp.map { Int64($0) } ??
Int64(floor(Date().timeIntervalSince1970 * 1000))
)
).inserted(db)
}
}

@ -249,11 +249,13 @@ public enum MessageReceiver {
public static func handleOpenGroupReactions(
_ db: Database,
threadId: String,
openGroupMessageServerId: Int64,
openGroupReactions: [Reaction]
) throws {
guard let interactionId: Int64 = try? Interaction
.select(.id)
.filter(Interaction.Columns.threadId == threadId)
.filter(Interaction.Columns.openGroupServerMessageId == openGroupMessageServerId)
.asRequest(of: Int64.self)
.fetchOne(db)

@ -637,6 +637,7 @@ public extension MessageViewModel {
let attachmentIdColumnLiteral: SQL = SQL(stringLiteral: Attachment.Columns.id.name)
let groupMemberModeratorTableLiteral: SQL = SQL(stringLiteral: "groupMemberModerator")
let groupMemberAdminTableLiteral: SQL = SQL(stringLiteral: "groupMemberAdmin")
let groupMemberGroupIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.groupId.name)
let groupMemberProfileIdColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.profileId.name)
let groupMemberRoleColumnLiteral: SQL = SQL(stringLiteral: GroupMember.Columns.role.name)
@ -715,11 +716,13 @@ public extension MessageViewModel {
)
LEFT JOIN \(GroupMember.self) AS \(groupMemberModeratorTableLiteral) ON (
\(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND
\(groupMemberModeratorTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND
\(groupMemberModeratorTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND
\(SQL("\(groupMemberModeratorTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.moderator)"))
)
LEFT JOIN \(GroupMember.self) AS \(groupMemberAdminTableLiteral) ON (
\(SQL("\(thread[.variant]) = \(SessionThread.Variant.openGroup)")) AND
\(groupMemberAdminTableLiteral).\(groupMemberGroupIdColumnLiteral) = \(interaction[.threadId]) AND
\(groupMemberAdminTableLiteral).\(groupMemberProfileIdColumnLiteral) = \(interaction[.authorId]) AND
\(SQL("\(groupMemberAdminTableLiteral).\(groupMemberRoleColumnLiteral) = \(GroupMember.Role.admin)"))
)

@ -110,9 +110,39 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// changes only include table and column info at this stage
guard allObservedTableNames.contains(event.tableName) else { return }
// When generating the tracked change we need to check if the change was
// a deletion to a related table (if so then once the change is performed
// there won't be a way to associated the deleted related record to the
// original so we need to retrieve the association in here)
let trackedChange: PagedData.TrackedChange = {
guard
event.tableName != pagedTableName,
event.kind == .delete,
let observedChange: PagedData.ObservedChanges = observedTableChangeTypes[event.tableName],
let joinToPagedType: SQL = observedChange.joinToPagedType
else { return PagedData.TrackedChange(event: event) }
// Retrieve the pagedRowId for the related value that is
// getting deleted
let pagedRowIds: [Int64] = Storage.shared
.read { db in
PagedData.pagedRowIdsForRelatedRowIds(
db,
tableName: event.tableName,
pagedTableName: pagedTableName,
relatedRowIds: [event.rowID],
joinToPagedType: joinToPagedType
)
}
.defaulting(to: [])
return PagedData.TrackedChange(event: event, pagedRowIdsForRelatedDeletion: pagedRowIds)
}()
// The 'event' object only exists during this method so we need to copy the info
// from it, otherwise it will cease to exist after this metod call finishes
changesInCommit.mutate { $0.insert(PagedData.TrackedChange(event: event)) }
changesInCommit.mutate { $0.insert(trackedChange) }
}
// Note: We will process all updates which come through this method even if
@ -180,13 +210,17 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.filter { $0.tableName == pagedTableName }
let relatedChanges: [String: [PagedData.TrackedChange]] = committedChanges
.filter { $0.tableName != pagedTableName }
.filter { $0.kind != .delete }
.reduce(into: [:]) { result, next in
guard observedTableChangeTypes[next.tableName] != nil else { return }
result[next.tableName] = (result[next.tableName] ?? []).appending(next)
}
let relatedDeletions: [PagedData.TrackedChange] = committedChanges
.filter { $0.tableName != pagedTableName }
.filter { $0.kind == .delete }
guard !directChanges.isEmpty || !relatedChanges.isEmpty else {
guard !directChanges.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(self.dataCache.wrappedValue, self.pageInfo.wrappedValue, false)
return
}
@ -219,7 +253,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
let changesToQuery: [PagedData.TrackedChange] = directChanges
.filter { $0.kind != .delete }
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty else {
guard !changesToQuery.isEmpty || !relatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty)
return
}
@ -248,7 +282,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
.asSet()
}()
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty else {
guard !changesToQuery.isEmpty || !pagedRowIdsForRelatedChanges.isEmpty || !relatedDeletions.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, !deletionChanges.isEmpty)
return
}
@ -270,6 +304,16 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
orderSQL: orderSQL,
filterSQL: filterSQL
)
let relatedDeletionIndexes: [PagedData.RowIndexInfo] = PagedData.indexes(
db,
rowIds: relatedDeletions
.compactMap { $0.pagedRowIdsForRelatedDeletion }
.flatMap { $0 },
tableName: pagedTableName,
requiredJoinSQL: joinSQL,
orderSQL: orderSQL,
filterSQL: filterSQL
)
// Determine if the indexes for the row ids should be displayed on the screen and remove any
// which shouldn't - values less than 'currentCount' or if there is at least one value less than
@ -306,6 +350,7 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
}
let validChangeRowIds: [Int64] = determineValidChanges(for: itemIndexes)
let validRelatedChangeRowIds: [Int64] = determineValidChanges(for: relatedChangeIndexes)
let validRelatedDeletionRowIds: [Int64] = determineValidChanges(for: relatedDeletionIndexes)
let countBefore: Int = itemIndexes.filter { $0.rowIndex < updatedPageInfo.pageOffset }.count
// Update the offset and totalCount even if the rows are outside of the current page (need to
@ -325,13 +370,13 @@ public class PagedDatabaseObserver<ObservedTable, T>: TransactionObserver where
// If there are no valid row ids then stop here (trigger updates though since the page info
// has changes)
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty else {
guard !validChangeRowIds.isEmpty || !validRelatedChangeRowIds.isEmpty || !validRelatedDeletionRowIds.isEmpty else {
updateDataAndCallbackIfNeeded(updatedDataCache, updatedPageInfo, true)
return
}
// Fetch the inserted/updated rows
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds).asSet())
let targetRowIds: [Int64] = Array((validChangeRowIds + validRelatedChangeRowIds + validRelatedDeletionRowIds).asSet())
let updatedItems: [T] = (try? dataQuery(targetRowIds)
.fetchAll(db))
.defaulting(to: [])
@ -904,11 +949,13 @@ public enum PagedData {
let tableName: String
let kind: DatabaseEvent.Kind
let rowId: Int64
let pagedRowIdsForRelatedDeletion: [Int64]?
init(event: DatabaseEvent) {
init(event: DatabaseEvent, pagedRowIdsForRelatedDeletion: [Int64]? = nil) {
self.tableName = event.tableName
self.kind = event.kind
self.rowId = event.rowID
self.pagedRowIdsForRelatedDeletion = pagedRowIdsForRelatedDeletion
}
}

@ -36,6 +36,13 @@ public protocol JobExecutor {
}
public final class JobRunner {
public enum JobResult {
case succeeded
case failed
case deferred
case notFound
}
private static let blockingQueue: Atomic<JobQueue?> = Atomic(
JobQueue(
type: .blocking,
@ -332,6 +339,15 @@ public final class JobRunner {
.defaulting(to: [:])
}
public static func afterCurrentlyRunningJob(_ job: Job?, callback: @escaping (JobResult) -> ()) {
guard let job: Job = job, let jobId: Int64 = job.id, let queue: JobQueue = queues.wrappedValue[job.variant] else {
callback(.notFound)
return
}
queue.afterCurrentlyRunningJob(jobId, callback: callback)
}
public static func hasPendingOrRunningJob<T: Encodable>(with variant: Job.Variant, details: T) -> Bool {
guard let targetQueue: JobQueue = queues.wrappedValue[variant] else { return false }
guard let detailsData: Data = try? JSONEncoder().encode(details) else { return false }
@ -339,6 +355,12 @@ public final class JobRunner {
return targetQueue.hasPendingOrRunningJob(with: detailsData)
}
public static func removePendingJob(_ job: Job?) {
guard let job: Job = job, let jobId: Int64 = job.id else { return }
queues.wrappedValue[job.variant]?.removePendingJob(jobId)
}
// MARK: - Convenience
fileprivate static func getRetryInterval(for job: Job) -> TimeInterval {
@ -445,6 +467,7 @@ private final class JobQueue {
fileprivate var isRunning: Atomic<Bool> = Atomic(false)
private var queue: Atomic<[Job]> = Atomic([])
private var jobsCurrentlyRunning: Atomic<Set<Int64>> = Atomic([])
private var jobCallbacks: Atomic<[Int64: [(JobRunner.JobResult) -> ()]]> = Atomic([:])
private var detailsForCurrentlyRunningJobs: Atomic<[Int64: Data?]> = Atomic([:])
private var deferLoopTracker: Atomic<[Int64: (count: Int, times: [TimeInterval])]> = Atomic([:])
@ -560,12 +583,29 @@ private final class JobQueue {
return detailsForCurrentlyRunningJobs.wrappedValue
}
fileprivate func afterCurrentlyRunningJob(_ jobId: Int64, callback: @escaping (JobRunner.JobResult) -> ()) {
guard isCurrentlyRunning(jobId) else {
callback(.notFound)
return
}
jobCallbacks.mutate { jobCallbacks in
jobCallbacks[jobId] = (jobCallbacks[jobId] ?? []).appending(callback)
}
}
fileprivate func hasPendingOrRunningJob(with detailsData: Data?) -> Bool {
let pendingJobs: [Job] = queue.wrappedValue
return pendingJobs.contains { job in job.details == detailsData }
}
fileprivate func removePendingJob(_ jobId: Int64) {
queue.mutate { queue in
queue = queue.filter { $0.id != jobId }
}
}
// MARK: - Job Running
fileprivate func start(force: Bool = false) {
@ -900,10 +940,8 @@ private final class JobQueue {
}
}
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set and start the next one
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
// Perform job cleanup and start the next job
performCleanUp(for: job, result: .succeeded)
internalQueue.async { [weak self] in
self?.runNextJob()
}
@ -914,8 +952,7 @@ private final class JobQueue {
private func handleJobFailed(_ job: Job, error: Error?, permanentFailure: Bool) {
guard Storage.shared.read({ db in try Job.exists(db, id: job.id ?? -1) }) == true else {
SNLog("[JobRunner] \(queueContext) \(job.variant) job canceled")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
@ -923,12 +960,30 @@ private final class JobQueue {
return
}
// If this is the blocking queue and a "blocking" job failed then rerun it immediately
// If this is the blocking queue and a "blocking" job failed then rerun it
// immediately (in this case we don't trigger any job callbacks because the
// job isn't actually done, it's going to try again immediately)
if self.type == .blocking && job.shouldBlock {
SNLog("[JobRunner] \(queueContext) \(job.variant) job failed; retrying immediately")
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
queue.mutate { $0.insert(job, at: 0) }
// If it was a possible deferral loop then we don't actually want to
// retry the job (even if it's a blocking one, this gives a small chance
// that the app could continue to function)
let wasPossibleDeferralLoop: Bool = {
if let error = error, case JobRunnerError.possibleDeferralLoop = error { return true }
return false
}()
performCleanUp(
for: job,
result: .failed,
shouldTriggerCallbacks: wasPossibleDeferralLoop
)
// Only add it back to the queue if it wasn't a deferral loop
if !wasPossibleDeferralLoop {
queue.mutate { $0.insert(job, at: 0) }
}
internalQueue.async { [weak self] in
self?.runNextJob()
@ -1003,8 +1058,7 @@ private final class JobQueue {
}
}
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
performCleanUp(for: job, result: .failed)
internalQueue.async { [weak self] in
self?.runNextJob()
}
@ -1014,8 +1068,7 @@ private final class JobQueue {
/// on other jobs, and it should automatically manage those dependencies)
private func handleJobDeferred(_ job: Job) {
var stuckInDeferLoop: Bool = false
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
deferLoopTracker.mutate {
guard let lastRecord: (count: Int, times: [TimeInterval]) = $0[job.id] else {
$0 = $0.setting(
@ -1055,8 +1108,29 @@ private final class JobQueue {
return
}
performCleanUp(for: job, result: .deferred)
internalQueue.async { [weak self] in
self?.runNextJob()
}
}
private func performCleanUp(for job: Job, result: JobRunner.JobResult, shouldTriggerCallbacks: Bool = true) {
// The job is removed from the queue before it runs so all we need to to is remove it
// from the 'currentlyRunning' set
jobsCurrentlyRunning.mutate { $0 = $0.removing(job.id) }
detailsForCurrentlyRunningJobs.mutate { $0 = $0.removingValue(forKey: job.id) }
guard shouldTriggerCallbacks else { return }
// Run any job callbacks now that it's done
var jobCallbacksToRun: [(JobRunner.JobResult) -> ()] = []
jobCallbacks.mutate { jobCallbacks in
jobCallbacksToRun = (jobCallbacks[job.id] ?? [])
jobCallbacks = jobCallbacks.removingValue(forKey: job.id)
}
DispatchQueue.global(qos: .default).async {
jobCallbacksToRun.forEach { $0(result) }
}
}
}

Loading…
Cancel
Save