From e63a7f8fb0a723332c284270779c77a366b0eaf6 Mon Sep 17 00:00:00 2001 From: Matthew Chen Date: Mon, 21 May 2018 12:39:49 -0400 Subject: [PATCH] PeerConnectionClient thread safety. --- Signal/src/call/PeerConnectionClient.swift | 201 ++++++++++++++------- 1 file changed, 133 insertions(+), 68 deletions(-) diff --git a/Signal/src/call/PeerConnectionClient.swift b/Signal/src/call/PeerConnectionClient.swift index 7ecb51b04..18bd90cf7 100644 --- a/Signal/src/call/PeerConnectionClient.swift +++ b/Signal/src/call/PeerConnectionClient.swift @@ -174,7 +174,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD private func createSignalingDataChannel() { SwiftAssertIsOnMainThread(#function) - + defer { + ensureTeardown(#function) + } guard let peerConnection = peerConnection else { owsFail("\(logTag) in \(#function) peerConnection was unexpectedly nil") return @@ -195,6 +197,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD fileprivate func createVideoSender() { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) in \(#function)") assert(self.videoSender == nil, "\(#function) should only be called once.") @@ -240,9 +245,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let localVideoSource = strongSelf.localVideoSource else { owsFail("\(strongSelf.logTag) in \(#function) localVideoSource was unexpectedly nil") @@ -261,9 +264,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setLocalVideoEnabled(enabled: Bool) { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } let completion = { [weak self] in guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let localVideoTrack = strongSelf.localVideoTrack else { return } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClient(strongSelf, didUpdateLocal: enabled ? localVideoTrack : nil) @@ -273,9 +282,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard strongSelf.peerConnection != nil else { Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") @@ -309,6 +316,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD fileprivate func createAudioSender() { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) in \(#function)") assert(self.audioSender == nil, "\(#function) should only be called once.") @@ -336,22 +346,17 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setAudioEnabled(enabled: Bool) { SwiftAssertIsOnMainThread(#function) - - Logger.info("\(self.logTag) setAudioEnabled 1.") - Logger.flush() + defer { + ensureTeardown(#function) + } PeerConnectionClient.signalingQueue.async { [weak self] in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } - Logger.info("\(strongSelf.logTag) setAudioEnabled 2.") - Logger.flush() - guard strongSelf.peerConnection != nil else { Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client") return @@ -363,9 +368,6 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD } audioTrack.isEnabled = enabled - - Logger.info("\(strongSelf.logTag) setAudioEnabled 3.") - Logger.flush() } } @@ -381,14 +383,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func createOffer() -> Promise { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } let completion: ((((HardenedRTCSessionDescription) -> Void), ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() guard strongSelf.peerConnection != nil else { @@ -415,9 +418,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() guard let peerConnection = strongSelf.peerConnection else { @@ -443,13 +444,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD } public func setLocalSessionDescriptionInternal(_ sessionDescription: HardenedRTCSessionDescription) -> Promise { + defer { + ensureTeardown(#function) + } + return PromiseKit.wrap { [weak self] resolve in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() @@ -465,14 +468,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setLocalSessionDescription(_ sessionDescription: HardenedRTCSessionDescription) -> Promise { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() guard let peerConnection = strongSelf.peerConnection else { @@ -501,6 +505,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func negotiateSessionDescription(remoteDescription: RTCSessionDescription, constraints: RTCMediaConstraints) -> Promise { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } return setRemoteSessionDescription(remoteDescription) .then(on: PeerConnectionClient.signalingQueue) { [weak self] in @@ -511,9 +518,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } return strongSelf.negotiateAnswerSessionDescription(constraints: constraints) } @@ -521,14 +526,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func setRemoteSessionDescription(_ sessionDescription: RTCSessionDescription) -> Promise { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() guard let peerConnection = strongSelf.peerConnection else { @@ -556,14 +562,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD private func negotiateAnswerSessionDescription(constraints: RTCMediaConstraints) -> Promise { assertOnSignalingQueue() + defer { + ensureTeardown(#function) + } let completion : ((@escaping ((HardenedRTCSessionDescription) -> Void), @escaping ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() guard strongSelf.peerConnection != nil else { @@ -597,9 +604,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.assertOnSignalingQueue() @@ -619,13 +624,14 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD } public func addRemoteIceCandidate(_ candidate: RTCIceCandidate) { + defer { + ensureTeardown(#function) + } PeerConnectionClient.signalingQueue.async { [weak self] in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let peerConnection = strongSelf.peerConnection else { @@ -639,6 +645,10 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func terminate() { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } + Logger.debug("\(logTag) in \(#function)") Logger.flush() @@ -650,9 +660,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } strongSelf.terminateInternal() @@ -661,6 +669,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD private func terminateInternal() { assertOnSignalingQueue() + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) in \(#function)") Logger.flush() @@ -711,14 +722,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD public func sendDataChannelMessage(data: Data, description: String, isCritical: Bool) { SwiftAssertIsOnMainThread(#function) + defer { + ensureTeardown(#function) + } PeerConnectionClient.signalingQueue.async { [weak self] in guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard strongSelf.peerConnection != nil else { @@ -756,15 +768,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** The data channel state changed. */ internal func dataChannelDidChangeState(_ dataChannel: RTCDataChannel) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) dataChannelDidChangeState: \(dataChannel)") } /** The data channel successfully received a data buffer. */ internal func dataChannel(_ dataChannel: RTCDataChannel, didReceiveMessageWith buffer: RTCDataBuffer) { + defer { + ensureTeardown(#function) + } let completion: (OWSWebRTCProtosData) -> Void = { [weak self] (dataChannelMessage) in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClient(strongSelf, received: dataChannelMessage) } @@ -773,9 +794,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard strongSelf.peerConnection != nil else { @@ -798,6 +817,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** The data channel's |bufferedAmount| changed. */ internal func dataChannel(_ dataChannel: RTCDataChannel, didChangeBufferedAmount amount: UInt64) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) didChangeBufferedAmount: \(amount)") } @@ -805,15 +827,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** Called when the SignalingState changed. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange stateChanged: RTCSignalingState) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) didChange signalingState:\(stateChanged.debugDescription)") } /** Called when media is received on a new stream from remote peer. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didAdd stream: RTCMediaStream) { + defer { + ensureTeardown(#function) + } let completion: (RTCVideoTrack) -> Void = { [weak self] (remoteVideoTrack) in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } // TODO: Consider checking for termination here. @@ -825,9 +856,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let peerConnection = strongSelf.peerConnection else { @@ -855,32 +884,50 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** Called when a remote peer closes a stream. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove stream: RTCMediaStream) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) didRemove Stream:\(stream)") } /** Called when negotiation is needed, for example ICE has restarted. */ internal func peerConnectionShouldNegotiate(_ peerConnectionParam: RTCPeerConnection) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) shouldNegotiate") } /** Called any time the IceConnectionState changes. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceConnectionState) { + defer { + ensureTeardown(#function) + } let connectedCompletion : () -> Void = { [weak self] in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClientIceConnected(strongSelf) } let failedCompletion : () -> Void = { [weak self] in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClientIceFailed(strongSelf) } let disconnectedCompletion : () -> Void = { [weak self] in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClientIceDisconnected(strongSelf) } @@ -889,9 +936,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let peerConnection = strongSelf.peerConnection else { @@ -921,15 +966,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** Called any time the IceGatheringState changes. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceGatheringState) { + defer { + ensureTeardown(#function) + } Logger.info("\(logTag) didChange IceGatheringState:\(newState.debugDescription)") } /** New ice candidate has been found. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didGenerate candidate: RTCIceCandidate) { + defer { + ensureTeardown(#function) + } let completion: (RTCIceCandidate) -> Void = { [weak self] (candidate) in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } guard let strongDelegate = strongSelf.delegate else { return } strongDelegate.peerConnectionClient(strongSelf, addedLocalIceCandidate: candidate) } @@ -938,9 +992,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let peerConnection = strongSelf.peerConnection else { @@ -960,15 +1012,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD /** Called when a group of local Ice candidates have been removed. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove candidates: [RTCIceCandidate]) { + defer { + ensureTeardown(#function) + } Logger.debug("\(logTag) didRemove IceCandidates:\(candidates)") } /** New data channel has been opened. */ internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didOpen dataChannel: RTCDataChannel) { + defer { + ensureTeardown(#function) + } let completion: ([PendingDataChannelMessage]) -> Void = { [weak self] (pendingMessages) in SwiftAssertIsOnMainThread(#function) guard let strongSelf = self else { return } + defer { + strongSelf.ensureTeardown(#function) + } pendingMessages.forEach { message in strongSelf.sendDataChannelMessage(data: message.data, description: message.description, isCritical: message.isCritical) } @@ -978,9 +1039,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD guard let strongSelf = self else { return } Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush() defer { - DispatchQueue.main.async { - Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush() - } + strongSelf.ensureTeardown(#function) } guard let peerConnection = strongSelf.peerConnection else { @@ -1047,7 +1106,13 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD } } - private func + private func ensureTeardown(_ functionName: String) { + PeerConnectionClient.signalingQueue.async { +// DispatchQueue.main.async { + Logger.info("\(self.logTag) \(functionName) complete.") + Logger.flush() + } + } } /**