pull/934/head
Ryan Tharp 5 years ago
parent 3038a8c7d2
commit 2b29b76d66

@ -1259,7 +1259,7 @@ class LokiPublicChannelAPI {
// local file avatar
const resolvedAvatar = path.normalize(note.value.avatar);
const base = path.normalize('images/');
const re = new RegExp(`^${base}`)
const re = new RegExp(`^${base}`);
// do we at least ends up inside images/ somewhere?
if (re.test(resolvedAvatar)) {
this.conversation.set('avatar', resolvedAvatar);
@ -1743,10 +1743,10 @@ class LokiPublicChannelAPI {
// early send
// split off count from pendingMessages
let sendNow = [];
([sendNow, pendingMessages] = _.partition(
[sendNow, pendingMessages] = _.partition(
pendingMessages,
message => message.serverId < firstSlaveId
));
);
sendNow.forEach(message => {
// send them out now
log.info('emitting primary message', message.serverId);

@ -217,7 +217,12 @@ class LokiMessageAPI {
}
return true;
} catch (e) {
log.warn('Loki send message error:', e.code, e.message, `from ${address}`);
log.warn(
'Loki send message error:',
e.code,
e.message,
`from ${address}`
);
if (e instanceof textsecure.WrongSwarmError) {
const { newSwarm } = e;
await lokiSnodeAPI.updateSwarmNodes(params.pubKey, newSwarm);

@ -0,0 +1,209 @@
/* global log, textsecure */
const EventEmitter = require('events');
const nodeFetch = require('node-fetch');
const { URL, URLSearchParams } = require('url');
const GROUPCHAT_POLL_EVERY = 1000; // 1 second
// singleton to relay events to libtextsecure/message_receiver
class LokiPublicChatAPI extends EventEmitter {
constructor(ourKey) {
super();
this.ourKey = ourKey;
this.lastGot = {};
this.servers = [];
}
findOrCreateServer(hostport) {
let thisServer = null;
log.info(`LokiPublicChatAPI looking for ${hostport}`);
this.servers.some(server => {
// if we already have this hostport registered
if (server.server === hostport) {
thisServer = server;
return true;
}
return false;
});
if (thisServer === null) {
thisServer = new LokiPublicServerAPI(this, hostport);
this.servers.push(thisServer);
}
return thisServer;
}
}
class LokiPublicServerAPI {
constructor(chatAPI, hostport) {
this.chatAPI = chatAPI;
this.server = hostport;
this.channels = [];
}
findOrCreateChannel(channelId, conversationId) {
let thisChannel = null;
this.channels.forEach(channel => {
if (
channel.channelId === channelId &&
channel.conversationId === conversationId
) {
thisChannel = channel;
}
});
if (thisChannel === null) {
thisChannel = new LokiPublicChannelAPI(this, channelId, conversationId);
this.channels.push(thisChannel);
}
return thisChannel;
}
unregisterChannel(channelId) {
// find it, remove it
// if no channels left, request we deregister server
return channelId || this; // this is just to make eslint happy
}
}
class LokiPublicChannelAPI {
constructor(serverAPI, channelId, conversationId) {
this.serverAPI = serverAPI;
this.channelId = channelId;
this.baseChannelUrl = `${serverAPI.server}/channels/${this.channelId}`;
this.groupName = 'unknown';
this.conversationId = conversationId;
this.lastGot = 0;
log.info(`registered LokiPublicChannel ${channelId}`);
// start polling
this.pollForMessages();
}
async pollForChannel(source, endpoint) {
// groupName will be loaded from server
const url = new URL(this.baseChannelUrl);
/*
const params = {
include_annotations: 1,
};
*/
let res;
let success = true;
try {
res = await nodeFetch(url);
} catch (e) {
success = false;
}
const response = await res.json();
if (response.meta.code !== 200) {
success = false;
}
// update this.groupId
return endpoint || success;
}
async pollForDeletions() {
// let id = 0;
// read all messages from 0 to current
// delete local copies if server state has changed to delete
// run every minute
const url = new URL(this.baseChannelUrl);
/*
const params = {
include_annotations: 1,
};
*/
let res;
let success = true;
try {
res = await nodeFetch(url);
} catch (e) {
success = false;
}
const response = await res.json();
if (response.meta.code !== 200) {
success = false;
}
return success;
}
async pollForMessages() {
const url = new URL(`${this.baseChannelUrl}/messages`);
const params = {
include_annotations: 1,
count: -20,
};
if (this.lastGot) {
params.since_id = this.lastGot;
}
url.search = new URLSearchParams(params);
let res;
let success = true;
try {
res = await nodeFetch(url);
} catch (e) {
success = false;
}
const response = await res.json();
if (response.meta.code !== 200) {
success = false;
}
if (success) {
let receivedAt = new Date().getTime();
response.data.forEach(adnMessage => {
// FIXME: create proper message for this message.DataMessage.body
let timestamp = new Date(adnMessage.created_at).getTime();
let from = adnMessage.user.username;
let source;
if (adnMessage.annotations.length) {
const noteValue = adnMessage.annotations[0].value;
({ from, timestamp, source } = noteValue);
}
const messageData = {
friendRequest: false,
source,
sourceDevice: 1,
timestamp,
serverTimestamp: timestamp,
receivedAt,
isPublic: true,
message: {
body: adnMessage.text,
attachments: [],
group: {
id: this.conversationId,
type: textsecure.protobuf.GroupContext.Type.DELIVER,
},
flags: 0,
expireTimer: 0,
profileKey: null,
timestamp,
received_at: receivedAt,
sent_at: timestamp,
quote: null,
contact: [],
preview: [],
profile: {
displayName: from,
},
},
};
receivedAt += 1; // Ensure different arrival times
this.serverAPI.chatAPI.emit('publicMessage', {
message: messageData,
});
this.lastGot = !this.lastGot
? adnMessage.id
: Math.max(this.lastGot, adnMessage.id);
});
}
setTimeout(() => {
this.pollForMessages();
}, GROUPCHAT_POLL_EVERY);
}
}
module.exports = LokiPublicChatAPI;

@ -70,7 +70,10 @@ const sendToProxy = async (options = {}, targetNode) => {
// detect SNode is not ready (not in swarm; not done syncing)
if (response.status === 503) {
const ciphertext = await response.text();
log.error(`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`, ciphertext);
log.error(
`lokiRpc sendToProxy snode ${randSnode.ip}:${randSnode.port} error`,
ciphertext
);
// mark as bad for this round (should give it some time and improve success rates)
lokiSnodeAPI.markRandomNodeUnreachable(randSnode);
// retry for a new working snode
@ -104,7 +107,7 @@ const sendToProxy = async (options = {}, targetNode) => {
const textDecoder = new TextDecoder();
plaintext = textDecoder.decode(plaintextBuffer);
} catch(e) {
} catch (e) {
log.error(
'lokiRpc sendToProxy decode error',
e.code,

@ -31,7 +31,10 @@ class LokiSnodeAPI {
];
}
async initialiseRandomPool(seedNodes = [...window.seedNodeList], consecutiveErrors = 0) {
async initialiseRandomPool(
seedNodes = [...window.seedNodeList],
consecutiveErrors = 0
) {
const params = {
limit: 20,
active_only: true,
@ -71,7 +74,10 @@ class LokiSnodeAPI {
if (consecutiveErrors < 3) {
// retry after a possible delay
setTimeout(() => {
log.info('Retrying initialising random snode pool, try #', consecutiveErrors);
log.info(
'Retrying initialising random snode pool, try #',
consecutiveErrors
);
this.initialiseRandomPool(seedNodes, consecutiveErrors + 1);
}, consecutiveErrors * consecutiveErrors * 5000);
} else {
@ -181,7 +187,12 @@ class LokiSnodeAPI {
const snodes = result.snodes.filter(tSnode => tSnode.ip !== '0.0.0.0');
return snodes;
} catch (e) {
log.error('getSnodesForPubkey error', e.code, e.message, `for ${snode.ip}:${snode.port}`);
log.error(
'getSnodesForPubkey error',
e.code,
e.message,
`for ${snode.ip}:${snode.port}`
);
this.markRandomNodeUnreachable(snode);
return [];
}
@ -197,7 +208,9 @@ class LokiSnodeAPI {
const resList = await this.getSnodesForPubkey(rSnode, pubKey);
// should we only activate entries that are in all results?
resList.map(item => {
const hasItem = snodes.some(hItem => item.ip === hItem.ip && item.port === hItem.port);
const hasItem = snodes.some(
hItem => item.ip === hItem.ip && item.port === hItem.port
);
if (!hasItem) {
snodes.push(item);
}

@ -59,7 +59,10 @@ function MessageReceiver(username, password, signalingKey, options = {}) {
lokiPublicChatAPI.removeAllListeners('publicMessage');
// we only need one MR in the system handling these
// bind events
lokiPublicChatAPI.on('publicMessage', this.handleUnencryptedMessage.bind(this));
lokiPublicChatAPI.on(
'publicMessage',
this.handleUnencryptedMessage.bind(this)
);
openGroupBound = true;
}
} else {

@ -95,7 +95,11 @@ export class CreateGroupDialog extends React.Component<Props, State> {
);
return (
<SessionModal title={titleText} onClose={this.closeDialog} onOk={() => null}>
<SessionModal
title={titleText}
onClose={this.closeDialog}
onOk={() => null}
>
<div className="spacer-lg" />
<p className={errorMessageClasses}>{this.state.errorMessage}</p>

Loading…
Cancel
Save