@ -20,22 +20,54 @@ public extension LibSession {
static var hasPaths : Bool { ! lastPaths . wrappedValue . isEmpty }
static var hasPaths : Bool { ! lastPaths . wrappedValue . isEmpty }
static var pathsDescription : String { lastPaths . wrappedValue . prettifiedDescription }
static var pathsDescription : String { lastPaths . wrappedValue . prettifiedDescription }
typealias NodesCallback = ( UnsafeMutablePointer < network_service_node > ? , Int ) -> Void
private class CallbackWrapper < Output > {
typealias NetworkCallback = ( Bool , Bool , Int16 , Data ? ) -> Void
public let resultPublisher : CurrentValueSubject < Output ? , Error > = CurrentValueSubject ( nil )
private class CWrapper < Callback > {
let callback : Callback
private var pointersToDeallocate : [ UnsafeRawPointer ? ] = [ ]
private var pointersToDeallocate : [ UnsafeRawPointer ? ] = [ ]
public init ( _ callback : Callback ) {
// MARK: - I n i t i a l i z a t i o n
self . callback = callback
deinit {
pointersToDeallocate . forEach { $0 ? . deallocate ( ) }
}
}
public func addUnsafePointerToCleanup < T > ( _ pointer : UnsafePointer < T > ? ) {
// MARK: - F u n c t i o n s
pointersToDeallocate . append ( UnsafeRawPointer ( pointer ) )
public static func create ( _ callback : @ escaping ( CallbackWrapper < Output > ) throws -> Void ) -> AnyPublisher < Output , Error > {
let wrapper : CallbackWrapper < Output > = CallbackWrapper ( )
return Deferred {
Future < Void , Error > { resolver in
do {
try callback ( wrapper )
resolver ( Result . success ( ( ) ) )
}
catch { resolver ( Result . failure ( error ) ) }
}
}
. flatMap { _ -> AnyPublisher < Output , Error > in
wrapper
. resultPublisher
. compactMap { $0 }
. first ( )
. eraseToAnyPublisher ( )
}
. eraseToAnyPublisher ( )
}
}
deinit {
public static func run ( _ ctx : UnsafeMutableRawPointer ? , _ output : Output ) {
pointersToDeallocate . forEach { $0 ? . deallocate ( ) }
guard let ctx : UnsafeMutableRawPointer = ctx else {
return Log . error ( " [LibSession] CallbackWrapper called with null context. " )
}
// D i s p a t c h a s y n c s o w e d o n ' t b l o c k l i b S e s s i o n ' s i n t e r n a l s w i t h S w i f t l o g i c ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
let wrapper : CallbackWrapper < Output > = Unmanaged < CallbackWrapper < Output > > . fromOpaque ( ctx ) . takeRetainedValue ( )
DispatchQueue . global ( qos : . default ) . async { [ wrapper ] in wrapper . resultPublisher . send ( output ) }
}
public func unsafePointer ( ) -> UnsafeMutableRawPointer { Unmanaged . passRetained ( self ) . toOpaque ( ) }
public func addUnsafePointerToCleanup < T > ( _ pointer : UnsafePointer < T > ? ) {
pointersToDeallocate . append ( UnsafeRawPointer ( pointer ) )
}
}
}
}
@ -95,79 +127,60 @@ public extension LibSession {
}
}
static func getSwarm ( swarmPublicKey : String ) -> AnyPublisher < Set < Snode > , Error > {
static func getSwarm ( swarmPublicKey : String ) -> AnyPublisher < Set < Snode > , Error > {
typealias Output = Result < Set < Snode > , Error >
return getOrCreateNetwork ( )
return getOrCreateNetwork ( )
. flatMap { network in
. flatMap { network in
Deferred {
CallbackWrapper< Output >
Future < Set < Snode > , Error > { resolv er in
. create { wrapp er in
let cSwarmPublicKey : [ CChar ] = swarmPublicKey
let cSwarmPublicKey : [ CChar ] = swarmPublicKey
. suffix ( 64 ) // Q u i c k w a y t o d r o p ' 0 5 ' p r e f i x i f p r e s e n t
. suffix ( 64 ) // Q u i c k w a y t o d r o p ' 0 5 ' p r e f i x i f p r e s e n t
. cArray
. cArray
. nullTerminated ( )
. nullTerminated ( )
let callbackWrapper : CWrapper < NodesCallback > = CWrapper { swarmPtr , swarmSize in
network_get_swarm ( network , cSwarmPublicKey , { swarmPtr , swarmSize , ctx in
guard
guard
swarmSize > 0 ,
swarmSize > 0 ,
let cSwarm : UnsafeMutablePointer < network_service_node > = swarmPtr
let cSwarm : UnsafeMutablePointer < network_service_node > = swarmPtr
else {
else { return CallbackWrapper < Output > . run ( ctx , . failure ( SnodeAPIError . unableToRetrieveSwarm ) ) }
// D i s p a t c h a s y n c s o w e d o n ' t h o l d u p t h e l i b S e s s i o n t h r e a d ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
DispatchQueue . global ( qos : . default ) . async {
resolver ( Result . failure ( SnodeAPIError . unableToRetrieveSwarm ) )
}
return
}
var nodes : Set < Snode > = [ ]
var nodes : Set < Snode > = [ ]
( 0. . < swarmSize ) . forEach { index in nodes . insert ( Snode ( cSwarm [ index ] ) ) }
( 0. . < swarmSize ) . forEach { index in nodes . insert ( Snode ( cSwarm [ index ] ) ) }
CallbackWrapper < Output > . run ( ctx , . success ( nodes ) )
// D i s p a t c h a s y n c s o w e d o n ' t h o l d u p t h e l i b S e s s i o n t h r e a d ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
} , wrapper . unsafePointer ( ) ) ;
DispatchQueue . global ( qos : . default ) . async {
resolver ( Result . success ( nodes ) )
}
}
let cWrapperPtr : UnsafeMutableRawPointer = Unmanaged . passRetained ( callbackWrapper ) . toOpaque ( )
network_get_swarm ( network , cSwarmPublicKey , { swarmPtr , swarmSize , ctx in
Unmanaged < CWrapper < NodesCallback > > . fromOpaque ( ctx ! ) . takeRetainedValue ( )
. callback ( swarmPtr , swarmSize )
} , cWrapperPtr ) ;
}
}
}
. tryMap { result in try result . successOrThrow ( ) }
}
}
. eraseToAnyPublisher ( )
. eraseToAnyPublisher ( )
}
}
static func getRandomNodes ( count : Int ) -> AnyPublisher < Set < Snode > , Error > {
static func getRandomNodes ( count : Int ) -> AnyPublisher < Set < Snode > , Error > {
typealias Output = Result < Set < Snode > , Error >
return getOrCreateNetwork ( )
return getOrCreateNetwork ( )
. flatMap { network in
. flatMap { network in
Deferred {
CallbackWrapper< Output >
Future < Set < Snode > , Error > { resolv er in
. create { wrapp er in
let callbackWrapper : CWrapper < NodesCallback > = CWrapper { nodesPtr , nodesSize in
network_get_random_nodes ( network , UInt16 ( count ) , { nodesPtr , nodesSize , ctx in
guard
guard
nodesSize > = count ,
nodesSize > 0 ,
let cSwarm : UnsafeMutablePointer < network_service_node > = nodesPtr
let cSwarm : UnsafeMutablePointer < network_service_node > = nodesPtr
else {
else { return CallbackWrapper < Output > . run ( ctx , . failure ( SnodeAPIError . unableToRetrieveSwarm ) ) }
// D i s p a t c h a s y n c s o w e d o n ' t h o l d u p t h e l i b S e s s i o n t h r e a d ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
DispatchQueue . global ( qos : . default ) . async {
resolver ( Result . failure ( SnodeAPIError . unableToRetrieveSwarm ) )
}
return
}
var nodes : Set < Snode > = [ ]
var nodes : Set < Snode > = [ ]
( 0. . < nodesSize ) . forEach { index in nodes . insert ( Snode ( cSwarm [ index ] ) ) }
( 0. . < nodesSize ) . forEach { index in nodes . insert ( Snode ( cSwarm [ index ] ) ) }
CallbackWrapper < Output > . run ( ctx , . success ( nodes ) )
} , wrapper . unsafePointer ( ) ) ;
}
. tryMap { result in
switch result {
case . failure ( let error ) : throw error
case . success ( let nodes ) :
guard nodes . count > count else { throw SnodeAPIError . unableToRetrieveSwarm }
// D i s p a t c h a s y n c s o w e d o n ' t h o l d u p t h e l i b S e s s i o n t h r e a d ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
return nodes
DispatchQueue . global ( qos : . default ) . async {
resolver ( Result . success ( nodes ) )
}
}
}
let cWrapperPtr : UnsafeMutableRawPointer = Unmanaged . passRetained ( callbackWrapper ) . toOpaque ( )
network_get_random_nodes ( network , UInt16 ( count ) , { nodesPtr , nodesSize , ctx in
Unmanaged < CWrapper < NodesCallback > > . fromOpaque ( ctx ! ) . takeRetainedValue ( )
. callback ( nodesPtr , nodesSize )
} , cWrapperPtr ) ;
}
}
}
}
}
. eraseToAnyPublisher ( )
. eraseToAnyPublisher ( )
}
}
@ -179,6 +192,8 @@ public extension LibSession {
timeout : TimeInterval ,
timeout : TimeInterval ,
using dependencies : Dependencies
using dependencies : Dependencies
) -> AnyPublisher < ( ResponseInfoType , Data ? ) , Error > {
) -> AnyPublisher < ( ResponseInfoType , Data ? ) , Error > {
typealias Output = ( success : Bool , timeout : Bool , statusCode : Int , data : Data ? )
return getOrCreateNetwork ( )
return getOrCreateNetwork ( )
. tryFlatMap { network in
. tryFlatMap { network in
// P r e p a r e t h e p a r a m e t e r s
// P r e p a r e t h e p a r a m e t e r s
@ -196,22 +211,8 @@ public extension LibSession {
cPayloadBytes = Array ( encodedBody )
cPayloadBytes = Array ( encodedBody )
}
}
return Deferred {
return CallbackWrapper < Output >
Future < ( ResponseInfoType , Data ? ) , Error > { resolver in
. create { wrapper in
let callbackWrapper : CWrapper < NetworkCallback > = CWrapper { success , timeout , statusCode , data in
let maybeError : Error ? = processError ( success , timeout , statusCode , data , using : dependencies )
// D i s p a t c h a s y n c s o w e d o n ' t h o l d u p t h e l i b S e s s i o n t h r e a d ( w h i c h c a n b l o c k o t h e r r e q u e s t s )
DispatchQueue . global ( qos : . default ) . async {
switch maybeError {
case . some ( let error ) : resolver ( Result . failure ( error ) )
case . none :
resolver ( Result . success ( ( Network . ResponseInfo ( code : Int ( statusCode ) , headers : [ : ] ) , data ) ) )
}
}
}
let cWrapperPtr : UnsafeMutableRawPointer = Unmanaged . passRetained ( callbackWrapper ) . toOpaque ( )
// T r i g g e r t h e r e q u e s t
// T r i g g e r t h e r e q u e s t
switch destination {
switch destination {
case . snode ( let snode ) :
case . snode ( let snode ) :
@ -219,7 +220,7 @@ public extension LibSession {
// Q u i c k w a y t o d r o p ' 0 5 ' p r e f i x i f p r e s e n t
// Q u i c k w a y t o d r o p ' 0 5 ' p r e f i x i f p r e s e n t
$0 . suffix ( 64 ) . cString ( using : . utf8 ) ? . unsafeCopy ( )
$0 . suffix ( 64 ) . cString ( using : . utf8 ) ? . unsafeCopy ( )
}
}
callbackW rapper. addUnsafePointerToCleanup ( cSwarmPublicKey )
w rapper. addUnsafePointerToCleanup ( cSwarmPublicKey )
network_send_onion_request_to_snode_destination (
network_send_onion_request_to_snode_destination (
network ,
network ,
@ -230,10 +231,9 @@ public extension LibSession {
Int64 ( floor ( timeout * 1000 ) ) ,
Int64 ( floor ( timeout * 1000 ) ) ,
{ success , timeout , statusCode , dataPtr , dataLen , ctx in
{ success , timeout , statusCode , dataPtr , dataLen , ctx in
let data : Data ? = dataPtr . map { Data ( bytes : $0 , count : dataLen ) }
let data : Data ? = dataPtr . map { Data ( bytes : $0 , count : dataLen ) }
Unmanaged < CWrapper < NetworkCallback > > . fromOpaque ( ctx ! ) . takeRetainedValue ( )
CallbackWrapper < Output > . run ( ctx , ( success , timeout , Int ( statusCode ) , data ) )
. callback ( success , timeout , statusCode , data )
} ,
} ,
cWrapperPtr
wrapper. unsafePointer ( )
)
)
case . server ( let method , let scheme , let host , let endpoint , let port , let headers , let x25519PublicKey ) :
case . server ( let method , let scheme , let host , let endpoint , let port , let headers , let x25519PublicKey ) :
@ -256,8 +256,7 @@ public extension LibSession {
else {
else {
cHeaderKeysContent . forEach { $0 ? . deallocate ( ) }
cHeaderKeysContent . forEach { $0 ? . deallocate ( ) }
cHeaderValuesContent . forEach { $0 ? . deallocate ( ) }
cHeaderValuesContent . forEach { $0 ? . deallocate ( ) }
cWrapperPtr . deallocate ( )
throw LibSessionError . invalidCConversion
return resolver ( Result . failure ( LibSessionError . invalidCConversion ) )
}
}
// C o n v e r t t h e o t h e r t y p e s
// C o n v e r t t h e o t h e r t y p e s
@ -295,15 +294,15 @@ public extension LibSession {
)
)
// A d d a c l e a n u p c a l l b a c k t o d e a l l o c a t e t h e h e a d e r a r r a y s
// A d d a c l e a n u p c a l l b a c k t o d e a l l o c a t e t h e h e a d e r a r r a y s
callbackW rapper. addUnsafePointerToCleanup ( cMethod )
w rapper. addUnsafePointerToCleanup ( cMethod )
callbackW rapper. addUnsafePointerToCleanup ( cTargetScheme )
w rapper. addUnsafePointerToCleanup ( cTargetScheme )
callbackW rapper. addUnsafePointerToCleanup ( cHost )
w rapper. addUnsafePointerToCleanup ( cHost )
callbackW rapper. addUnsafePointerToCleanup ( cEndpoint )
w rapper. addUnsafePointerToCleanup ( cEndpoint )
callbackW rapper. addUnsafePointerToCleanup ( cX25519Pubkey )
w rapper. addUnsafePointerToCleanup ( cX25519Pubkey )
cHeaderKeysContent . forEach { callbackW rapper. addUnsafePointerToCleanup ( $0 ) }
cHeaderKeysContent . forEach { w rapper. addUnsafePointerToCleanup ( $0 ) }
cHeaderValuesContent . forEach { callbackW rapper. addUnsafePointerToCleanup ( $0 ) }
cHeaderValuesContent . forEach { w rapper. addUnsafePointerToCleanup ( $0 ) }
callbackW rapper. addUnsafePointerToCleanup ( cHeaderKeys )
w rapper. addUnsafePointerToCleanup ( cHeaderKeys )
callbackW rapper. addUnsafePointerToCleanup ( cHeaderValues )
w rapper. addUnsafePointerToCleanup ( cHeaderValues )
network_send_onion_request_to_server_destination (
network_send_onion_request_to_server_destination (
network ,
network ,
@ -313,14 +312,16 @@ public extension LibSession {
Int64 ( floor ( timeout * 1000 ) ) ,
Int64 ( floor ( timeout * 1000 ) ) ,
{ success , timeout , statusCode , dataPtr , dataLen , ctx in
{ success , timeout , statusCode , dataPtr , dataLen , ctx in
let data : Data ? = dataPtr . map { Data ( bytes : $0 , count : dataLen ) }
let data : Data ? = dataPtr . map { Data ( bytes : $0 , count : dataLen ) }
Unmanaged < CWrapper < NetworkCallback > > . fromOpaque ( ctx ! ) . takeRetainedValue ( )
CallbackWrapper < Output > . run ( ctx , ( success , timeout , Int ( statusCode ) , data ) )
. callback ( success , timeout , statusCode , data )
} ,
} ,
cWrapperPtr
wrapper. unsafePointer ( )
)
)
}
}
}
}
}
. tryMap { success , timeout , statusCode , data -> ( any ResponseInfoType , Data ? ) in
try throwErrorIfNeeded ( success , timeout , statusCode , data , using : dependencies )
return ( Network . ResponseInfo ( code : statusCode ) , data )
}
}
}
. eraseToAnyPublisher ( )
. eraseToAnyPublisher ( )
}
}
@ -435,45 +436,39 @@ public extension LibSession {
}
}
}
}
private static func processError (
private static func throwErrorIfNeeded (
_ success : Bool ,
_ success : Bool ,
_ timeout : Bool ,
_ timeout : Bool ,
_ statusCode : Int 16 ,
_ statusCode : Int ,
_ data : Data ? ,
_ data : Data ? ,
using dependencies : Dependencies
using dependencies : Dependencies
) -> Error ? {
) throws {
guard ! success || statusCode < 200 || statusCode > 299 else { return nil }
guard ! success || statusCode < 200 || statusCode > 299 else { return }
guard ! timeout else { return NetworkError . timeout }
guard ! timeout else { throw NetworkError . timeout }
// / H a n d l e s t a t u s c o d e s w i t h s p e c i f i c m e a n i n g s
// / H a n d l e s t a t u s c o d e s w i t h s p e c i f i c m e a n i n g s
switch ( statusCode , data . map { String ( data : $0 , encoding : . ascii ) } ) {
switch ( statusCode , data . map { String ( data : $0 , encoding : . ascii ) } ) {
case ( 400 , . none ) :
case ( 400 , . none ) :
return NetworkError . badRequest ( error : NetworkError . unknown . errorDescription ? ? " Bad Request " , rawData : data )
throw NetworkError . badRequest ( error : NetworkError . unknown . errorDescription ? ? " Bad Request " , rawData : data )
case ( 400 , . some ( let responseString ) ) : return NetworkError . badRequest ( error : responseString , rawData : data )
case ( 400 , . some ( let responseString ) ) : throw NetworkError . badRequest ( error : responseString , rawData : data )
case ( 401 , _ ) :
case ( 401 , _ ) :
Log . warn ( " Unauthorised (Failed to verify the signature). " )
Log . warn ( " Unauthorised (Failed to verify the signature). " )
return NetworkError . unauthorised
throw NetworkError . unauthorised
case ( 404 , _ ) : return NetworkError . notFound
case ( 404 , _ ) : throw NetworkError . notFound
// / A s n o d e w i l l r e t u r n a ` 4 0 6 ` b u t o n i o n r e q u e s t s v 4 s e e m s t o r e t u r n ` 4 2 5 ` s o h a n d l e b o t h
// / A s n o d e w i l l r e t u r n a ` 4 0 6 ` b u t o n i o n r e q u e s t s v 4 s e e m s t o r e t u r n ` 4 2 5 ` s o h a n d l e b o t h
case ( 406 , _ ) , ( 425 , _ ) :
case ( 406 , _ ) , ( 425 , _ ) :
Log . warn ( " The user's clock is out of sync with the service node network. " )
Log . warn ( " The user's clock is out of sync with the service node network. " )
return SnodeAPIError . clockOutOfSync
throw SnodeAPIError . clockOutOfSync
case ( 421 , _ ) : return SnodeAPIError . unassociatedPubkey
case ( 429 , _ ) : return SnodeAPIError . rateLimited
case ( 500 , _ ) , ( 502 , _ ) , ( 503 , _ ) : return SnodeAPIError . internalServerError
case ( _ , . none ) : return NetworkError . unknown
case ( _ , . some ( let responseString ) ) :
// A n i n t e r n a l s e r v e r e r r o r c o u l d r e t u r n H T M L d a t a , t h i s i s a n a t t e m p t t o i n t e r c e p t t h a t c a s e
guard ! responseString . starts ( with : " 500 Internal Server Error " ) else {
return SnodeAPIError . internalServerError
}
return NetworkError . requestFailed ( error : responseString , rawData : data )
case ( 421 , _ ) : throw SnodeAPIError . unassociatedPubkey
case ( 429 , _ ) : throw SnodeAPIError . rateLimited
case ( 500 , _ ) , ( 502 , _ ) , ( 503 , _ ) : throw SnodeAPIError . internalServerError
case ( _ , . none ) : throw NetworkError . unknown
case ( _ , . some ( let responseString ) ) : throw NetworkError . requestFailed ( error : responseString , rawData : data )
}
}
}
}
}
}