diff --git a/integration_test/add_friends_test.js b/integration_test/add_friends_test.js index 034064bb5..400da84ee 100644 --- a/integration_test/add_friends_test.js +++ b/integration_test/add_friends_test.js @@ -56,18 +56,15 @@ describe('Add friends', function() { .should.eventually.equal(common.TEST_PUBKEY2); await app.client.element(ConversationPage.nextButton).click(); - await app.client.waitForExist( - ConversationPage.sendFriendRequestTextarea, - 1000 - ); + await app.client.waitForExist(ConversationPage.sendMessageTextarea, 1000); // send a text message to that user (will be a friend request) await app.client - .element(ConversationPage.sendFriendRequestTextarea) + .element(ConversationPage.sendMessageTextarea) .setValue(textMessage); await app.client.keys('Enter'); await app.client.waitForExist( - ConversationPage.existingFriendRequestText(textMessage), + ConversationPage.existingSendMessageText(textMessage), 1000 ); @@ -76,63 +73,13 @@ describe('Add friends', function() { await app.client.isExisting(ConversationPage.retrySendButton).should .eventually.be.false; - // wait for left notification Friend Request count to go to 1 and click it - await app2.client.waitForExist( - ConversationPage.oneNotificationFriendRequestLeft, - 5000 - ); - await app2.client - .element(ConversationPage.oneNotificationFriendRequestLeft) - .click(); - // open the dropdown from the top friend request count - await app2.client.isExisting( - ConversationPage.oneNotificationFriendRequestTop - ).should.eventually.be.true; - await app2.client - .element(ConversationPage.oneNotificationFriendRequestTop) - .click(); - - // we should have our app1 friend request here - await app2.client.isExisting( - ConversationPage.friendRequestFromUser( - common.TEST_DISPLAY_NAME1, - common.TEST_PUBKEY1 - ) - ).should.eventually.be.true; - await app2.client.isExisting(ConversationPage.acceptFriendRequestButton) - .should.eventually.be.true; - - // accept the friend request and validate that on both side the "accepted FR" message is shown - await app2.client - .element(ConversationPage.acceptFriendRequestButton) - .click(); + await app2.client.waitForExist(ConversationPage.conversationItem, 5000); + + await app2.client.element(ConversationPage.conversationItem).click(); + await app2.client.waitForExist( - ConversationPage.acceptedFriendRequestMessage, + ConversationPage.existingReceivedMessageText(textMessage), 1000 ); - await app.client.waitForExist( - ConversationPage.acceptedFriendRequestMessage, - 5000 - ); - - // app trigger the friend request logic first - const aliceLogs = await app.client.getRenderProcessLogs(); - const bobLogs = await app2.client.getRenderProcessLogs(); - await common.logsContains( - aliceLogs, - `Sending undefined:friend-request message to ${common.TEST_PUBKEY2}` - ); - await common.logsContains( - bobLogs, - `Received a NORMAL_FRIEND_REQUEST from source: ${common.TEST_PUBKEY1}, primarySource: ${common.TEST_PUBKEY1},` - ); - await common.logsContains( - bobLogs, - `Sending incoming-friend-request-accept:onlineBroadcast message to ${common.TEST_PUBKEY1}` - ); - await common.logsContains( - aliceLogs, - `Sending outgoing-friend-request-accepted:onlineBroadcast message to ${common.TEST_PUBKEY2}` - ); }); }); diff --git a/integration_test/page-objects/conversation.page.js b/integration_test/page-objects/conversation.page.js index 4a0b2394f..4110edd05 100644 --- a/integration_test/page-objects/conversation.page.js +++ b/integration_test/page-objects/conversation.page.js @@ -28,6 +28,8 @@ module.exports = { `${number} members` ), + conversationItem: commonPage.divWithClass('module-conversation-list-item'), + attachmentInput: '//*[contains(@class, "choose-file")]/input[@type="file"]', attachmentButton: '//*[contains(@class, "choose-file")]/button', diff --git a/preload.js b/preload.js index 3b6c133db..15ad38c5e 100644 --- a/preload.js +++ b/preload.js @@ -443,8 +443,13 @@ window.NewSnodeAPI = require('./ts/session/snode_api/serviceNodeAPI'); window.SnodePool = require('./ts/session/snode_api/snodePool'); const { SwarmPolling } = require('./ts/session/snode_api/swarmPolling'); +const { SwarmPollingStub } = require('./ts/session/snode_api/swarmPollingStub'); -window.SwarmPolling = new SwarmPolling(); +if (process.env.USE_STUBBED_NETWORK) { + window.SwarmPolling = new SwarmPollingStub(); +} else { + window.SwarmPolling = new SwarmPolling(); +} window.shortenPubkey = pubkey => { const pk = pubkey.key ? pubkey.key : pubkey; diff --git a/ts/receiver/dataMessage.ts b/ts/receiver/dataMessage.ts index 2bdf50f06..b0d3e9530 100644 --- a/ts/receiver/dataMessage.ts +++ b/ts/receiver/dataMessage.ts @@ -508,7 +508,7 @@ function createMessage( } function sendDeliveryReceipt(source: string, timestamp: any) { -// FIXME audric + // FIXME audric // const receiptMessage = new DeliveryReceiptMessage({ // timestamp: Date.now(), // timestamps: [timestamp], diff --git a/ts/session/snode_api/serviceNodeAPI.ts b/ts/session/snode_api/serviceNodeAPI.ts index a451cf7a5..472204d87 100644 --- a/ts/session/snode_api/serviceNodeAPI.ts +++ b/ts/session/snode_api/serviceNodeAPI.ts @@ -352,123 +352,6 @@ export async function storeOnNode( return false; } -// export async function openRetrieveConnection(pSwarmPool: any, stopPollingPromise: Promise, onMessages: any) { -// const swarmPool = pSwarmPool; // lint -// let stopPollingResult = false; - -// // When message_receiver restarts from onoffline/ononline events it closes -// // http-resources, which will then resolve the stopPollingPromise with true. We then -// // want to cancel these polling connections because new ones will be created - -// // tslint:disable-next-line no-floating-promises -// stopPollingPromise.then((result: any) => { -// stopPollingResult = result; -// }); - -// while (!stopPollingResult && !_.isEmpty(swarmPool)) { -// const address = Object.keys(swarmPool)[0]; // X.snode hostname -// const nodeData = swarmPool[address]; -// delete swarmPool[address]; -// let successiveFailures = 0; -// while ( -// !stopPollingResult && -// successiveFailures < MAX_ACCEPTABLE_FAILURES -// ) { -// // TODO: Revert back to using snode address instead of IP -// try { -// // in general, I think we want exceptions to bubble up -// // so the user facing UI can report unhandled errors -// // except in this case of living inside http-resource pollServer -// // because it just restarts more connections... -// let messages = await retrieveNextMessages( -// nodeData, -// nodeData.lastHash, -// this.ourKey -// ); - -// // this only tracks retrieval failures -// // won't include parsing failures... -// successiveFailures = 0; -// if (messages.length) { -// const lastMessage = _.last(messages); -// nodeData.lastHash = lastMessage.hash; -// await lokiSnodeAPI.updateLastHash( -// this.ourKey, -// address, -// lastMessage.hash, -// lastMessage.expiration -// ); -// messages = await this.jobQueue.add(() => -// filterIncomingMessages(messages) -// ); -// } -// // Execute callback even with empty array to signal online status -// onMessages(messages); -// } catch (e) { -// log.warn( -// 'loki_message:::_openRetrieveConnection - retrieve error:', -// e.code, -// e.message, -// `on ${nodeData.ip}:${nodeData.port}` -// ); -// if (e instanceof textsecure.WrongSwarmError) { -// const { newSwarm } = e; - -// // Is this a security concern that we replace the list of snodes -// // based on a response from a single snode? -// await lokiSnodeAPI.updateSwarmNodes(this.ourKey, newSwarm); -// // FIXME: restart all openRetrieves when this happens... -// // FIXME: lokiSnode should handle this -// for (let i = 0; i < newSwarm.length; i += 1) { -// const lastHash = await window.Signal.Data.getLastHashBySnode( -// this.ourKey, -// newSwarm[i] -// ); -// swarmPool[newSwarm[i]] = { -// lastHash, -// }; -// } -// // Try another snode -// break; -// } else if (e instanceof textsecure.NotFoundError) { -// // DNS/Lokinet error, needs to bubble up -// throw new window.textsecure.DNSResolutionError( -// 'Retrieving messages' -// ); -// } -// successiveFailures += 1; -// } - -// // Always wait a bit as we are no longer long-polling -// await sleepFor(Math.max(successiveFailures, 2) * 1000); -// } -// if (successiveFailures >= MAX_ACCEPTABLE_FAILURES) { -// const remainingSwarmSnodes = await markUnreachableForPubkey( -// this.ourKey, -// nodeData -// ); -// log.warn( -// `loki_message:::_openRetrieveConnection - too many successive failures, removing ${ -// nodeData.ip -// }:${nodeData.port} from our swarm pool. We have ${ -// Object.keys(swarmPool).length -// } usable swarm nodes left for our connection (${ -// remainingSwarmSnodes.length -// } in local db)` -// ); -// } -// } -// // if not stopPollingResult -// if (_.isEmpty(swarmPool)) { -// log.error( -// 'loki_message:::_openRetrieveConnection - We no longer have any swarm nodes available to try in pool, closing retrieve connection' -// ); -// return false; -// } -// return true; -// } - -// mark private (_ prefix) since no error handling is done here... export async function retrieveNextMessages( nodeData: Snode, lastHash: string, diff --git a/ts/session/snode_api/snodePool.ts b/ts/session/snode_api/snodePool.ts index 923aa57d7..d6672930d 100644 --- a/ts/session/snode_api/snodePool.ts +++ b/ts/session/snode_api/snodePool.ts @@ -142,12 +142,8 @@ export function markNodeUnreachable(snode: Snode): void { export async function getRandomSnodeAddress(): Promise { // resolve random snode if (randomSnodePool.length === 0) { - // allow exceptions to pass through upwards without the unhandled promise rejection - try { - await refreshRandomPool([]); - } catch (e) { - throw e; - } + await refreshRandomPool([]); + if (randomSnodePool.length === 0) { throw new window.textsecure.SeedNodeError('Invalid seed node response'); } diff --git a/ts/session/snode_api/swarmPolling.ts b/ts/session/snode_api/swarmPolling.ts index 13683f0f0..ed2b98d5c 100644 --- a/ts/session/snode_api/swarmPolling.ts +++ b/ts/session/snode_api/swarmPolling.ts @@ -65,6 +65,82 @@ export class SwarmPolling { this.groupPubkeys = this.groupPubkeys.filter(key => !pubkey.isEqual(key)); } + protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) { + // NOTE: sometimes pubkey is string, sometimes it is object, so + // accept both until this is fixed: + const pk = (pubkey.key ? pubkey.key : pubkey) as string; + + const snodes = await getSnodesFor(pk); + + // Select nodes for which we already have lastHashes + const alreadyPolled = snodes.filter( + (n: Snode) => this.lastHashes[n.pubkey_ed25519] + ); + + // If we need more nodes, select randomly from the remaining nodes: + + // Use 1 node for now: + const COUNT = 1; + + let nodesToPoll = _.sampleSize(alreadyPolled, COUNT); + + if (nodesToPoll.length < COUNT) { + const notPolled = _.difference(snodes, alreadyPolled); + + const newNeeded = COUNT - alreadyPolled.length; + + const newNodes = _.sampleSize(notPolled, newNeeded); + + nodesToPoll = _.concat(nodesToPoll, newNodes); + } + + const results = await Promise.all( + nodesToPoll.map(async (n: Snode) => { + return this.pollNodeForKey(n, pubkey); + }) + ); + + // Merge results into one list of unique messages + const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash); + + const newMessages = await this.handleSeenMessages(messages); + + newMessages.forEach((m: Message) => { + const options = isGroup ? { conversationId: pk } : {}; + processMessage(m.data, options); + }); + } + + // Fetches messages for `pubkey` from `node` potentially updating + // the lash hash record + protected async pollNodeForKey( + node: Snode, + pubkey: PubKey + ): Promise> { + const edkey = node.pubkey_ed25519; + + const pkStr = pubkey.key ? pubkey.key : pubkey; + + const prevHash = await this.getLastHash(edkey, pkStr as string); + + const messages = await retrieveNextMessages(node, prevHash, pubkey); + + if (!messages.length) { + return []; + } + + const lastMessage = _.last(messages); + + this.updateLastHash( + edkey, + pubkey, + lastMessage.hash, + lastMessage.expiration + ); + + return messages; + } + private loadGroupIds() { // Start polling for medium size groups as well (they might be in different swarms) const convos = window @@ -103,11 +179,11 @@ export class SwarmPolling { private async pollForAllKeys() { const directPromises = this.pubkeys.map(async pk => { - return this.pollOnceForKey(pk); + return this.pollOnceForKey(pk, false); }); const groupPromises = this.groupPubkeys.map(async pk => { - return this.pollOnceForKey(pk); + return this.pollOnceForKey(pk, true); }); await Promise.all(_.concat(directPromises, groupPromises)); @@ -154,81 +230,4 @@ export class SwarmPolling { return nodeRecords[pubkey]; } } - - // Fetches messages for `pubkey` from `node` potentially updating - // the lash hash record - private async pollNodeForKey( - node: Snode, - pubkey: PubKey - ): Promise> { - const edkey = node.pubkey_ed25519; - - const pkStr = pubkey.key ? pubkey.key : pubkey; - - const prevHash = await this.getLastHash(edkey, pkStr as string); - - const messages = await retrieveNextMessages(node, prevHash, pubkey); - - if (!messages.length) { - return []; - } - - const lastMessage = _.last(messages); - - this.updateLastHash( - edkey, - pubkey, - lastMessage.hash, - lastMessage.expiration - ); - - return messages; - } - - private async pollOnceForKey(pubkey: PubKey) { - // NOTE: sometimes pubkey is string, sometimes it is object, so - // accept both until this is fixed: - const pk = pubkey.key ? pubkey.key : pubkey; - - const snodes = await getSnodesFor(pk as string); - - // Select nodes for which we already have lastHashes - const alreadyPolled = snodes.filter( - (n: Snode) => this.lastHashes[n.pubkey_ed25519] - ); - - // If we need more nodes, select randomly from the remaining nodes: - - // Use 1 node for now: - const COUNT = 1; - - let nodesToPoll = _.sampleSize(alreadyPolled, COUNT); - - if (nodesToPoll.length < COUNT) { - const notPolled = _.difference(snodes, alreadyPolled); - - const newNeeded = COUNT - alreadyPolled.length; - - const newNodes = _.sampleSize(notPolled, newNeeded); - - nodesToPoll = _.concat(nodesToPoll, newNodes); - } - - const results = await Promise.all( - nodesToPoll.map(async (n: Snode) => { - return this.pollNodeForKey(n, pubkey); - }) - ); - - // Merge results into one list of unique messages - const messages = _.uniqBy(_.flatten(results), (x: any) => x.hash); - - const newMessages = await this.handleSeenMessages(messages); - - newMessages.forEach((m: Message) => { - processMessage(m.data, { conversationId: pubkey.key }); - }); - - // TODO: `onMessages` - } } diff --git a/ts/session/snode_api/swarmPollingStub.ts b/ts/session/snode_api/swarmPollingStub.ts new file mode 100644 index 000000000..90e9b3fd6 --- /dev/null +++ b/ts/session/snode_api/swarmPollingStub.ts @@ -0,0 +1,32 @@ +import { processMessage, SwarmPolling } from './swarmPolling'; +import fetch from 'node-fetch'; +import { PubKey } from '../types'; + +export class SwarmPollingStub extends SwarmPolling { + private readonly baseUrl = 'http://localhost:3000'; + + protected async pollOnceForKey(pubkey: PubKey, isGroup: boolean) { + const pubkeyStr = pubkey.key ? pubkey.key : pubkey; + + const get = { + method: 'GET', + }; + + const res = await fetch( + `${this.baseUrl}/messages?pubkey=${pubkeyStr}`, + get + ); + + try { + const json = await res.json(); + + const options = isGroup ? { conversationId: pubkeyStr } : {}; + + json.messages.forEach((m: any) => { + processMessage(m.data, options); + }); + } catch (e) { + window.log.error('invalid json: ', e); + } + } +}