@ -88,7 +88,7 @@ public class Poller {
getSnodeForPolling ( for : publicKey )
. subscribe ( on : Threading . pollerQueue )
. flatMap { snode -> AnyPublisher < Void , Error > in
. flatMap { snode -> AnyPublisher < [ Message ] , Error > in
Poller . poll (
namespaces : namespaces ,
from : snode ,
@ -126,7 +126,7 @@ public class Poller {
self ? . getSnodeForPolling ( for : publicKey )
. subscribe ( on : Threading . pollerQueue )
. flatMap { snode -> AnyPublisher < Void , Error > in
. flatMap { snode -> AnyPublisher < [ Message ] , Error > in
Poller . poll (
namespaces : namespaces ,
from : snode ,
@ -177,6 +177,11 @@ public class Poller {
}
}
// / P o l l s t h e s p e c i f i e d n a m e s p a c e s a n d p r o c e s s e s a n y m e s s a g e s , r e t u r n i n g a n a r r a y o f m e s s a g e s t h a t w e r e
// / s u c c e s s f u l l y p r o c e s s e d
// /
// / * * N o t e : * * T h e r e t u r n e d m e s s a g e s w i l l h a v e a l r e a d y b e e n p r o c e s s e d b y t h e ` P o l l e r ` , t h e y a r e o n l y r e t u r n e d
// / f o r c a s e s w h e r e w e n e e d e x p l i c i t / c u s t o m b e h a v i o u r s t o o c c u r ( e g . O n b o a r d i n g )
public static func poll (
namespaces : [ SnodeAPI . Namespace ] ,
from snode : Snode ,
@ -185,13 +190,13 @@ public class Poller {
calledFromBackgroundPoller : Bool = false ,
isBackgroundPollValid : @ escaping ( ( ) -> Bool ) = { true } ,
poller : Poller ? = nil
) -> AnyPublisher < Void , Error > {
) -> AnyPublisher < [ Message ] , Error > {
// I f t h e p o l l i n g h a s b e e n c a n c e l l e d t h e n d o n ' t c o n t i n u e
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( () )
return Just ( [] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
@ -210,26 +215,25 @@ public class Poller {
from : snode ,
associatedWith : publicKey
)
. flatMap { namespacedResults -> AnyPublisher < Void , Error > in
. flatMap { namespacedResults -> AnyPublisher < [ Message ] , Error > in
guard
( calledFromBackgroundPoller && isBackgroundPollValid ( ) ) ||
poller ? . isPolling . wrappedValue [ publicKey ] = = true
else {
return Just ( () )
return Just ( [] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
let allMessages Count: Int = namespacedResults
. map { $0 . value . data ? . messages . count ? ? 0 }
. reduce ( 0 , + )
let allMessages : [ SnodeReceivedMessage ] = namespacedResults
. compactMap { _ , result -> [ SnodeReceivedMessage ] ? in result . data ? . messages }
. flatMap { $0 }
// N o n e e d t o d o a n y t h i n g i f t h e r e a r e n o m e s s a g e s
guard allMessagesCount > 0 else {
if ! calledFromBackgroundPoller {
SNLog ( " Received no new messages in \( pollerName ) " )
}
return Just ( ( ) )
guard ! allMessages . isEmpty else {
if ! calledFromBackgroundPoller { SNLog ( " Received no new messages in \( pollerName ) " ) }
return Just ( [ ] )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
@ -237,90 +241,92 @@ public class Poller {
// O t h e r w i s e p r o c e s s t h e m e s s a g e s a n d a d d t h e m t o t h e q u e u e f o r h a n d l i n g
let lastHashes : [ String ] = namespacedResults
. compactMap { $0 . value . data ? . lastHash }
let otherKnownHashes : [ String ] = namespacedResults
. filter { $0 . key . shouldDedupeMessages }
. compactMap { $0 . value . data ? . messages . map { $0 . info . hash } }
. reduce ( [ ] , + )
var messageCount : Int = 0
var processedMessages : [ Message ] = [ ]
var hadValidHashUpdate : Bool = false
var jobsToRun : [ Job ] = [ ]
Storage . shared . write { db in
namespacedResults . forEach { namespace , result in
result . data ? . messages
. compactMap { message -> ProcessedMessage ? in
do {
return try Message . processRawReceivedMessage ( db , rawMessage : message )
}
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
case MessageReceiverError . duplicateMessageNewSnode :
hadValidHashUpdate = true
break
case DatabaseError . SQLITE_ABORT :
// I n t h e b a c k g r o u n d i g n o r e ' S Q L I T E _ A B O R T ' ( i t g e n e r a l l y m e a n s
// t h e B a c k g r o u n d P o l l e r h a s t i m e d o u t
if ! calledFromBackgroundPoller {
SNLog ( " Failed to the database being suspended (running in background with no background task). " )
}
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
return nil
}
allMessages
. compactMap { message -> ProcessedMessage ? in
do {
return try Message . processRawReceivedMessage ( db , rawMessage : message )
}
. grouped { threadId , _ , _ in ( threadId ? ? Message . nonThreadMessageId ) }
. forEach { threadId , threadMessages in
messageCount += threadMessages . count
let jobToRun : Job ? = Job (
variant : . messageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : MessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
jobsToRun = jobsToRun . appending ( jobToRun )
catch {
switch error {
// I g n o r e d u p l i c a t e & s e l f S e n d m e s s a g e e r r o r s ( a n d d o n ' t b o t h e r l o g g i n g
// t h e m a s t h e r e w i l l b e a l o t s i n c e w e e a c h s e r v i c e n o d e d u p l i c a t e s m e s s a g e s )
case DatabaseError . SQLITE_CONSTRAINT_UNIQUE ,
MessageReceiverError . duplicateMessage ,
MessageReceiverError . duplicateControlMessage ,
MessageReceiverError . selfSend :
break
case MessageReceiverError . duplicateMessageNewSnode :
hadValidHashUpdate = true
break
case DatabaseError . SQLITE_ABORT :
// I n t h e b a c k g r o u n d i g n o r e ' S Q L I T E _ A B O R T ' ( i t g e n e r a l l y m e a n s
// t h e B a c k g r o u n d P o l l e r h a s t i m e d o u t
if ! calledFromBackgroundPoller {
SNLog ( " Failed to the database being suspended (running in background with no background task). " )
}
break
default : SNLog ( " Failed to deserialize envelope due to error: \( error ) . " )
}
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
JobRunner . add ( db , job : jobToRun , canStartJob : ! calledFromBackgroundPoller )
return nil
}
}
}
. grouped { threadId , _ , _ in ( threadId ? ? Message . nonThreadMessageId ) }
. forEach { threadId , threadMessages in
messageCount += threadMessages . count
processedMessages += threadMessages . map { $0 . messageInfo . message }
let jobToRun : Job ? = Job (
variant : . messageReceive ,
behaviour : . runOnce ,
threadId : threadId ,
details : MessageReceiveJob . Details (
messages : threadMessages . map { $0 . messageInfo } ,
calledFromBackgroundPoller : calledFromBackgroundPoller
)
)
jobsToRun = jobsToRun . appending ( jobToRun )
// I f w e a r e f o r c e - p o l l i n g t h e n a d d t o t h e J o b R u n n e r s o t h e y a r e
// p e r s i s t e n t a n d w i l l r e t r y o n t h e n e x t a p p r u n i f t h e y f a i l b u t
// d o n ' t l e t t h e m a u t o - s t a r t
JobRunner . add ( db , job : jobToRun , canStartJob : ! calledFromBackgroundPoller )
}
// C l e a n u p m e s s a g e h a s h e s a n d a d d s o m e l o g s a b o u t t h e p o l l r e s u l t s
if allMessagesCount = = 0 && ! hadValidHashUpdate {
if allMessages . isEmpty && ! hadValidHashUpdate {
if ! calledFromBackgroundPoller {
SNLog ( " Received \( allMessagesCount ) new message \( allMessagesCount = = 1 ? " " : " s " ) , all duplicates - marking the hash we polled with as invalid " )
SNLog ( " Received \( allMessages . count ) new message \( allMessages . c ount = = 1 ? " " : " s " ) , all duplicates - marking the hash we polled with as invalid " )
}
// U p d a t e t h e c a c h e d v a l i d i t y o f t h e m e s s a g e s
try SnodeReceivedMessageInfo . handlePotentialDeletedOrInvalidHash (
db ,
potentiallyInvalidHashes : lastHashes ,
otherKnownValidHashes : namespacedResults
. compactMap { $0 . value . data ? . messages . map { $0 . info . hash } }
. reduce ( [ ] , + )
otherKnownValidHashes : otherKnownHashes
)
}
else if ! calledFromBackgroundPoller {
SNLog ( " Received \( messageCount ) new message \( messageCount = = 1 ? " " : " s " ) in \( pollerName ) (duplicates: \( allMessagesCount - messageCount ) ) " )
SNLog ( " Received \( messageCount ) new message \( messageCount = = 1 ? " " : " s " ) in \( pollerName ) (duplicates: \( allMessages . c ount - messageCount ) ) " )
}
}
// I f w e a r e n ' t r u n i n g i n a b a c k g r o u n d p o l l e r t h e n j u s t f i n i s h i m m e d i a t e l y
guard calledFromBackgroundPoller else {
return Just ( ( ) )
return Just ( processedMessages )
. setFailureType ( to : Error . self )
. eraseToAnyPublisher ( )
}
@ -345,7 +351,7 @@ public class Poller {
}
)
. collect ( )
. map { _ in ( ) }
. map { _ in processedMessages }
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )