From 07ce97aa560e803d761506c5002186d8cb1e3fa9 Mon Sep 17 00:00:00 2001 From: Ryan Tharp Date: Wed, 18 Mar 2020 17:40:48 -0700 Subject: [PATCH] serverRequest/sendToProxy refactor, start messagesPollLock implementation, improve logging --- js/modules/loki_app_dot_net_api.js | 600 ++++++++++++++++------------- 1 file changed, 333 insertions(+), 267 deletions(-) diff --git a/js/modules/loki_app_dot_net_api.js b/js/modules/loki_app_dot_net_api.js index 70ff7e8a1..d26ec16b3 100644 --- a/js/modules/loki_app_dot_net_api.js +++ b/js/modules/loki_app_dot_net_api.js @@ -14,6 +14,13 @@ const PUBLICCHAT_DELETION_POLL_EVERY = 5 * 1000; // 5s const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s +const FILESERVER_HOSTS = [ + 'file-dev.lokinet.org', + 'file.lokinet.org', + 'file-dev.getsession.org', + 'file.getsession.org', +]; + const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver'; const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar'; const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings'; @@ -25,6 +32,290 @@ const snodeHttpsAgent = new https.Agent({ rejectUnauthorized: false, }); +const sendToProxy = async ( + srvPubKey, + endpoint, + pFetchOptions, + options = {} +) => { + if (!srvPubKey) { + log.error( + 'loki_app_dot_net: sendToProxy called without a server public key' + ); + return {}; + } + const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); + const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`; + + const fetchOptions = pFetchOptions; // make lint happy + // safety issue with file server, just safer to have this + if (fetchOptions.headers === undefined) { + fetchOptions.headers = {}; + } + + const payloadObj = { + body: fetchOptions.body, // might need to b64 if binary... + endpoint, + method: fetchOptions.method, + headers: fetchOptions.headers, + }; + + // from https://github.com/sindresorhus/is-stream/blob/master/index.js + if ( + payloadObj.body && + typeof payloadObj.body === 'object' && + typeof payloadObj.body.pipe === 'function' + ) { + const fData = payloadObj.body.getBuffer(); + const fHeaders = payloadObj.body.getHeaders(); + // update headers for boundary + payloadObj.headers = { ...payloadObj.headers, ...fHeaders }; + // update body with base64 chunk + payloadObj.body = { + fileUpload: fData.toString('base64'), + }; + } + + // convert our payload to binary buffer + const payloadData = Buffer.from( + dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer() + ); + payloadObj.body = false; // free memory + + // make temporary key for this request/response + const ephemeralKey = libsignal.Curve.generateKeyPair(); + + // mix server pub key with our priv key + const symKey = libsignal.Curve.calculateAgreement( + srvPubKey, // server's pubkey + ephemeralKey.privKey // our privkey + ); + + const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData); + + // convert final buffer to base64 + const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString( + 'base64' + ); + + const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap( + ephemeralKey.pubKey + ).toString('base64'); + + const finalRequestHeader = { + 'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64, + }; + + const firstHopOptions = { + method: 'POST', + // not sure why I can't use anything but json... + // text/plain would be preferred... + body: JSON.stringify({ cipherText64 }), + headers: { + 'Content-Type': 'application/json', + 'X-Loki-File-Server-Target': '/loki/v1/secure_rpc', + 'X-Loki-File-Server-Verb': 'POST', + 'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader), + }, + // we are talking to a snode... + agent: snodeHttpsAgent, + }; + // weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = '0' + const result = await nodeFetch(url, firstHopOptions); + + const txtResponse = await result.text(); + if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) { + // mark snode bad + const randomPoolRemainingCount = lokiSnodeAPI.markRandomNodeUnreachable( + randSnode + ); + log.warn( + `loki_app_dot_net: Marking random snode bad, internet address ${ + randSnode.ip + }:${ + randSnode.port + }. ${randomPoolRemainingCount} snodes remaining in randomPool` + ); + // retry (hopefully with new snode) + // FIXME: max number of retries... + return sendToProxy(srvPubKey, endpoint, fetchOptions); + } + + let response = {}; + try { + response = JSON.parse(txtResponse); + } catch (e) { + log.warn( + `loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`, + endpoint, + 'on', + url + ); + } + + if (response.meta && response.meta.code === 200) { + // convert base64 in response to binary + const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap( + response.data, + 'base64' + ).toArrayBuffer(); + const decrypted = await libloki.crypto.DHDecrypt( + symKey, + ivAndCiphertextResponse + ); + const textDecoder = new TextDecoder(); + const respStr = textDecoder.decode(decrypted); + // replace response + try { + response = options.textResponse ? respStr : JSON.parse(respStr); + } catch (e) { + log.warn( + `loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`, + endpoint, + 'on', + url + ); + } + } else { + log.warn( + 'loki_app_dot_net: file server secure_rpc gave an non-200 response: ', + response, + ` txtResponse[${txtResponse}]`, + endpoint + ); + } + return { result, txtResponse, response }; +}; + +const serverRequest = async (endpoint, options = {}) => { + const { + params = {}, + method, + rawBody, + objBody, + token, + srvPubKey, + forceFreshToken = false, + } = options; + + const url = new URL(endpoint); + if (params) { + url.search = new URLSearchParams(params); + } + const fetchOptions = {}; + const headers = {}; + try { + if (token) { + headers.Authorization = `Bearer ${token}`; + } + if (method) { + fetchOptions.method = method; + } + if (objBody) { + headers['Content-Type'] = 'application/json'; + fetchOptions.body = JSON.stringify(objBody); + } else if (rawBody) { + fetchOptions.body = rawBody; + } + fetchOptions.headers = headers; + + // domain ends in .loki + if (url.host.match(/\.loki$/i)) { + fetchOptions.agent = snodeHttpsAgent; + } + } catch (e) { + log.info('serverRequest set up error:', e.code, e.message); + return { + err: e, + }; + } + + let response; + let result; + let txtResponse; + let mode = 'nodeFetch'; + try { + const host = url.host.toLowerCase(); + // log.info('host', host, FILESERVER_HOSTS); + if ( + window.lokiFeatureFlags.useSnodeProxy && + FILESERVER_HOSTS.includes(host) + ) { + mode = 'sendToProxy'; + // strip trailing slash + const endpointWithQS = ( + url.pathname + (url.search ? '?' + url.search : '') + ).replace(/^\//, ''); + // log.info('endpointWithQS', endpointWithQS) + ({ response, txtResponse, result } = await sendToProxy( + srvPubKey, + endpointWithQS, + fetchOptions, + options + )); + } else { + // disable check for .loki + process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i) + ? '0' + : '1'; + result = await nodeFetch(url, fetchOptions); + // always make sure this check is enabled + process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; + txtResponse = await result.text(); + // cloudflare timeouts (504s) will be html... + response = options.textResponse ? txtResponse : JSON.parse(txtResponse); + } + } catch (e) { + if (txtResponse) { + log.info( + `serverRequest ${mode} error`, + e.code, + e.message, + `json: ${txtResponse}`, + 'attempting connection to', + url + ); + } else { + log.info( + `serverRequest ${mode} error`, + e.code, + e.message, + 'attempting connection to', + url + ); + } + if (mode === '_sendToProxy') { + // if we can detect, certain types of failures, we can retry... + if (e.code === 'ECONNRESET') { + // retry with counter? + } + } + return { + err: e, + }; + } + // if it's a response style with a meta + if (result.status !== 200) { + if (!forceFreshToken && (!response.meta || response.meta.code === 401)) { + // copy options because lint complains if we modify this directly + const updatedOptions = options; + // force it this time + updatedOptions.forceFreshToken = true; + // retry with updated options + return this.serverRequest(endpoint, updatedOptions); + } + return { + err: 'statusCode', + statusCode: result.status, + response, + }; + } + return { + statusCode: result.status, + response, + }; +}; + // the core ADN class that handles all communication with a specific server class LokiAppDotNetServerAPI { constructor(ourKey, url) { @@ -394,276 +685,19 @@ class LokiAppDotNetServerAPI { if (urlStr.match(/\.loki\//)) { process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; } - const result = await nodeFetch(urlObj, fetchOptions, options); - process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; + const result = nodeFetch(urlObj, fetchOptions, options); + process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1'; return result; } - async _sendToProxy(endpoint, pFetchOptions, options = {}) { - const randSnode = await lokiSnodeAPI.getRandomSnodeAddress(); - const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`; - - const fetchOptions = pFetchOptions; // make lint happy - // safety issue with file server, just safer to have this - if (fetchOptions.headers === undefined) { - fetchOptions.headers = {}; - } - - const payloadObj = { - body: fetchOptions.body, // might need to b64 if binary... - endpoint, - method: fetchOptions.method, - headers: fetchOptions.headers, - }; - - // from https://github.com/sindresorhus/is-stream/blob/master/index.js - if ( - payloadObj.body && - typeof payloadObj.body === 'object' && - typeof payloadObj.body.pipe === 'function' - ) { - log.info('detected body is a stream'); - const fData = payloadObj.body.getBuffer(); - const fHeaders = payloadObj.body.getHeaders(); - // update headers for boundary - payloadObj.headers = { ...payloadObj.headers, ...fHeaders }; - // update body with base64 chunk - payloadObj.body = { - fileUpload: fData.toString('base64'), - }; - } - - // convert our payload to binary buffer - const payloadData = Buffer.from( - dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer() - ); - payloadObj.body = false; // free memory - - // make temporary key for this request/response - const ephemeralKey = libsignal.Curve.generateKeyPair(); - - // mix server pub key with our priv key - const symKey = libsignal.Curve.calculateAgreement( - this.pubKey, // server's pubkey - ephemeralKey.privKey // our privkey - ); - - const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData); - - // convert final buffer to base64 - const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString( - 'base64' - ); - - const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap( - ephemeralKey.pubKey - ).toString('base64'); - - const finalRequestHeader = { - 'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64, - }; - - const firstHopOptions = { - method: 'POST', - // not sure why I can't use anything but json... - // text/plain would be preferred... - body: JSON.stringify({ cipherText64 }), - headers: { - 'Content-Type': 'application/json', - 'X-Loki-File-Server-Target': '/loki/v1/secure_rpc', - 'X-Loki-File-Server-Verb': 'POST', - 'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader), - }, - // we are talking to a snode... - agent: snodeHttpsAgent, - }; - // weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = 0 - const result = await nodeFetch(url, firstHopOptions); - - const txtResponse = await result.text(); - if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) { - // mark snode bad - log.warn( - `Marking random snode bad, internet address ${randSnode.ip}:${ - randSnode.port - }` - ); - lokiSnodeAPI.markRandomNodeUnreachable(randSnode); - // retry (hopefully with new snode) - // FIXME: max number of retries... - return this._sendToProxy(endpoint, fetchOptions); - } - - let response = {}; - try { - response = JSON.parse(txtResponse); - } catch (e) { - log.warn( - `_sendToProxy Could not parse outer JSON [${txtResponse}]`, - endpoint - ); - } - - if (response.meta && response.meta.code === 200) { - // convert base64 in response to binary - const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap( - response.data, - 'base64' - ).toArrayBuffer(); - const decrypted = await libloki.crypto.DHDecrypt( - symKey, - ivAndCiphertextResponse - ); - const textDecoder = new TextDecoder(); - const respStr = textDecoder.decode(decrypted); - // replace response - try { - response = options.textResponse ? respStr : JSON.parse(respStr); - } catch (e) { - log.warn( - `_sendToProxy Could not parse inner JSON [${respStr}]`, - endpoint - ); - } - } else { - log.warn( - 'file server secure_rpc gave an non-200 response: ', - response, - ` txtResponse[${txtResponse}]`, - endpoint - ); - } - return { result, txtResponse, response }; - } - // make a request to the server async serverRequest(endpoint, options = {}) { - const { - params = {}, - method, - rawBody, - objBody, - forceFreshToken = false, - } = options; - - const url = new URL(`${this.baseServerUrl}/${endpoint}`); - if (params) { - url.search = new URLSearchParams(params); - } - const fetchOptions = {}; - const headers = {}; - try { - if (forceFreshToken) { - await this.getOrRefreshServerToken(true); - } - if (this.token) { - headers.Authorization = `Bearer ${this.token}`; - } - if (method) { - fetchOptions.method = method; - } - if (objBody) { - headers['Content-Type'] = 'application/json'; - fetchOptions.body = JSON.stringify(objBody); - } else if (rawBody) { - fetchOptions.body = rawBody; - } - fetchOptions.headers = headers; - - // domain ends in .loki - if (url.toString().match(/\.loki\//)) { - fetchOptions.agent = snodeHttpsAgent; - } - } catch (e) { - log.info('serverRequest set up error:', e.code, e.message); - return { - err: e, - }; - } - - let response; - let result; - let txtResponse; - let mode = 'nodeFetch'; - try { - if ( - window.lokiFeatureFlags.useSnodeProxy && - (this.baseServerUrl === 'https://file-dev.lokinet.org' || - this.baseServerUrl === 'https://file.lokinet.org' || - this.baseServerUrl === 'https://file-dev.getsession.org' || - this.baseServerUrl === 'https://file.getsession.org') - ) { - mode = '_sendToProxy'; - - const endpointWithQS = url - .toString() - .replace(`${this.baseServerUrl}/`, ''); - ({ response, txtResponse, result } = await this._sendToProxy( - endpointWithQS, - fetchOptions, - options - )); - } else { - // disable check for .loki - if (url.toString().match(/\.loki/)) { - process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; - } - result = await nodeFetch(url, fetchOptions); - // always make sure this check is enabled - process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1; - txtResponse = await result.text(); - // hrm cloudflare timeouts (504s) will be html... - response = options.textResponse ? txtResponse : JSON.parse(txtResponse); - } - } catch (e) { - if (txtResponse) { - log.info( - `serverRequest ${mode} error`, - e.code, - e.message, - `json: ${txtResponse}`, - 'attempting connection to', - url - ); - } else { - log.info( - `serverRequest ${mode} error`, - e.code, - e.message, - 'attempting connection to', - url - ); - } - if (mode === '_sendToProxy') { - // if we can detect, certain types of failures, we can retry... - if (e.code === 'ECONNRESET') { - // retry with counter? - } - } - return { - err: e, - }; + options.token = this.token; + options.srvPubKey = this.pubKey; + if (options.forceFreshToken) { + await this.getOrRefreshServerToken(true); } - // if it's a response style with a meta - if (result.status !== 200) { - if (!forceFreshToken && (!response.meta || response.meta.code === 401)) { - // copy options because lint complains if we modify this directly - const updatedOptions = options; - // force it this time - updatedOptions.forceFreshToken = true; - // retry with updated options - return this.serverRequest(endpoint, updatedOptions); - } - return { - err: 'statusCode', - statusCode: result.status, - response, - }; - } - return { - statusCode: result.status, - response, - }; + return serverRequest(`${this.baseServerUrl}/${endpoint}`, options); } async getUserAnnotations(pubKey) { @@ -946,6 +980,8 @@ class LokiPublicChannelAPI { this.deleteLastId = 1; this.timers = {}; this.myPrivateKey = false; + this.messagesPollLock = false; + // can escalated to SQL if it start uses too much memory this.logMop = {}; @@ -1322,7 +1358,6 @@ class LokiPublicChannelAPI { Conversation: Whisper.Conversation, } ); - await this.pollForChannelOnce(); } // get moderation actions @@ -1523,6 +1558,20 @@ class LokiPublicChannelAPI { } async pollOnceForMessages() { + if (this.messagesPollLock) { + // TODO: check if lock is stale + log.warn( + 'pollOnceForModerators locked', + 'on', + this.channelId, + 'at', + this.serverAPI.baseServerUrl + ); + return; + } + // disable locking system for now as it's not quite perfect yet + // this.messagesPollLock = Date.now(); + const params = { include_annotations: 1, include_user_annotations: 1, // to get the home server @@ -1551,6 +1600,7 @@ class LokiPublicChannelAPI { if (res.err) { log.error('pollOnceForMessages receive error', res.err); } + this.messagesPollLock = false; return; } @@ -1706,6 +1756,7 @@ class LokiPublicChannelAPI { // do we really need this? if (!pendingMessages.length) { this.conversation.setLastRetrievedMessage(this.lastGot); + this.messagesPollLock = false; return; } @@ -1755,7 +1806,14 @@ class LokiPublicChannelAPI { ); sendNow.forEach(message => { // send them out now - log.info('emitting primary message', message.serverId); + log.info( + 'emitting primary message', + message.serverId, + 'on', + this.channelId, + 'at', + this.serverAPI.baseServerUrl + ); this.chatAPI.emit('publicMessage', { message, }); @@ -1832,7 +1890,14 @@ class LokiPublicChannelAPI { return; } } - log.info('emitting pending message', message.serverId); + log.info( + 'emitting pending message', + message.serverId, + 'on', + this.channelId, + 'at', + this.serverAPI.baseServerUrl + ); this.chatAPI.emit('publicMessage', { message, }); @@ -1854,6 +1919,7 @@ class LokiPublicChannelAPI { // finally update our position this.conversation.setLastRetrievedMessage(this.lastGot); + this.messagesPollLock = false; } static getPreviewFromAnnotation(annotation) {