Merge branch 'charlesmchen/callSetup'

pull/1/head
Matthew Chen 6 years ago
commit 4fdb18f50d

@ -4580,8 +4580,6 @@ typedef enum : NSUInteger {
OWSAssertIsOnMainThread();
OWSAssertDebug(self.conversationViewModel);
OWSLogVerbose(@"");
// HACK to work around radar #28167779
// "UICollectionView performBatchUpdates can trigger a crash if the collection view is flagged for layout"
// more: https://github.com/PSPDFKit-labs/radar.apple.com/tree/master/28167779%20-%20CollectionViewBatchingIssue
@ -4604,8 +4602,6 @@ typedef enum : NSUInteger {
return;
}
OWSLogVerbose(@"");
[self updateBackButtonUnreadCount];
[self updateNavigationBarSubtitleLabel];

@ -1321,8 +1321,6 @@ NSString *const kArchivedConversationsReuseIdentifier = @"kArchivedConversations
return;
}
OWSLogVerbose(@"");
NSArray *notifications = [self.uiDatabaseConnection beginLongLivedReadTransaction];
if (![[self.uiDatabaseConnection ext:TSThreadDatabaseViewExtensionName] hasChangesForGroup:self.currentGrouping

@ -79,6 +79,7 @@ public enum CallError: Error {
case timeout(description: String)
case obsoleteCall(description: String)
case fatalError(description: String)
case messageSendFailure(underlyingError: Error)
}
// Should be roughly synced with Android client for consistency
@ -99,8 +100,14 @@ protocol CallServiceObserver: class {
remoteVideoTrack: RTCVideoTrack?)
}
protocol SignalCallDataDelegate: class {
func outgoingIceUpdateDidFail(call: SignalCall, error: Error)
}
// Gather all per-call state in one place.
private class SignalCallData: NSObject {
fileprivate weak var delegate: SignalCallDataDelegate?
public let call: SignalCall
// Used to coordinate promises across delegate methods
@ -147,8 +154,9 @@ private class SignalCallData: NSObject {
}
}
required init(call: SignalCall) {
required init(call: SignalCall, delegate: SignalCallDataDelegate) {
self.call = call
self.delegate = delegate
let (callConnectedPromise, callConnectedResolver) = Promise<Void>.pending()
self.callConnectedPromise = callConnectedPromise
@ -192,11 +200,85 @@ private class SignalCallData: NSObject {
peerConnectionClient?.terminate()
Logger.debug("setting peerConnectionClient")
outgoingIceUpdateQueue.removeAll()
}
// MARK: - Dependencies
private var messageSender: MessageSender {
return SSKEnvironment.shared.messageSender
}
// MARK: - Outgoing ICE updates.
// Setting up a call involves sending many (currently 20+) ICE updates.
// We send messages serially in order to preserve outgoing message order.
// There are so many ICE updates per call that the cost of sending all of
// those messages becomes significant. So we batch outgoing ICE updates,
// making sure that we only have one outgoing ICE update message at a time.
//
// This variable should only be accessed on the main thread.
private var outgoingIceUpdateQueue = [SSKProtoCallMessageIceUpdate]()
private var outgoingIceUpdatesInFlight = false
func sendOrEnqueue(outgoingIceUpdate iceUpdateProto: SSKProtoCallMessageIceUpdate) {
AssertIsOnMainThread()
outgoingIceUpdateQueue.append(iceUpdateProto)
tryToSendIceUpdates()
}
private func tryToSendIceUpdates() {
AssertIsOnMainThread()
guard !outgoingIceUpdatesInFlight else {
Logger.verbose("Enqueued outgoing ice update")
return
}
let iceUpdateProtos = outgoingIceUpdateQueue
guard iceUpdateProtos.count > 0 else {
// Nothing in the queue.
return
}
outgoingIceUpdateQueue.removeAll()
outgoingIceUpdatesInFlight = true
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
let callMessage = OWSOutgoingCallMessage(thread: call.thread, iceUpdateMessages: iceUpdateProtos)
let sendPromise = self.messageSender.sendPromise(message: callMessage)
.done { [weak self] in
AssertIsOnMainThread()
guard let strongSelf = self else {
return
}
strongSelf.outgoingIceUpdatesInFlight = false
strongSelf.tryToSendIceUpdates()
}.catch { [weak self] (error) in
AssertIsOnMainThread()
guard let strongSelf = self else {
return
}
strongSelf.outgoingIceUpdatesInFlight = false
strongSelf.delegate?.outgoingIceUpdateDidFail(call: strongSelf.call, error: error)
}
sendPromise.retainUntilComplete()
}
}
// This class' state should only be accessed on the main queue.
@objc public class CallService: NSObject, CallObserver, PeerConnectionClientDelegate {
@objc public class CallService: NSObject, CallObserver, PeerConnectionClientDelegate, SignalCallDataDelegate {
// MARK: - Properties
@ -216,6 +298,7 @@ private class SignalCallData: NSObject {
didSet {
AssertIsOnMainThread()
oldValue?.delegate = nil
oldValue?.call.removeObserver(self)
callData?.call.addObserverAndSyncState(observer: self)
@ -363,7 +446,7 @@ private class SignalCallData: NSObject {
return Promise(error: CallError.assertionError(description: errorDescription))
}
let callData = SignalCallData(call: call)
let callData = SignalCallData(call: call, delegate: self)
self.callData = callData
// MJK TODO remove this timestamp param
@ -371,7 +454,8 @@ private class SignalCallData: NSObject {
callRecord.save()
call.callRecord = callRecord
let promise = getIceServers().then { iceServers -> Promise<HardenedRTCSessionDescription> in
let promise = getIceServers()
.then { iceServers -> Promise<HardenedRTCSessionDescription> in
Logger.debug("got ice servers:\(iceServers) for call: \(call.identifiersForLogs)")
guard self.call == call else {
@ -404,9 +488,9 @@ private class SignalCallData: NSObject {
Logger.info("session description for outgoing call: \(call.identifiersForLogs), sdp: \(sessionDescription.logSafeDescription).")
return firstly {
return
peerConnectionClient.setLocalSessionDescription(sessionDescription)
}.then { _ -> Promise<Void> in
.then { _ -> Promise<Void> in
do {
let offerBuilder = SSKProtoCallMessageOffer.builder(id: call.signalingId,
sessionDescription: sessionDescription.sdp)
@ -500,9 +584,8 @@ private class SignalCallData: NSObject {
let sessionDescription = RTCSessionDescription(type: .answer, sdp: sessionDescription)
firstly {
peerConnectionClient.setRemoteSessionDescription(sessionDescription)
}.done {
.done {
Logger.debug("successfully set remote description")
}.catch { error in
if let callError = error as? CallError {
@ -664,7 +747,7 @@ private class SignalCallData: NSObject {
Logger.info("starting new call: \(newCall.identifiersForLogs)")
let callData = SignalCallData(call: newCall)
let callData = SignalCallData(call: newCall, delegate: self)
self.callData = callData
var backgroundTask: OWSBackgroundTask? = OWSBackgroundTask(label: "\(#function)", completionBlock: { [weak self] status in
@ -686,9 +769,8 @@ private class SignalCallData: NSObject {
strongSelf.handleFailedCall(failedCall: newCall, error: timeout)
})
firstly {
getIceServers()
}.then { (iceServers: [RTCIceServer]) -> Promise<HardenedRTCSessionDescription> in
.then { (iceServers: [RTCIceServer]) -> Promise<HardenedRTCSessionDescription> in
// FIXME for first time call recipients I think we'll see mic/camera permission requests here,
// even though, from the users perspective, no incoming call is yet visible.
guard self.call == newCall else {
@ -857,23 +939,24 @@ private class SignalCallData: NSObject {
Logger.info("sending ICE Candidate \(call.identifiersForLogs).")
let iceUpdateProto: SSKProtoCallMessageIceUpdate
do {
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
let iceUpdateBuilder = SSKProtoCallMessageIceUpdate.builder(id: call.signalingId,
sdpMid: sdpMid,
sdpMlineIndex: UInt32(iceCandidate.sdpMLineIndex),
sdp: iceCandidate.sdp)
let callMessage = OWSOutgoingCallMessage(thread: call.thread, iceUpdateMessage: try iceUpdateBuilder.build())
let sendPromise = self.messageSender.sendPromise(message: callMessage)
sendPromise.retainUntilComplete()
iceUpdateProto = try iceUpdateBuilder.build()
} catch {
owsFailDebug("Couldn't build proto")
throw CallError.fatalError(description: "Couldn't build proto")
}
/**
* Sent by both parties out of band of the RTC calling channels, as part of setting up those channels. The messages
* include network accessibility information from the perspective of each client. Once compatible ICEUpdates have been
* exchanged, the clients can connect.
*/
callData.sendOrEnqueue(outgoingIceUpdate: iceUpdateProto)
}.catch { error in
OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalAddedIceCandidate(), file: #file, function: #function, line: #line)
Logger.error("waitUntilReadyToSendIceUpdates failed with error: \(error)")
@ -1201,9 +1284,9 @@ private class SignalCallData: NSObject {
do {
let hangupBuilder = SSKProtoCallMessageHangup.builder(id: call.signalingId)
let callMessage = OWSOutgoingCallMessage(thread: call.thread, hangupMessage: try hangupBuilder.build())
firstly {
self.messageSender.sendPromise(message: callMessage)
}.done {
.done {
Logger.debug("successfully sent hangup call message to \(call.thread.contactIdentifier())")
}.catch { error in
OWSProdInfo(OWSAnalyticsEvents.callServiceErrorHandleLocalHungupCall(), file: #file, function: #function, line: #line)
@ -1520,11 +1603,9 @@ private class SignalCallData: NSObject {
* a list of servers, plus we have fallback servers hardcoded in the app.
*/
private func getIceServers() -> Promise<[RTCIceServer]> {
AssertIsOnMainThread()
return firstly {
accountManager.getTurnServerInfo()
}.map { turnServerInfo -> [RTCIceServer] in
return self.accountManager.getTurnServerInfo()
.map(on: DispatchQueue.global()) { turnServerInfo -> [RTCIceServer] in
Logger.debug("got turn server urls: \(turnServerInfo.urls)")
return turnServerInfo.urls.map { url in
@ -1537,7 +1618,7 @@ private class SignalCallData: NSObject {
return RTCIceServer(urlStrings: [url])
}
} + [CallService.fallbackIceServer]
}.recover { (error: Error) -> Guarantee<[RTCIceServer]> in
}.recover(on: DispatchQueue.global()) { (error: Error) -> Guarantee<[RTCIceServer]> in
Logger.error("fetching ICE servers failed with error: \(error)")
Logger.warn("using fallback ICE Servers")
@ -1812,4 +1893,17 @@ private class SignalCallData: NSObject {
self.activeCallTimer?.invalidate()
self.activeCallTimer = nil
}
// MARK: - SignalCallDataDelegate
func outgoingIceUpdateDidFail(call: SignalCall, error: Error) {
AssertIsOnMainThread()
guard self.call == call else {
Logger.warn("obsolete call")
return
}
handleFailedCall(failedCall: call, error: CallError.messageSendFailure(underlyingError: error))
}
}

@ -1,5 +1,5 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
// Copyright (c) 2019 Open Whisper Systems. All rights reserved.
//
import Foundation
@ -98,7 +98,7 @@ public class ProfileFetcherJob: NSObject {
Logger.error("background task time ran out before profile fetch completed.")
})
DispatchQueue.main.async {
DispatchQueue.global().async {
for recipientId in recipientIds {
self.getAndUpdateProfile(recipientId: recipientId)
}
@ -112,7 +112,7 @@ public class ProfileFetcherJob: NSObject {
public func getAndUpdateProfile(recipientId: String, remainingRetries: Int = 3) {
self.getProfile(recipientId: recipientId).map(on: DispatchQueue.global()) { profile in
self.updateProfile(signalServiceProfile: profile)
}.catch { error in
}.catch(on: DispatchQueue.global()) { error in
switch error {
case ProfileFetcherJobError.throttled(let lastTimeInterval):
Logger.info("skipping updateProfile: \(recipientId), lastTimeInterval: \(lastTimeInterval)")
@ -129,7 +129,6 @@ public class ProfileFetcherJob: NSObject {
}
public func getProfile(recipientId: String) -> Promise<SignalServiceProfile> {
AssertIsOnMainThread()
if !ignoreThrottling {
if let lastDate = ProfileFetcherJob.fetchDateMap[recipientId] {
let lastTimeInterval = fabs(lastDate.timeIntervalSinceNow)
@ -162,8 +161,6 @@ public class ProfileFetcherJob: NSObject {
private func requestProfile(recipientId: String,
udAccess: OWSUDAccess?,
canFailoverUDAuth: Bool) -> Promise<SignalServiceProfile> {
AssertIsOnMainThread()
let requestMaker = RequestMaker(label: "Profile Fetch",
requestFactoryBlock: { (udAccessKeyForRequest) -> TSRequest in
return OWSRequestFactory.getProfileRequest(recipientId: recipientId, udAccessKey: udAccessKeyForRequest)

@ -462,7 +462,7 @@ NSString *NSStringForOutgoingMessageRecipientState(OWSOutgoingMessageRecipientSt
// There's no need to save this message, since it's not displayed to the user.
//
// Should we find a need to save this in the future, we need to exclude any non-serializable properties.
OWSLogDebug(@"Skipping save for group meta message.");
OWSLogDebug(@"Skipping save for transient outgoing message.");
return;
}

@ -380,8 +380,7 @@ NSString *const OWSMessageDecryptJobFinderExtensionGroup = @"OWSMessageProcessin
OWSFailDebug(@"Could not parse proto.");
// TODO: Add analytics.
[[OWSPrimaryStorage.sharedManager newDatabaseConnection]
readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
[self.dbConnection readWriteWithBlock:^(YapDatabaseReadWriteTransaction *transaction) {
TSErrorMessage *errorMessage = [TSErrorMessage corruptedMessageInUnknownThread];
[SSKEnvironment.shared.notificationsManager notifyUserForThreadlessErrorMessage:errorMessage
transaction:transaction];

@ -32,7 +32,6 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithThread:(TSThread *)thread offerMessage:(SSKProtoCallMessageOffer *)offerMessage;
- (instancetype)initWithThread:(TSThread *)thread answerMessage:(SSKProtoCallMessageAnswer *)answerMessage;
- (instancetype)initWithThread:(TSThread *)thread iceUpdateMessage:(SSKProtoCallMessageIceUpdate *)iceUpdateMessage;
- (instancetype)initWithThread:(TSThread *)thread
iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessage;
- (instancetype)initWithThread:(TSThread *)thread hangupMessage:(SSKProtoCallMessageHangup *)hangupMessage;

@ -58,18 +58,6 @@ NS_ASSUME_NONNULL_BEGIN
return self;
}
- (instancetype)initWithThread:(TSThread *)thread iceUpdateMessage:(SSKProtoCallMessageIceUpdate *)iceUpdateMessage
{
self = [self initWithThread:thread];
if (!self) {
return self;
}
_iceUpdateMessages = @[ iceUpdateMessage ];
return self;
}
- (instancetype)initWithThread:(TSThread *)thread
iceUpdateMessages:(NSArray<SSKProtoCallMessageIceUpdate *> *)iceUpdateMessages
{
@ -189,7 +177,7 @@ NS_ASSUME_NONNULL_BEGIN
} else if (self.answerMessage) {
payload = @"answerMessage";
} else if (self.iceUpdateMessages.count > 0) {
payload = @"iceUpdateMessage";
payload = [NSString stringWithFormat:@"iceUpdateMessages: %lu", (unsigned long)self.iceUpdateMessages.count];
} else if (self.hangupMessage) {
payload = @"hangupMessage";
} else if (self.busyMessage) {

@ -1,5 +1,5 @@
//
// Copyright (c) 2018 Open Whisper Systems. All rights reserved.
// Copyright (c) 2019 Open Whisper Systems. All rights reserved.
//
import Foundation
@ -95,7 +95,7 @@ public class RequestMaker: NSObject {
@objc
public func makeRequestObjc() -> AnyPromise {
let promise = makeRequest()
.recover { (error: Error) -> Promise<RequestMakerResult> in
.recover(on: DispatchQueue.global()) { (error: Error) -> Promise<RequestMakerResult> in
switch error {
case NetworkManagerError.taskError(_, let underlyingError):
throw underlyingError
@ -173,7 +173,7 @@ public class RequestMaker: NSObject {
}
} else {
return self.networkManager.makePromise(request: request)
.map { (networkManagerResult: TSNetworkManager.NetworkManagerResult) -> RequestMakerResult in
.map(on: DispatchQueue.global()) { (networkManagerResult: TSNetworkManager.NetworkManagerResult) -> RequestMakerResult in
if self.udManager.isUDVerboseLoggingEnabled() {
if isUDRequest {
Logger.debug("UD REST request '\(self.label)' succeeded.")

Loading…
Cancel
Save