From d55c96cb67399b401a5dab35d857c0415009ed6e Mon Sep 17 00:00:00 2001 From: Audric Ackermann Date: Tue, 21 Sep 2021 17:51:23 +1000 Subject: [PATCH] setup webrtc between two pubkey --- preload.js | 1 + ts/receiver/callMessage.ts | 80 +++++ ts/receiver/contentMessage.ts | 4 + .../outgoing/controlMessage/CallMessage.ts | 24 +- ts/session/sending/MessageQueue.ts | 3 +- ts/session/snode_api/SNodeAPI.ts | 4 +- ts/session/utils/CallManager.ts | 300 ++++++++++++++++++ ts/session/utils/index.ts | 2 + ts/window.d.ts | 1 + 9 files changed, 405 insertions(+), 14 deletions(-) create mode 100644 ts/receiver/callMessage.ts create mode 100644 ts/session/utils/CallManager.ts diff --git a/preload.js b/preload.js index 51864a310..050116a71 100644 --- a/preload.js +++ b/preload.js @@ -53,6 +53,7 @@ window.lokiFeatureFlags = { padOutgoingAttachments: true, enablePinConversations: true, useUnsendRequests: false, + useCallMessage: true, }; window.isBeforeVersion = (toCheck, baseVersion) => { diff --git a/ts/receiver/callMessage.ts b/ts/receiver/callMessage.ts new file mode 100644 index 000000000..4e610dddf --- /dev/null +++ b/ts/receiver/callMessage.ts @@ -0,0 +1,80 @@ +import _ from 'lodash'; +import { SignalService } from '../protobuf'; +import { TTL_DEFAULT } from '../session/constants'; +import { SNodeAPI } from '../session/snode_api'; +import { CallManager } from '../session/utils'; +import { removeFromCache } from './cache'; +import { EnvelopePlus } from './types'; + +// audric FIXME: refactor this out to persistence, just to help debug the flow and send/receive in synchronous testing + +export async function handleCallMessage( + envelope: EnvelopePlus, + callMessage: SignalService.CallMessage +) { + const sender = envelope.senderIdentity || envelope.source; + + const currentOffset = SNodeAPI.getLatestTimestampOffset(); + const sentTimestamp = _.toNumber(envelope.timestamp); + + const { type } = callMessage; + switch (type) { + case SignalService.CallMessage.Type.END_CALL: + window.log.info('handling callMessage END_CALL'); + break; + case SignalService.CallMessage.Type.ANSWER: + window.log.info('handling callMessage ANSWER'); + break; + case SignalService.CallMessage.Type.ICE_CANDIDATES: + window.log.info('handling callMessage ICE_CANDIDATES'); + break; + case SignalService.CallMessage.Type.OFFER: + window.log.info('handling callMessage OFFER'); + break; + case SignalService.CallMessage.Type.PROVISIONAL_ANSWER: + window.log.info('handling callMessage PROVISIONAL_ANSWER'); + break; + default: + window.log.info('handling callMessage unknown'); + } + if (type === SignalService.CallMessage.Type.OFFER) { + if (Math.max(sentTimestamp - (Date.now() - currentOffset)) > TTL_DEFAULT.CALL_MESSAGE) { + window?.log?.info('Dropping incoming OFFER callMessage sent a while ago: ', sentTimestamp); + await removeFromCache(envelope); + + return; + } + await removeFromCache(envelope); + + await CallManager.handleOfferCallMessage(sender, callMessage); + + return; + } + + if (type === SignalService.CallMessage.Type.END_CALL) { + await removeFromCache(envelope); + + CallManager.handleEndCallMessage(sender); + + return; + } + + if (type === SignalService.CallMessage.Type.ANSWER) { + await removeFromCache(envelope); + + await CallManager.handleCallAnsweredMessage(sender, callMessage); + + return; + } + if (type === SignalService.CallMessage.Type.ICE_CANDIDATES) { + await removeFromCache(envelope); + + await CallManager.handleIceCandidatesMessage(sender, callMessage); + + return; + } + await removeFromCache(envelope); + + // if this another type of call message, just add it to the manager + await CallManager.handleOtherCallMessage(sender, callMessage); +} diff --git a/ts/receiver/contentMessage.ts b/ts/receiver/contentMessage.ts index f5422bc33..c3f900f33 100644 --- a/ts/receiver/contentMessage.ts +++ b/ts/receiver/contentMessage.ts @@ -18,6 +18,7 @@ import { removeMessagePadding } from '../session/crypto/BufferPadding'; import { perfEnd, perfStart } from '../session/utils/Performance'; import { getAllCachedECKeyPair } from './closedGroups'; import { getMessageBySenderAndTimestamp } from '../data/data'; +import { handleCallMessage } from './callMessage'; export async function handleContentMessage(envelope: EnvelopePlus, messageHash?: string) { try { @@ -399,6 +400,9 @@ export async function innerHandleContentMessage( if (content.unsendMessage && window.lokiFeatureFlags?.useUnsendRequests) { await handleUnsendMessage(envelope, content.unsendMessage as SignalService.Unsend); } + if (content.callMessage && window.lokiFeatureFlags?.useCallMessage) { + await handleCallMessage(envelope, content.callMessage as SignalService.CallMessage); + } } catch (e) { window?.log?.warn(e); } diff --git a/ts/session/messages/outgoing/controlMessage/CallMessage.ts b/ts/session/messages/outgoing/controlMessage/CallMessage.ts index 59e3a5b79..307ea9991 100644 --- a/ts/session/messages/outgoing/controlMessage/CallMessage.ts +++ b/ts/session/messages/outgoing/controlMessage/CallMessage.ts @@ -3,28 +3,30 @@ import { MessageParams } from '../Message'; import { ContentMessage } from '..'; import { signalservice } from '../../../../protobuf/compiled'; import { TTL_DEFAULT } from '../../../constants'; -interface CallMessageMessageParams extends MessageParams { +interface CallMessageParams extends MessageParams { type: SignalService.CallMessage.Type; - sdpMLineIndexes: Array; - sdpMids: Array; - sdps: Array; - referencedAttachmentTimestamp: number; + sdpMLineIndexes?: Array; + sdpMids?: Array; + sdps?: Array; } -export class CallMessageMessage extends ContentMessage { +export class CallMessage extends ContentMessage { public readonly type: signalservice.CallMessage.Type; - public readonly sdpMLineIndexes: Array; - public readonly sdpMids: Array; - public readonly sdps: Array; + public readonly sdpMLineIndexes?: Array; + public readonly sdpMids?: Array; + public readonly sdps?: Array; - constructor(params: CallMessageMessageParams) { + constructor(params: CallMessageParams) { super({ timestamp: params.timestamp, identifier: params.identifier }); this.type = params.type; this.sdpMLineIndexes = params.sdpMLineIndexes; this.sdpMids = params.sdpMids; this.sdps = params.sdps; // this does not make any sense - if (this.type !== signalservice.CallMessage.Type.END_CALL && this.sdps.length === 0) { + if ( + this.type !== signalservice.CallMessage.Type.END_CALL && + (!this.sdps || this.sdps.length === 0) + ) { throw new Error('sdps must be set unless this is a END_CALL type message'); } } diff --git a/ts/session/sending/MessageQueue.ts b/ts/session/sending/MessageQueue.ts index e7ffdb4e2..1a7cf9db4 100644 --- a/ts/session/sending/MessageQueue.ts +++ b/ts/session/sending/MessageQueue.ts @@ -21,6 +21,7 @@ import { SyncMessageType } from '../utils/syncUtils'; import { OpenGroupRequestCommonType } from '../../opengroup/opengroupV2/ApiUtil'; import { OpenGroupVisibleMessage } from '../messages/outgoing/visibleMessage/OpenGroupVisibleMessage'; import { UnsendMessage } from '../messages/outgoing/controlMessage/UnsendMessage'; +import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage'; type ClosedGroupMessageType = | ClosedGroupVisibleMessage @@ -129,7 +130,7 @@ export class MessageQueue { */ public async sendToPubKeyNonDurably( user: PubKey, - message: ClosedGroupNewMessage + message: ClosedGroupNewMessage | CallMessage ): Promise { let rawMessage; try { diff --git a/ts/session/snode_api/SNodeAPI.ts b/ts/session/snode_api/SNodeAPI.ts index 5feddefc1..e1e52ed9c 100644 --- a/ts/session/snode_api/SNodeAPI.ts +++ b/ts/session/snode_api/SNodeAPI.ts @@ -30,11 +30,11 @@ export const ERROR_CODE_NO_CONNECT = 'ENETUNREACH: No network connection.'; let latestTimestampOffset = Number.MAX_SAFE_INTEGER; -function handleTimestampOffset(request: string, snodeTimestamp: number) { +function handleTimestampOffset(_request: string, snodeTimestamp: number) { if (snodeTimestamp && _.isNumber(snodeTimestamp) && snodeTimestamp > 1609419600 * 1000) { // first january 2021. Arbitrary, just want to make sure the return timestamp is somehow valid and not some crazy low value const now = Date.now(); - window?.log?.info(`timestamp offset from request ${request}: ${now - snodeTimestamp}ms`); + // window?.log?.info(`timestamp offset from request ${request}: ${now - snodeTimestamp}ms`); latestTimestampOffset = now - snodeTimestamp; } } diff --git a/ts/session/utils/CallManager.ts b/ts/session/utils/CallManager.ts new file mode 100644 index 000000000..06ca19ada --- /dev/null +++ b/ts/session/utils/CallManager.ts @@ -0,0 +1,300 @@ +import _ from 'lodash'; +import { SignalService } from '../../protobuf'; +import { CallMessage } from '../messages/outgoing/controlMessage/CallMessage'; +import { ed25519Str } from '../onions/onionPath'; +import { getMessageQueue } from '../sending'; +import { PubKey } from '../types'; + +const incomingCall = ({ sender }: { sender: string }) => { + return { type: 'incomingCall', payload: sender }; +}; +const endCall = ({ sender }: { sender: string }) => { + return { type: 'endCall', payload: sender }; +}; +const answerCall = ({ sender, sdps }: { sender: string; sdps: Array }) => { + return { + type: 'answerCall', + payload: { + sender, + sdps, + }, + }; +}; + +/** + * This field stores all the details received by a sender about a call in separate messages. + */ +const callCache = new Map>(); + +let peerConnection: RTCPeerConnection | null; + +const ENABLE_VIDEO = false; + +const configuration = { + configuration: { + offerToReceiveAudio: true, + offerToReceiveVideo: ENABLE_VIDEO, + }, + iceServers: [ + { urls: 'stun:stun.l.google.com:19302' }, + { urls: 'stun:stun1.l.google.com:19302' }, + { urls: 'stun:stun2.l.google.com:19302' }, + { urls: 'stun:stun3.l.google.com:19302' }, + { urls: 'stun:stun4.l.google.com:19302' }, + ], +}; + +export async function USER_callRecipient(recipient: string) { + window?.log?.info(`starting call with ${ed25519Str(recipient)}..`); + + if (peerConnection) { + window.log.info('closing existing peerconnection'); + peerConnection.close(); + peerConnection = null; + } + peerConnection = new RTCPeerConnection(configuration); + + const mediadevices = await openMediaDevices(); + mediadevices.getTracks().map(track => { + window.log.info('USER_callRecipient adding track: ', track); + peerConnection?.addTrack(track); + }); + peerConnection.addEventListener('connectionstatechange', _event => { + window.log.info('peerConnection?.connectionState:', peerConnection?.connectionState); + if (peerConnection?.connectionState === 'connected') { + // Peers connected! + } + }); + peerConnection.addEventListener('icecandidate', event => { + // window.log.warn('event.candidate', event.candidate); + + if (event.candidate) { + iceCandidates.push(event.candidate); + void iceSenderDebouncer(recipient); + } + }); + const offerDescription = await peerConnection.createOffer({ + offerToReceiveAudio: true, + offerToReceiveVideo: ENABLE_VIDEO, + }); + + if (!offerDescription || !offerDescription.sdp || !offerDescription.sdp.length) { + window.log.warn(`failed to createOffer for recipient ${ed25519Str(recipient)}`); + return; + } + await peerConnection.setLocalDescription(offerDescription); + const callOfferMessage = new CallMessage({ + timestamp: Date.now(), + type: SignalService.CallMessage.Type.OFFER, + sdps: [offerDescription.sdp], + }); + window.log.info('sending OFFER MESSAGE'); + await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callOfferMessage); + // FIXME audric dispatch UI update to show the calling UI +} + +const iceCandidates: Array = new Array(); +const iceSenderDebouncer = _.debounce(async (recipient: string) => { + if (!iceCandidates) { + return; + } + const validCandidates = _.compact( + iceCandidates.map(c => { + if ( + c.sdpMLineIndex !== null && + c.sdpMLineIndex !== undefined && + c.sdpMid !== null && + c.candidate + ) { + return { + sdpMLineIndex: c.sdpMLineIndex, + sdpMid: c.sdpMid, + candidate: c.candidate, + }; + } + return null; + }) + ); + const callIceCandicates = new CallMessage({ + timestamp: Date.now(), + type: SignalService.CallMessage.Type.ICE_CANDIDATES, + sdpMLineIndexes: validCandidates.map(c => c.sdpMLineIndex), + sdpMids: validCandidates.map(c => c.sdpMid), + sdps: validCandidates.map(c => c.candidate), + }); + window.log.info('sending ICE CANDIDATES MESSAGE to ', recipient); + + await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(recipient), callIceCandicates); +}, 2000); + +const openMediaDevices = async () => { + return navigator.mediaDevices.getUserMedia({ + // video: { + // width: 320, + // height: 240, + // }, + video: ENABLE_VIDEO, + audio: true, + }); +}; + +export async function USER_acceptIncomingCallRequest(fromSender: string) { + const msgCacheFromSender = callCache.get(fromSender); + if (!msgCacheFromSender) { + window?.log?.info( + 'incoming call request cannot be accepted as the corresponding message is not found' + ); + return; + } + const lastOfferMessage = _.findLast( + msgCacheFromSender, + m => m.type === SignalService.CallMessage.Type.OFFER + ); + + if (!lastOfferMessage) { + window?.log?.info( + 'incoming call request cannot be accepted as the corresponding message is not found' + ); + return; + } + + if (peerConnection) { + window.log.info('closing existing peerconnection'); + peerConnection.close(); + peerConnection = null; + } + peerConnection = new RTCPeerConnection(configuration); + const mediadevices = await openMediaDevices(); + mediadevices.getTracks().map(track => { + window.log.info('USER_acceptIncomingCallRequest adding track ', track); + peerConnection?.addTrack(track); + }); + peerConnection.addEventListener('connectionstatechange', _event => { + window.log.info('peerConnection?.connectionState:', peerConnection?.connectionState); + if (peerConnection?.connectionState === 'connected') { + // Peers connected! + } + }); + + const { sdps } = lastOfferMessage; + if (!sdps || sdps.length === 0) { + window?.log?.info( + 'incoming call request cannot be accepted as the corresponding sdps is empty' + ); + return; + } + await peerConnection.setRemoteDescription( + new RTCSessionDescription({ sdp: sdps[0], type: 'offer' }) + ); + const answer = await peerConnection.createAnswer({ + offerToReceiveAudio: true, + offerToReceiveVideo: ENABLE_VIDEO, + }); + if (!answer?.sdp || answer.sdp.length === 0) { + window.log.warn('failed to create answer'); + return; + } + await peerConnection.setLocalDescription(answer); + const answerSdp = answer.sdp; + const callAnswerMessage = new CallMessage({ + timestamp: Date.now(), + type: SignalService.CallMessage.Type.ANSWER, + sdps: [answerSdp], + }); + window.log.info('sending ANSWER MESSAGE'); + + await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), callAnswerMessage); + + window.inboxStore?.dispatch(answerCall({ sender: fromSender, sdps })); +} + +export async function USER_rejectIncomingCallRequest(fromSender: string) { + const endCallMessage = new CallMessage({ + type: SignalService.CallMessage.Type.END_CALL, + timestamp: Date.now(), + }); + callCache.delete(fromSender); + + window.inboxStore?.dispatch(endCall({ sender: fromSender })); + window.log.info('sending END_CALL MESSAGE'); + + await getMessageQueue().sendToPubKeyNonDurably(PubKey.cast(fromSender), endCallMessage); +} + +export function handleEndCallMessage(sender: string) { + callCache.delete(sender); + // + // FIXME audric trigger UI cleanup + window.inboxStore?.dispatch(endCall({ sender })); +} + +export async function handleOfferCallMessage( + sender: string, + callMessage: SignalService.CallMessage +) { + if (!callCache.has(sender)) { + callCache.set(sender, new Array()); + } + callCache.get(sender)?.push(callMessage); + window.inboxStore?.dispatch(incomingCall({ sender })); + //FIXME audric. thiis should not be auto accepted here + await USER_acceptIncomingCallRequest(sender); +} + +export async function handleCallAnsweredMessage( + sender: string, + callMessage: SignalService.CallMessage +) { + if (!callMessage.sdps || callMessage.sdps.length === 0) { + window.log.warn('cannot handle answered message without sdps'); + return; + } + if (!callCache.has(sender)) { + callCache.set(sender, new Array()); + } + + callCache.get(sender)?.push(callMessage); + window.inboxStore?.dispatch(incomingCall({ sender })); + const remoteDesc = new RTCSessionDescription({ type: 'answer', sdp: callMessage.sdps[0] }); + if (peerConnection) { + await peerConnection.setRemoteDescription(remoteDesc); + } else { + window.log.info('call answered by recipient but we do not have a peerconnection set'); + } +} + +export async function handleIceCandidatesMessage( + sender: string, + callMessage: SignalService.CallMessage +) { + if (!callMessage.sdps || callMessage.sdps.length === 0) { + window.log.warn('cannot handle iceCandicates message without candidates'); + return; + } + if (!callCache.has(sender)) { + callCache.set(sender, new Array()); + } + + callCache.get(sender)?.push(callMessage); + window.inboxStore?.dispatch(incomingCall({ sender })); + if (peerConnection) { + // tslint:disable-next-line: prefer-for-of + for (let index = 0; index < callMessage.sdps.length; index++) { + const sdp = callMessage.sdps[index]; + const sdpMLineIndex = callMessage.sdpMLineIndexes[index]; + const sdpMid = callMessage.sdpMids[index]; + const candicate = new RTCIceCandidate({ sdpMid, sdpMLineIndex, candidate: sdp }); + await peerConnection.addIceCandidate(candicate); + } + } else { + window.log.info('handleIceCandidatesMessage but we do not have a peerconnection set'); + } +} + +// tslint:disable-next-line: no-async-without-await +export async function handleOtherCallMessage( + sender: string, + callMessage: SignalService.CallMessage +) { + callCache.get(sender)?.push(callMessage); +} diff --git a/ts/session/utils/index.ts b/ts/session/utils/index.ts index fa626cbd8..2329b8836 100644 --- a/ts/session/utils/index.ts +++ b/ts/session/utils/index.ts @@ -9,6 +9,7 @@ import * as UserUtils from './User'; import * as SyncUtils from './syncUtils'; import * as AttachmentsV2Utils from './AttachmentsV2'; import * as AttachmentDownloads from './AttachmentsDownload'; +import * as CallManager from './CallManager'; export * from './Attachments'; export * from './TypedEmitter'; @@ -26,4 +27,5 @@ export { SyncUtils, AttachmentsV2Utils, AttachmentDownloads, + CallManager, }; diff --git a/ts/window.d.ts b/ts/window.d.ts index 61d1a538f..ff6c7ba51 100644 --- a/ts/window.d.ts +++ b/ts/window.d.ts @@ -48,6 +48,7 @@ declare global { padOutgoingAttachments: boolean; enablePinConversations: boolean; useUnsendRequests: boolean; + useCallMessage: boolean; }; lokiSnodeAPI: LokiSnodeAPI; onLogin: any;