Batch outgoing ICE updates.

pull/1/head
Matthew Chen 6 years ago
parent 5bb78cba25
commit 70185dd872

@ -192,6 +192,70 @@ private class SignalCallData: NSObject {
peerConnectionClient?.terminate() peerConnectionClient?.terminate()
Logger.debug("setting peerConnectionClient") Logger.debug("setting peerConnectionClient")
outgoingIceUpdateQueue.removeAll()
}
// MARK: - Dependencies
private var messageSender: MessageSender {
return SSKEnvironment.shared.messageSender
}
// MARK: - Outgoing ICE updates.
// Setting up a call involves sending many (currently 20+) ICE updates.
// We send messages serially in order to preserve outgoing message order.
// There are so many ICE updates per call that the cost of sending all of
// those messages becomes significant. So we batch outgoing ICE updates,
// making sure that we only have one outgoing ICE update message at a time.
//
// This variable should only be accessed on the main thread.
private var outgoingIceUpdateQueue = [SSKProtoCallMessageIceUpdate]()
private var outgoingIceUpdatesInFlight = false
func sendOrEnqueue(outgoingIceUpdate iceUpdateProto: SSKProtoCallMessageIceUpdate) {
AssertIsOnMainThread()
outgoingIceUpdateQueue.append(iceUpdateProto)
tryToSendIceUpdates()
}
private func tryToSendIceUpdates() {
guard !outgoingIceUpdatesInFlight else {
Logger.verbose("Enqueued outgoing ice update")
return
}
let iceUpdateProtos = outgoingIceUpdateQueue
guard iceUpdateProtos.count > 0 else {
// Nothing in the queue.
return
}
outgoingIceUpdateQueue.removeAll()
outgoingIceUpdatesInFlight = true
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
let callMessage = OWSOutgoingCallMessage(thread: call.thread, iceUpdateMessages: iceUpdateProtos)
let sendPromise = self.messageSender.sendPromise(message: callMessage)
.ensure { [weak self] in
AssertIsOnMainThread()
guard let strongSelf = self else {
return
}
strongSelf.outgoingIceUpdatesInFlight = false
strongSelf.tryToSendIceUpdates()
}
sendPromise.retainUntilComplete()
} }
} }
@ -249,9 +313,6 @@ private class SignalCallData: NSObject {
return callData?.call return callData?.call
} }
didSet {
AssertIsOnMainThread()
}
} }
var peerConnectionClient: PeerConnectionClient? { var peerConnectionClient: PeerConnectionClient? {
get { get {
@ -259,9 +320,6 @@ private class SignalCallData: NSObject {
return callData?.peerConnectionClient return callData?.peerConnectionClient
} }
didSet {
AssertIsOnMainThread()
}
} }
weak var localCaptureSession: AVCaptureSession? { weak var localCaptureSession: AVCaptureSession? {
@ -270,9 +328,6 @@ private class SignalCallData: NSObject {
return callData?.localCaptureSession return callData?.localCaptureSession
} }
didSet {
AssertIsOnMainThread()
}
} }
var remoteVideoTrack: RTCVideoTrack? { var remoteVideoTrack: RTCVideoTrack? {
@ -281,9 +336,6 @@ private class SignalCallData: NSObject {
return callData?.remoteVideoTrack return callData?.remoteVideoTrack
} }
didSet {
AssertIsOnMainThread()
}
} }
var isRemoteVideoEnabled: Bool { var isRemoteVideoEnabled: Bool {
get { get {
@ -294,9 +346,6 @@ private class SignalCallData: NSObject {
} }
return callData.isRemoteVideoEnabled return callData.isRemoteVideoEnabled
} }
didSet {
AssertIsOnMainThread()
}
} }
@objc public override init() { @objc public override init() {
@ -420,9 +469,9 @@ private class SignalCallData: NSObject {
Logger.info("session description for outgoing call: \(call.identifiersForLogs), sdp: \(sessionDescription.logSafeDescription).") Logger.info("session description for outgoing call: \(call.identifiersForLogs), sdp: \(sessionDescription.logSafeDescription).")
return firstly { return
peerConnectionClient.setLocalSessionDescription(sessionDescription) peerConnectionClient.setLocalSessionDescription(sessionDescription)
}.then { _ -> Promise<Void> in .then { _ -> Promise<Void> in
do { do {
let offerBuilder = SSKProtoCallMessageOffer.builder(id: call.signalingId, let offerBuilder = SSKProtoCallMessageOffer.builder(id: call.signalingId,
sessionDescription: sessionDescription.sdp) sessionDescription: sessionDescription.sdp)
@ -516,9 +565,8 @@ private class SignalCallData: NSObject {
let sessionDescription = RTCSessionDescription(type: .answer, sdp: sessionDescription) let sessionDescription = RTCSessionDescription(type: .answer, sdp: sessionDescription)
firstly { peerConnectionClient.setRemoteSessionDescription(sessionDescription)
peerConnectionClient.setRemoteSessionDescription(sessionDescription) .done {
}.done {
Logger.debug("successfully set remote description") Logger.debug("successfully set remote description")
}.catch { error in }.catch { error in
if let callError = error as? CallError { if let callError = error as? CallError {
@ -872,23 +920,24 @@ private class SignalCallData: NSObject {
Logger.info("sending ICE Candidate \(call.identifiersForLogs).") Logger.info("sending ICE Candidate \(call.identifiersForLogs).")
let iceUpdateProto: SSKProtoCallMessageIceUpdate
do { do {
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
let iceUpdateBuilder = SSKProtoCallMessageIceUpdate.builder(id: call.signalingId, let iceUpdateBuilder = SSKProtoCallMessageIceUpdate.builder(id: call.signalingId,
sdpMid: sdpMid, sdpMid: sdpMid,
sdpMlineIndex: UInt32(iceCandidate.sdpMLineIndex), sdpMlineIndex: UInt32(iceCandidate.sdpMLineIndex),
sdp: iceCandidate.sdp) sdp: iceCandidate.sdp)
let callMessage = OWSOutgoingCallMessage(thread: call.thread, iceUpdateMessage: try iceUpdateBuilder.build()) iceUpdateProto = try iceUpdateBuilder.build()
let sendPromise = self.messageSender.sendPromise(message: callMessage)
sendPromise.retainUntilComplete()
} catch { } catch {
owsFailDebug("Couldn't build proto") owsFailDebug("Couldn't build proto")
throw CallError.fatalError(description: "Couldn't build proto") throw CallError.fatalError(description: "Couldn't build proto")
} }
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
callData.sendOrEnqueue(outgoingIceUpdate: iceUpdateProto)
}.catch { error in }.catch { error in
OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalAddedIceCandidate(), file: #file, function: #function, line: #line) OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalAddedIceCandidate(), file: #file, function: #function, line: #line)
Logger.error("waitUntilReadyToSendIceUpdates failed with error: \(error)") Logger.error("waitUntilReadyToSendIceUpdates failed with error: \(error)")
@ -898,12 +947,14 @@ private class SignalCallData: NSObject {
/** /**
* The clients can now communicate via WebRTC. * The clients can now communicate via WebRTC.
* *
* Called by both caller and callee. Compatible ICE messages have been exchanged between the local and remote * Called by both caller and callee. Compatible ICE messages have been exchanged between the local and remote
* client. * client.
*/ */
private func handleIceConnected() { private func handleIceConnected() {
AssertIsOnMainThread() AssertIsOnMainThread()
Logger.verbose("-----.")
guard let call = self.call else { guard let call = self.call else {
// This will only be called for the current peerConnectionClient, so // This will only be called for the current peerConnectionClient, so
// fail the current call. // fail the current call.
@ -1216,9 +1267,9 @@ private class SignalCallData: NSObject {
do { do {
let hangupBuilder = SSKProtoCallMessageHangup.builder(id: call.signalingId) let hangupBuilder = SSKProtoCallMessageHangup.builder(id: call.signalingId)
let callMessage = OWSOutgoingCallMessage(thread: call.thread, hangupMessage: try hangupBuilder.build()) let callMessage = OWSOutgoingCallMessage(thread: call.thread, hangupMessage: try hangupBuilder.build())
firstly {
self.messageSender.sendPromise(message: callMessage) self.messageSender.sendPromise(message: callMessage)
}.done { .done {
Logger.debug("successfully sent hangup call message to \(call.thread.contactIdentifier())") Logger.debug("successfully sent hangup call message to \(call.thread.contactIdentifier())")
}.catch { error in }.catch { error in
OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalHungupCall(), file: #file, function: #function, line: #line) OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalHungupCall(), file: #file, function: #function, line: #line)
@ -1536,7 +1587,7 @@ private class SignalCallData: NSObject {
*/ */
private func getIceServers() -> Promise<[RTCIceServer]> { private func getIceServers() -> Promise<[RTCIceServer]> {
self.accountManager.getTurnServerInfo() return self.accountManager.getTurnServerInfo()
.map(on: DispatchQueue.global()) { turnServerInfo -> [RTCIceServer] in .map(on: DispatchQueue.global()) { turnServerInfo -> [RTCIceServer] in
Logger.debug("got turn server urls: \(turnServerInfo.urls)") Logger.debug("got turn server urls: \(turnServerInfo.urls)")

@ -32,7 +32,6 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithThread:(TSThread *)thread offerMessage:(SSKProtoCallMessageOffer *)offerMessage; - (instancetype)initWithThread:(TSThread *)thread offerMessage:(SSKProtoCallMessageOffer *)offerMessage;
- (instancetype)initWithThread:(TSThread *)thread answerMessage:(SSKProtoCallMessageAnswer *)answerMessage; - (instancetype)initWithThread:(TSThread *)thread answerMessage:(SSKProtoCallMessageAnswer *)answerMessage;
- (instancetype)initWithThread:(TSThread *)thread iceUpdateMessage:(SSKProtoCallMessageIceUpdate *)iceUpdateMessage;
- (instancetype)initWithThread:(TSThread *)thread - (instancetype)initWithThread:(TSThread *)thread
iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessage; iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessage;
- (instancetype)initWithThread:(TSThread *)thread hangupMessage:(SSKProtoCallMessageHangup *)hangupMessage; - (instancetype)initWithThread:(TSThread *)thread hangupMessage:(SSKProtoCallMessageHangup *)hangupMessage;

@ -58,18 +58,6 @@ NS_ASSUME_NONNULL_BEGIN
return self; return self;
} }
- (instancetype)initWithThread:(TSThread *)thread iceUpdateMessage:(SSKProtoCallMessageIceUpdate *)iceUpdateMessage
{
self = [self initWithThread:thread];
if (!self) {
return self;
}
_iceUpdateMessages = @[ iceUpdateMessage ];
return self;
}
- (instancetype)initWithThread:(TSThread *)thread - (instancetype)initWithThread:(TSThread *)thread
iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessages iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessages
{ {
@ -189,7 +177,7 @@ NS_ASSUME_NONNULL_BEGIN
} else if (self.answerMessage) { } else if (self.answerMessage) {
payload = @"answerMessage"; payload = @"answerMessage";
} else if (self.iceUpdateMessages.count > 0) { } else if (self.iceUpdateMessages.count > 0) {
payload = @"iceUpdateMessage"; payload = [NSString stringWithFormat:@"iceUpdateMessages: %lu", (unsigned long)self.iceUpdateMessages.count];
} else if (self.hangupMessage) { } else if (self.hangupMessage) {
payload = @"hangupMessage"; payload = @"hangupMessage";
} else if (self.busyMessage) { } else if (self.busyMessage) {

Loading…
Cancel
Save