@ -7,13 +7,9 @@ import SessionSnodeKit
import SessionMessagingKit
import SessionMessagingKit
import SessionUtilitiesKit
import SessionUtilitiesKit
@objc ( LKBackgroundPoller )
public final class BackgroundPoller {
public final class BackgroundPoller : NSObject {
private static var promises : [ Promise < Void > ] = [ ]
private static var promises : [ Promise < Void > ] = [ ]
private override init ( ) { }
@objc ( pollWithCompletionHandler : )
public static func poll ( completionHandler : @ escaping ( UIBackgroundFetchResult ) -> Void ) {
public static func poll ( completionHandler : @ escaping ( UIBackgroundFetchResult ) -> Void ) {
promises = [ ]
promises = [ ]
. appending ( pollForMessages ( ) )
. appending ( pollForMessages ( ) )
@ -40,12 +36,29 @@ public final class BackgroundPoller: NSObject {
}
}
)
)
// B a c k g r o u n d t a s k s w i l l a u t o m a t i c a l l y b e t e r m i n a t e d a f t e r 3 0 s e c o n d s ( w h i c h r e s u l t s i n a c r a s h
// a n d a p r o m p t t o a p p e a r f o r t h e u s e r ) w e w a n t t o a v o i d t h i s s o w e s t a r t a t i m e r w h i c h e x p i r e s
// a f t e r 2 5 s e c o n d s a l l o w i n g u s t o c a n c e l a l l p e n d i n g p r o m i s e s
let cancelTimer : Timer = Timer . scheduledTimerOnMainThread ( withTimeInterval : 25 , repeats : false ) { timer in
timer . invalidate ( )
guard promises . contains ( where : { ! $0 . isResolved } ) else { return }
SNLog ( " Background poll failed due to manual timeout " )
completionHandler ( . failed )
}
when ( resolved : promises )
when ( resolved : promises )
. done { _ in
. done { _ in
cancelTimer . invalidate ( )
completionHandler ( . newData )
completionHandler ( . newData )
}
}
. catch { error in
. catch { error in
// I f w e h a v e a l r e a d y i n v a l i d a t e d t h e t i m e r t h e n d o n o t h i n g ( w e e s s e n t i a l l y t i m e d o u t )
guard cancelTimer . isValid else { return }
SNLog ( " Background poll failed due to error: \( error ) " )
SNLog ( " Background poll failed due to error: \( error ) " )
cancelTimer . invalidate ( )
completionHandler ( . failed )
completionHandler ( . failed )
}
}
}
}
@ -74,7 +87,7 @@ public final class BackgroundPoller: NSObject {
ClosedGroupPoller . poll (
ClosedGroupPoller . poll (
groupPublicKey ,
groupPublicKey ,
on : DispatchQueue . main ,
on : DispatchQueue . main ,
maxRetryCount : 4 ,
maxRetryCount : 0 ,
isBackgroundPoll : true
isBackgroundPoll : true
)
)
}
}
@ -85,78 +98,76 @@ public final class BackgroundPoller: NSObject {
. then ( on : DispatchQueue . main ) { swarm -> Promise < Void > in
. then ( on : DispatchQueue . main ) { swarm -> Promise < Void > in
guard let snode = swarm . randomElement ( ) else { throw SnodeAPIError . generic }
guard let snode = swarm . randomElement ( ) else { throw SnodeAPIError . generic }
return attempt ( maxRetryCount : 4 , recoveringOn : DispatchQueue . main ) {
return SnodeAPI . getMessages ( from : snode , associatedWith : publicKey )
return SnodeAPI . getMessages ( from : snode , associatedWith : publicKey )
. then ( on : DispatchQueue . main ) { messages -> Promise < Void > in
. then ( on : DispatchQueue . main ) { messages -> Promise < Void > in
guard ! messages . isEmpty else { return Promise . value ( ( ) ) }
guard ! messages . isEmpty else { return Promise . value ( ( ) ) }
var jobsToRun : [ Job ] = [ ]
var jobsToRun : [ Job ] = [ ]
Storage . shared . write { db in
var threadMessages : [ String : [ MessageReceiveJob . Details . MessageInfo ] ] = [ : ]
Storage . shared . write { db in
messages . forEach { message in
var threadMessages : [ String : [ MessageReceiveJob . Details . MessageInfo ] ] = [ : ]
do {
let processedMessage : ProcessedMessage ? = try Message . processRawReceivedMessage ( db , rawMessage : message )
messages . forEach { message in
let key : String = ( processedMessage ? . threadId ? ? Message . nonThreadMessageId )
do {
let processedMessage : ProcessedMessage ? = try Message . processRawReceivedMessage ( db , rawMessage : message )
threadMessages [ key ] = ( threadMessages [ key ] ? ? [ ] )
let key : String = ( processedMessage ? . threadId ? ? Message . nonThreadMessageId )
. appending ( processedMessage ? . messageInfo )
threadMessages [ key ] = ( threadMessages [ key ] ? ? [ ] )
. appending ( processedMessage ? . messageInfo )
}
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
}
}
}
catch {
threadMessages
switch error {
. forEach { threadId , threadMessages in
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
let maybeJob : Job ? = Job (
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
variant : . messageReceive ,
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
behaviour : . runOnce ,
MessageReceiverError . duplicateMessage ,
threadId : threadId ,
MessageReceiverError . duplicateControlMessage ,
details : MessageReceiveJob . Details (
MessageReceiverError . selfSend :
messages : threadMessages ,
break
isBackgroundPoll : true
)
)
guard let job : Job = maybeJob else { return }
// A d d t o t h e J o b R u n n e r s o t h e y a r e p e r s i s t e n t a n d w i l l r e t r y o n
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
// t h e n e x t a p p r u n i f t h e y f a i l
JobRunner . add ( db , job : job , canStartJob : false )
jobsToRun . append ( job )
}
}
}
}
}
let promises : [ Promise < Void > ] = jobsToRun . map { job -> Promise < Void > in
threadMessages
let ( promise , seal ) = Promise < Void > . pending ( )
. forEach { threadId , threadMessages in
let maybeJob : Job ? = Job (
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
variant : . messageReceive ,
MessageReceiveJob . run (
behaviour : . runOnce ,
job ,
threadId : threadId ,
queue : DispatchQueue . main ,
details : MessageReceiveJob . Details (
success : { _ , _ in seal . fulfill ( ( ) ) } ,
messages : threadMessages ,
failure : { _ , _ , _ in seal . fulfill ( ( ) ) } ,
isBackgroundPoll : true
deferred : { _ in seal . fulfill ( ( ) ) }
)
)
)
return promise
guard let job : Job = maybeJob else { return }
}
// A d d t o t h e J o b R u n n e r s o t h e y a r e p e r s i s t e n t a n d w i l l r e t r y o n
// t h e n e x t a p p r u n i f t h e y f a i l
JobRunner . add ( db , job : job , canStartJob : false )
jobsToRun . append ( job )
}
}
let promises : [ Promise < Void > ] = jobsToRun . map { job -> Promise < Void > in
let ( promise , seal ) = Promise < Void > . pending ( )
// N o t e : I n t h e b a c k g r o u n d w e j u s t w a n t j o b s t o f a i l s i l e n t l y
MessageReceiveJob . run (
job ,
queue : DispatchQueue . main ,
success : { _ , _ in seal . fulfill ( ( ) ) } ,
failure : { _ , _ , _ in seal . fulfill ( ( ) ) } ,
deferred : { _ in seal . fulfill ( ( ) ) }
)
return when ( fulfilled : promises )
return promise
}
}
}
return when ( fulfilled : promises )
}
}
}
}
}
}
}