Integrate transaction handling utility part 1

pull/213/head
nielsandriesse 5 years ago
parent a193200023
commit e9fc5b748e

@ -63,7 +63,7 @@ public final class LokiRSSFeedPoller : NSObject {
envelope.setSource(NSLocalizedString("Loki", comment: "")) envelope.setSource(NSLocalizedString("Loki", comment: ""))
envelope.setSourceDevice(OWSDevicePrimaryDeviceId) envelope.setSourceDevice(OWSDevicePrimaryDeviceId)
envelope.setContent(try! content.build().serializedData()) envelope.setContent(try! content.build().serializedData())
OWSPrimaryStorage.shared().dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
SSKEnvironment.shared.messageManager.throws_processEnvelope(try! envelope.build(), plaintextData: try! content.build().serializedData(), wasReceivedByUD: false, transaction: transaction, serverID: 0) SSKEnvironment.shared.messageManager.throws_processEnvelope(try! envelope.build(), plaintextData: try! content.build().serializedData(), wasReceivedByUD: false, transaction: transaction, serverID: 0)
} }
} }

@ -186,7 +186,7 @@ final class DeviceLinkingModal : Modal, DeviceLinkingSessionDelegate {
SSKEnvironment.shared.messageSender.send(linkingAuthorizationMessage, success: { SSKEnvironment.shared.messageSender.send(linkingAuthorizationMessage, success: {
let storage = OWSPrimaryStorage.shared() let storage = OWSPrimaryStorage.shared()
let slaveHexEncodedPublicKey = deviceLink.slave.hexEncodedPublicKey let slaveHexEncodedPublicKey = deviceLink.slave.hexEncodedPublicKey
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
let thread = TSContactThread.getOrCreateThread(withContactId: slaveHexEncodedPublicKey, transaction: transaction) let thread = TSContactThread.getOrCreateThread(withContactId: slaveHexEncodedPublicKey, transaction: transaction)
thread.save(with: transaction) thread.save(with: transaction)
} }
@ -196,7 +196,7 @@ final class DeviceLinkingModal : Modal, DeviceLinkingSessionDelegate {
let _ = SSKEnvironment.shared.syncManager.syncAllContacts() let _ = SSKEnvironment.shared.syncManager.syncAllContacts()
} }
let _ = SSKEnvironment.shared.syncManager.syncAllOpenGroups() let _ = SSKEnvironment.shared.syncManager.syncAllOpenGroups()
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
storage.setFriendRequestStatus(.friends, for: slaveHexEncodedPublicKey, transaction: transaction) storage.setFriendRequestStatus(.friends, for: slaveHexEncodedPublicKey, transaction: transaction)
} }
DispatchQueue.main.async { DispatchQueue.main.async {
@ -251,9 +251,8 @@ final class DeviceLinkingModal : Modal, DeviceLinkingSessionDelegate {
session.markLinkingRequestAsProcessed() // Only relevant in master mode session.markLinkingRequestAsProcessed() // Only relevant in master mode
delegate?.handleDeviceLinkingModalDismissed() // Only relevant in slave mode delegate?.handleDeviceLinkingModalDismissed() // Only relevant in slave mode
if let deviceLink = deviceLink { if let deviceLink = deviceLink {
let storage = OWSPrimaryStorage.shared() try! Storage.syncWrite { transaction in
storage.dbReadWriteConnection.readWrite { transaction in OWSPrimaryStorage.shared().removePreKeyBundle(forContact: deviceLink.slave.hexEncodedPublicKey, transaction: transaction)
storage.removePreKeyBundle(forContact: deviceLink.slave.hexEncodedPublicKey, transaction: transaction)
} }
} }
dismiss(animated: true, completion: nil) dismiss(animated: true, completion: nil)

@ -146,14 +146,14 @@ final class DeviceLinksVC : BaseVC, UITableViewDataSource, UITableViewDelegate,
let unlinkDeviceMessage = UnlinkDeviceMessage(thread: thread) let unlinkDeviceMessage = UnlinkDeviceMessage(thread: thread)
SSKEnvironment.shared.messageSender.send(unlinkDeviceMessage, success: { SSKEnvironment.shared.messageSender.send(unlinkDeviceMessage, success: {
let storage = OWSPrimaryStorage.shared() let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
storage.removePreKeyBundle(forContact: linkedDeviceHexEncodedPublicKey, transaction: transaction) storage.removePreKeyBundle(forContact: linkedDeviceHexEncodedPublicKey, transaction: transaction)
storage.deleteAllSessions(forContact: linkedDeviceHexEncodedPublicKey, protocolContext: transaction) storage.deleteAllSessions(forContact: linkedDeviceHexEncodedPublicKey, protocolContext: transaction)
} }
}, failure: { _ in }, failure: { _ in
print("[Loki] Failed to send unlink device message.") print("[Loki] Failed to send unlink device message.")
let storage = OWSPrimaryStorage.shared() let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
storage.removePreKeyBundle(forContact: linkedDeviceHexEncodedPublicKey, transaction: transaction) storage.removePreKeyBundle(forContact: linkedDeviceHexEncodedPublicKey, transaction: transaction)
storage.deleteAllSessions(forContact: linkedDeviceHexEncodedPublicKey, protocolContext: transaction) storage.deleteAllSessions(forContact: linkedDeviceHexEncodedPublicKey, protocolContext: transaction)
} }

@ -135,7 +135,7 @@ final class JoinPublicChatVC : BaseVC, UIPageViewControllerDataSource, UIPageVie
let urlAsString = url.absoluteString let urlAsString = url.absoluteString
let displayName = OWSProfileManager.shared().localProfileName() let displayName = OWSProfileManager.shared().localProfileName()
// TODO: Profile picture & profile key // TODO: Profile picture & profile key
OWSPrimaryStorage.shared().dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.removeObject(forKey: "\(urlAsString).\(channelID)", inCollection: LokiPublicChatAPI.lastMessageServerIDCollection) transaction.removeObject(forKey: "\(urlAsString).\(channelID)", inCollection: LokiPublicChatAPI.lastMessageServerIDCollection)
transaction.removeObject(forKey: "\(urlAsString).\(channelID)", inCollection: LokiPublicChatAPI.lastDeletionServerIDCollection) transaction.removeObject(forKey: "\(urlAsString).\(channelID)", inCollection: LokiPublicChatAPI.lastDeletionServerIDCollection)
} }

@ -53,12 +53,9 @@ public extension LokiAPI {
} }
}.done(on: DispatchQueue.global()) { snode in }.done(on: DispatchQueue.global()) { snode in
seal.fulfill(snode) seal.fulfill(snode)
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { print("[Loki] Persisting snode pool to database.")
storage.dbReadWriteConnection.readWrite { transaction in storage.setSnodePool(LokiAPI.snodePool, in: transaction)
print("[Loki] Persisting snode pool to database.")
storage.setSnodePool(LokiAPI.snodePool, in: transaction)
}
} }
}.catch(on: DispatchQueue.global()) { error in }.catch(on: DispatchQueue.global()) { error in
print("[Loki] Failed to contact seed node at: \(target).") print("[Loki] Failed to contact seed node at: \(target).")
@ -90,12 +87,8 @@ public extension LokiAPI {
parseTargets(from: $0) parseTargets(from: $0)
}.get { swarm in }.get { swarm in
swarmCache[hexEncodedPublicKey] = swarm swarmCache[hexEncodedPublicKey] = swarm
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
}
} }
} }
} }
@ -108,12 +101,8 @@ public extension LokiAPI {
internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) { internal static func dropSnodeFromSnodePool(_ target: LokiAPITarget) {
LokiAPI.snodePool.remove(target) LokiAPI.snodePool.remove(target)
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { storage.dropSnodeFromSnodePool(target, in: transaction)
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.dropSnodeFromSnodePool(target, in: transaction)
}
} }
} }
@ -122,12 +111,8 @@ public extension LokiAPI {
if var swarm = swarm, let index = swarm.firstIndex(of: target) { if var swarm = swarm, let index = swarm.firstIndex(of: target) {
swarm.remove(at: index) swarm.remove(at: index)
LokiAPI.swarmCache[hexEncodedPublicKey] = swarm LokiAPI.swarmCache[hexEncodedPublicKey] = swarm
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.setSwarm(swarm, for: hexEncodedPublicKey, in: transaction)
}
} }
} }
} }
@ -135,11 +120,8 @@ public extension LokiAPI {
// MARK: Public API // MARK: Public API
@objc public static func clearSnodePool() { @objc public static func clearSnodePool() {
snodePool.removeAll() snodePool.removeAll()
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { storage.clearSnodePool(in: transaction)
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearSnodePool(in: transaction)
}
} }
} }

@ -194,14 +194,14 @@ public final class LokiAPI : NSObject {
var result: String? = nil var result: String? = nil
// Uses a read/write connection because getting the last message hash value also removes expired messages as needed // Uses a read/write connection because getting the last message hash value also removes expired messages as needed
// TODO: This shouldn't be the case; a getter shouldn't have an unexpected side effect // TODO: This shouldn't be the case; a getter shouldn't have an unexpected side effect
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
result = storage.getLastMessageHash(forSnode: target.address, transaction: transaction) result = storage.getLastMessageHash(forSnode: target.address, transaction: transaction)
} }
return result return result
} }
private static func setLastMessageHashValue(for target: LokiAPITarget, hashValue: String, expirationDate: UInt64) { private static func setLastMessageHashValue(for target: LokiAPITarget, hashValue: String, expirationDate: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
storage.setLastMessageHash(forSnode: target.address, hash: hashValue, expiresAt: expirationDate, transaction: transaction) storage.setLastMessageHash(forSnode: target.address, hash: hashValue, expiresAt: expirationDate, transaction: transaction)
} }
} }
@ -218,7 +218,7 @@ public final class LokiAPI : NSObject {
} }
private static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) { private static func setReceivedMessageHashValues(to receivedMessageHashValues: Set<String>) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.setObject(receivedMessageHashValues, forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection) transaction.setObject(receivedMessageHashValues, forKey: receivedMessageHashValuesKey, inCollection: receivedMessageHashValuesCollection)
} }
} }

@ -38,11 +38,8 @@ public class LokiDotNetAPI : NSObject {
return Promise.value(token) return Promise.value(token)
} else { } else {
return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global()) { token in return requestNewAuthToken(for: server).then(on: DispatchQueue.global()) { submitAuthToken($0, for: server) }.map(on: DispatchQueue.global()) { token in
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { setAuthToken(for: server, to: token, in: transaction)
storage.dbReadWriteConnection.readWrite { transaction in
setAuthToken(for: server, to: token, in: transaction)
}
} }
return token return token
} }
@ -54,11 +51,8 @@ public class LokiDotNetAPI : NSObject {
} }
public static func clearAuthToken(for server: String) { public static func clearAuthToken(for server: String) {
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { transaction.removeObject(forKey: server, inCollection: authTokenCollection)
storage.dbReadWriteConnection.readWrite { transaction in
transaction.removeObject(forKey: server, inCollection: authTokenCollection)
}
} }
} }

@ -81,15 +81,7 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
} }
}) })
}.map(on: DispatchQueue.global()) { deviceLinks in }.map(on: DispatchQueue.global()) { deviceLinks in
storage.cacheDeviceLinks(deviceLinks) storage.setDeviceLinks(deviceLinks)
/*
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.setDeviceLinks(deviceLinks, in: transaction)
}
}
*/
return deviceLinks return deviceLinks
} }
}.handlingInvalidAuthTokenIfNeeded(for: server) }.handlingInvalidAuthTokenIfNeeded(for: server)
@ -122,16 +114,8 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction) deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction)
} }
deviceLinks.insert(deviceLink) deviceLinks.insert(deviceLink)
return setDeviceLinks(deviceLinks).then(on: LokiAPI.workQueue) { _ -> Promise<Void> in return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in
let (promise, seal) = Promise<Void>.pending() storage.addDeviceLink(deviceLink)
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.addDeviceLink(deviceLink, in: transaction)
}
seal.fulfill(())
}
return promise
} }
} }
@ -142,16 +126,8 @@ public final class LokiFileServerAPI : LokiDotNetAPI {
deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction) deviceLinks = storage.getDeviceLinks(for: getUserHexEncodedPublicKey(), in: transaction)
} }
deviceLinks.remove(deviceLink) deviceLinks.remove(deviceLink)
return setDeviceLinks(deviceLinks).then(on: LokiAPI.workQueue) { _ -> Promise<Void> in return setDeviceLinks(deviceLinks).map(on: LokiAPI.workQueue) { _ in
let (promise, seal) = Promise<Void>.pending() storage.removeDeviceLink(deviceLink)
// Dispatch async on the main queue to avoid nested write transactions
DispatchQueue.main.async {
storage.dbReadWriteConnection.readWrite { transaction in
storage.removeDeviceLink(deviceLink, in: transaction)
}
seal.fulfill(())
}
return promise
} }
} }

@ -123,13 +123,11 @@ public enum OnionRequestAPI {
} }
}.map(on: LokiAPI.workQueue) { paths in }.map(on: LokiAPI.workQueue) { paths in
OnionRequestAPI.paths = paths OnionRequestAPI.paths = paths
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
print("[Loki] Persisting onion request paths to database.")
OWSPrimaryStorage.shared().setOnionRequestPaths(paths, in: transaction)
}
DispatchQueue.main.async { DispatchQueue.main.async {
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
print("[Loki] Persisting onion request paths to database.")
storage.setOnionRequestPaths(paths, in: transaction)
}
NotificationCenter.default.post(name: .pathsBuilt, object: nil) NotificationCenter.default.post(name: .pathsBuilt, object: nil)
} }
return paths return paths
@ -165,12 +163,8 @@ public enum OnionRequestAPI {
private static func dropAllPaths() { private static func dropAllPaths() {
paths.removeAll() paths.removeAll()
// Dispatch async on the main queue to avoid nested write transactions try! Storage.syncWrite { transaction in
DispatchQueue.main.async { OWSPrimaryStorage.shared().clearOnionRequestPaths(in: transaction)
let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in
storage.clearOnionRequestPaths(in: transaction)
}
} }
} }

@ -37,13 +37,13 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} }
private static func setLastMessageServerID(for group: UInt64, on server: String, to newValue: UInt64) { private static func setLastMessageServerID(for group: UInt64, on server: String, to newValue: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.setObject(newValue, forKey: "\(server).\(group)", inCollection: lastMessageServerIDCollection) transaction.setObject(newValue, forKey: "\(server).\(group)", inCollection: lastMessageServerIDCollection)
} }
} }
private static func removeLastMessageServerID(for group: UInt64, on server: String) { private static func removeLastMessageServerID(for group: UInt64, on server: String) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.removeObject(forKey: "\(server).\(group)", inCollection: lastMessageServerIDCollection) transaction.removeObject(forKey: "\(server).\(group)", inCollection: lastMessageServerIDCollection)
} }
} }
@ -57,13 +57,13 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
} }
private static func setLastDeletionServerID(for group: UInt64, on server: String, to newValue: UInt64) { private static func setLastDeletionServerID(for group: UInt64, on server: String, to newValue: UInt64) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.setObject(newValue, forKey: "\(server).\(group)", inCollection: lastDeletionServerIDCollection) transaction.setObject(newValue, forKey: "\(server).\(group)", inCollection: lastDeletionServerIDCollection)
} }
} }
private static func removeLastDeletionServerID(for group: UInt64, on server: String) { private static func removeLastDeletionServerID(for group: UInt64, on server: String) {
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.removeObject(forKey: "\(server).\(group)", inCollection: lastDeletionServerIDCollection) transaction.removeObject(forKey: "\(server).\(group)", inCollection: lastDeletionServerIDCollection)
} }
} }
@ -271,7 +271,7 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
print("[Loki] Couldn't parse display names for users: \(hexEncodedPublicKeys) from: \(rawResponse).") print("[Loki] Couldn't parse display names for users: \(hexEncodedPublicKeys) from: \(rawResponse).")
throw LokiDotNetAPIError.parsingFailed throw LokiDotNetAPIError.parsingFailed
} }
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
data.forEach { data in data.forEach { data in
guard let user = data["user"] as? JSON, let hexEncodedPublicKey = user["username"] as? String, let rawDisplayName = user["name"] as? String else { return } guard let user = data["user"] as? JSON, let hexEncodedPublicKey = user["username"] as? String, let rawDisplayName = user["name"] as? String else { return }
let endIndex = hexEncodedPublicKey.endIndex let endIndex = hexEncodedPublicKey.endIndex
@ -355,7 +355,7 @@ public final class LokiPublicChatAPI : LokiDotNetAPI {
throw LokiDotNetAPIError.parsingFailed throw LokiDotNetAPIError.parsingFailed
} }
let storage = OWSPrimaryStorage.shared() let storage = OWSPrimaryStorage.shared()
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
storage.setUserCount(memberCount, forPublicChatWithID: "\(server).\(channel)", in: transaction) storage.setUserCount(memberCount, forPublicChatWithID: "\(server).\(channel)", in: transaction)
} }
// TODO: Use this to update open group names as needed // TODO: Use this to update open group names as needed

@ -71,7 +71,7 @@ public final class LokiPublicChatManager : NSObject {
let model = TSGroupModel(title: chat.displayName, memberIds: [userHexEncodedPublicKey!, chat.server], image: nil, groupId: LKGroupUtilities.getEncodedOpenGroupIDAsData(chat.id), groupType: .openGroup, adminIds: []) let model = TSGroupModel(title: chat.displayName, memberIds: [userHexEncodedPublicKey!, chat.server], image: nil, groupId: LKGroupUtilities.getEncodedOpenGroupIDAsData(chat.id), groupType: .openGroup, adminIds: [])
// Store the group chat mapping // Store the group chat mapping
self.storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
let thread = TSGroupThread.getOrCreateThread(with: model, transaction: transaction) let thread = TSGroupThread.getOrCreateThread(with: model, transaction: transaction)
// Save the group chat // Save the group chat
@ -118,7 +118,7 @@ public final class LokiPublicChatManager : NSObject {
} }
// Remove the chat from the db // Remove the chat from the db
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
LokiDatabaseUtilities.removePublicChat(for: threadId, in: transaction) LokiDatabaseUtilities.removePublicChat(for: threadId, in: transaction)
} }

@ -163,7 +163,7 @@ public final class LokiPublicChatPoller : NSObject {
envelope.setSource(senderHexEncodedPublicKey) envelope.setSource(senderHexEncodedPublicKey)
envelope.setSourceDevice(OWSDevicePrimaryDeviceId) envelope.setSourceDevice(OWSDevicePrimaryDeviceId)
envelope.setContent(try! content.build().serializedData()) envelope.setContent(try! content.build().serializedData())
storage.dbReadWriteConnection.readWrite { transaction in try! Storage.syncWrite { transaction in
transaction.setObject(senderDisplayName, forKey: senderHexEncodedPublicKey, inCollection: publicChat.id) transaction.setObject(senderDisplayName, forKey: senderHexEncodedPublicKey, inCollection: publicChat.id)
let messageServerID = message.serverID let messageServerID = message.serverID
SSKEnvironment.shared.messageManager.throws_processEnvelope(try! envelope.build(), plaintextData: try! content.build().serializedData(), wasReceivedByUD: false, transaction: transaction, serverID: messageServerID ?? 0) SSKEnvironment.shared.messageManager.throws_processEnvelope(try! envelope.build(), plaintextData: try! content.build().serializedData(), wasReceivedByUD: false, transaction: transaction, serverID: messageServerID ?? 0)
@ -213,9 +213,8 @@ public final class LokiPublicChatPoller : NSObject {
private func pollForDeletedMessages() { private func pollForDeletedMessages() {
let publicChat = self.publicChat let publicChat = self.publicChat
let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { deletedMessageServerIDs in let _ = LokiPublicChatAPI.getDeletedMessageServerIDs(for: publicChat.channel, on: publicChat.server).done(on: DispatchQueue.global()) { deletedMessageServerIDs in
let storage = OWSPrimaryStorage.shared() try! Storage.syncWrite { transaction in
storage.dbReadWriteConnection.readWrite { transaction in let deletedMessageIDs = deletedMessageServerIDs.compactMap { OWSPrimaryStorage.shared().getIDForMessage(withServerID: UInt($0), in: transaction) }
let deletedMessageIDs = deletedMessageServerIDs.compactMap { storage.getIDForMessage(withServerID: UInt($0), in: transaction) }
deletedMessageIDs.forEach { messageID in deletedMessageIDs.forEach { messageID in
TSMessage.fetch(uniqueId: messageID)?.remove(with: transaction) TSMessage.fetch(uniqueId: messageID)?.remove(with: transaction)
} }

@ -4,22 +4,20 @@
public extension OWSPrimaryStorage { public extension OWSPrimaryStorage {
// MARK: - Snode Pool // MARK: - Snode Pool
private static let snodePoolCollection = "LokiSnodePoolCollection"
public func setSnodePool(_ snodePool: Set<LokiAPITarget>, in transaction: YapDatabaseReadWriteTransaction) { public func setSnodePool(_ snodePool: Set<LokiAPITarget>, in transaction: YapDatabaseReadWriteTransaction) {
clearSnodePool(in: transaction) clearSnodePool(in: transaction)
snodePool.forEach { snode in snodePool.forEach { snode in
transaction.setObject(snode, forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection) transaction.setObject(snode, forKey: snode.description, inCollection: Storage.snodePoolCollection)
} }
} }
public func clearSnodePool(in transaction: YapDatabaseReadWriteTransaction) { public func clearSnodePool(in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeAllObjects(inCollection: OWSPrimaryStorage.snodePoolCollection) transaction.removeAllObjects(inCollection: Storage.snodePoolCollection)
} }
public func getSnodePool(in transaction: YapDatabaseReadTransaction) -> Set<LokiAPITarget> { public func getSnodePool(in transaction: YapDatabaseReadTransaction) -> Set<LokiAPITarget> {
var result: Set<LokiAPITarget> = [] var result: Set<LokiAPITarget> = []
transaction.enumerateKeysAndObjects(inCollection: OWSPrimaryStorage.snodePoolCollection) { _, object, _ in transaction.enumerateKeysAndObjects(inCollection: Storage.snodePoolCollection) { _, object, _ in
guard let snode = object as? LokiAPITarget else { return } guard let snode = object as? LokiAPITarget else { return }
result.insert(snode) result.insert(snode)
} }
@ -27,33 +25,29 @@ public extension OWSPrimaryStorage {
} }
public func dropSnodeFromSnodePool(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) { public func dropSnodeFromSnodePool(_ snode: LokiAPITarget, in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeObject(forKey: snode.description, inCollection: OWSPrimaryStorage.snodePoolCollection) transaction.removeObject(forKey: snode.description, inCollection: Storage.snodePoolCollection)
} }
// MARK: - Swarm // MARK: - Swarm
private func getSwarmCollection(for publicKey: String) -> String {
return "LokiSwarmCollection-\(publicKey)"
}
public func setSwarm(_ swarm: [Snode], for publicKey: String, in transaction: YapDatabaseReadWriteTransaction) { public func setSwarm(_ swarm: [Snode], for publicKey: String, in transaction: YapDatabaseReadWriteTransaction) {
print("[Loki] Caching swarm for: \(publicKey).") print("[Loki] Caching swarm for: \(publicKey).")
clearSwarm(for: publicKey, in: transaction) clearSwarm(for: publicKey, in: transaction)
let collection = getSwarmCollection(for: publicKey) let collection = Storage.getSwarmCollection(for: publicKey)
swarm.forEach { snode in swarm.forEach { snode in
transaction.setObject(snode, forKey: snode.description, inCollection: collection) transaction.setObject(snode, forKey: snode.description, inCollection: collection)
} }
} }
public func clearSwarm(for publicKey: String, in transaction: YapDatabaseReadWriteTransaction) { public func clearSwarm(for publicKey: String, in transaction: YapDatabaseReadWriteTransaction) {
let collection = getSwarmCollection(for: publicKey) let collection = Storage.getSwarmCollection(for: publicKey)
transaction.removeAllObjects(inCollection: collection) transaction.removeAllObjects(inCollection: collection)
} }
public func getSwarm(for publicKey: String, in transaction: YapDatabaseReadTransaction) -> [Snode] { public func getSwarm(for publicKey: String, in transaction: YapDatabaseReadTransaction) -> [Snode] {
var result: [Snode] = [] var result: [Snode] = []
let collection = getSwarmCollection(for: publicKey) let collection = Storage.getSwarmCollection(for: publicKey)
transaction.enumerateKeysAndObjects(inCollection: collection) { _, object, _ in transaction.enumerateKeysAndObjects(inCollection: collection) { _, object, _ in
guard let snode = object as? Snode else { return } guard let snode = object as? Snode else { return }
result.append(snode) result.append(snode)
@ -64,15 +58,13 @@ public extension OWSPrimaryStorage {
// MARK: - Onion Request Paths // MARK: - Onion Request Paths
private static let onionRequestPathCollection = "LokiOnionRequestPathCollection"
public func setOnionRequestPaths(_ paths: [OnionRequestAPI.Path], in transaction: YapDatabaseReadWriteTransaction) { public func setOnionRequestPaths(_ paths: [OnionRequestAPI.Path], in transaction: YapDatabaseReadWriteTransaction) {
// FIXME: This is a bit of a dirty approach that assumes 2 paths of length 3 each. We should do better than this. // FIXME: This is a bit of a dirty approach that assumes 2 paths of length 3 each. We should do better than this.
guard paths.count == 2 else { return } guard paths.count == 2 else { return }
let path0 = paths[0] let path0 = paths[0]
let path1 = paths[1] let path1 = paths[1]
guard path0.count == 3, path1.count == 3 else { return } guard path0.count == 3, path1.count == 3 else { return }
let collection = OWSPrimaryStorage.onionRequestPathCollection let collection = Storage.onionRequestPathCollection
transaction.setObject(path0[0], forKey: "0-0", inCollection: collection) transaction.setObject(path0[0], forKey: "0-0", inCollection: collection)
transaction.setObject(path0[1], forKey: "0-1", inCollection: collection) transaction.setObject(path0[1], forKey: "0-1", inCollection: collection)
transaction.setObject(path0[2], forKey: "0-2", inCollection: collection) transaction.setObject(path0[2], forKey: "0-2", inCollection: collection)
@ -82,7 +74,7 @@ public extension OWSPrimaryStorage {
} }
public func getOnionRequestPaths(in transaction: YapDatabaseReadTransaction) -> [OnionRequestAPI.Path] { public func getOnionRequestPaths(in transaction: YapDatabaseReadTransaction) -> [OnionRequestAPI.Path] {
let collection = OWSPrimaryStorage.onionRequestPathCollection let collection = Storage.onionRequestPathCollection
guard guard
let path0Snode0 = transaction.object(forKey: "0-0", inCollection: collection) as? LokiAPITarget, let path0Snode0 = transaction.object(forKey: "0-0", inCollection: collection) as? LokiAPITarget,
let path0Snode1 = transaction.object(forKey: "0-1", inCollection: collection) as? LokiAPITarget, let path0Snode1 = transaction.object(forKey: "0-1", inCollection: collection) as? LokiAPITarget,
@ -94,20 +86,18 @@ public extension OWSPrimaryStorage {
} }
public func clearOnionRequestPaths(in transaction: YapDatabaseReadWriteTransaction) { public func clearOnionRequestPaths(in transaction: YapDatabaseReadWriteTransaction) {
transaction.removeAllObjects(inCollection: OWSPrimaryStorage.onionRequestPathCollection) transaction.removeAllObjects(inCollection: Storage.onionRequestPathCollection)
} }
// MARK: - Session Requests // MARK: - Session Requests
private static let sessionRequestTimestampCollection = "LokiSessionRequestTimestampCollection"
public func setSessionRequestTimestamp(for publicKey: String, to timestamp: Date, in transaction: YapDatabaseReadWriteTransaction) { public func setSessionRequestTimestamp(for publicKey: String, to timestamp: Date, in transaction: YapDatabaseReadWriteTransaction) {
transaction.setDate(timestamp, forKey: publicKey, inCollection: OWSPrimaryStorage.sessionRequestTimestampCollection) transaction.setDate(timestamp, forKey: publicKey, inCollection: Storage.sessionRequestTimestampCollection)
} }
public func getSessionRequestTimestamp(for publicKey: String, in transaction: YapDatabaseReadTransaction) -> Date? { public func getSessionRequestTimestamp(for publicKey: String, in transaction: YapDatabaseReadTransaction) -> Date? {
transaction.date(forKey: publicKey, inCollection: OWSPrimaryStorage.sessionRequestTimestampCollection) transaction.date(forKey: publicKey, inCollection: Storage.sessionRequestTimestampCollection)
} }
@ -115,23 +105,15 @@ public extension OWSPrimaryStorage {
// MARK: - Multi Device // MARK: - Multi Device
private static var deviceLinkCache: Set<DeviceLink> = [] private static var deviceLinkCache: Set<DeviceLink> = []
private func getDeviceLinkCollection(for masterHexEncodedPublicKey: String) -> String { public func setDeviceLinks(_ deviceLinks: Set<DeviceLink>) {
return "LokiDeviceLinkCollection-\(masterHexEncodedPublicKey)" deviceLinks.forEach { addDeviceLink($0) }
}
public func cacheDeviceLinks(_ deviceLinks: Set<DeviceLink>) {
OWSPrimaryStorage.deviceLinkCache.formUnion(deviceLinks)
} }
public func setDeviceLinks(_ deviceLinks: Set<DeviceLink>, in transaction: YapDatabaseReadWriteTransaction) { public func addDeviceLink(_ deviceLink: DeviceLink) {
deviceLinks.forEach { addDeviceLink($0, in: transaction) }
}
public func addDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) {
OWSPrimaryStorage.deviceLinkCache.insert(deviceLink) OWSPrimaryStorage.deviceLinkCache.insert(deviceLink)
} }
public func removeDeviceLink(_ deviceLink: DeviceLink, in transaction: YapDatabaseReadWriteTransaction) { public func removeDeviceLink(_ deviceLink: DeviceLink) {
OWSPrimaryStorage.deviceLinkCache.remove(deviceLink) OWSPrimaryStorage.deviceLinkCache.remove(deviceLink)
} }
@ -150,13 +132,11 @@ public extension OWSPrimaryStorage {
// MARK: - Open Groups // MARK: - Open Groups
private static let openGroupUserCountCollection = "LokiPublicChatUserCountCollection"
public func getUserCount(for publicChat: LokiPublicChat, in transaction: YapDatabaseReadTransaction) -> Int? { public func getUserCount(for publicChat: LokiPublicChat, in transaction: YapDatabaseReadTransaction) -> Int? {
return transaction.object(forKey: publicChat.id, inCollection: OWSPrimaryStorage.openGroupUserCountCollection) as? Int return transaction.object(forKey: publicChat.id, inCollection: Storage.openGroupUserCountCollection) as? Int
} }
public func setUserCount(_ userCount: Int, forPublicChatWithID publicChatID: String, in transaction: YapDatabaseReadWriteTransaction) { public func setUserCount(_ userCount: Int, forPublicChatWithID publicChatID: String, in transaction: YapDatabaseReadWriteTransaction) {
transaction.setObject(userCount, forKey: publicChatID, inCollection: OWSPrimaryStorage.openGroupUserCountCollection) transaction.setObject(userCount, forKey: publicChatID, inCollection: Storage.openGroupUserCountCollection)
} }
} }

@ -62,12 +62,9 @@ public final class MultiDeviceProtocol : NSObject {
} else if let thread = TSContactThread.getWithContactId(destination.hexEncodedPublicKey, transaction: transaction) { } else if let thread = TSContactThread.getWithContactId(destination.hexEncodedPublicKey, transaction: transaction) {
threadPromiseSeal.fulfill(thread) threadPromiseSeal.fulfill(thread)
} else { } else {
// Dispatch async on the main queue to avoid nested write transactions Storage.write { transaction in
DispatchQueue.main.async { let thread = TSContactThread.getOrCreateThread(withContactId: destination.hexEncodedPublicKey, transaction: transaction)
storage.dbReadWriteConnection.readWrite { transaction in threadPromiseSeal.fulfill(thread)
let thread = TSContactThread.getOrCreateThread(withContactId: destination.hexEncodedPublicKey, transaction: transaction)
threadPromiseSeal.fulfill(thread)
}
} }
} }
return threadPromise.then(on: OWSDispatch.sendingQueue()) { thread -> Promise<Void> in return threadPromise.then(on: OWSDispatch.sendingQueue()) { thread -> Promise<Void> in
@ -81,13 +78,10 @@ public final class MultiDeviceProtocol : NSObject {
let messageSendCopy = copy(messageSend, for: destination, with: seal) let messageSendCopy = copy(messageSend, for: destination, with: seal)
messageSender.sendMessage(messageSendCopy) messageSender.sendMessage(messageSendCopy)
} else { } else {
// Dispatch async on the main queue to avoid nested write transactions Storage.write { transaction in
DispatchQueue.main.async { getAutoGeneratedMultiDeviceFRMessageSend(for: destination.hexEncodedPublicKey, in: transaction, seal: seal)
storage.dbReadWriteConnection.readWrite { transaction in .done(on: OWSDispatch.sendingQueue()) { autoGeneratedFRMessageSend in
getAutoGeneratedMultiDeviceFRMessageSend(for: destination.hexEncodedPublicKey, in: transaction, seal: seal) messageSender.sendMessage(autoGeneratedFRMessageSend)
.done(on: OWSDispatch.sendingQueue()) { autoGeneratedFRMessageSend in
messageSender.sendMessage(autoGeneratedFRMessageSend)
}
} }
} }
} }

@ -141,14 +141,12 @@ public final class SessionManagementProtocol : NSObject {
@objc(repairSessionIfNeededForMessage:to:) @objc(repairSessionIfNeededForMessage:to:)
public static func repairSessionIfNeeded(for message: TSOutgoingMessage, to hexEncodedPublicKey: String) { public static func repairSessionIfNeeded(for message: TSOutgoingMessage, to hexEncodedPublicKey: String) {
guard (message.thread as? TSGroupThread)?.groupModel.groupType == .closedGroup else { return } guard (message.thread as? TSGroupThread)?.groupModel.groupType == .closedGroup else { return }
DispatchQueue.main.async { Storage.write { transaction in
storage.dbReadWriteConnection.readWrite { transaction in let thread = TSContactThread.getOrCreateThread(withContactId: hexEncodedPublicKey, transaction: transaction)
let thread = TSContactThread.getOrCreateThread(withContactId: hexEncodedPublicKey, transaction: transaction) let sessionRequestMessage = SessionRequestMessage(thread: thread)
let sessionRequestMessage = SessionRequestMessage(thread: thread) storage.setSessionRequestTimestamp(for: hexEncodedPublicKey, to: Date(), in: transaction)
storage.setSessionRequestTimestamp(for: hexEncodedPublicKey, to: Date(), in: transaction) let messageSenderJobQueue = SSKEnvironment.shared.messageSenderJobQueue
let messageSenderJobQueue = SSKEnvironment.shared.messageSenderJobQueue messageSenderJobQueue.add(message: sessionRequestMessage, transaction: transaction)
messageSenderJobQueue.add(message: sessionRequestMessage, transaction: transaction)
}
} }
} }
@ -205,7 +203,7 @@ public final class SessionManagementProtocol : NSObject {
closedGroupMembers.formUnion(group.groupModel.groupMemberIds) closedGroupMembers.formUnion(group.groupModel.groupMemberIds)
} }
LokiFileServerAPI.getDeviceLinks(associatedWith: closedGroupMembers).ensure { LokiFileServerAPI.getDeviceLinks(associatedWith: closedGroupMembers).ensure {
storage.dbReadWriteConnection.readWrite { transaction in Storage.write { transaction in
let validHEPKs = closedGroupMembers.flatMap { let validHEPKs = closedGroupMembers.flatMap {
LokiDatabaseUtilities.getLinkedDeviceHexEncodedPublicKeys(for: $0, in: transaction) LokiDatabaseUtilities.getLinkedDeviceHexEncodedPublicKeys(for: $0, in: transaction)
} }

Loading…
Cancel
Save