PeerConnectionClient thread safety.

pull/1/head
Matthew Chen 7 years ago
parent c3e8fde24c
commit e63a7f8fb0

@ -174,7 +174,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
private func createSignalingDataChannel() {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
guard let peerConnection = peerConnection else {
owsFail("\(logTag) in \(#function) peerConnection was unexpectedly nil")
return
@ -195,6 +197,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
fileprivate func createVideoSender() {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) in \(#function)")
assert(self.videoSender == nil, "\(#function) should only be called once.")
@ -240,9 +245,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let localVideoSource = strongSelf.localVideoSource else {
owsFail("\(strongSelf.logTag) in \(#function) localVideoSource was unexpectedly nil")
@ -261,9 +264,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func setLocalVideoEnabled(enabled: Bool) {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
let completion = { [weak self] in
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let localVideoTrack = strongSelf.localVideoTrack else { return }
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClient(strongSelf, didUpdateLocal: enabled ? localVideoTrack : nil)
@ -273,9 +282,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard strongSelf.peerConnection != nil else {
Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client")
@ -309,6 +316,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
fileprivate func createAudioSender() {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) in \(#function)")
assert(self.audioSender == nil, "\(#function) should only be called once.")
@ -336,22 +346,17 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func setAudioEnabled(enabled: Bool) {
SwiftAssertIsOnMainThread(#function)
Logger.info("\(self.logTag) setAudioEnabled 1.")
Logger.flush()
defer {
ensureTeardown(#function)
}
PeerConnectionClient.signalingQueue.async { [weak self] in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
Logger.info("\(strongSelf.logTag) setAudioEnabled 2.")
Logger.flush()
guard strongSelf.peerConnection != nil else {
Logger.debug("\(strongSelf.logTag) \(#function) Ignoring obsolete event in terminated client")
return
@ -363,9 +368,6 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
}
audioTrack.isEnabled = enabled
Logger.info("\(strongSelf.logTag) setAudioEnabled 3.")
Logger.flush()
}
}
@ -381,14 +383,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func createOffer() -> Promise<HardenedRTCSessionDescription> {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
let completion: ((((HardenedRTCSessionDescription) -> Void), ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
guard strongSelf.peerConnection != nil else {
@ -415,9 +418,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
guard let peerConnection = strongSelf.peerConnection else {
@ -443,13 +444,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
}
public func setLocalSessionDescriptionInternal(_ sessionDescription: HardenedRTCSessionDescription) -> Promise<Void> {
defer {
ensureTeardown(#function)
}
return PromiseKit.wrap { [weak self] resolve in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
@ -465,14 +468,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func setLocalSessionDescription(_ sessionDescription: HardenedRTCSessionDescription) -> Promise<Void> {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
guard let peerConnection = strongSelf.peerConnection else {
@ -501,6 +505,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func negotiateSessionDescription(remoteDescription: RTCSessionDescription, constraints: RTCMediaConstraints) -> Promise<HardenedRTCSessionDescription> {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
return setRemoteSessionDescription(remoteDescription)
.then(on: PeerConnectionClient.signalingQueue) { [weak self] in
@ -511,9 +518,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
}
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
return strongSelf.negotiateAnswerSessionDescription(constraints: constraints)
}
@ -521,14 +526,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func setRemoteSessionDescription(_ sessionDescription: RTCSessionDescription) -> Promise<Void> {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
let workBlock : ((@escaping (() -> Void), @escaping ((Error) -> Void)) -> Void) = { [weak self] (fulfill, reject) in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
guard let peerConnection = strongSelf.peerConnection else {
@ -556,14 +562,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
private func negotiateAnswerSessionDescription(constraints: RTCMediaConstraints) -> Promise<HardenedRTCSessionDescription> {
assertOnSignalingQueue()
defer {
ensureTeardown(#function)
}
let completion : ((@escaping ((HardenedRTCSessionDescription) -> Void), @escaping ((Error) -> Void), RTCSessionDescription?, Error?) -> Void) = { [weak self] (fulfill, reject, sdp, error) in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
guard strongSelf.peerConnection != nil else {
@ -597,9 +604,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.assertOnSignalingQueue()
@ -619,13 +624,14 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
}
public func addRemoteIceCandidate(_ candidate: RTCIceCandidate) {
defer {
ensureTeardown(#function)
}
PeerConnectionClient.signalingQueue.async { [weak self] in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let peerConnection = strongSelf.peerConnection else {
@ -639,6 +645,10 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func terminate() {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) in \(#function)")
Logger.flush()
@ -650,9 +660,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
strongSelf.terminateInternal()
@ -661,6 +669,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
private func terminateInternal() {
assertOnSignalingQueue()
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) in \(#function)")
Logger.flush()
@ -711,14 +722,15 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
public func sendDataChannelMessage(data: Data, description: String, isCritical: Bool) {
SwiftAssertIsOnMainThread(#function)
defer {
ensureTeardown(#function)
}
PeerConnectionClient.signalingQueue.async { [weak self] in
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard strongSelf.peerConnection != nil else {
@ -756,15 +768,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** The data channel state changed. */
internal func dataChannelDidChangeState(_ dataChannel: RTCDataChannel) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) dataChannelDidChangeState: \(dataChannel)")
}
/** The data channel successfully received a data buffer. */
internal func dataChannel(_ dataChannel: RTCDataChannel, didReceiveMessageWith buffer: RTCDataBuffer) {
defer {
ensureTeardown(#function)
}
let completion: (OWSWebRTCProtosData) -> Void = { [weak self] (dataChannelMessage) in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClient(strongSelf, received: dataChannelMessage)
}
@ -773,9 +794,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard strongSelf.peerConnection != nil else {
@ -798,6 +817,9 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** The data channel's |bufferedAmount| changed. */
internal func dataChannel(_ dataChannel: RTCDataChannel, didChangeBufferedAmount amount: UInt64) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) didChangeBufferedAmount: \(amount)")
}
@ -805,15 +827,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** Called when the SignalingState changed. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange stateChanged: RTCSignalingState) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) didChange signalingState:\(stateChanged.debugDescription)")
}
/** Called when media is received on a new stream from remote peer. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didAdd stream: RTCMediaStream) {
defer {
ensureTeardown(#function)
}
let completion: (RTCVideoTrack) -> Void = { [weak self] (remoteVideoTrack) in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
// TODO: Consider checking for termination here.
@ -825,9 +856,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let peerConnection = strongSelf.peerConnection else {
@ -855,32 +884,50 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** Called when a remote peer closes a stream. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove stream: RTCMediaStream) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) didRemove Stream:\(stream)")
}
/** Called when negotiation is needed, for example ICE has restarted. */
internal func peerConnectionShouldNegotiate(_ peerConnectionParam: RTCPeerConnection) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) shouldNegotiate")
}
/** Called any time the IceConnectionState changes. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceConnectionState) {
defer {
ensureTeardown(#function)
}
let connectedCompletion : () -> Void = { [weak self] in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClientIceConnected(strongSelf)
}
let failedCompletion : () -> Void = { [weak self] in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClientIceFailed(strongSelf)
}
let disconnectedCompletion : () -> Void = { [weak self] in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClientIceDisconnected(strongSelf)
}
@ -889,9 +936,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let peerConnection = strongSelf.peerConnection else {
@ -921,15 +966,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** Called any time the IceGatheringState changes. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didChange newState: RTCIceGatheringState) {
defer {
ensureTeardown(#function)
}
Logger.info("\(logTag) didChange IceGatheringState:\(newState.debugDescription)")
}
/** New ice candidate has been found. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didGenerate candidate: RTCIceCandidate) {
defer {
ensureTeardown(#function)
}
let completion: (RTCIceCandidate) -> Void = { [weak self] (candidate) in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
guard let strongDelegate = strongSelf.delegate else { return }
strongDelegate.peerConnectionClient(strongSelf, addedLocalIceCandidate: candidate)
}
@ -938,9 +992,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let peerConnection = strongSelf.peerConnection else {
@ -960,15 +1012,24 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
/** Called when a group of local Ice candidates have been removed. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didRemove candidates: [RTCIceCandidate]) {
defer {
ensureTeardown(#function)
}
Logger.debug("\(logTag) didRemove IceCandidates:\(candidates)")
}
/** New data channel has been opened. */
internal func peerConnection(_ peerConnectionParam: RTCPeerConnection, didOpen dataChannel: RTCDataChannel) {
defer {
ensureTeardown(#function)
}
let completion: ([PendingDataChannelMessage]) -> Void = { [weak self] (pendingMessages) in
SwiftAssertIsOnMainThread(#function)
guard let strongSelf = self else { return }
defer {
strongSelf.ensureTeardown(#function)
}
pendingMessages.forEach { message in
strongSelf.sendDataChannelMessage(data: message.data, description: message.description, isCritical: message.isCritical)
}
@ -978,9 +1039,7 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
guard let strongSelf = self else { return }
Logger.info("\(strongSelf.logTag) \(#function) starting."); Logger.flush()
defer {
DispatchQueue.main.async {
Logger.info("\(strongSelf.logTag) \(#function) complete."); Logger.flush()
}
strongSelf.ensureTeardown(#function)
}
guard let peerConnection = strongSelf.peerConnection else {
@ -1047,7 +1106,13 @@ class PeerConnectionClient: NSObject, RTCPeerConnectionDelegate, RTCDataChannelD
}
}
private func
private func ensureTeardown(_ functionName: String) {
PeerConnectionClient.signalingQueue.async {
// DispatchQueue.main.async {
Logger.info("\(self.logTag) \(functionName) complete.")
Logger.flush()
}
}
}
/**

Loading…
Cancel
Save