Merge pull request #132 from BeaudanBrown/swarm-requests

Swarm requests
pull/152/head
Mikunj Varsani 6 years ago committed by GitHub
commit 27f95a24bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,3 @@
const fs = require('fs');
const path = require('path'); const path = require('path');
const mkdirp = require('mkdirp'); const mkdirp = require('mkdirp');
const rimraf = require('rimraf'); const rimraf = require('rimraf');
@ -81,6 +80,8 @@ module.exports = {
removeSessionsByNumber, removeSessionsByNumber,
removeAllSessions, removeAllSessions,
getSwarmNodesByPubkey,
getConversationCount, getConversationCount,
saveConversation, saveConversation,
saveConversations, saveConversations,
@ -1025,6 +1026,18 @@ async function removeAllFromTable(table) {
// Conversations // Conversations
async function getSwarmNodesByPubkey(pubkey) {
const row = await db.get('SELECT * FROM conversations WHERE id = $pubkey;', {
$pubkey: pubkey,
});
if (!row) {
return null;
}
return jsonToObject(row.json).swarmNodes;
}
async function getConversationCount() { async function getConversationCount() {
const row = await db.get('SELECT count(*) from conversations;'); const row = await db.get('SELECT count(*) from conversations;');

@ -1,6 +1,8 @@
{ {
"serverUrl": "http://localhost:8080", "serverUrl": "random.snode",
"cdnUrl": "http://localhost", "cdnUrl": "random.snode",
"messageServerPort": "8080",
"swarmServerPort": "8079",
"disableAutoUpdate": false, "disableAutoUpdate": false,
"openDevTools": false, "openDevTools": false,
"buildExpiration": 0, "buildExpiration": 0,

@ -174,6 +174,7 @@
return conversation; return conversation;
} }
window.LokiSnodeAPI.replenishSwarm(id);
try { try {
await window.Signal.Data.saveConversation(conversation.attributes, { await window.Signal.Data.saveConversation(conversation.attributes, {
Conversation: Whisper.Conversation, Conversation: Whisper.Conversation,

@ -86,6 +86,7 @@
friendRequestStatus: FriendRequestStatusEnum.none, friendRequestStatus: FriendRequestStatusEnum.none,
unlockTimestamp: null, // Timestamp used for expiring friend requests. unlockTimestamp: null, // Timestamp used for expiring friend requests.
sessionResetStatus: SessionResetEnum.none, sessionResetStatus: SessionResetEnum.none,
swarmNodes: new Set([]),
}; };
}, },
@ -1198,7 +1199,7 @@
options.messageType = message.get('type'); options.messageType = message.get('type');
// Add the message sending on another queue so that our UI doesn't get blocked // Add the message sending on another queue so that our UI doesn't get blocked
this.queueMessageSend(async () => this.queueMessageSend(async () => {
message.send( message.send(
this.wrapSend( this.wrapSend(
sendFunction( sendFunction(
@ -1213,7 +1214,7 @@
) )
) )
) )
); });
return true; return true;
}); });

@ -108,6 +108,8 @@ module.exports = {
removeSessionsByNumber, removeSessionsByNumber,
removeAllSessions, removeAllSessions,
getSwarmNodesByPubkey,
getConversationCount, getConversationCount,
saveConversation, saveConversation,
saveConversations, saveConversations,
@ -654,12 +656,33 @@ async function removeAllSessions(id) {
// Conversation // Conversation
function setifyProperty(data, propertyName) {
if (!data) return data;
const returnData = data;
if (returnData[propertyName]) {
returnData[propertyName] = new Set(returnData[propertyName]);
}
return returnData;
}
async function getSwarmNodesByPubkey(pubkey) {
let swarmNodes = await channels.getSwarmNodesByPubkey(pubkey);
if (Array.isArray(swarmNodes)) {
swarmNodes = new Set(swarmNodes);
}
return swarmNodes;
}
async function getConversationCount() { async function getConversationCount() {
return channels.getConversationCount(); return channels.getConversationCount();
} }
async function saveConversation(data) { async function saveConversation(data) {
await channels.saveConversation(data); const storeData = data;
if (storeData.swarmNodes) {
storeData.swarmNodes = Array.from(storeData.swarmNodes);
}
await channels.saveConversation(storeData);
} }
async function saveConversations(data) { async function saveConversations(data) {
@ -667,7 +690,8 @@ async function saveConversations(data) {
} }
async function getConversationById(id, { Conversation }) { async function getConversationById(id, { Conversation }) {
const data = await channels.getConversationById(id); const rawData = await channels.getConversationById(id)
const data = setifyProperty(rawData, 'swarmNodes');
return new Conversation(data); return new Conversation(data);
} }
@ -678,6 +702,9 @@ async function updateConversation(id, data, { Conversation }) {
} }
const merged = merge({}, existing.attributes, data); const merged = merge({}, existing.attributes, data);
if (merged.swarmNodes instanceof Set) {
merged.swarmNodes = Array.from(merged.swarmNodes);
}
await channels.updateConversation(merged); await channels.updateConversation(merged);
} }
@ -698,7 +725,8 @@ async function _removeConversations(ids) {
} }
async function getAllConversations({ ConversationCollection }) { async function getAllConversations({ ConversationCollection }) {
const conversations = await channels.getAllConversations(); const conversations = (await channels.getAllConversations())
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -711,7 +739,8 @@ async function getAllConversationIds() {
} }
async function getAllPrivateConversations({ ConversationCollection }) { async function getAllPrivateConversations({ ConversationCollection }) {
const conversations = await channels.getAllPrivateConversations(); const conversations = (await channels.getAllPrivateConversations())
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -719,7 +748,8 @@ async function getAllPrivateConversations({ ConversationCollection }) {
} }
async function getAllGroupsInvolvingId(id, { ConversationCollection }) { async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
const conversations = await channels.getAllGroupsInvolvingId(id); const conversations = (await channels.getAllGroupsInvolvingId(id))
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);
@ -727,7 +757,8 @@ async function getAllGroupsInvolvingId(id, { ConversationCollection }) {
} }
async function searchConversations(query, { ConversationCollection }) { async function searchConversations(query, { ConversationCollection }) {
const conversations = await channels.searchConversations(query); const conversations = (await channels.searchConversations(query))
.map(c => setifyProperty(c, 'swarmNodes'));
const collection = new ConversationCollection(); const collection = new ConversationCollection();
collection.add(conversations); collection.add(conversations);

@ -1,25 +1,29 @@
/* eslint-disable no-await-in-loop */
/* global log, dcodeIO, window, callWorker */ /* global log, dcodeIO, window, callWorker */
const fetch = require('node-fetch'); const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
class LokiServer { // eslint-disable-next-line
const invert = p => new Promise((res, rej) => p.then(rej, res));
const firstOf = ps => invert(Promise.all(ps.map(invert)));
constructor({ urls }) { // Will be raised (to 3?) when we get more nodes
this.nodes = []; const MINIMUM_SUCCESSFUL_REQUESTS = 2;
urls.forEach(url => { class LokiMessageAPI {
if (!is.string(url)) {
throw new Error('WebAPI.initialize: Invalid server url'); constructor({ messageServerPort }) {
} this.messageServerPort = messageServerPort
this.nodes.push({ url }); ? `:${messageServerPort}`
}); : '';
} }
async sendMessage(pubKey, data, messageTimeStamp, ttl) { async sendMessage(pubKey, data, messageTimeStamp, ttl) {
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64'); const swarmNodes = await window.LokiSnodeAPI.getSwarmNodesByPubkey(pubKey)
// Hardcoded to use a single node/server for now if (!swarmNodes || swarmNodes.size === 0) {
const currentNode = this.nodes[0]; throw Error('No swarm nodes to query!');
}
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000); const timestamp = Math.floor(Date.now() / 1000);
// Nonce is returned as a base64 string to include in header // Nonce is returned as a base64 string to include in header
let nonce; let nonce;
@ -32,109 +36,143 @@ class LokiServer {
nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development); nonce = await callWorker('calcPoW', timestamp, ttl, pubKey, data64, development);
} catch (err) { } catch (err) {
// Something went horribly wrong // Something went horribly wrong
// TODO: Handle gracefully
throw err; throw err;
} }
const options = { const requests = Array.from(swarmNodes).map(async node => {
url: `${currentNode.url}/store`, // TODO: Confirm sensible timeout
type: 'POST', const options = {
responseType: undefined, url: `${node}${this.messageServerPort}/store`,
timeout: undefined, type: 'POST',
}; responseType: undefined,
timeout: 5000,
const fetchOptions = { };
method: options.type,
body: data64, const fetchOptions = {
headers: { method: options.type,
'X-Loki-pow-nonce': nonce, body: data64,
'X-Loki-timestamp': timestamp.toString(), headers: {
'X-Loki-ttl': ttl.toString(), 'X-Loki-pow-nonce': nonce,
'X-Loki-recipient': pubKey, 'X-Loki-timestamp': timestamp.toString(),
}, 'X-Loki-ttl': ttl.toString(),
timeout: options.timeout, 'X-Loki-recipient': pubKey,
}; },
timeout: options.timeout,
let response; };
try {
response = await fetch(options.url, fetchOptions); let response;
} catch (e) { try {
log.error(options.type, options.url, 0, 'Error'); response = await fetch(options.url, fetchOptions);
throw HTTPError('fetch error', 0, e.toString()); } catch (e) {
} log.error(options.type, options.url, 0, 'Error sending message');
window.LokiSnodeAPI.unreachableNode(pubKey, node);
throw HTTPError('fetch error', 0, e.toString());
}
let result; let result;
if ( if (
options.responseType === 'json' && options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json' response.headers.get('Content-Type') === 'application/json'
) { ) {
result = await response.json(); result = await response.json();
} else if (options.responseType === 'arraybuffer') { } else if (options.responseType === 'arraybuffer') {
result = await response.buffer(); result = await response.buffer();
} else { } else {
result = await response.text(); result = await response.text();
} }
if (response.status >= 0 && response.status < 400) { if (response.status >= 0 && response.status < 400) {
return result;
}
log.error(options.type, options.url, response.status, 'Error sending message');
throw HTTPError('sendMessage: error response', response.status, result);
});
try {
// TODO: Possibly change this to require more than a single response?
const result = await firstOf(requests);
return result; return result;
} catch(err) {
throw err;
} }
log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('sendMessage: error response', response.status, result);
} }
async retrieveMessages(pubKey) { async retrieveMessages(callback) {
// Hardcoded to use a single node/server for now const ourKey = window.textsecure.storage.user.getNumber();
const currentNode = this.nodes[0]; let completedRequests = 0;
const options = { const doRequest = async (nodeUrl, nodeData) => {
url: `${currentNode.url}/retrieve`, // TODO: Confirm sensible timeout
type: 'GET', const options = {
responseType: 'json', url: `${nodeUrl}${this.messageServerPort}/retrieve`,
timeout: undefined, type: 'GET',
}; responseType: 'json',
timeout: 5000,
const headers = { };
'X-Loki-recipient': pubKey,
}; const headers = {
'X-Loki-recipient': ourKey,
if (currentNode.lastHash) { };
headers['X-Loki-last-hash'] = currentNode.lastHash;
} if (nodeData.lastHash) {
headers['X-Loki-last-hash'] = nodeData.lastHash;
const fetchOptions = { }
method: options.type,
headers,
timeout: options.timeout,
};
let response; const fetchOptions = {
try { method: options.type,
response = await fetch(options.url, fetchOptions); headers,
} catch (e) { timeout: options.timeout,
log.error(options.type, options.url, 0, 'Error'); };
throw HTTPError('fetch error', 0, e.toString()); let response;
} try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
// TODO: Maybe we shouldn't immediately delete?
// And differentiate between different connectivity issues
log.error(options.type, options.url, 0, `Error retrieving messages from ${nodeUrl}`);
window.LokiSnodeAPI.unreachableNode(ourKey, nodeUrl);
return;
}
let result; let result;
if ( if (
options.responseType === 'json' && options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json' response.headers.get('Content-Type') === 'application/json'
) { ) {
result = await response.json(); result = await response.json();
} else if (options.responseType === 'arraybuffer') { } else if (options.responseType === 'arraybuffer') {
result = await response.buffer(); result = await response.buffer();
} else { } else {
result = await response.text(); result = await response.text();
}
completedRequests += 1;
if (response.status === 200) {
if (result.lastHash) {
window.LokiSnodeAPI.updateLastHash(nodeUrl, result.lastHash);
callback(result.messages);
}
return;
}
// Handle error from snode
log.error(options.type, options.url, response.status, 'Error');
} }
if (response.status >= 0 && response.status < 400) { while (completedRequests < MINIMUM_SUCCESSFUL_REQUESTS) {
if (result.lastHash) { const remainingRequests = MINIMUM_SUCCESSFUL_REQUESTS - completedRequests;
currentNode.lastHash = result.lastHash; const ourSwarmNodes = await window.LokiSnodeAPI.getOurSwarmNodes();
if (Object.keys(ourSwarmNodes).length < remainingRequests) {
// This means we don't have enough swarm nodes to meet the minimum threshold
if (completedRequests !== 0) {
// TODO: Decide how to handle some completed requests but not enough
}
} }
return result;
await Promise.all(
Object.entries(ourSwarmNodes)
.splice(0, remainingRequests)
.map(([nodeUrl, lastHash]) => doRequest(nodeUrl, lastHash))
);
} }
log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('retrieveMessages: error response', response.status, result);
} }
} }
@ -153,5 +191,5 @@ function HTTPError(message, providedCode, response, stack) {
} }
module.exports = { module.exports = {
LokiServer, LokiMessageAPI,
}; };

@ -0,0 +1,187 @@
/* global log, window, Whisper */
const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
const dns = require('dns');
// Will be raised (to 3?) when we get more nodes
const MINIMUM_SWARM_NODES = 1;
class LokiSnodeAPI {
constructor({ url, swarmServerPort }) {
if (!is.string(url)) {
throw new Error('WebAPI.initialize: Invalid server url');
}
this.url = url;
this.swarmServerPort = swarmServerPort
? `:${swarmServerPort}`
: '';
this.swarmsPendingReplenish = {};
this.ourSwarmNodes = {};
}
getRandomSnodeAddress() {
/* resolve random snode */
return new Promise((resolve, reject) => {
dns.resolveCname(this.url, (err, address) => {
if(err) {
reject(err);
} else {
resolve(address[0]);
}
});
});
}
async unreachableNode(pubKey, nodeUrl) {
if (pubKey === window.textsecure.storage.user.getNumber()) {
delete this.ourSwarmNodes[nodeUrl];
return;
}
const conversation = window.ConversationController.get(pubKey);
const swarmNodes = conversation.get('swarmNodes');
if (swarmNodes.delete(nodeUrl)) {
conversation.set({ swarmNodes });
await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, {
Conversation: Whisper.Conversation,
});
}
}
updateLastHash(nodeUrl, hash) {
if (!this.ourSwarmNodes[nodeUrl]) {
this.ourSwarmNodes[nodeUrl] = {
lastHash: hash,
}
} else {
this.ourSwarmNodes[nodeUrl].lastHash = hash;
}
}
async getOurSwarmNodes() {
if (
!this.ourSwarmNodes ||
Object.keys(this.ourSwarmNodes).length < MINIMUM_SWARM_NODES
) {
this.ourSwarmNodes = {};
// Try refresh our swarm list once
const ourKey = window.textsecure.storage.user.getNumber();
const nodeAddresses = await window.LokiSnodeAPI.getSwarmNodes(ourKey);
if (!nodeAddresses || nodeAddresses.length === 0) {
throw Error('Could not load our swarm')
}
nodeAddresses.forEach(url => {
this.ourSwarmNodes[url] = {};
});
}
return this.ourSwarmNodes;
}
async getSwarmNodesByPubkey(pubKey) {
const swarmNodes = await window.Signal.Data.getSwarmNodesByPubkey(pubKey);
// TODO: Check if swarm list is below a threshold rather than empty
if (swarmNodes && swarmNodes.size !== 0) {
return swarmNodes;
}
return this.replenishSwarm(pubKey);
}
async replenishSwarm(pubKey) {
const conversation = window.ConversationController.get(pubKey);
if (!(pubKey in this.swarmsPendingReplenish)) {
this.swarmsPendingReplenish[pubKey] = new Promise(async (resolve) => {
let newSwarmNodes
try {
newSwarmNodes = new Set(await this.getSwarmNodes(pubKey));
} catch (e) {
// TODO: Handle these errors sensibly
newSwarmNodes = new Set([]);
}
conversation.set({ swarmNodes: newSwarmNodes });
await window.Signal.Data.updateConversation(conversation.id, conversation.attributes, {
Conversation: Whisper.Conversation,
});
resolve(newSwarmNodes);
});
}
const newSwarmNodes = await this.swarmsPendingReplenish[pubKey];
delete this.swarmsPendingReplenish[pubKey];
return newSwarmNodes;
}
async getSwarmNodes(pubKey) {
// TODO: Hit multiple random nodes and merge lists?
const node = await this.getRandomSnodeAddress();
// TODO: Confirm final API URL and sensible timeout
const options = {
url: `http://${node}${this.swarmServerPort}/json_rpc`,
type: 'POST',
responseType: 'json',
timeout: 5000,
};
const body = {
jsonrpc: '2.0',
id: '0',
method: 'get_swarm_list_for_messenger_pubkey',
params: {
pubkey: pubKey,
},
}
const fetchOptions = {
method: options.type,
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
},
timeout: options.timeout,
};
let response;
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(options.type, options.url, 0, `Error getting swarm nodes for ${pubKey}`);
throw HTTPError('fetch error', 0, e.toString());
}
let result;
if (
options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json'
) {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
}
if (response.status >= 0 && response.status < 400) {
return result.nodes;
}
log.error(options.type, options.url, response.status, `Error getting swarm nodes for ${pubKey}`);
throw HTTPError('sendMessage: error response', response.status, result);
}
}
function HTTPError(message, providedCode, response, stack) {
const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode;
const e = new Error(`${message}; code: ${code}`);
e.name = 'HTTPError';
e.code = code;
if (stack) {
e.stack += `\nOriginal stack:\n${stack}`;
}
if (response) {
e.response = response;
}
return e;
}
module.exports = {
LokiSnodeAPI,
};

@ -4,30 +4,36 @@
(function () { (function () {
window.libloki = window.libloki || {}; window.libloki = window.libloki || {};
function consolidateLists(lists, threshold = 1){ function consolidateLists(lists, threshold, selector = (x) => x){
if (typeof threshold !== 'number') { if (typeof threshold !== 'number') {
throw Error('Provided threshold is not a number'); throw Error('Provided threshold is not a number');
} }
if (typeof selector !== 'function') {
throw Error('Provided selector is not a function');
}
// calculate list size manually since `Set` // calculate list size manually since `Set`
// does not have a `length` attribute // does not have a `length` attribute
let numLists = 0; let numLists = 0;
const occurences = {}; const occurences = {};
const values = {};
lists.forEach(list => { lists.forEach(list => {
numLists += 1; numLists += 1;
list.forEach(item => { list.forEach(item => {
if (!(item in occurences)) { const key = selector(item);
occurences[item] = 1; if (!(key in occurences)) {
occurences[key] = 1;
values[key] = item;
} else { } else {
occurences[item] += 1; occurences[key] += 1;
} }
}); });
}); });
const scaledThreshold = numLists * threshold; const scaledThreshold = numLists * threshold;
return Object.entries(occurences) return Object.keys(occurences)
.filter(keyValue => keyValue[1] >= scaledThreshold) .filter(key => occurences[key] >= scaledThreshold)
.map(keyValue => keyValue[0]); .map(key => values[key]);
} }
window.libloki.serviceNodes = { window.libloki.serviceNodes = {

@ -17,13 +17,22 @@ describe('ServiceNodes', () => {
); );
}); });
it('should throw when provided a non-function selector', () => {
[1, 'a', 0xffffffff, { really: 'not a function' }].forEach(x => {
assert.throws(() =>
libloki.serviceNodes.consolidateLists([], 1, x),
'Provided selector is not a function'
)
});
});
it('should return an empty array when the input is an empty array', () => { it('should return an empty array when the input is an empty array', () => {
const result = libloki.serviceNodes.consolidateLists([]); const result = libloki.serviceNodes.consolidateLists([], 1);
assert.deepEqual(result, []); assert.deepEqual(result, []);
}); });
it('should return the input when only 1 list is provided', () => { it('should return the input when only 1 list is provided', () => {
const result = libloki.serviceNodes.consolidateLists([['a', 'b', 'c']]); const result = libloki.serviceNodes.consolidateLists([['a', 'b', 'c']], 1);
assert.deepEqual(result, ['a', 'b', 'c']); assert.deepEqual(result, ['a', 'b', 'c']);
}); });
@ -36,6 +45,25 @@ describe('ServiceNodes', () => {
assert.deepEqual(result.sort(), ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']); assert.deepEqual(result.sort(), ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']);
}); });
it('should use the selector to identify the elements', () => {
const result = libloki.serviceNodes.consolidateLists([
[{ id: 1, val: 'a'}, { id: 2, val: 'b'}, { id: 3, val: 'c'}, { id: 8, val: 'h'}],
[{ id: 4, val: 'd'}, { id: 5, val: 'e'}, { id: 6, val: 'f'}, { id: 7, val: 'g'}],
[{ id: 7, val: 'g'}, { id: 8, val: 'h'}],
], 0, x => x.id);
const expected = [
{ id: 1, val: 'a'},
{ id: 2, val: 'b'},
{ id: 3, val: 'c'},
{ id: 4, val: 'd'},
{ id: 5, val: 'e'},
{ id: 6, val: 'f'},
{ id: 7, val: 'g'},
{ id: 8, val: 'h'},
];
assert.deepEqual(result.sort((a, b) => a.val > b.val), expected);
});
it('should return the intersection of all lists when threshold is 1', () => { it('should return the intersection of all lists when threshold is 1', () => {
const result = libloki.serviceNodes.consolidateLists([ const result = libloki.serviceNodes.consolidateLists([
['a', 'b', 'c', 'd'], ['a', 'b', 'c', 'd'],

@ -1,4 +1,4 @@
/* global window, dcodeIO, textsecure, StringView */ /* global window, dcodeIO, textsecure */
// eslint-disable-next-line func-names // eslint-disable-next-line func-names
(function () { (function () {
@ -62,26 +62,8 @@
}; };
let connected = false; let connected = false;
this.startPolling = async function pollServer(callBack) { const processMessages = async messages => {
const myKeys = await textsecure.storage.protocol.getIdentityKeyPair(); const newMessages = await filterIncomingMessages(messages);
const pubKey = StringView.arrayBufferToHex(myKeys.pubKey)
let result;
try {
result = await server.retrieveMessages(pubKey);
connected = true;
} catch (err) {
connected = false;
setTimeout(() => { pollServer(callBack); }, pollTime);
return;
}
if (typeof callBack === 'function') {
callBack(connected);
}
if (!result.messages) {
setTimeout(() => { pollServer(callBack); }, pollTime);
return;
}
const newMessages = await filterIncomingMessages(result.messages);
newMessages.forEach(async message => { newMessages.forEach(async message => {
const { data } = message; const { data } = message;
const dataPlaintext = stringToArrayBufferBase64(data); const dataPlaintext = stringToArrayBufferBase64(data);
@ -97,7 +79,17 @@
); );
} }
}); });
setTimeout(() => { pollServer(callBack); }, pollTime); }
this.startPolling = async function pollServer(callback) {
try {
await server.retrieveMessages(processMessages);
connected = true;
} catch (err) {
connected = false;
}
callback(connected);
setTimeout(() => { pollServer(callback); }, pollTime);
}; };
this.isConnected = function isConnected() { this.isConnected = function isConnected() {

@ -22,7 +22,7 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
this.signalingKey = signalingKey; this.signalingKey = signalingKey;
this.username = username; this.username = username;
this.password = password; this.password = password;
this.lokiserver = window.LokiAPI; this.lokiMessageAPI = window.LokiMessageAPI;
if (!options.serverTrustRoot) { if (!options.serverTrustRoot) {
throw new Error('Server trust root is required!'); throw new Error('Server trust root is required!');
@ -67,7 +67,7 @@ MessageReceiver.prototype.extend({
} }
this.hasConnected = true; this.hasConnected = true;
this.httpPollingResource = new HttpResource(this.lokiserver, { this.httpPollingResource = new HttpResource(this.lokiMessageAPI, {
handleRequest: this.handleRequest.bind(this), handleRequest: this.handleRequest.bind(this),
}); });
this.httpPollingResource.startPolling((connected) => { this.httpPollingResource.startPolling((connected) => {

@ -34,7 +34,7 @@ function OutgoingMessage(
this.callback = callback; this.callback = callback;
this.silent = silent; this.silent = silent;
this.lokiserver = window.LokiAPI; this.lokiMessageAPI = window.LokiMessageAPI;
this.numbersCompleted = 0; this.numbersCompleted = 0;
this.errors = []; this.errors = [];
@ -184,7 +184,7 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) { async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number; const pubKey = number;
try { try {
const result = await this.lokiserver.sendMessage(pubKey, data, timestamp, ttl); const result = await this.lokiMessageAPI.sendMessage(pubKey, data, timestamp, ttl);
return result; return result;
} catch (e) { } catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) { if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {

@ -144,6 +144,8 @@ function prepareURL(pathSegments, moreKeys) {
buildExpiration: config.get('buildExpiration'), buildExpiration: config.get('buildExpiration'),
serverUrl: config.get('serverUrl'), serverUrl: config.get('serverUrl'),
cdnUrl: config.get('cdnUrl'), cdnUrl: config.get('cdnUrl'),
messageServerPort: config.get('messageServerPort'),
swarmServerPort: config.get('swarmServerPort'),
certificateAuthority: config.get('certificateAuthority'), certificateAuthority: config.get('certificateAuthority'),
environment: config.environment, environment: config.environment,
node_version: process.versions.node, node_version: process.versions.node,

@ -265,10 +265,18 @@ window.WebAPI = initializeWebAPI({
proxyUrl: config.proxyUrl, proxyUrl: config.proxyUrl,
}); });
const { LokiServer } = require('./js/modules/loki_message_api'); const { LokiSnodeAPI } = require('./js/modules/loki_snode_api');
window.LokiAPI = new LokiServer({ window.LokiSnodeAPI = new LokiSnodeAPI({
urls: [config.serverUrl], url: config.serverUrl,
swarmServerPort: config.swarmServerPort,
});
const { LokiMessageAPI } = require('./js/modules/loki_message_api');
window.LokiMessageAPI = new LokiMessageAPI({
url: config.serverUrl,
messageServerPort: config.messageServerPort,
}); });
window.mnemonic = require('./libloki/mnemonic'); window.mnemonic = require('./libloki/mnemonic');

Loading…
Cancel
Save