basic classes for message sending
parent
15560a4cb5
commit
e0f27ba712
@ -0,0 +1,39 @@
|
||||
import { EncryptionType } from '../types/EncryptionType';
|
||||
import { SignalService } from '../../protobuf';
|
||||
|
||||
function padPlainTextBuffer(messageBuffer: Uint8Array): Uint8Array {
|
||||
const plaintext = new Uint8Array(
|
||||
getPaddedMessageLength(messageBuffer.byteLength + 1) - 1
|
||||
);
|
||||
plaintext.set(new Uint8Array(messageBuffer));
|
||||
plaintext[messageBuffer.byteLength] = 0x80;
|
||||
|
||||
return plaintext;
|
||||
}
|
||||
|
||||
function getPaddedMessageLength(originalLength: number): number {
|
||||
const messageLengthWithTerminator = originalLength + 1;
|
||||
let messagePartCount = Math.floor(messageLengthWithTerminator / 160);
|
||||
|
||||
if (messageLengthWithTerminator % 160 !== 0) {
|
||||
messagePartCount += 1;
|
||||
}
|
||||
|
||||
return messagePartCount * 160;
|
||||
}
|
||||
|
||||
export function encrypt(
|
||||
device: string,
|
||||
plainTextBuffer: Uint8Array,
|
||||
encryptionType: EncryptionType
|
||||
): {
|
||||
envelopeType: SignalService.Envelope.Type;
|
||||
cipherText: Uint8Array;
|
||||
} {
|
||||
const plainText = padPlainTextBuffer(plainTextBuffer);
|
||||
// TODO: Do encryption here?
|
||||
return {
|
||||
envelopeType: SignalService.Envelope.Type.CIPHERTEXT,
|
||||
cipherText: new Uint8Array(),
|
||||
};
|
||||
}
|
@ -0,0 +1,3 @@
|
||||
import * as MessageEncrypter from './MessageEncrypter';
|
||||
|
||||
export { MessageEncrypter };
|
@ -0,0 +1,8 @@
|
||||
import * as Messages from './messages';
|
||||
import * as Protocols from './protocols';
|
||||
|
||||
// TODO: Do we export class instances here?
|
||||
// E.g
|
||||
// export const messageQueue = new MessageQueue()
|
||||
|
||||
export { Messages, Protocols };
|
@ -0,0 +1,6 @@
|
||||
// TODO: Populate this with multi device specific code, e.g getting linked devices for a user etc...
|
||||
// We need to deprecate the multi device code we have in js and slowly transition to this file
|
||||
|
||||
export function implementStuffHere() {
|
||||
throw new Error("Don't call me :(");
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
// TODO: Need to flesh out these functions
|
||||
// Structure of this can be changed for example sticking this all in a class
|
||||
// The reason i haven't done it is to avoid having instances of the protocol, rather you should be able to call the functions directly
|
||||
|
||||
import { OutgoingContentMessage } from '../messages/outgoing';
|
||||
|
||||
export function hasSession(device: string): boolean {
|
||||
return false; // TODO: Implement
|
||||
}
|
||||
|
||||
export function hasSentSessionRequest(device: string): boolean {
|
||||
// TODO: need a way to keep track of if we've sent a session request
|
||||
// My idea was to use the timestamp of when it was sent but there might be another better approach
|
||||
return false;
|
||||
}
|
||||
|
||||
export async function sendSessionRequestIfNeeded(
|
||||
device: string
|
||||
): Promise<void> {
|
||||
if (hasSession(device) || hasSentSessionRequest(device)) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
// TODO: Call sendSessionRequest with SessionReset
|
||||
return Promise.reject(new Error('Need to implement this function'));
|
||||
}
|
||||
|
||||
// TODO: Replace OutgoingContentMessage with SessionReset
|
||||
export async function sendSessionRequest(
|
||||
message: OutgoingContentMessage
|
||||
): Promise<void> {
|
||||
// TODO: Optimistically store timestamp of when session request was sent
|
||||
// TODO: Send out the request via MessageSender
|
||||
// TODO: On failure, unset the timestamp
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
export async function sessionEstablished(device: string) {
|
||||
// TODO: this is called when we receive an encrypted message from the other user
|
||||
// Maybe it should be renamed to something else
|
||||
// TODO: This should make `hasSentSessionRequest` return `false`
|
||||
}
|
||||
|
||||
export async function shouldProcessSessionRequest(
|
||||
device: string,
|
||||
messageTimestamp: number
|
||||
): boolean {
|
||||
// TODO: Need to do the following here
|
||||
// messageTimestamp > session request sent timestamp && messageTimestamp > session request processed timestamp
|
||||
return false;
|
||||
}
|
||||
|
||||
export async function sessionRequestProcessed(device: string) {
|
||||
// TODO: this is called when we process the session request
|
||||
// This should store the processed timestamp
|
||||
// Again naming is crap so maybe some other name is better
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
import * as SessionProtocol from './SessionProtocol';
|
||||
import * as MultiDeviceProtocol from './MultiDeviceProtocol';
|
||||
|
||||
export { SessionProtocol, MultiDeviceProtocol };
|
@ -0,0 +1,54 @@
|
||||
import { MessageQueueInterface } from './MessageQueueInterface';
|
||||
import { OutgoingContentMessage, OpenGroupMessage } from '../messages/outgoing';
|
||||
import { JobQueue } from '../utils/JobQueue';
|
||||
import { PendingMessageCache } from './PendingMessageCache';
|
||||
|
||||
export class MessageQueue implements MessageQueueInterface {
|
||||
private readonly jobQueues: Map<string, JobQueue> = new Map();
|
||||
private readonly cache: PendingMessageCache;
|
||||
|
||||
constructor() {
|
||||
this.cache = new PendingMessageCache();
|
||||
this.processAllPending();
|
||||
}
|
||||
|
||||
public sendUsingMultiDevice(user: string, message: OutgoingContentMessage) {
|
||||
throw new Error('Method not implemented.');
|
||||
}
|
||||
public send(device: string, message: OutgoingContentMessage) {
|
||||
throw new Error('Method not implemented.');
|
||||
}
|
||||
public sendToGroup(message: OutgoingContentMessage | OpenGroupMessage) {
|
||||
throw new Error('Method not implemented.');
|
||||
}
|
||||
public sendSyncMessage(message: OutgoingContentMessage) {
|
||||
throw new Error('Method not implemented.');
|
||||
}
|
||||
|
||||
public processPending(device: string) {
|
||||
// TODO: implement
|
||||
}
|
||||
|
||||
private processAllPending() {
|
||||
// TODO: Get all devices which are pending here
|
||||
}
|
||||
|
||||
private queue(device: string, message: OutgoingContentMessage) {
|
||||
// TODO: implement
|
||||
}
|
||||
|
||||
private queueOpenGroupMessage(message: OpenGroupMessage) {
|
||||
// TODO: Do we need to queue open group messages?
|
||||
// If so we can get open group job queue and add the send job here
|
||||
}
|
||||
|
||||
private getJobQueue(device: string): JobQueue {
|
||||
let queue = this.jobQueues.get(device);
|
||||
if (!queue) {
|
||||
queue = new JobQueue();
|
||||
this.jobQueues.set(device, queue);
|
||||
}
|
||||
|
||||
return queue;
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
import { OutgoingContentMessage, OpenGroupMessage } from '../messages/outgoing';
|
||||
|
||||
// TODO: add all group messages here, replace OutgoingContentMessage with them
|
||||
type GroupMessageType = OpenGroupMessage | OutgoingContentMessage;
|
||||
export interface MessageQueueInterface {
|
||||
sendUsingMultiDevice(user: string, message: OutgoingContentMessage): void;
|
||||
send(device: string, message: OutgoingContentMessage): void;
|
||||
sendToGroup(message: GroupMessageType): void;
|
||||
sendSyncMessage(message: OutgoingContentMessage): void;
|
||||
// TODO: Find a good way to handle events in this
|
||||
// E.g if we do queue.onMessageSent() we want to also be able to stop listening to the event
|
||||
// TODO: implement events here
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
// REMOVE COMMENT AFTER: This can just export pure functions as it doesn't need state
|
||||
|
||||
import { RawMessage } from '../types/RawMessage';
|
||||
import { OpenGroupMessage } from '../messages/outgoing';
|
||||
|
||||
export async function send(message: RawMessage): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
export async function sendToOpenGroup(
|
||||
message: OpenGroupMessage
|
||||
): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
import { RawMessage } from '../types/RawMessage';
|
||||
import { OutgoingContentMessage } from '../messages/outgoing';
|
||||
|
||||
// TODO: We should be able to import functions straight from the db here without going through the window object
|
||||
|
||||
export class PendingMessageCache {
|
||||
private cachedMessages: Array<RawMessage> = [];
|
||||
|
||||
constructor() {
|
||||
// TODO: We should load pending messages from db here
|
||||
}
|
||||
|
||||
public addPendingMessage(
|
||||
device: string,
|
||||
message: OutgoingContentMessage
|
||||
): RawMessage {
|
||||
// TODO: Maybe have a util for converting OutgoingContentMessage to RawMessage?
|
||||
// TODO: Raw message has uuid, how are we going to set that? maybe use a different identifier?
|
||||
// One could be device + timestamp would make a unique identifier
|
||||
// TODO: Return previous pending message if it exists
|
||||
return {} as RawMessage;
|
||||
}
|
||||
|
||||
public removePendingMessage(message: RawMessage) {
|
||||
// TODO: implement
|
||||
}
|
||||
|
||||
public getPendingDevices(): Array<String> {
|
||||
// TODO: this should return all devices which have pending messages
|
||||
return [];
|
||||
}
|
||||
|
||||
public getPendingMessages(device: string): Array<RawMessage> {
|
||||
return [];
|
||||
}
|
||||
}
|
@ -0,0 +1,5 @@
|
||||
import * as MessageSender from './MessageSender';
|
||||
import { MessageQueue } from './MessageQueue';
|
||||
import { MessageQueueInterface } from './MessageQueueInterface';
|
||||
|
||||
export { MessageSender, MessageQueue, MessageQueueInterface };
|
@ -0,0 +1,5 @@
|
||||
export enum EncryptionType {
|
||||
Signal,
|
||||
SessionReset,
|
||||
MediumGroup,
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
import { EncryptionType } from './EncryptionType';
|
||||
|
||||
// TODO: Should we store failure count on raw messages??
|
||||
// Might be better to have a seperate interface which takes in a raw message aswell as a failure count
|
||||
export interface RawMessage {
|
||||
identifier: string;
|
||||
plainTextBuffer: Uint8Array;
|
||||
timestamp: number;
|
||||
device: string;
|
||||
ttl: number;
|
||||
encryption: EncryptionType;
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
// TODO: This needs to replace js/modules/job_queue.js
|
||||
export class JobQueue {
|
||||
private pending: Promise<any> = Promise.resolve();
|
||||
private readonly jobs: Map<string, Promise<any>> = new Map();
|
||||
|
||||
public has(id: string): boolean {
|
||||
return this.jobs.has(id);
|
||||
}
|
||||
|
||||
public async add(job: () => any): Promise<any> {
|
||||
const id = uuid();
|
||||
|
||||
return this.addWithId(id, job);
|
||||
}
|
||||
|
||||
public async addWithId(id: string, job: () => any): Promise<any> {
|
||||
if (this.jobs.has(id)) {
|
||||
return this.jobs.get(id);
|
||||
}
|
||||
|
||||
const previous = this.pending || Promise.resolve();
|
||||
this.pending = previous.then(job, job);
|
||||
|
||||
const current = this.pending;
|
||||
current
|
||||
.finally(() => {
|
||||
if (this.pending === current) {
|
||||
delete this.pending;
|
||||
}
|
||||
this.jobs.delete(id);
|
||||
})
|
||||
.ignore();
|
||||
|
||||
this.jobs.set(id, current);
|
||||
|
||||
return current;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue