Added an exponential back-off to polling open groups when they fail to poll

pull/612/head
Morgan Pretty 3 years ago
parent ae4999c3a7
commit c022f7cda2

@ -140,6 +140,9 @@ enum _001_InitialSetupMigration: Migration {
t.column(.sequenceNumber, .integer).notNull() t.column(.sequenceNumber, .integer).notNull()
t.column(.inboxLatestMessageId, .integer).notNull() t.column(.inboxLatestMessageId, .integer).notNull()
t.column(.outboxLatestMessageId, .integer).notNull() t.column(.outboxLatestMessageId, .integer).notNull()
t.column(.pollFailureCount, .integer)
.notNull()
.defaults(to: 0)
} }
/// Create a full-text search table synchronized with the OpenGroup table /// Create a full-text search table synchronized with the OpenGroup table

@ -26,6 +26,7 @@ public struct OpenGroup: Codable, Identifiable, FetchableRecord, PersistableReco
case sequenceNumber case sequenceNumber
case inboxLatestMessageId case inboxLatestMessageId
case outboxLatestMessageId case outboxLatestMessageId
case pollFailureCount
} }
public var id: String { threadId } // Identifiable public var id: String { threadId } // Identifiable
@ -86,6 +87,9 @@ public struct OpenGroup: Codable, Identifiable, FetchableRecord, PersistableReco
/// updated whenever this value changes) /// updated whenever this value changes)
public let outboxLatestMessageId: Int64 public let outboxLatestMessageId: Int64
/// The number of times this room has failed to poll since the last successful poll
public let pollFailureCount: Int64
// MARK: - Relationships // MARK: - Relationships
public var thread: QueryInterfaceRequest<SessionThread> { public var thread: QueryInterfaceRequest<SessionThread> {
@ -117,7 +121,8 @@ public struct OpenGroup: Codable, Identifiable, FetchableRecord, PersistableReco
infoUpdates: Int64, infoUpdates: Int64,
sequenceNumber: Int64 = 0, sequenceNumber: Int64 = 0,
inboxLatestMessageId: Int64 = 0, inboxLatestMessageId: Int64 = 0,
outboxLatestMessageId: Int64 = 0 outboxLatestMessageId: Int64 = 0,
pollFailureCount: Int64 = 0
) { ) {
self.threadId = OpenGroup.idFor(roomToken: roomToken, server: server) self.threadId = OpenGroup.idFor(roomToken: roomToken, server: server)
self.server = server.lowercased() self.server = server.lowercased()
@ -133,6 +138,7 @@ public struct OpenGroup: Codable, Identifiable, FetchableRecord, PersistableReco
self.sequenceNumber = sequenceNumber self.sequenceNumber = sequenceNumber
self.inboxLatestMessageId = inboxLatestMessageId self.inboxLatestMessageId = inboxLatestMessageId
self.outboxLatestMessageId = outboxLatestMessageId self.outboxLatestMessageId = outboxLatestMessageId
self.pollFailureCount = pollFailureCount
} }
} }
@ -159,7 +165,8 @@ public extension OpenGroup {
infoUpdates: 0, infoUpdates: 0,
sequenceNumber: 0, sequenceNumber: 0,
inboxLatestMessageId: 0, inboxLatestMessageId: 0,
outboxLatestMessageId: 0 outboxLatestMessageId: 0,
pollFailureCount: 0
) )
} }
@ -192,7 +199,8 @@ extension OpenGroup: CustomStringConvertible, CustomDebugStringConvertible {
"infoUpdates: \(infoUpdates)", "infoUpdates: \(infoUpdates)",
"sequenceNumber: \(sequenceNumber)", "sequenceNumber: \(sequenceNumber)",
"inboxLatestMessageId: \(inboxLatestMessageId)", "inboxLatestMessageId: \(inboxLatestMessageId)",
"outboxLatestMessageId: \(outboxLatestMessageId))" "outboxLatestMessageId: \(outboxLatestMessageId)",
"pollFailureCount: \(pollFailureCount))"
].joined(separator: ", ") ].joined(separator: ", ")
} }
} }

@ -15,7 +15,8 @@ extension OpenGroupAPI {
// MARK: - Settings // MARK: - Settings
private static let pollInterval: TimeInterval = 4 private static let minPollInterval: TimeInterval = 3
private static let maxPollInterval: Double = (60 * 60)
internal static let maxInactivityPeriod: Double = (14 * 24 * 60 * 60) internal static let maxInactivityPeriod: Double = (14 * 24 * 60 * 60)
// MARK: - Lifecycle // MARK: - Lifecycle
@ -28,10 +29,7 @@ extension OpenGroupAPI {
guard !hasStarted else { return } guard !hasStarted else { return }
hasStarted = true hasStarted = true
timer = Timer.scheduledTimerOnMainThread(withTimeInterval: Poller.pollInterval, repeats: true) { _ in pollRecursively(using: dependencies)
self.poll(using: dependencies).retainUntilComplete()
}
poll(using: dependencies).retainUntilComplete()
} }
@objc public func stop() { @objc public func stop() {
@ -41,6 +39,30 @@ extension OpenGroupAPI {
// MARK: - Polling // MARK: - Polling
private func pollRecursively(using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) {
guard hasStarted else { return }
let minPollFailureCount: TimeInterval = Storage.shared
.read { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.select(min(OpenGroup.Columns.pollFailureCount))
.asRequest(of: TimeInterval.self)
.fetchOne(db)
}
.defaulting(to: 0)
let nextPollInterval: TimeInterval = getInterval(for: minPollFailureCount, minInterval: Poller.minPollInterval, maxInterval: Poller.maxPollInterval)
poll(using: dependencies).retainUntilComplete()
timer = Timer.scheduledTimerOnMainThread(withTimeInterval: nextPollInterval, repeats: false) { [weak self] timer in
timer.invalidate()
Threading.pollerQueue.async {
self?.pollRecursively(using: dependencies)
}
}
}
@discardableResult @discardableResult
public func poll(using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) -> Promise<Void> { public func poll(using dependencies: OpenGroupManager.OGMDependencies = OpenGroupManager.OGMDependencies()) -> Promise<Void> {
return poll(isBackgroundPoll: false, isPostCapabilitiesRetry: false, using: dependencies) return poll(isBackgroundPoll: false, isPostCapabilitiesRetry: false, using: dependencies)
@ -83,6 +105,14 @@ extension OpenGroupAPI {
cache.timeSinceLastPoll[server] = Date().timeIntervalSince1970 cache.timeSinceLastPoll[server] = Date().timeIntervalSince1970
UserDefaults.standard[.lastOpen] = Date() UserDefaults.standard[.lastOpen] = Date()
} }
// Reset the failure count
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: 0))
}
SNLog("Open group polling finished for \(server).") SNLog("Open group polling finished for \(server).")
seal.fulfill(()) seal.fulfill(())
} }
@ -97,7 +127,24 @@ extension OpenGroupAPI {
) )
.done(on: OpenGroupAPI.workQueue) { [weak self] didHandleError in .done(on: OpenGroupAPI.workQueue) { [weak self] didHandleError in
if !didHandleError { if !didHandleError {
SNLog("Open group polling failed due to error: \(error).") // Increase the failure count
let pollFailureCount: Int64 = Storage.shared
.read { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.select(max(OpenGroup.Columns.pollFailureCount))
.asRequest(of: Int64.self)
.fetchOne(db)
}
.defaulting(to: 0)
Storage.shared.writeAsync { db in
try OpenGroup
.filter(OpenGroup.Columns.server == server)
.updateAll(db, OpenGroup.Columns.pollFailureCount.set(to: (pollFailureCount + 1)))
}
SNLog("Open group polling failed due to error: \(error). Setting failure count to \(pollFailureCount).")
} }
self?.isPolling = false self?.isPolling = false
@ -265,4 +312,11 @@ extension OpenGroupAPI {
} }
} }
} }
// MARK: - Convenience
fileprivate static func getInterval(for failureCount: TimeInterval, minInterval: TimeInterval, maxInterval: TimeInterval) -> TimeInterval {
// Arbitrary backoff factor...
return min(maxInterval, minInterval + pow(2, failureCount))
}
} }

@ -200,6 +200,14 @@ public final class Storage {
WHERE openGroup.infoUpdates = -1 WHERE openGroup.infoUpdates = -1
""") """)
// TODO: Remove this once everyone has updated // TODO: Remove this once everyone has updated
let openGroupTableInfo: [Row] = (try? Row.fetchAll(db, sql: "PRAGMA table_info(openGroup)"))
.defaulting(to: [])
if !openGroupTableInfo.contains(where: { $0["name"] == "pollFailureCount" }) {
try? db.execute(literal: """
ALTER TABLE openGroup
ADD pollFailureCount INTEGER NOT NULL DEFAULT 0
""")
}
onComplete(finalError, needsConfigSync) onComplete(finalError, needsConfigSync)
} }

Loading…
Cancel
Save