Started trying to sync new groups between devices

Added a work around for getting an SQLite Busy exception on startup (should only be caused by edge-cases, unsure what the impact of the workaround will be if the db is actually busy)
Updated the auth to work for updated groups
Cleaned up group creation to seemingly work properly (not syncing for some reason)
pull/941/head
Morgan Pretty 2 years ago
parent c77d7ecda1
commit 5d9a2335ba

@ -1 +1 @@
Subproject commit 9b0cdcdc0cfc788b4e46c1b337ceddfbc1deee0f
Subproject commit 194f972d161a57dae07430f92eac44e95c208c84

@ -614,7 +614,7 @@ final class ConversationVC: BaseVC, SessionUtilRespondingViewController, Convers
!SessionUtil.conversationInConfig(
threadId: threadId,
threadVariant: viewModel.threadData.threadVariant,
visibleOnly: true
visibleOnly: false
)
{
Dependencies()[singleton: .storage].writeAsync { db in

@ -20,8 +20,7 @@ enum _017_GroupsRebuildChanges: Migration {
.notNull()
.defaults(to: 0)
t.add(.groupIdentityPrivateKey, .blob)
t.add(.tag, .blob)
t.add(.subkey, .blob)
t.add(.authData, .blob)
t.add(.approved, .boolean)
.notNull()
.defaults(to: true)

@ -28,17 +28,10 @@ public struct ClosedGroup: Codable, Identifiable, FetchableRecord, PersistableRe
case lastDisplayPictureUpdate
case groupIdentityPrivateKey
case tag
case subkey
case authData
case approved
}
/// The Group public key takes up 32 bytes
static let pubkeyByteLength: Int = 32
/// The Group secret key takes up 32 bytes
static let secretKeyByteLength: Int = 32
public var id: String { threadId } // Identifiable
public var publicKey: String { threadId }
@ -64,15 +57,10 @@ public struct ClosedGroup: Codable, Identifiable, FetchableRecord, PersistableRe
/// The private key for performing admin actions on this group
public let groupIdentityPrivateKey: Data?
/// The unique tag for the user within the group
///
/// **Note:** This will be `null` if the `groupIdentityPrivateKey` is set
public let tag: Data?
/// The unique subkey for the user within the group
/// The unique authData for the current user within the group
///
/// **Note:** This will be `null` if the `groupIdentityPrivateKey` is set
public let subkey: Data?
public let authData: Data?
/// A flag indicating whether the user has approved the group invitation
public let approved: Bool
@ -122,8 +110,7 @@ public struct ClosedGroup: Codable, Identifiable, FetchableRecord, PersistableRe
displayPictureEncryptionKey: Data? = nil,
lastDisplayPictureUpdate: TimeInterval = 0,
groupIdentityPrivateKey: Data? = nil,
tag: Data? = nil,
subkey: Data? = nil,
authData: Data? = nil,
approved: Bool
) {
self.threadId = threadId
@ -134,8 +121,7 @@ public struct ClosedGroup: Codable, Identifiable, FetchableRecord, PersistableRe
self.displayPictureEncryptionKey = displayPictureEncryptionKey
self.lastDisplayPictureUpdate = lastDisplayPictureUpdate
self.groupIdentityPrivateKey = groupIdentityPrivateKey
self.tag = tag
self.subkey = subkey
self.authData = authData
self.approved = approved
}
}
@ -172,6 +158,27 @@ public extension ClosedGroup {
case forced
}
/// The Group public key takes up 32 bytes
static func pubKeyByteLength(for variant: SessionThread.Variant) -> Int {
return 32
}
/// The Group secret key size differs between legacy and updated groups
static func secretKeyByteLength(for variant: SessionThread.Variant) -> Int {
switch variant {
case .group: return 64
default: return 32
}
}
/// The Group authData size differs between legacy and updated groups
static func authDataByteLength(for variant: SessionThread.Variant) -> Int {
switch variant {
case .group: return 100
default: return 0
}
}
static func removeKeysAndUnsubscribe(
_ db: Database? = nil,
threadId: String,
@ -271,6 +278,17 @@ public extension ClosedGroup {
.map { $0.id },
using: dependencies
)
// Remove the group config states
threadVariants
.filter { $0.variant == .group }
.forEach { threadIdVariant in
SessionUtil.removeGroupStateIfNeeded(
db,
groupIdentityPublicKey: threadIdVariant.id,
using: dependencies
)
}
}
}
}

@ -57,9 +57,12 @@ public struct ConfigDump: Codable, Equatable, Hashable, FetchableRecord, Persist
// MARK: - Convenience
public extension ConfigDump.Variant {
static let userVariants: [ConfigDump.Variant] = [
static let userVariants: Set<ConfigDump.Variant> = [
.userProfile, .contacts, .convoInfoVolatile, .userGroups
]
static let groupVariants: Set<ConfigDump.Variant> = [
.groupInfo, .groupMembers, .groupKeys
]
var configMessageKind: SharedConfigMessage.Kind {
switch self {
@ -87,6 +90,15 @@ public extension ConfigDump.Variant {
}
}
/// This value defines the order that the ConfigDump records should be loaded in, we need to load the `groupKeys`
/// config _after_ the `groupInfo` and `groupMembers` configs as it requires those to be passed as arguments
var loadOrder: Int {
switch self {
case .groupKeys: return 1
default: return 0
}
}
/// This value defines the order that the SharedConfigMessages should be processed in, while we re-process config
/// messages every time we poll this will prevent an edge-case where data/logic between different config messages
/// could be dependant on each other (eg. there could be `convoInfoVolatile` data related to a new conversation

@ -84,8 +84,8 @@ public enum ConfigurationSyncJob: JobExecutor {
SNLog("[ConfigurationSyncJob] For \(publicKey) started with \(pendingConfigChanges.count) change\(pendingConfigChanges.count == 1 ? "" : "s")")
dependencies[singleton: .storage]
.readPublisher { db in
try pendingConfigChanges.map { change -> MessageSender.PreparedSendData in
.readPublisher { db -> (keyPair: KeyPair, changes: [MessageSender.PreparedSendData]) in
let changes: [MessageSender.PreparedSendData] = try pendingConfigChanges.map { change -> MessageSender.PreparedSendData in
try MessageSender.preparedSendData(
db,
message: change.message,
@ -94,8 +94,39 @@ public enum ConfigurationSyncJob: JobExecutor {
interactionId: nil
)
}
switch destination {
case .contact:
return (
(
try Identity.fetchUserEd25519KeyPair(db, using: dependencies) ??
{ throw SnodeAPIError.noKeyPair }()
),
changes
)
case .closedGroup(let groupPublicKey):
// Only admins can update the group config messages
let keyPair: KeyPair = try {
guard
let group: ClosedGroup = try ClosedGroup.fetchOne(db, id: groupPublicKey),
let adminKey: Data = group.groupIdentityPrivateKey
else {
throw MessageSenderError.invalidClosedGroupUpdate
}
return KeyPair(
publicKey: Array(Data(hex: groupPublicKey).removingIdPrefixIfNeeded()),
secretKey: Array(adminKey)
)
}()
return (keyPair, changes)
default: throw HTTPError.invalidPreparedRequest
}
}
.flatMap { (changes: [MessageSender.PreparedSendData]) -> AnyPublisher<(ResponseInfoType, HTTP.BatchResponse), Error> in
.flatMap { (keyPair: KeyPair, changes: [MessageSender.PreparedSendData]) -> AnyPublisher<(ResponseInfoType, HTTP.BatchResponse), Error> in
SnodeAPI
.sendConfigMessages(
changes.compactMap { change in
@ -106,6 +137,7 @@ public enum ConfigurationSyncJob: JobExecutor {
return (snodeMessage, namespace)
},
signedWith: keyPair,
allObsoleteHashes: Array(allObsoleteHashes),
using: dependencies
)

@ -22,9 +22,8 @@ extension MessageReceiver {
_ db: Database,
groupIdentityPublicKey: String,
groupIdentityPrivateKey: Data?,
name: String,
tag: Data?,
subkey: Data?,
name: String?,
authData: Data?,
created: Int64,
approved: Bool,
calledFromConfigHandling: Bool,
@ -36,23 +35,34 @@ extension MessageReceiver {
.fetchOrCreate(db, id: groupIdentityPublicKey, variant: .group, shouldBeVisible: true)
let closedGroup: ClosedGroup = try ClosedGroup(
threadId: groupIdentityPublicKey,
name: name,
name: (name ?? "GROUP_TITLE_FALLBACK".localized()),
formationTimestamp: TimeInterval(created),
groupIdentityPrivateKey: groupIdentityPrivateKey,
tag: tag,
subkey: subkey,
authData: authData,
approved: approved
).saved(db)
if !calledFromConfigHandling {
// Update libSession
try? SessionUtil.add(
db,
groupIdentityPublicKey: groupIdentityPublicKey,
groupIdentityPrivateKey: groupIdentityPrivateKey,
name: name,
authData: authData,
joinedAt: created,
using: dependencies
)
}
// Only start polling and subscribe for PNs if the user has approved the group
guard approved else { return }
// Start polling
ClosedGroupPoller.shared.startIfNeeded(for: groupIdentityPublicKey, using: dependencies)
dependencies[singleton: .closedGroupPoller].startIfNeeded(for: groupIdentityPublicKey, using: dependencies)
// Resubscribe for group push notifications
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db)
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, using: dependencies)
}
}

@ -9,11 +9,11 @@ import SessionSnodeKit
extension MessageSender {
private typealias PreparedGroupData = (
groupState: [ConfigDump.Variant: SessionUtil.Config],
thread: SessionThread,
group: ClosedGroup,
members: [GroupMember],
preparedNotificationsSubscription: HTTP.PreparedRequest<PushNotificationAPI.SubscribeResponse>?,
currentUserPublicKey: String
preparedNotificationsSubscription: HTTP.PreparedRequest<PushNotificationAPI.SubscribeResponse>?
)
public static func createGroup(
name: String,
@ -21,7 +21,7 @@ extension MessageSender {
members: [(String, Profile?)],
using dependencies: Dependencies = Dependencies()
) -> AnyPublisher<SessionThread, Error> {
Just(())
return Just(())
.setFailureType(to: Error.self)
.flatMap { _ -> AnyPublisher<(url: String, filename: String, encryptionKey: Data)?, Error> in
guard let displayPicture: SignalAttachment = displayPicture else {
@ -34,12 +34,12 @@ extension MessageSender {
.setFailureType(to: Error.self)
.eraseToAnyPublisher()
}
.map { displayPictureInfo -> PreparedGroupData? in
dependencies[singleton: .storage].write(using: dependencies) { db -> PreparedGroupData in
.flatMap { displayPictureInfo -> AnyPublisher<PreparedGroupData, Error> in
dependencies[singleton: .storage].writePublisher(using: dependencies) { db -> PreparedGroupData in
// Create and cache the libSession entries
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, using: dependencies)
let currentUserProfile: Profile = Profile.fetchOrCreateCurrentUser(db, using: dependencies)
let groupData: (identityKeyPair: KeyPair, group: ClosedGroup, members: [GroupMember]) = try SessionUtil.createGroup(
let createdInfo: SessionUtil.CreatedGroupInfo = try SessionUtil.createGroup(
db,
name: name,
displayPictureUrl: displayPictureInfo?.url,
@ -49,58 +49,90 @@ extension MessageSender {
admins: [(currentUserPublicKey, currentUserProfile)],
using: dependencies
)
let preparedNotificationSubscription = try? PushNotificationAPI
.preparedSubscribe(
publicKey: groupData.group.id,
subkey: nil,
ed25519KeyPair: groupData.identityKeyPair,
using: dependencies
)
// Save the relevant objects to the database
let thread: SessionThread = try SessionThread
.fetchOrCreate(
db,
id: groupData.group.id,
id: createdInfo.group.id,
variant: .group,
shouldBeVisible: true,
using: dependencies
)
try groupData.group.insert(db)
try groupData.members.forEach { try $0.insert(db) }
try createdInfo.group.insert(db)
try createdInfo.members.forEach { try $0.insert(db) }
// Prepare the notification subscription
let preparedNotificationSubscription = try? PushNotificationAPI
.preparedSubscribe(
publicKey: createdInfo.group.id,
subkey: nil,
ed25519KeyPair: createdInfo.identityKeyPair,
using: dependencies
)
return (
createdInfo.groupState,
thread,
groupData.group,
groupData.members,
preparedNotificationSubscription,
currentUserPublicKey
createdInfo.group,
createdInfo.members,
preparedNotificationSubscription
)
}
}
.tryFlatMap { maybePreparedData -> AnyPublisher<PreparedGroupData, Error> in
guard let preparedData: PreparedGroupData = maybePreparedData else {
throw StorageError.failedToSave
}
return ConfigurationSyncJob
.run(publicKey: preparedData.group.id, using: dependencies)
.map { _ in preparedData }
.flatMap { preparedGroupData -> AnyPublisher<PreparedGroupData, Error> in
ConfigurationSyncJob
.run(publicKey: preparedGroupData.group.id, using: dependencies)
.flatMap { _ in
dependencies[singleton: .storage].writePublisher(using: dependencies) { db in
// Save the successfully created group and add to the user config
try SessionUtil.saveCreatedGroup(
db,
group: preparedGroupData.group,
groupState: preparedGroupData.groupState,
using: dependencies
)
return preparedGroupData
}
}
.handleEvents(
receiveCompletion: { result in
switch result {
case .finished: break
case .failure:
// Remove the config and database states
dependencies[singleton: .storage].writeAsync(using: dependencies) { db in
SessionUtil.removeGroupStateIfNeeded(
db,
groupIdentityPublicKey: preparedGroupData.group.id,
using: dependencies
)
_ = try? preparedGroupData.thread.delete(db)
_ = try? preparedGroupData.group.delete(db)
try? preparedGroupData.members.forEach { try $0.delete(db) }
}
}
}
)
.eraseToAnyPublisher()
}
.handleEvents(
receiveOutput: { _, group, members, preparedNotificationSubscription, currentUserPublicKey in
receiveOutput: { _, thread, _, members, preparedNotificationSubscription in
// Start polling
dependencies[singleton: .closedGroupPoller].startIfNeeded(for: group.id, using: dependencies)
dependencies[singleton: .closedGroupPoller].startIfNeeded(for: thread.id, using: dependencies)
// Subscribe for push notifications (if PNs are enabled)
preparedNotificationSubscription?
.send(using: dependencies)
.subscribe(on: DispatchQueue.global(qos: .userInitiated))
.subscribe(on: DispatchQueue.global(qos: .userInitiated), using: dependencies)
.sinkUntilComplete()
// Save jobs for sending group member invitations
dependencies[singleton: .storage].write(using: dependencies) { db in
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, using: dependencies)
members
.filter { $0.profileId != currentUserPublicKey }
.forEach { member in
@ -108,6 +140,7 @@ extension MessageSender {
db,
job: Job(
variant: .groupInviteMemberJob,
threadId: thread.id,
details: GroupInviteMemberJob.Details(
memberSubkey: Data(),
memberTag: Data()
@ -124,7 +157,7 @@ extension MessageSender {
}
}
)
.map { thread, _, _, _, _ in thread }
.map { _, thread, _, _, _ in thread }
.eraseToAnyPublisher()
}
}

@ -140,48 +140,38 @@ internal extension SessionUtil {
.fetchOne(db)
let threadExists: Bool = (threadInfo != nil)
let updatedShouldBeVisible: Bool = SessionUtil.shouldBeVisible(priority: data.priority)
switch (updatedShouldBeVisible, threadExists) {
case (false, true):
SessionUtil.kickFromConversationUIIfNeeded(removedThreadIds: [sessionId])
try SessionThread
.deleteOrLeave(
db,
threadId: sessionId,
threadVariant: .contact,
groupLeaveType: .forced,
calledFromConfigHandling: true,
using: dependencies
)
case (true, false):
try SessionThread(
id: sessionId,
variant: .contact,
creationDateTimestamp: data.created,
shouldBeVisible: true,
pinnedPriority: data.priority
).save(db)
case (true, true):
let changes: [ConfigColumnAssignment] = [
(threadInfo?.shouldBeVisible == updatedShouldBeVisible ? nil :
SessionThread.Columns.shouldBeVisible.set(to: updatedShouldBeVisible)
),
(threadInfo?.pinnedPriority == data.priority ? nil :
SessionThread.Columns.pinnedPriority.set(to: data.priority)
)
].compactMap { $0 }
try SessionThread
.filter(id: sessionId)
.updateAll( // Handling a config update so don't use `updateAllAndConfig`
db,
changes
)
case (false, false): break
/// If we are hiding the conversation then kick the user from it if it's currently open
if !updatedShouldBeVisible {
SessionUtil.kickFromConversationUIIfNeeded(removedThreadIds: [sessionId])
}
/// Create the thread if it doesn't exist, otherwise just update it's state
if !threadExists {
try SessionThread(
id: sessionId,
variant: .contact,
creationDateTimestamp: data.created,
shouldBeVisible: updatedShouldBeVisible,
pinnedPriority: data.priority
).save(db)
}
else {
let changes: [ConfigColumnAssignment] = [
(threadInfo?.shouldBeVisible == updatedShouldBeVisible ? nil :
SessionThread.Columns.shouldBeVisible.set(to: updatedShouldBeVisible)
),
(threadInfo?.pinnedPriority == data.priority ? nil :
SessionThread.Columns.pinnedPriority.set(to: data.priority)
)
].compactMap { $0 }
try SessionThread
.filter(id: sessionId)
.updateAll( // Handling a config update so don't use `updateAllAndConfig`
db,
changes
)
}
// Update disappearing messages configuration if needed

@ -10,6 +10,13 @@ import SessionUtilitiesKit
// MARK: - Convenience
internal extension SessionUtil {
typealias CreatedGroupInfo = (
identityKeyPair: KeyPair,
groupState: [ConfigDump.Variant: Config],
group: ClosedGroup,
members: [GroupMember]
)
static func createGroup(
_ db: Database,
name: String,
@ -19,7 +26,7 @@ internal extension SessionUtil {
members: [(id: String, profile: Profile?)],
admins: [(id: String, profile: Profile?)],
using dependencies: Dependencies
) throws -> (identityKeyPair: KeyPair, group: ClosedGroup, members: [GroupMember]) {
) throws -> CreatedGroupInfo {
guard
let groupIdentityKeyPair: KeyPair = dependencies[singleton: .crypto].generate(.ed25519KeyPair()),
let userED25519KeyPair: KeyPair = Identity.fetchUserEd25519KeyPair(db, using: dependencies)
@ -125,42 +132,22 @@ internal extension SessionUtil {
groups_members_set(membersConf, &member)
}
}
// Load them into memory
// Define the config state map and load it into memory
let groupState: [ConfigDump.Variant: Config] = [
.groupKeys: .groupKeys(keysConf, info: infoConf, members: membersConf),
.groupInfo: .object(infoConf),
.groupMembers: .object(membersConf),
]
dependencies.mutate(cache: .sessionUtil) { cache in
groupState.forEach { variant, config in
cache.setConfig(for: variant, publicKey: groupId.hexString, to: config)
}
}
// Create and save dumps for the configs
try groupState.forEach { variant, config in
try SessionUtil.createDump(
config: config,
for: variant,
publicKey: groupId.hexString,
timestampMs: Int64(floor(creationTimestamp * 1000))
)?.save(db)
}
// Add the new group to the USER_GROUPS config message
try SessionUtil.add(
db,
groupIdentityPublicKey: groupId.hexString,
groupIdentityPrivateKey: Data(groupIdentityPrivateKey),
name: name,
tag: nil,
subkey: nil,
joinedAt: Int64(floor(creationTimestamp)),
using: dependencies
)
return (
groupIdentityKeyPair,
groupState,
ClosedGroup(
threadId: groupId.hexString,
name: name,
@ -183,6 +170,50 @@ internal extension SessionUtil {
)
}
static func removeGroupStateIfNeeded(
_ db: Database,
groupIdentityPublicKey: String,
using dependencies: Dependencies
) {
dependencies.mutate(cache: .sessionUtil) { cache in
cache.setConfig(for: .groupKeys, publicKey: groupIdentityPublicKey, to: nil)
cache.setConfig(for: .groupInfo, publicKey: groupIdentityPublicKey, to: nil)
cache.setConfig(for: .groupMembers, publicKey: groupIdentityPublicKey, to: nil)
}
_ = try? ConfigDump
.filter(ConfigDump.Columns.publicKey == groupIdentityPublicKey)
.deleteAll(db)
}
static func saveCreatedGroup(
_ db: Database,
group: ClosedGroup,
groupState: [ConfigDump.Variant: Config],
using dependencies: Dependencies
) throws {
// Create and save dumps for the configs
try groupState.forEach { variant, config in
try SessionUtil.createDump(
config: config,
for: variant,
publicKey: group.id,
timestampMs: Int64(floor(group.formationTimestamp * 1000))
)?.save(db)
}
// Add the new group to the USER_GROUPS config message
try SessionUtil.add(
db,
groupIdentityPublicKey: group.id,
groupIdentityPrivateKey: group.groupIdentityPrivateKey,
name: group.name,
authData: group.authData,
joinedAt: Int64(floor(group.formationTimestamp)),
using: dependencies
)
}
@discardableResult static func addGroup(
_ db: Database,
groupIdentityPublicKey: [UInt8],

@ -40,6 +40,7 @@ internal extension SessionUtil {
var groups: [GroupInfo] = []
var community: ugroups_community_info = ugroups_community_info()
var legacyGroup: ugroups_legacy_group_info = ugroups_legacy_group_info()
var group: ugroups_group_info = ugroups_group_info()
let groupsIterator: OpaquePointer = user_groups_iterator_new(conf)
while !user_groups_iterator_done(groupsIterator) {
@ -76,13 +77,13 @@ internal extension SessionUtil {
threadId: groupId,
publicKey: Data(
libSessionVal: legacyGroup.enc_pubkey,
count: ClosedGroup.pubkeyByteLength
count: ClosedGroup.pubKeyByteLength(for: .legacyGroup)
),
secretKey: Data(
libSessionVal: legacyGroup.enc_seckey,
count: ClosedGroup.secretKeyByteLength
count: ClosedGroup.secretKeyByteLength(for: .legacyGroup)
),
receivedTimestamp: (TimeInterval(SnodeAPI.currentOffsetTimestampMs()) / 1000)
receivedTimestamp: (TimeInterval(SnodeAPI.currentOffsetTimestampMs(using: dependencies)) / 1000)
),
disappearingConfig: DisappearingMessagesConfiguration
.defaultWith(groupId)
@ -116,6 +117,31 @@ internal extension SessionUtil {
)
)
}
else if user_groups_it_is_group(groupsIterator, &group) {
let groupId: String = String(libSessionVal: group.id)
groups.append(
GroupInfo(
groupIdentityPublicKey: groupId,
groupIdentityPrivateKey: (!group.have_secretkey ? nil :
Data(
libSessionVal: group.secretkey,
count: ClosedGroup.secretKeyByteLength(for: .group),
nullIfEmpty: true
)
),
authData: (!group.have_auth_data ? nil :
Data(
libSessionVal: group.auth_data,
count: ClosedGroup.authDataByteLength(for: .group),
nullIfEmpty: true
)
),
priority: group.priority,
joinedAt: group.joined_at
)
)
}
else {
SNLog("Ignoring unknown conversation type when iterating through volatile conversation info update")
}
@ -388,6 +414,89 @@ internal extension SessionUtil {
// MARK: -- Handle Group Changes
let existingGroupIds: Set<String> = Set(existingThreadInfo
.filter { $0.value.variant == .group }
.keys)
let existingGroups: [String: ClosedGroup] = (try? ClosedGroup
.fetchAll(db, ids: existingGroupIds))
.defaulting(to: [])
.reduce(into: [:]) { result, next in result[next.id] = next }
try groups.forEach { group in
guard
let name: String = group.name,
let joinedAt: Int64 = group.joinedAt
else { return }
if !existingGroupIds.contains(group.groupIdentityPublicKey) {
// Add a new group if it doesn't already exist
try MessageReceiver.handleNewGroup(
db,
groupIdentityPublicKey: group.groupIdentityPublicKey,
groupIdentityPrivateKey: group.groupIdentityPrivateKey,
name: name,
authData: group.authData,
created: Int64((group.joinedAt ?? (latestConfigSentTimestampMs / 1000))),
approved: true,// TODO: What to do here???? <#T##Bool#>,
calledFromConfigHandling: true,
using: dependencies
)
}
else {
// Otherwise update the existing group
let groupChanges: [ConfigColumnAssignment] = [
(existingGroups[group.groupIdentityPublicKey]?.name == name ? nil :
ClosedGroup.Columns.name.set(to: name)
),
(existingGroups[group.groupIdentityPublicKey]?.formationTimestamp == TimeInterval(joinedAt) ? nil :
ClosedGroup.Columns.formationTimestamp.set(to: TimeInterval(joinedAt))
),
(existingGroups[group.groupIdentityPublicKey]?.authData == group.authData ? nil :
ClosedGroup.Columns.authData.set(to: group.authData)
),
(existingGroups[group.groupIdentityPublicKey]?.groupIdentityPrivateKey == group.groupIdentityPrivateKey ? nil :
ClosedGroup.Columns.groupIdentityPrivateKey.set(to: group.groupIdentityPrivateKey)
)
].compactMap { $0 }
// Apply any group changes
if !groupChanges.isEmpty {
_ = try? ClosedGroup
.filter(id: group.groupIdentityPublicKey)
.updateAll( // Handling a config update so don't use `updateAllAndConfig`
db,
groupChanges
)
}
}
// Make any thread-specific changes if needed
if existingThreadInfo[group.groupIdentityPublicKey]?.pinnedPriority != group.priority {
_ = try? SessionThread
.filter(id: group.groupIdentityPublicKey)
.updateAll( // Handling a config update so don't use `updateAllAndConfig`
db,
SessionThread.Columns.pinnedPriority.set(to: group.priority)
)
}
}
// Remove any legacy groups which are no longer in the config
let groupIdsToRemove: Set<String> = existingGroupIds
.subtracting(legacyGroups.map { $0.id })
if !groupIdsToRemove.isEmpty {
SessionUtil.kickFromConversationUIIfNeeded(removedThreadIds: Array(groupIdsToRemove))
try SessionThread
.deleteOrLeave(
db,
threadIds: Array(groupIdsToRemove),
threadVariant: .group,
groupLeaveType: .forced,
calledFromConfigHandling: true
)
}
}
fileprivate static func memberInfo(in legacyGroup: UnsafeMutablePointer<ugroups_legacy_group_info>) -> [String: Bool] {
@ -523,6 +632,41 @@ internal extension SessionUtil {
guard case .object(let conf) = config else { throw SessionUtilError.invalidConfigObject }
guard !groups.isEmpty else { return }
try groups
.forEach { group in
var cGroupId: [CChar] = group.groupIdentityPublicKey.cArray.nullTerminated()
var userGroup: ugroups_group_info = ugroups_group_info()
guard user_groups_get_or_construct_group(conf, &userGroup, &cGroupId) else {
/// It looks like there are some situations where this object might not get created correctly (and
/// will throw due to the implicit unwrapping) as a result we put it in a guard and throw instead
SNLog("Unable to upsert group conversation to SessionUtil: \(config.lastError)")
throw SessionUtilError.getOrConstructFailedUnexpectedly
}
/// Assign the non-admin auth data (if it exists)
if let authData: Data = group.authData {
userGroup.auth_data = authData.toLibSession()
userGroup.have_auth_data = true
}
/// Assign the admin key (if it exists)
///
/// **Note:** We do this after assigning the `auth_data` as generally the values are mutually
/// exclusive and if we have a `groupIdentityPrivateKey` we want that to take priority
if let privateKey: Data = group.groupIdentityPrivateKey {
userGroup.secretkey = privateKey.toLibSession()
userGroup.have_secretkey = true
// Store the updated group (needs to happen before variables go out of scope)
user_groups_set_group(conf, &userGroup)
}
// Store the updated group (can't be sure if we made any changes above)
userGroup.joined_at = (group.joinedAt ?? userGroup.joined_at)
userGroup.priority = (group.priority ?? userGroup.priority)
user_groups_set_group(conf, &userGroup)
}
}
static func upsert(
@ -799,9 +943,8 @@ public extension SessionUtil {
_ db: Database,
groupIdentityPublicKey: String,
groupIdentityPrivateKey: Data?,
name: String,
tag: Data?,
subkey: Data?,
name: String?,
authData: Data?,
joinedAt: Int64,
using dependencies: Dependencies
) throws {
@ -811,7 +954,18 @@ public extension SessionUtil {
publicKey: getUserHexEncodedPublicKey(db, using: dependencies),
using: dependencies
) { config in
guard case .object(let conf) = config else { throw SessionUtilError.invalidConfigObject }
try SessionUtil.upsert(
groups: [
GroupInfo(
groupIdentityPublicKey: groupIdentityPublicKey,
groupIdentityPrivateKey: groupIdentityPrivateKey,
name: name,
authData: authData,
joinedAt: joinedAt
)
],
in: config
)
}
}
@ -820,8 +974,7 @@ public extension SessionUtil {
groupIdentityPublicKey: String,
groupIdentityPrivateKey: Data? = nil,
name: String? = nil,
tag: Data? = nil,
subkey: Data? = nil,
authData: Data? = nil,
using dependencies: Dependencies
) throws {
try SessionUtil.performAndPushChange(
@ -836,8 +989,7 @@ public extension SessionUtil {
groupIdentityPublicKey: groupIdentityPublicKey,
groupIdentityPrivateKey: groupIdentityPrivateKey,
name: name,
tag: tag,
subkey: subkey
authData: authData
)
],
in: config
@ -858,6 +1010,14 @@ public extension SessionUtil {
publicKey: getUserHexEncodedPublicKey(db, using: dependencies),
using: dependencies
) { config in
guard case .object(let conf) = config else { throw SessionUtilError.invalidConfigObject }
groupIds.forEach { threadId in
var cGroupId: [CChar] = threadId.cArray.nullTerminated()
// Don't care if the group doesn't exist
user_groups_erase_group(conf, &cGroupId)
}
}
// Remove the volatile info as well
@ -997,8 +1157,7 @@ extension SessionUtil {
let groupIdentityPublicKey: String
let groupIdentityPrivateKey: Data?
let name: String?
let tag: Data?
let subkey: Data?
let authData: Data?
let priority: Int32?
let joinedAt: Int64?
@ -1006,16 +1165,14 @@ extension SessionUtil {
groupIdentityPublicKey: String,
groupIdentityPrivateKey: Data? = nil,
name: String? = nil,
tag: Data? = nil,
subkey: Data? = nil,
authData: Data? = nil,
priority: Int32? = nil,
joinedAt: Int64? = nil
) {
self.groupIdentityPublicKey = groupIdentityPublicKey
self.groupIdentityPrivateKey = groupIdentityPrivateKey
self.name = name
self.tag = tag
self.subkey = subkey
self.authData = authData
self.priority = priority
self.joinedAt = joinedAt
}

@ -55,14 +55,12 @@ public enum SessionUtil {
// Retrieve the existing dumps from the database
let currentUserPublicKey: String = getUserHexEncodedPublicKey(db, using: dependencies)
let existingDumps: Set<ConfigDump> = ((try? ConfigDump.fetchSet(db)) ?? [])
.sorted { lhs, rhs in lhs.variant.processingOrder < rhs.variant.processingOrder }
.asSet()
let existingDumps: [ConfigDump] = ((try? ConfigDump.fetchSet(db)) ?? [])
.sorted { lhs, rhs in lhs.variant.loadOrder < rhs.variant.loadOrder }
let existingDumpVariants: Set<ConfigDump.Variant> = existingDumps
.map { $0.variant }
.asSet()
let missingRequiredVariants: Set<ConfigDump.Variant> = ConfigDump.Variant.userVariants
.asSet()
.subtracting(existingDumpVariants)
let groupsByKey: [String: Data] = (try? ClosedGroup
.filter(ids: existingDumps.map { $0.publicKey })
@ -70,7 +68,7 @@ public enum SessionUtil {
.reduce(into: [:]) { result, next in result[next.threadId] = next.groupIdentityPrivateKey })
.defaulting(to: [:])
// Create the 'config_object' records for each dump
// Create the config records for each dump
dependencies.mutate(cache: .sessionUtil) { cache in
existingDumps.forEach { dump in
cache.setConfig(
@ -280,24 +278,20 @@ public enum SessionUtil {
publicKey: String,
using dependencies: Dependencies
) throws -> [OutgoingConfResult] {
guard Identity.userExists(db) else { throw SessionUtilError.userDoesNotExist }
guard Identity.userExists(db, using: dependencies) else { throw SessionUtilError.userDoesNotExist }
let userPublicKey: String = getUserHexEncodedPublicKey(db)
var existingDumpVariants: Set<ConfigDump.Variant> = try ConfigDump
.select(.variant)
.filter(ConfigDump.Columns.publicKey == publicKey)
.asRequest(of: ConfigDump.Variant.self)
.fetchSet(db)
// Ensure we always check the required user config types for changes even if there is no dump
// data yet (to deal with first launch cases)
if publicKey == userPublicKey {
ConfigDump.Variant.userVariants.forEach { existingDumpVariants.insert($0) }
}
// Get a list of the different config variants for the provided publicKey
let currenUserPublicKey: String = getUserHexEncodedPublicKey(db, using: dependencies)
let targetVariants: Set<ConfigDump.Variant> = {
switch (publicKey, SessionId.Prefix(from: publicKey)) {
case (currenUserPublicKey, _): return ConfigDump.Variant.userVariants
case (_, .group): return ConfigDump.Variant.groupVariants
default: return []
}
}()
// Ensure we always check the required user config types for changes even if there is no dump
// data yet (to deal with first launch cases)
return try existingDumpVariants
// Extract any pending changes from the cached config entry for each variant
return try targetVariants
.compactMap { variant -> OutgoingConfResult? in
try dependencies[cache: .sessionUtil]
.config(for: variant, publicKey: publicKey)
@ -323,8 +317,12 @@ public enum SessionUtil {
result = "\(convo_info_volatile_size(conf)) volatile conversations"
case (_, .groupInfo): result = "1 group info"
case (.object(let conf), .groupMembers): result = ""
case (_, .groupKeys): result = ""
case (.object(let conf), .groupMembers):
result = "\(groups_members_size(conf)) group members"
case (.groupKeys(let conf, _, _), .groupKeys):
result = "\(groups_keys_size(conf)) group keys"
default: break
}
}
@ -415,6 +413,7 @@ public enum SessionUtil {
guard !publicKey.isEmpty else { throw MessageReceiverError.noThread }
let groupedMessages: [ConfigDump.Variant: [SharedConfigMessage]] = messages
.sorted { lhs, rhs in lhs.seqNo < rhs.seqNo }
.grouped(by: \.kind.configDumpVariant)
let needsPush: Bool = try groupedMessages
@ -601,7 +600,7 @@ public extension SessionUtil {
// MARK: - Functions
public func setConfig(for variant: ConfigDump.Variant, publicKey: String, to config: SessionUtil.Config?) {
configStore[Key(variant: variant, publicKey: publicKey)] = Atomic(config)
configStore[Key(variant: variant, publicKey: publicKey)] = config.map { Atomic($0) }
}
public func config(

@ -98,7 +98,7 @@ public extension SnodeReceivedMessageInfo {
return try SnodeReceivedMessageInfo
.select(Column.rowID)
.filter(SnodeReceivedMessageInfo.Columns.key == key(for: snode, publicKey: publicKey, namespace: namespace))
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= SnodeAPI.currentOffsetTimestampMs())
.filter(SnodeReceivedMessageInfo.Columns.expirationDateMs <= SnodeAPI.currentOffsetTimestampMs(using: dependencies))
.asRequest(of: Int64.self)
.fetchAll(db)
}

@ -18,7 +18,7 @@ extension SnodeAPI {
messageHashes: [String],
requireSuccessfulDeletion: Bool,
pubkey: String,
ed25519PublicKey: [UInt8],
ed25519PublicKey: [UInt8]?,
ed25519SecretKey: [UInt8]
) {
self.messageHashes = messageHashes

@ -18,7 +18,7 @@ extension SnodeAPI {
namespace: SnodeAPI.Namespace,
subkey: String?,
timestampMs: UInt64,
ed25519PublicKey: [UInt8],
ed25519PublicKey: [UInt8]?,
ed25519SecretKey: [UInt8]
) {
self.message = message

@ -13,7 +13,9 @@ public class SnodeAuthenticatedRequestBody: Encodable {
}
private let pubkey: String
private let ed25519PublicKey: [UInt8]
/// This value should only be provided if the `pubkey` value is an x25519 public key
private let ed25519PublicKey: [UInt8]?
internal let ed25519SecretKey: [UInt8]
private let subkey: String?
internal let timestampMs: UInt64?
@ -22,7 +24,7 @@ public class SnodeAuthenticatedRequestBody: Encodable {
public init(
pubkey: String,
ed25519PublicKey: [UInt8],
ed25519PublicKey: [UInt8]?,
ed25519SecretKey: [UInt8],
subkey: String? = nil,
timestampMs: UInt64? = nil
@ -44,7 +46,7 @@ public class SnodeAuthenticatedRequestBody: Encodable {
try container.encode(pubkey, forKey: .pubkey)
try container.encodeIfPresent(subkey, forKey: .subkey)
try container.encodeIfPresent(timestampMs, forKey: .timestampMs)
try container.encode(ed25519PublicKey.toHexString(), forKey: .ed25519PublicKey)
try container.encodeIfPresent(ed25519PublicKey?.toHexString(), forKey: .ed25519PublicKey)
try container.encode(signatureBase64, forKey: .signatureBase64)
}

@ -740,24 +740,19 @@ public final class SnodeAPI {
public static func sendConfigMessages(
_ messages: [(message: SnodeMessage, namespace: Namespace)],
signedWith ed25519KeyPair: KeyPair,
allObsoleteHashes: [String],
using dependencies: Dependencies = Dependencies()
) -> AnyPublisher<(ResponseInfoType, HTTP.BatchResponse), Error> {
guard
!messages.isEmpty,
let recipient: String = messages.first?.message.recipient
let recipient: String = messages.first?.message.recipient,
let recipientPrefix: SessionId.Prefix = SessionId.Prefix(from: recipient)
else {
return Fail(error: SnodeAPIError.generic)
.eraseToAnyPublisher()
}
// TODO: Need to get either the closed group subKey or the userEd25519 key for auth
guard let userED25519KeyPair = Identity.fetchUserEd25519KeyPair() else {
return Fail(error: SnodeAPIError.noKeyPair)
.eraseToAnyPublisher()
}
let userX25519PublicKey: String = getUserHexEncodedPublicKey(using: dependencies)
let publicKey: String = recipient
var requests: [SnodeAPI.BatchRequest.Info] = messages
.map { message, namespace in
// Check if this namespace requires authentication
@ -782,8 +777,8 @@ public final class SnodeAPI {
namespace: namespace,
subkey: nil, // TODO: Need to get this
timestampMs: UInt64(SnodeAPI.currentOffsetTimestampMs()),
ed25519PublicKey: userED25519KeyPair.publicKey,
ed25519SecretKey: userED25519KeyPair.secretKey
ed25519PublicKey: (recipientPrefix != .standard ? nil : ed25519KeyPair.publicKey),
ed25519SecretKey: ed25519KeyPair.secretKey
)
),
responseType: SendMessagesResponse.self
@ -799,9 +794,9 @@ public final class SnodeAPI {
body: DeleteMessagesRequest(
messageHashes: allObsoleteHashes,
requireSuccessfulDeletion: false,
pubkey: userX25519PublicKey,
ed25519PublicKey: userED25519KeyPair.publicKey,
ed25519SecretKey: userED25519KeyPair.secretKey
pubkey: recipient,
ed25519PublicKey: (recipientPrefix != .standard ? nil : ed25519KeyPair.publicKey),
ed25519SecretKey: ed25519KeyPair.secretKey
)
),
responseType: DeleteMessagesResponse.self
@ -811,7 +806,7 @@ public final class SnodeAPI {
let responseTypes = requests.map { $0.responseType }
return getSwarm(for: publicKey)
return getSwarm(for: recipient)
.tryFlatMapWithRandomSnode(retry: maxRetryCount) { snode -> AnyPublisher<(ResponseInfoType, HTTP.BatchResponse), Error> in
SnodeAPI
.send(
@ -820,7 +815,7 @@ public final class SnodeAPI {
body: BatchRequest(requests: requests)
),
to: snode,
associatedWith: publicKey,
associatedWith: recipient,
using: dependencies
)
.eraseToAnyPublisher()

@ -146,10 +146,32 @@ open class Storage {
// Create the DatabasePool to allow us to connect to the database and mark the storage as valid
do {
dbWriter = try DatabasePool(
path: "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)",
configuration: config
)
do {
dbWriter = try DatabasePool(
path: "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)",
configuration: config
)
}
catch {
switch error {
case DatabaseError.SQLITE_BUSY:
/// According to the docs in GRDB there are a few edge-cases where opening the database
/// can fail due to it reporting a "busy" state, by changing the behaviour from `immediateError`
/// to `timeout(1)` we give the database a 1 second grace period to deal with it's issues
/// and get back into a valid state - adding this helps the database resolve situations where it
/// can get confused due to crashing mid-transaction
config.busyMode = .timeout(1)
SNLog("[Database Warning] Database reported busy state during statup, adding grace period to allow startup to continue")
// Try to initialise the dbWriter again (hoping the above resolves the lock)
dbWriter = try DatabasePool(
path: "\(Storage.sharedDatabaseDirectoryPath)/\(Storage.dbFileName)",
configuration: config
)
default: throw error
}
}
isValid = true
Storage.internalHasCreatedValidInstance.mutate { $0 = true }
}

Loading…
Cancel
Save