serverRequest/sendToProxy refactor, start messagesPollLock implementation, improve logging

pull/988/head
Ryan Tharp 5 years ago
parent 2d76b1eda9
commit 07ce97aa56

@ -14,6 +14,13 @@ const PUBLICCHAT_DELETION_POLL_EVERY = 5 * 1000; // 5s
const PUBLICCHAT_MOD_POLL_EVERY = 30 * 1000; // 30s
const PUBLICCHAT_MIN_TIME_BETWEEN_DUPLICATE_MESSAGES = 10 * 1000; // 10s
const FILESERVER_HOSTS = [
'file-dev.lokinet.org',
'file.lokinet.org',
'file-dev.getsession.org',
'file.getsession.org',
];
const HOMESERVER_USER_ANNOTATION_TYPE = 'network.loki.messenger.homeserver';
const AVATAR_USER_ANNOTATION_TYPE = 'network.loki.messenger.avatar';
const SETTINGS_CHANNEL_ANNOTATION_TYPE = 'net.patter-app.settings';
@ -25,6 +32,290 @@ const snodeHttpsAgent = new https.Agent({
rejectUnauthorized: false,
});
const sendToProxy = async (
srvPubKey,
endpoint,
pFetchOptions,
options = {}
) => {
if (!srvPubKey) {
log.error(
'loki_app_dot_net: sendToProxy called without a server public key'
);
return {};
}
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
srvPubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = '0'
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
const randomPoolRemainingCount = lokiSnodeAPI.markRandomNodeUnreachable(
randSnode
);
log.warn(
`loki_app_dot_net: Marking random snode bad, internet address ${
randSnode.ip
}:${
randSnode.port
}. ${randomPoolRemainingCount} snodes remaining in randomPool`
);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return sendToProxy(srvPubKey, endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint,
'on',
url
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`loki_app_dot_net: sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint,
'on',
url
);
}
} else {
log.warn(
'loki_app_dot_net: file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
};
const serverRequest = async (endpoint, options = {}) => {
const {
params = {},
method,
rawBody,
objBody,
token,
srvPubKey,
forceFreshToken = false,
} = options;
const url = new URL(endpoint);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (token) {
headers.Authorization = `Bearer ${token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.host.match(/\.loki$/i)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
}
let response;
let result;
let txtResponse;
let mode = 'nodeFetch';
try {
const host = url.host.toLowerCase();
// log.info('host', host, FILESERVER_HOSTS);
if (
window.lokiFeatureFlags.useSnodeProxy &&
FILESERVER_HOSTS.includes(host)
) {
mode = 'sendToProxy';
// strip trailing slash
const endpointWithQS = (
url.pathname + (url.search ? '?' + url.search : '')
).replace(/^\//, '');
// log.info('endpointWithQS', endpointWithQS)
({ response, txtResponse, result } = await sendToProxy(
srvPubKey,
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
process.env.NODE_TLS_REJECT_UNAUTHORIZED = url.host.match(/\.loki$/i)
? '0'
: '1';
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
txtResponse = await result.text();
// cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// copy options because lint complains if we modify this directly
const updatedOptions = options;
// force it this time
updatedOptions.forceFreshToken = true;
// retry with updated options
return this.serverRequest(endpoint, updatedOptions);
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
};
// the core ADN class that handles all communication with a specific server
class LokiAppDotNetServerAPI {
constructor(ourKey, url) {
@ -394,276 +685,19 @@ class LokiAppDotNetServerAPI {
if (urlStr.match(/\.loki\//)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
}
const result = await nodeFetch(urlObj, fetchOptions, options);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
const result = nodeFetch(urlObj, fetchOptions, options);
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '1';
return result;
}
async _sendToProxy(endpoint, pFetchOptions, options = {}) {
const randSnode = await lokiSnodeAPI.getRandomSnodeAddress();
const url = `https://${randSnode.ip}:${randSnode.port}/file_proxy`;
const fetchOptions = pFetchOptions; // make lint happy
// safety issue with file server, just safer to have this
if (fetchOptions.headers === undefined) {
fetchOptions.headers = {};
}
const payloadObj = {
body: fetchOptions.body, // might need to b64 if binary...
endpoint,
method: fetchOptions.method,
headers: fetchOptions.headers,
};
// from https://github.com/sindresorhus/is-stream/blob/master/index.js
if (
payloadObj.body &&
typeof payloadObj.body === 'object' &&
typeof payloadObj.body.pipe === 'function'
) {
log.info('detected body is a stream');
const fData = payloadObj.body.getBuffer();
const fHeaders = payloadObj.body.getHeaders();
// update headers for boundary
payloadObj.headers = { ...payloadObj.headers, ...fHeaders };
// update body with base64 chunk
payloadObj.body = {
fileUpload: fData.toString('base64'),
};
}
// convert our payload to binary buffer
const payloadData = Buffer.from(
dcodeIO.ByteBuffer.wrap(JSON.stringify(payloadObj)).toArrayBuffer()
);
payloadObj.body = false; // free memory
// make temporary key for this request/response
const ephemeralKey = libsignal.Curve.generateKeyPair();
// mix server pub key with our priv key
const symKey = libsignal.Curve.calculateAgreement(
this.pubKey, // server's pubkey
ephemeralKey.privKey // our privkey
);
const ivAndCiphertext = await libloki.crypto.DHEncrypt(symKey, payloadData);
// convert final buffer to base64
const cipherText64 = dcodeIO.ByteBuffer.wrap(ivAndCiphertext).toString(
'base64'
);
const ephemeralPubKey64 = dcodeIO.ByteBuffer.wrap(
ephemeralKey.pubKey
).toString('base64');
const finalRequestHeader = {
'X-Loki-File-Server-Ephemeral-Key': ephemeralPubKey64,
};
const firstHopOptions = {
method: 'POST',
// not sure why I can't use anything but json...
// text/plain would be preferred...
body: JSON.stringify({ cipherText64 }),
headers: {
'Content-Type': 'application/json',
'X-Loki-File-Server-Target': '/loki/v1/secure_rpc',
'X-Loki-File-Server-Verb': 'POST',
'X-Loki-File-Server-Headers': JSON.stringify(finalRequestHeader),
},
// we are talking to a snode...
agent: snodeHttpsAgent,
};
// weird this doesn't need NODE_TLS_REJECT_UNAUTHORIZED = 0
const result = await nodeFetch(url, firstHopOptions);
const txtResponse = await result.text();
if (txtResponse.match(/^Service node is not ready: not in any swarm/i)) {
// mark snode bad
log.warn(
`Marking random snode bad, internet address ${randSnode.ip}:${
randSnode.port
}`
);
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry (hopefully with new snode)
// FIXME: max number of retries...
return this._sendToProxy(endpoint, fetchOptions);
}
let response = {};
try {
response = JSON.parse(txtResponse);
} catch (e) {
log.warn(
`_sendToProxy Could not parse outer JSON [${txtResponse}]`,
endpoint
);
}
if (response.meta && response.meta.code === 200) {
// convert base64 in response to binary
const ivAndCiphertextResponse = dcodeIO.ByteBuffer.wrap(
response.data,
'base64'
).toArrayBuffer();
const decrypted = await libloki.crypto.DHDecrypt(
symKey,
ivAndCiphertextResponse
);
const textDecoder = new TextDecoder();
const respStr = textDecoder.decode(decrypted);
// replace response
try {
response = options.textResponse ? respStr : JSON.parse(respStr);
} catch (e) {
log.warn(
`_sendToProxy Could not parse inner JSON [${respStr}]`,
endpoint
);
}
} else {
log.warn(
'file server secure_rpc gave an non-200 response: ',
response,
` txtResponse[${txtResponse}]`,
endpoint
);
}
return { result, txtResponse, response };
}
// make a request to the server
async serverRequest(endpoint, options = {}) {
const {
params = {},
method,
rawBody,
objBody,
forceFreshToken = false,
} = options;
const url = new URL(`${this.baseServerUrl}/${endpoint}`);
if (params) {
url.search = new URLSearchParams(params);
}
const fetchOptions = {};
const headers = {};
try {
if (forceFreshToken) {
await this.getOrRefreshServerToken(true);
}
if (this.token) {
headers.Authorization = `Bearer ${this.token}`;
}
if (method) {
fetchOptions.method = method;
}
if (objBody) {
headers['Content-Type'] = 'application/json';
fetchOptions.body = JSON.stringify(objBody);
} else if (rawBody) {
fetchOptions.body = rawBody;
}
fetchOptions.headers = headers;
// domain ends in .loki
if (url.toString().match(/\.loki\//)) {
fetchOptions.agent = snodeHttpsAgent;
}
} catch (e) {
log.info('serverRequest set up error:', e.code, e.message);
return {
err: e,
};
}
let response;
let result;
let txtResponse;
let mode = 'nodeFetch';
try {
if (
window.lokiFeatureFlags.useSnodeProxy &&
(this.baseServerUrl === 'https://file-dev.lokinet.org' ||
this.baseServerUrl === 'https://file.lokinet.org' ||
this.baseServerUrl === 'https://file-dev.getsession.org' ||
this.baseServerUrl === 'https://file.getsession.org')
) {
mode = '_sendToProxy';
const endpointWithQS = url
.toString()
.replace(`${this.baseServerUrl}/`, '');
({ response, txtResponse, result } = await this._sendToProxy(
endpointWithQS,
fetchOptions,
options
));
} else {
// disable check for .loki
if (url.toString().match(/\.loki/)) {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
}
result = await nodeFetch(url, fetchOptions);
// always make sure this check is enabled
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 1;
txtResponse = await result.text();
// hrm cloudflare timeouts (504s) will be html...
response = options.textResponse ? txtResponse : JSON.parse(txtResponse);
}
} catch (e) {
if (txtResponse) {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
`json: ${txtResponse}`,
'attempting connection to',
url
);
} else {
log.info(
`serverRequest ${mode} error`,
e.code,
e.message,
'attempting connection to',
url
);
}
if (mode === '_sendToProxy') {
// if we can detect, certain types of failures, we can retry...
if (e.code === 'ECONNRESET') {
// retry with counter?
}
}
return {
err: e,
};
options.token = this.token;
options.srvPubKey = this.pubKey;
if (options.forceFreshToken) {
await this.getOrRefreshServerToken(true);
}
// if it's a response style with a meta
if (result.status !== 200) {
if (!forceFreshToken && (!response.meta || response.meta.code === 401)) {
// copy options because lint complains if we modify this directly
const updatedOptions = options;
// force it this time
updatedOptions.forceFreshToken = true;
// retry with updated options
return this.serverRequest(endpoint, updatedOptions);
}
return {
err: 'statusCode',
statusCode: result.status,
response,
};
}
return {
statusCode: result.status,
response,
};
return serverRequest(`${this.baseServerUrl}/${endpoint}`, options);
}
async getUserAnnotations(pubKey) {
@ -946,6 +980,8 @@ class LokiPublicChannelAPI {
this.deleteLastId = 1;
this.timers = {};
this.myPrivateKey = false;
this.messagesPollLock = false;
// can escalated to SQL if it start uses too much memory
this.logMop = {};
@ -1322,7 +1358,6 @@ class LokiPublicChannelAPI {
Conversation: Whisper.Conversation,
}
);
await this.pollForChannelOnce();
}
// get moderation actions
@ -1523,6 +1558,20 @@ class LokiPublicChannelAPI {
}
async pollOnceForMessages() {
if (this.messagesPollLock) {
// TODO: check if lock is stale
log.warn(
'pollOnceForModerators locked',
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
return;
}
// disable locking system for now as it's not quite perfect yet
// this.messagesPollLock = Date.now();
const params = {
include_annotations: 1,
include_user_annotations: 1, // to get the home server
@ -1551,6 +1600,7 @@ class LokiPublicChannelAPI {
if (res.err) {
log.error('pollOnceForMessages receive error', res.err);
}
this.messagesPollLock = false;
return;
}
@ -1706,6 +1756,7 @@ class LokiPublicChannelAPI {
// do we really need this?
if (!pendingMessages.length) {
this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
return;
}
@ -1755,7 +1806,14 @@ class LokiPublicChannelAPI {
);
sendNow.forEach(message => {
// send them out now
log.info('emitting primary message', message.serverId);
log.info(
'emitting primary message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', {
message,
});
@ -1832,7 +1890,14 @@ class LokiPublicChannelAPI {
return;
}
}
log.info('emitting pending message', message.serverId);
log.info(
'emitting pending message',
message.serverId,
'on',
this.channelId,
'at',
this.serverAPI.baseServerUrl
);
this.chatAPI.emit('publicMessage', {
message,
});
@ -1854,6 +1919,7 @@ class LokiPublicChannelAPI {
// finally update our position
this.conversation.setLastRetrievedMessage(this.lastGot);
this.messagesPollLock = false;
}
static getPreviewFromAnnotation(annotation) {

Loading…
Cancel
Save