Merge pull request #29 from BeaudanBrown/message-polling

Message server is polled every 5 seconds to check for new messages
pull/30/head
sachaaaaa 7 years ago committed by GitHub
commit 7887786b00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -76,6 +76,7 @@ module.exports = grunt => {
'libtextsecure/event_target.js',
'libtextsecure/account_manager.js',
'libtextsecure/websocket-resources.js',
'libtextsecure/http-resources.js',
'libtextsecure/message_receiver.js',
'libtextsecure/outgoing_message.js',
'libtextsecure/sendmessage.js',

@ -1,4 +1,4 @@
/* global log */
/* global log, dcodeIO */
const fetch = require('node-fetch');
const is = require('@sindresorhus/is');
@ -20,6 +20,7 @@ function initialize({ url }) {
function connect() {
return {
sendMessage,
retrieveMessages,
};
function getPoWNonce(timestamp, ttl, pubKey, data) {
@ -32,7 +33,7 @@ function initialize({ url }) {
timestamp,
ttl,
pubKey,
data: Array.from(data),
data,
});
// Handle child process error (should never happen)
@ -52,12 +53,60 @@ function initialize({ url }) {
});
}
async function retrieveMessages(pubKey) {
const options = {
url: `${url}/retrieve`,
type: 'GET',
responseType: 'json',
timeout: undefined,
};
log.info(options.type, options.url);
const fetchOptions = {
method: options.type,
headers: {
'X-Loki-recipient': pubKey,
},
timeout: options.timeout,
};
let response;
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(options.type, options.url, 0, 'Error');
throw HTTPError('fetch error', 0, e.toString());
}
let result;
if (
options.responseType === 'json' &&
response.headers.get('Content-Type') === 'application/json'
) {
result = await response.json();
} else if (options.responseType === 'arraybuffer') {
result = await response.buffer();
} else {
result = await response.text();
}
if (response.status >= 0 && response.status < 400) {
log.info(options.type, options.url, response.status, 'Success');
return result;
}
log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('retrieveMessages: error response', response.status, result);
}
async function sendMessage(pubKey, data, ttl) {
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000);
// Nonce is returned as a base64 string to include in header
let nonce;
try {
nonce = await getPoWNonce(timestamp, ttl, pubKey, data);
nonce = await getPoWNonce(timestamp, ttl, pubKey, data64);
} catch (err) {
// Something went horribly wrong
// TODO: Handle gracefully
@ -75,13 +124,12 @@ function initialize({ url }) {
const fetchOptions = {
method: options.type,
body: data,
body: data64,
headers: {
'X-Loki-pow-nonce': nonce,
'X-Loki-timestamp': timestamp.toString(),
'X-Loki-ttl': ttl.toString(),
'X-Loki-recipient': pubKey,
'Content-Length': data.byteLength,
},
timeout: options.timeout,
};
@ -108,7 +156,7 @@ function initialize({ url }) {
if (response.status >= 0 && response.status < 400) {
log.info(options.type, options.url, response.status, 'Success');
return [result, response.status];
return result;
}
log.error(options.type, options.url, response.status, 'Error');
throw HTTPError('sendMessage: error response', response.status, result);

@ -58,15 +58,9 @@ function greaterThan(arr1, arr2) {
// Return nonce that hashes together with payload lower than the target
function calcPoW(timestamp, ttl, pubKey, data) {
const leadingString = timestamp.toString() + ttl.toString() + pubKey;
const leadingArray = new Uint8Array(
bb.wrap(leadingString, 'binary').toArrayBuffer()
const payload = new Uint8Array(
bb.wrap(timestamp.toString() + ttl.toString() + pubKey + data, 'binary').toArrayBuffer()
);
// Payload constructed from concatenating timestamp, ttl and pubkey strings,
// converting to Uint8Array and then appending to the message data array
const payload = new Uint8Array(leadingArray.length + data.length);
payload.set(leadingArray);
payload.set(data, leadingArray.length);
// payloadLength + NONCE_LEN
const totalLen = new BigInteger(payload.length.toString()).add(
@ -118,7 +112,7 @@ process.on('message', msg => {
msg.timestamp,
msg.ttl,
msg.pubKey,
new Uint8Array(msg.data)
msg.data
),
});
});

@ -0,0 +1,83 @@
/* global window, dcodeIO, textsecure, StringView */
// eslint-disable-next-line func-names
(function() {
let server;
function stringToArrayBufferBase64(string) {
return dcodeIO.ByteBuffer.wrap(string, 'base64').toArrayBuffer();
}
const Response = function Response(options) {
this.verb = options.verb || options.type;
this.path = options.path || options.url;
this.body = options.body || options.data;
this.success = options.success;
this.error = options.error;
this.id = options.id;
if (this.id === undefined) {
const bits = new Uint32Array(2);
window.crypto.getRandomValues(bits);
this.id = dcodeIO.Long.fromBits(bits[0], bits[1], true);
}
if (this.body === undefined) {
this.body = null;
}
};
const IncomingHttpResponse = function IncomingHttpResponse(options) {
const request = new Response(options);
this.verb = request.verb;
this.path = request.path;
this.body = request.body;
this.respond = (status, message) => {
// Mock websocket response
window.log.info(status, message);
};
};
window.HttpResource = function HttpResource(_server, opts = {}) {
server = _server;
let { handleRequest } = opts;
if (typeof handleRequest !== 'function') {
handleRequest = request => request.respond(404, 'Not found');
};
this.startPolling = async function pollServer() {
const myKeys = await textsecure.storage.protocol.getIdentityKeyPair();
const pubKey = StringView.arrayBufferToHex(myKeys.pubKey)
let result;
try {
result = await server.retrieveMessages(pubKey);
} catch(err) {
setTimeout(() => { pollServer(); }, 5000);
return;
}
if (!result.messages) {
setTimeout(() => { pollServer(); }, 5000);
return;
}
result.messages.forEach(async message => {
const { data } = message;
const dataPlaintext = stringToArrayBufferBase64(data);
const messageBuf = textsecure.protobuf.WebSocketMessage.decode(dataPlaintext);
if (messageBuf.type === textsecure.protobuf.WebSocketMessage.Type.REQUEST) {
handleRequest(
new IncomingHttpResponse({
verb: messageBuf.request.verb,
path: messageBuf.request.path,
body: messageBuf.request.body,
id: messageBuf.request.id,
})
);
}
});
setTimeout(() => { pollServer(); }, 5000);
};
};
})();

@ -1,12 +1,12 @@
/* global window: false */
/* global textsecure: false */
/* global WebAPI: false */
/* global StringView: false */
/* global libsignal: false */
/* global WebSocketResource: false */
/* global WebSocket: false */
/* global Event: false */
/* global dcodeIO: false */
/* global _: false */
/* global HttpResource: false */
/* global ContactBuffer: false */
/* global GroupBuffer: false */
/* global Worker: false */
@ -121,7 +121,7 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
this.signalingKey = signalingKey;
this.username = username;
this.password = password;
this.server = WebAPI.connect({ username, password });
this.lokiserver = window.LokiAPI.connect();
if (!options.serverTrustRoot) {
throw new Error('Server trust root is required!');
@ -166,6 +166,12 @@ MessageReceiver.prototype.extend({
}
this.hasConnected = true;
this.hr = new HttpResource(this.lokiserver, {
handleRequest: this.handleRequest.bind(this),
});
this.hr.startPolling();
// TODO: Rework this socket stuff to work with online messaging
return;
if (this.socket && this.socket.readyState !== WebSocket.CLOSED) {
this.socket.close();
@ -236,7 +242,6 @@ MessageReceiver.prototype.extend({
);
// TODO: handle properly
return;
this.shutdown();
if (this.calledClose) {
@ -276,8 +281,8 @@ MessageReceiver.prototype.extend({
return;
}
const promise = Promise.resolve(request.body.toArrayBuffer()) //textsecure.crypto
//.decryptWebsocketMessage(request.body, this.signalingKey)
const promise = Promise.resolve(request.body.toArrayBuffer()) // textsecure.crypto
// .decryptWebsocketMessage(request.body, this.signalingKey)
.then(plaintext => {
const envelope = textsecure.protobuf.Envelope.decode(plaintext);
// After this point, decoding errors are not the server's
@ -1222,6 +1227,8 @@ MessageReceiver.prototype.extend({
return textsecure.storage.get('blocked-groups', []).indexOf(groupId) >= 0;
},
handleAttachment(attachment) {
console.log("Not handling attachments.");
return;
// eslint-disable-next-line no-param-reassign
attachment.id = attachment.id.toString();
// eslint-disable-next-line no-param-reassign

@ -177,8 +177,8 @@ OutgoingMessage.prototype = {
async transmitMessage(number, data, timestamp, ttl = 24 * 60 * 60) {
const pubKey = number;
try {
const [response] = await this.lokiserver.sendMessage(pubKey, data, ttl);
return response;
const result = await this.lokiserver.sendMessage(pubKey, data, ttl);
return result;
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
// 409 and 410 should bubble and be handled by doSendMessage
@ -220,8 +220,8 @@ OutgoingMessage.prototype = {
async wrapInWebsocketMessage(outgoingObject) {
const messageEnvelope = new textsecure.protobuf.Envelope({
type: outgoingObject.type,
source: outgoingObject.address.getName(),
sourceDevice: outgoingObject.address.getDeviceId(),
source: outgoingObject.ourKey,
sourceDevice: outgoingObject.sourceDevice,
timestamp: this.timestamp,
content: outgoingObject.content,
});
@ -275,11 +275,11 @@ OutgoingMessage.prototype = {
return Promise.all(
deviceIds.map(async deviceId => {
const address = new libsignal.SignalProtocolAddress(number, deviceId);
const ourNumber = textsecure.storage.user.getNumber();
const ourKey = textsecure.storage.user.getNumber();
const options = {};
// No limit on message keys if we're communicating with our other devices
if (ourNumber === number) {
if (ourKey === number) {
options.messageKeysLimit = false;
}
@ -309,8 +309,8 @@ OutgoingMessage.prototype = {
})
.then(ciphertext => ({
type: ciphertext.type,
address,
destinationDeviceId: address.getDeviceId(),
ourKey,
sourceDevice: 1,
destinationRegistrationId: ciphertext.registrationId,
content: ciphertext.body,
}));

Loading…
Cancel
Save