Make typing indicators work & implement job resuming

pull/313/head
nielsandriesse 4 years ago
parent b030b5999b
commit 7e9eb2f138

@ -57,18 +57,27 @@ extension Storage : SessionMessagingKitStorageProtocol {
// MARK: - Jobs
private static let jobCollection = "SNJobCollection"
public func persist(_ job: Job, using transaction: Any) {
(transaction as! YapDatabaseReadWriteTransaction).setObject(job, forKey: job.id!, inCollection: Storage.jobCollection)
(transaction as! YapDatabaseReadWriteTransaction).setObject(job, forKey: job.id!, inCollection: type(of: job).collection)
}
public func markJobAsSucceeded(_ job: Job, using transaction: Any) {
(transaction as! YapDatabaseReadWriteTransaction).removeObject(forKey: job.id!, inCollection: Storage.jobCollection)
(transaction as! YapDatabaseReadWriteTransaction).removeObject(forKey: job.id!, inCollection: type(of: job).collection)
}
public func markJobAsFailed(_ job: Job, using transaction: Any) {
(transaction as! YapDatabaseReadWriteTransaction).removeObject(forKey: job.id!, inCollection: Storage.jobCollection)
(transaction as! YapDatabaseReadWriteTransaction).removeObject(forKey: job.id!, inCollection: type(of: job).collection)
}
public func getAllPendingJobs(of type: Job.Type) -> [Job] {
var result: [Job] = []
Storage.read { transaction in
transaction.enumerateRows(inCollection: type.collection) { _, object, _, _ in
guard let job = object as? Job else { return }
result.append(job)
}
}
return result
}
@ -216,8 +225,48 @@ extension Storage : SessionMessagingKitStorageProtocol {
return (thread.uniqueId!, message)
}
public func cancelTypingIndicatorsIfNeeded(for threadID: String, senderPublicKey: String) {
guard let thread = TSThread.fetch(uniqueId: threadID) else { return }
public func showTypingIndicatorIfNeeded(for senderPublicKey: String) {
var threadOrNil: TSContactThread?
Storage.read { transaction in
threadOrNil = TSContactThread.getWithContactId(senderPublicKey, transaction: transaction)
}
guard let thread = threadOrNil else { return } // Ignore if the thread doesn't exist yet
func showTypingIndicatorsIfNeeded() {
SSKEnvironment.shared.typingIndicators.didReceiveTypingStartedMessage(inThread: thread, recipientId: senderPublicKey, deviceId: 1)
}
if Thread.current.isMainThread {
showTypingIndicatorsIfNeeded()
} else {
DispatchQueue.main.async {
showTypingIndicatorsIfNeeded()
}
}
}
public func hideTypingIndicatorIfNeeded(for senderPublicKey: String) {
var threadOrNil: TSContactThread?
Storage.read { transaction in
threadOrNil = TSContactThread.getWithContactId(senderPublicKey, transaction: transaction)
}
guard let thread = threadOrNil else { return } // Ignore if the thread doesn't exist yet
func hideTypingIndicatorsIfNeeded() {
SSKEnvironment.shared.typingIndicators.didReceiveTypingStoppedMessage(inThread: thread, recipientId: senderPublicKey, deviceId: 1)
}
if Thread.current.isMainThread {
hideTypingIndicatorsIfNeeded()
} else {
DispatchQueue.main.async {
hideTypingIndicatorsIfNeeded()
}
}
}
public func cancelTypingIndicatorsIfNeeded(for senderPublicKey: String) {
var threadOrNil: TSContactThread?
Storage.read { transaction in
threadOrNil = TSContactThread.getWithContactId(senderPublicKey, transaction: transaction)
}
guard let thread = threadOrNil else { return } // Ignore if the thread doesn't exist yet
func cancelTypingIndicatorsIfNeeded() {
SSKEnvironment.shared.typingIndicators.didReceiveIncomingMessage(inThread: thread, recipientId: senderPublicKey, deviceId: 1)
}

@ -1172,8 +1172,7 @@ NSString *NSStringForOWSMessageCellType(OWSMessageCellType cellType)
// Only allow deletion on incoming messages if the user has moderation permission
return [SNOpenGroupAPI isUserModerator:self.userHexEncodedPublicKey forChannel:publicChat.channel onServer:publicChat.server];
} else {
// Only allow deletion on outgoing messages if the user was the sender (i.e. it was not sent from another linked device)
return [self.interaction.actualSenderHexEncodedPublicKey isEqual:self.userHexEncodedPublicKey];
return YES;
}
}

@ -8,6 +8,7 @@ public final class AttachmentDownloadJob : NSObject, Job, NSCoding { // NSObject
public var failureCount: UInt = 0
// MARK: Settings
public class var collection: String { return "AttachmentDownloadJobCollection" }
public static let maxFailureCount: UInt = 20
// MARK: Coding

@ -8,6 +8,7 @@ public final class AttachmentUploadJob : NSObject, Job, NSCoding { // NSObject/N
public var failureCount: UInt = 0
// MARK: Settings
public class var collection: String { return "AttachmentUploadJobCollection" }
public static let maxFailureCount: UInt = 20
// MARK: Coding

@ -5,6 +5,7 @@ public protocol Job : class, NSCoding {
var id: String? { get set }
var failureCount: UInt { get set }
static var collection: String { get }
static var maxFailureCount: UInt { get }
func execute()

@ -6,12 +6,20 @@ public final class JobQueue : NSObject, JobDelegate {
@objc public static let shared = JobQueue()
@objc public func add(_ job: Job, using transaction: Any) {
job.id = UUID().uuidString
job.id = String(NSDate.millisecondTimestamp())
Configuration.shared.storage.persist(job, using: transaction)
job.delegate = self
job.execute()
}
@objc public func resumePendingJobs() {
let allJobTypes: [Job.Type] = [ AttachmentDownloadJob.self, AttachmentUploadJob.self, MessageReceiveJob.self, MessageSendJob.self, NotifyPNServerJob.self ]
allJobTypes.forEach { type in
let allPendingJobs = Configuration.shared.storage.getAllPendingJobs(of: type)
allPendingJobs.sorted(by: { $0.id! < $1.id! }).forEach { $0.execute() } // Retry the oldest jobs first
}
}
public func handleJobSucceeded(_ job: Job) {
Configuration.shared.storage.withAsync({ transaction in
Configuration.shared.storage.markJobAsSucceeded(job, using: transaction)

@ -8,6 +8,7 @@ public final class MessageReceiveJob : NSObject, Job, NSCoding { // NSObject/NSC
public var failureCount: UInt = 0
// MARK: Settings
public class var collection: String { return "MessageReceiveJobCollection" }
public static let maxFailureCount: UInt = 10
// MARK: Initialization

@ -9,6 +9,7 @@ public final class MessageSendJob : NSObject, Job, NSCoding { // NSObject/NSCodi
public var failureCount: UInt = 0
// MARK: Settings
public class var collection: String { return "MessageSendJobCollection" }
public static let maxFailureCount: UInt = 20
// MARK: Initialization

@ -9,6 +9,7 @@ public final class NotifyPNServerJob : NSObject, Job, NSCoding { // NSObject/NSC
public var failureCount: UInt = 0
// MARK: Settings
public class var collection: String { return "NotifyPNServerJobCollection" }
public static let maxFailureCount: UInt = 20
// MARK: Initialization

@ -76,16 +76,40 @@ internal enum MessageReceiver {
internal static func handle(_ message: Message, messageServerID: UInt64?, using transaction: Any) {
switch message {
case is ReadReceipt: break
case is SessionRequest: break
case is TypingIndicator: break
case is ClosedGroupUpdate: break
case is ExpirationTimerUpdate: break
case let message as ReadReceipt: handleReadReceipt(message, using: transaction)
case let message as SessionRequest: handleSessionRequest(message, using: transaction)
case let message as TypingIndicator: handleTypingIndicator(message, using: transaction)
case let message as ClosedGroupUpdate: handleClosedGroupUpdate(message, using: transaction)
case let message as ExpirationTimerUpdate: handleExpirationTimerUpdate(message, using: transaction)
case let message as VisibleMessage: handleVisibleMessage(message, using: transaction)
default: fatalError()
}
}
private static func handleReadReceipt(_ message: ReadReceipt, using transaction: Any) {
}
private static func handleSessionRequest(_ message: SessionRequest, using transaction: Any) {
}
private static func handleTypingIndicator(_ message: TypingIndicator, using transaction: Any) {
let storage = Configuration.shared.storage
switch message.kind! {
case .started: storage.showTypingIndicatorIfNeeded(for: message.sender!)
case .stopped: storage.hideTypingIndicatorIfNeeded(for: message.sender!)
}
}
private static func handleClosedGroupUpdate(_ message: ClosedGroupUpdate, using transaction: Any) {
}
private static func handleExpirationTimerUpdate(_ message: ExpirationTimerUpdate, using transaction: Any) {
}
private static func handleVisibleMessage(_ message: VisibleMessage, using transaction: Any) {
let storage = Configuration.shared.storage
// Update profile if needed
@ -96,7 +120,7 @@ internal enum MessageReceiver {
let (threadID, tsIncomingMessage) = storage.persist(message, using: transaction)
message.threadID = threadID
// Cancel any typing indicators
storage.cancelTypingIndicatorsIfNeeded(for: message.threadID!, senderPublicKey: message.sender!)
storage.cancelTypingIndicatorsIfNeeded(for: message.sender!)
// Notify the user if needed
storage.notifyUserIfNeeded(for: tsIncomingMessage, threadID: threadID)
}

@ -2,38 +2,72 @@ import SessionProtocolKit
public protocol SessionMessagingKitStorageProtocol {
// MARK: - Shared
func with(_ work: @escaping (Any) -> Void)
func withAsync(_ work: @escaping (Any) -> Void, completion: @escaping () -> Void)
// MARK: - General
func getUserPublicKey() -> String?
func getUserKeyPair() -> ECKeyPair?
func getUserDisplayName() -> String?
// MARK: - Signal Protocol
func getOrGenerateRegistrationID(using transaction: Any) -> UInt32
func isClosedGroup(_ publicKey: String) -> Bool
func getSenderCertificate(for publicKey: String) -> SMKSenderCertificate
// MARK: - Shared Sender Keys
func getClosedGroupPrivateKey(for publicKey: String) -> String?
func isClosedGroup(_ publicKey: String) -> Bool
// MARK: - Jobs
func persist(_ job: Job, using transaction: Any)
func markJobAsSucceeded(_ job: Job, using transaction: Any)
func markJobAsFailed(_ job: Job, using transaction: Any)
func getSenderCertificate(for publicKey: String) -> SMKSenderCertificate
func getAllPendingJobs(of type: Job.Type) -> [Job]
// MARK: - Authorization
func getAuthToken(for server: String) -> String?
func setAuthToken(for server: String, to newValue: String, using transaction: Any)
func removeAuthToken(for server: String, using transaction: Any)
// MARK: - Open Group Public Keys
func getOpenGroupPublicKey(for server: String) -> String?
func setOpenGroupPublicKey(for server: String, to newValue: String, using transaction: Any)
// MARK: - Last Message Server ID
func getLastMessageServerID(for group: UInt64, on server: String) -> UInt64?
func setLastMessageServerID(for group: UInt64, on server: String, to newValue: UInt64, using transaction: Any)
func removeLastMessageServerID(for group: UInt64, on server: String, using transaction: Any)
// MARK: - Last Deletion Server ID
func getLastDeletionServerID(for group: UInt64, on server: String) -> UInt64?
func setLastDeletionServerID(for group: UInt64, on server: String, to newValue: UInt64, using transaction: Any)
func removeLastDeletionServerID(for group: UInt64, on server: String, using transaction: Any)
// MARK: - Open Group Metadata
func setUserCount(to newValue: Int, forOpenGroupWithID openGroupID: String, using transaction: Any)
func getIDForMessage(withServerID serverID: UInt64) -> UInt64?
func setOpenGroupDisplayName(to displayName: String, for publicKey: String, on channel: UInt64, server: String, using transaction: Any)
func setLastProfilePictureUploadDate(_ date: Date) // Stored in user defaults so no transaction is needed
// MARK: - Message Handling
func isBlocked(_ publicKey: String) -> Bool
func updateProfile(for publicKey: String, from profile: VisibleMessage.Profile, using transaction: Any)
/// Returns the ID of the thread the message was stored under along with the `TSIncomingMessage` that was constructed.
func persist(_ message: VisibleMessage, using transaction: Any) -> (String, Any)
func cancelTypingIndicatorsIfNeeded(for threadID: String, senderPublicKey: String)
func showTypingIndicatorIfNeeded(for senderPublicKey: String)
func hideTypingIndicatorIfNeeded(for senderPublicKey: String)
func cancelTypingIndicatorsIfNeeded(for senderPublicKey: String)
func notifyUserIfNeeded(for message: Any, threadID: String)
}

@ -40,8 +40,8 @@ NSString *NSStringFromOWSInteractionType(OWSInteractionType value);
@property (nonatomic, readonly) uint64_t sortId;
@property (nonatomic, readonly) uint64_t receivedAtTimestamp;
@property (nonatomic, readonly) BOOL shouldUseServerTime;
/// Used for public chats where a message sent from a slave device is interpreted as having been sent from the master device.
@property (nonatomic) NSString *actualSenderHexEncodedPublicKey;
// Push notifications
@property (nonatomic) BOOL hasUnfetchedAttachmentsFromPN;
- (void)setServerTimestampToReceivedTimestamp:(uint64_t)receivedAtTimestamp;

@ -31,8 +31,6 @@ NS_ASSUME_NONNULL_BEGIN
// Open groups
@property (nonatomic) uint64_t openGroupServerMessageID;
@property (nonatomic, readonly) BOOL isOpenGroupMessage;
// Push notifications
@property (nonatomic) BOOL hasUnfetchedAttachmentsFromPN;
- (instancetype)initInteractionWithTimestamp:(uint64_t)timestamp inThread:(TSThread *)thread NS_UNAVAILABLE;

Loading…
Cancel
Save