Session jobs database implementation

pull/420/head
Brice 3 years ago
parent 13f94c2cfd
commit 5f0a5c5db6

@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.loki.database.LokiBackupFilesDatabase;
import org.thoughtcrime.securesms.loki.database.LokiMessageDatabase; import org.thoughtcrime.securesms.loki.database.LokiMessageDatabase;
import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase; import org.thoughtcrime.securesms.loki.database.LokiThreadDatabase;
import org.thoughtcrime.securesms.loki.database.LokiUserDatabase; import org.thoughtcrime.securesms.loki.database.LokiUserDatabase;
import org.thoughtcrime.securesms.loki.database.SessionJobDatabase;
import org.thoughtcrime.securesms.loki.database.SharedSenderKeysDatabase; import org.thoughtcrime.securesms.loki.database.SharedSenderKeysDatabase;
public class DatabaseFactory { public class DatabaseFactory {
@ -67,6 +68,7 @@ public class DatabaseFactory {
private final LokiUserDatabase lokiUserDatabase; private final LokiUserDatabase lokiUserDatabase;
private final LokiBackupFilesDatabase lokiBackupFilesDatabase; private final LokiBackupFilesDatabase lokiBackupFilesDatabase;
private final SharedSenderKeysDatabase sskDatabase; private final SharedSenderKeysDatabase sskDatabase;
private final SessionJobDatabase sessionJobDatabase;
// Refactor // Refactor
private final Storage storage; private final Storage storage;
@ -177,6 +179,10 @@ public class DatabaseFactory {
public static SharedSenderKeysDatabase getSSKDatabase(Context context) { public static SharedSenderKeysDatabase getSSKDatabase(Context context) {
return getInstance(context).sskDatabase; return getInstance(context).sskDatabase;
} }
public static SessionJobDatabase getSessionJobDatabase(Context context) {
return getInstance(context).sessionJobDatabase;
}
// endregion // endregion
// region Refactor // region Refactor
@ -226,6 +232,7 @@ public class DatabaseFactory {
this.sskDatabase = new SharedSenderKeysDatabase(context, databaseHelper); this.sskDatabase = new SharedSenderKeysDatabase(context, databaseHelper);
this.storage = new Storage(context, databaseHelper); this.storage = new Storage(context, databaseHelper);
this.attachmentProvider = new DatabaseAttachmentProvider(context, databaseHelper); this.attachmentProvider = new DatabaseAttachmentProvider(context, databaseHelper);
this.sessionJobDatabase = new SessionJobDatabase(context, databaseHelper);
} }
} }

@ -6,6 +6,7 @@ import com.google.protobuf.ByteString
import org.session.libsession.messaging.StorageProtocol import org.session.libsession.messaging.StorageProtocol
import org.session.libsession.messaging.jobs.AttachmentUploadJob import org.session.libsession.messaging.jobs.AttachmentUploadJob
import org.session.libsession.messaging.jobs.Job import org.session.libsession.messaging.jobs.Job
import org.session.libsession.messaging.jobs.JobQueue
import org.session.libsession.messaging.jobs.MessageSendJob import org.session.libsession.messaging.jobs.MessageSendJob
import org.session.libsession.messaging.messages.Message import org.session.libsession.messaging.messages.Message
import org.session.libsession.messaging.messages.visible.Attachment import org.session.libsession.messaging.messages.visible.Attachment
@ -80,10 +81,6 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return registrationID return registrationID
} }
override fun persist(job: Job) {
TODO("Not yet implemented")
}
override fun persist(attachments: List<Attachment>): List<Long> { override fun persist(attachments: List<Attachment>): List<Long> {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -128,34 +125,44 @@ class Storage(context: Context, helper: SQLCipherOpenHelper) : Database(context,
return messageID return messageID
} }
// JOBS
override fun persist(job: Job) {
DatabaseFactory.getSessionJobDatabase(context).persistJob(job)
}
override fun markJobAsSucceeded(job: Job) { override fun markJobAsSucceeded(job: Job) {
TODO("Not yet implemented") DatabaseFactory.getSessionJobDatabase(context).markJobAsSucceeded(job)
} }
override fun markJobAsFailed(job: Job) { override fun markJobAsFailed(job: Job) {
TODO("Not yet implemented") DatabaseFactory.getSessionJobDatabase(context).markJobAsFailed(job)
} }
override fun getAllPendingJobs(type: String): List<Job> { override fun getAllPendingJobs(type: String): List<Job> {
TODO("Not yet implemented") return DatabaseFactory.getSessionJobDatabase(context).getAllPendingJobs(type)
} }
override fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? { override fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? {
TODO("Not yet implemented") return DatabaseFactory.getSessionJobDatabase(context).getAttachmentUploadJob(attachmentID)
} }
override fun getMessageSendJob(messageSendJobID: String): MessageSendJob? { override fun getMessageSendJob(messageSendJobID: String): MessageSendJob? {
TODO("Not yet implemented") return DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID)
} }
override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) { override fun resumeMessageSendJobIfNeeded(messageSendJobID: String) {
TODO("Not yet implemented") val job = DatabaseFactory.getSessionJobDatabase(context).getMessageSendJob(messageSendJobID) ?: return
job.delegate = JobQueue.shared
job.execute()
} }
override fun isJobCanceled(job: Job): Boolean { override fun isJobCanceled(job: Job): Boolean {
TODO("Not yet implemented") return DatabaseFactory.getSessionJobDatabase(context).isJobCanceled(job)
} }
// Authorization
override fun getAuthToken(server: String): String? { override fun getAuthToken(server: String): String? {
return DatabaseFactory.getLokiAPIDatabase(context).getAuthToken(server) return DatabaseFactory.getLokiAPIDatabase(context).getAuthToken(server)
} }

@ -1,4 +1,94 @@
package org.thoughtcrime.securesms.loki.database package org.thoughtcrime.securesms.loki.database
class SessionJobDatabase { import android.content.ContentValues
import android.content.Context
import net.sqlcipher.Cursor
import org.session.libsession.messaging.jobs.*
import org.thoughtcrime.securesms.database.Database
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer
import org.thoughtcrime.securesms.loki.utilities.*
class SessionJobDatabase(context: Context, helper: SQLCipherOpenHelper) : Database(context, helper) {
companion object {
private val sessionJobTable = "loki_thread_session_reset_database"
val jobID = "job_id"
val jobType = "job_type"
val failureCount = "failure_count"
val serializedData = "serialized_data"
@JvmStatic val createSessionJobTableCommand = "CREATE TABLE $sessionJobTable ($jobID INTEGER PRIMARY KEY, $jobType STRING, $failureCount INTEGER DEFAULT 0, $serializedData TEXT);"
}
fun persistJob(job: Job) {
val database = databaseHelper.writableDatabase
val contentValues = ContentValues(2)
contentValues.put(jobID, job.id)
contentValues.put(jobType, job.getFactoryKey())
contentValues.put(failureCount, job.failureCount)
contentValues.put(serializedData, SessionJobHelper.dataSerializer.serialize(job.serialize()))
database.insertOrUpdate(sessionJobTable, contentValues, "$jobID = ?", arrayOf(jobID.toString()))
}
fun markJobAsSucceeded(job: Job) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id))
}
fun markJobAsFailed(job: Job) {
databaseHelper.writableDatabase.delete(sessionJobTable, "$jobID = ?", arrayOf(job.id))
}
fun getAllPendingJobs(type: String): List<Job> {
val database = databaseHelper.readableDatabase
return database.getAll(sessionJobTable, "$jobType = ?", arrayOf(type)) { cursor ->
jobFromCursor(cursor)
}
}
fun getAttachmentUploadJob(attachmentID: Long): AttachmentUploadJob? {
val database = databaseHelper.readableDatabase
var result = mutableListOf<AttachmentUploadJob>()
database.getAll(sessionJobTable, "$jobType = ?", arrayOf(AttachmentUploadJob.KEY)) { cursor ->
result.add(jobFromCursor(cursor) as AttachmentUploadJob)
}
return result.first { job -> job.attachmentID == attachmentID }
}
fun getMessageSendJob(messageSendJobID: String): MessageSendJob? {
val database = databaseHelper.readableDatabase
return database.get(sessionJobTable, "$jobID = ? AND $jobType = ?", arrayOf(messageSendJobID, MessageSendJob.KEY)) { cursor ->
jobFromCursor(cursor) as MessageSendJob
}
}
fun isJobCanceled(job: Job): Boolean {
val database = databaseHelper.readableDatabase
var cursor: android.database.Cursor? = null
try {
cursor = database.rawQuery("SELECT * FROM $sessionJobTable WHERE $jobID = ?", arrayOf(job.id))
return cursor != null && cursor.moveToFirst()
} catch (e: Exception) {
// Do nothing
} finally {
cursor?.close()
}
return false
}
private fun jobFromCursor(cursor: Cursor): Job {
val type = cursor.getString(jobType)
val data = SessionJobHelper.dataSerializer.deserialize(cursor.getString(serializedData))
val job = SessionJobHelper.sessionJobInstantiator.instantiate(type, data)
job.id = cursor.getString(jobID)
job.failureCount = cursor.getInt(failureCount)
return job
}
}
class SessionJobHelper() {
companion object {
val dataSerializer: Data.Serializer = JsonDataSerializer()
val sessionJobInstantiator: SessionJobInstantiator = SessionJobInstantiator(SessionJobManagerFactories.getSessionJobFactories())
}
} }

@ -24,7 +24,7 @@ class AttachmentDownloadJob(val attachmentID: Long, val tsIncomingMessageID: Lon
// Settings // Settings
override val maxFailureCount: Int = 20 override val maxFailureCount: Int = 20
companion object { companion object {
val collection: String = "AttachmentDownloadJobCollection" val KEY: String = "AttachmentDownloadJob"
//keys used for database storage purpose //keys used for database storage purpose
private val KEY_ATTACHMENT_ID = "attachment_id" private val KEY_ATTACHMENT_ID = "attachment_id"
@ -89,17 +89,18 @@ class AttachmentDownloadJob(val attachmentID: Long, val tsIncomingMessageID: Lon
//database functions //database functions
override fun serialize(): Data { override fun serialize(): Data {
val builder = this.createJobDataBuilder() return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID)
return builder.putLong(KEY_ATTACHMENT_ID, attachmentID)
.putLong(KEY_TS_INCOMING_MESSAGE_ID, tsIncomingMessageID) .putLong(KEY_TS_INCOMING_MESSAGE_ID, tsIncomingMessageID)
.build(); .build();
} }
override fun getFactoryKey(): String {
return KEY
}
class Factory: Job.Factory<AttachmentDownloadJob> { class Factory: Job.Factory<AttachmentDownloadJob> {
override fun create(data: Data): AttachmentDownloadJob { override fun create(data: Data): AttachmentDownloadJob {
val job = AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID)) return AttachmentDownloadJob(data.getLong(KEY_ATTACHMENT_ID), data.getLong(KEY_TS_INCOMING_MESSAGE_ID))
job.initJob(data)
return job
} }
} }
} }

@ -29,8 +29,8 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
override val maxFailureCount: Int = 20 override val maxFailureCount: Int = 20
companion object { companion object {
val TAG = AttachmentUploadJob::class.qualifiedName val TAG = AttachmentUploadJob::class.qualifiedName
val KEY: String = "AttachmentUploadJob"
val collection: String = "AttachmentUploadJobCollection"
val maxFailureCount: Int = 20 val maxFailureCount: Int = 20
//keys used for database storage purpose //keys used for database storage purpose
@ -104,7 +104,6 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
//database functions //database functions
override fun serialize(): Data { override fun serialize(): Data {
val builder = this.createJobDataBuilder()
//serialize Message property //serialize Message property
val kryo = Kryo() val kryo = Kryo()
kryo.isRegistrationRequired = false kryo.isRegistrationRequired = false
@ -112,13 +111,17 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val output = Output(serializedMessage) val output = Output(serializedMessage)
kryo.writeObject(output, message) kryo.writeObject(output, message)
output.close() output.close()
return builder.putLong(KEY_ATTACHMENT_ID, attachmentID) return Data.Builder().putLong(KEY_ATTACHMENT_ID, attachmentID)
.putString(KEY_THREAD_ID, threadID) .putString(KEY_THREAD_ID, threadID)
.putByteArray(KEY_MESSAGE, serializedMessage) .putByteArray(KEY_MESSAGE, serializedMessage)
.putString(KEY_MESSAGE_SEND_JOB_ID, messageSendJobID) .putString(KEY_MESSAGE_SEND_JOB_ID, messageSendJobID)
.build(); .build();
} }
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
}
class Factory: Job.Factory<AttachmentUploadJob> { class Factory: Job.Factory<AttachmentUploadJob> {
override fun create(data: Data): AttachmentUploadJob { override fun create(data: Data): AttachmentUploadJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE) val serializedMessage = data.getByteArray(KEY_MESSAGE)
@ -127,9 +130,7 @@ class AttachmentUploadJob(val attachmentID: Long, val threadID: String, val mess
val input = Input(serializedMessage) val input = Input(serializedMessage)
val message: Message = kryo.readObject(input, Message::class.java) val message: Message = kryo.readObject(input, Message::class.java)
input.close() input.close()
val job = AttachmentUploadJob(data.getLong(KEY_ATTACHMENT_ID), data.getString(KEY_THREAD_ID)!!, message, data.getString(KEY_MESSAGE_SEND_JOB_ID)!!) return AttachmentUploadJob(data.getLong(KEY_ATTACHMENT_ID), data.getString(KEY_THREAD_ID)!!, message, data.getString(KEY_MESSAGE_SEND_JOB_ID)!!)
job.initJob(data)
return job
} }
} }
} }

@ -19,15 +19,10 @@ interface Job {
fun serialize(): Data fun serialize(): Data
fun initJob(data: Data) { /**
id = data.getString(KEY_ID) * Returns the key that can be used to find the relevant factory needed to create your job.
failureCount = data.getInt(KEY_FAILURE_COUNT) */
} fun getFactoryKey(): String
fun createJobDataBuilder(): Data.Builder {
return Data.Builder().putString(KEY_ID, id)
.putInt(KEY_FAILURE_COUNT, failureCount)
}
interface Factory<T : Job> { interface Factory<T : Job> {
fun create(data: Data): T fun create(data: Data): T

@ -16,8 +16,7 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
companion object { companion object {
val TAG = MessageReceiveJob::class.qualifiedName val TAG = MessageReceiveJob::class.qualifiedName
val KEY: String = "AttachmentUploadJob"
val collection: String = "MessageReceiveJobCollection"
//keys used for database storage purpose //keys used for database storage purpose
private val KEY_DATA = "data" private val KEY_DATA = "data"
@ -64,20 +63,20 @@ class MessageReceiveJob(val data: ByteArray, val isBackgroundPoll: Boolean, val
//database functions //database functions
override fun serialize(): Data { override fun serialize(): Data {
val builder = this.createJobDataBuilder() val builder = Data.Builder().putByteArray(KEY_DATA, data)
builder.putByteArray(KEY_DATA, data)
.putBoolean(KEY_IS_BACKGROUND_POLL, isBackgroundPoll) .putBoolean(KEY_IS_BACKGROUND_POLL, isBackgroundPoll)
openGroupMessageServerID?.let { builder.putLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID, openGroupMessageServerID) } openGroupMessageServerID?.let { builder.putLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID, openGroupMessageServerID) }
openGroupID?.let { builder.putString(KEY_OPEN_GROUP_ID, openGroupID) } openGroupID?.let { builder.putString(KEY_OPEN_GROUP_ID, openGroupID) }
return builder.build(); return builder.build();
} }
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
}
class Factory: Job.Factory<MessageReceiveJob> { class Factory: Job.Factory<MessageReceiveJob> {
override fun create(data: Data): MessageReceiveJob { override fun create(data: Data): MessageReceiveJob {
val job = MessageReceiveJob(data.getByteArray(KEY_DATA), data.getBoolean(KEY_IS_BACKGROUND_POLL), data.getLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID), data.getString(KEY_OPEN_GROUP_ID)) return MessageReceiveJob(data.getByteArray(KEY_DATA), data.getBoolean(KEY_IS_BACKGROUND_POLL), data.getLong(KEY_OPEN_GROUP_MESSAGE_SERVER_ID), data.getString(KEY_OPEN_GROUP_ID))
job.initJob(data)
return job
} }
} }
} }

@ -20,8 +20,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
override val maxFailureCount: Int = 10 override val maxFailureCount: Int = 10
companion object { companion object {
val TAG = MessageSendJob::class.qualifiedName val TAG = MessageSendJob::class.qualifiedName
val KEY: String = "MessageSendJob"
val collection: String = "MessageSendJobCollection"
//keys used for database storage purpose //keys used for database storage purpose
private val KEY_MESSAGE = "message" private val KEY_MESSAGE = "message"
@ -77,7 +76,6 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
//database functions //database functions
override fun serialize(): Data { override fun serialize(): Data {
val builder = this.createJobDataBuilder()
//serialize Message and Destination properties //serialize Message and Destination properties
val kryo = Kryo() val kryo = Kryo()
kryo.isRegistrationRequired = false kryo.isRegistrationRequired = false
@ -89,11 +87,15 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
output = Output(serializedDestination) output = Output(serializedDestination)
kryo.writeObject(output, destination) kryo.writeObject(output, destination)
output.close() output.close()
return builder.putByteArray(KEY_MESSAGE, serializedMessage) return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage)
.putByteArray(KEY_DESTINATION, serializedDestination) .putByteArray(KEY_DESTINATION, serializedDestination)
.build(); .build();
} }
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
}
class Factory: Job.Factory<MessageSendJob> { class Factory: Job.Factory<MessageSendJob> {
override fun create(data: Data): MessageSendJob { override fun create(data: Data): MessageSendJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE) val serializedMessage = data.getByteArray(KEY_MESSAGE)
@ -106,9 +108,7 @@ class MessageSendJob(val message: Message, val destination: Destination) : Job {
input = Input(serializedDestination) input = Input(serializedDestination)
val destination: Destination = kryo.readObject(input, Destination::class.java) val destination: Destination = kryo.readObject(input, Destination::class.java)
input.close() input.close()
val job = MessageSendJob(message, destination) return MessageSendJob(message, destination)
job.initJob(data)
return job
} }
} }
} }

@ -24,7 +24,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
// Settings // Settings
override val maxFailureCount: Int = 20 override val maxFailureCount: Int = 20
companion object { companion object {
val collection: String = "NotifyPNServerJobCollection" val KEY: String = "NotifyPNServerJob"
//keys used for database storage purpose //keys used for database storage purpose
private val KEY_MESSAGE = "message" private val KEY_MESSAGE = "message"
@ -64,7 +64,6 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
//database functions //database functions
override fun serialize(): Data { override fun serialize(): Data {
val builder = this.createJobDataBuilder()
//serialize SnodeMessage property //serialize SnodeMessage property
val kryo = Kryo() val kryo = Kryo()
kryo.isRegistrationRequired = false kryo.isRegistrationRequired = false
@ -72,10 +71,14 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
val output = Output(serializedMessage) val output = Output(serializedMessage)
kryo.writeObject(output, message) kryo.writeObject(output, message)
output.close() output.close()
return builder.putByteArray(KEY_MESSAGE, serializedMessage) return Data.Builder().putByteArray(KEY_MESSAGE, serializedMessage)
.build(); .build();
} }
override fun getFactoryKey(): String {
return AttachmentDownloadJob.KEY
}
class Factory: Job.Factory<NotifyPNServerJob> { class Factory: Job.Factory<NotifyPNServerJob> {
override fun create(data: Data): NotifyPNServerJob { override fun create(data: Data): NotifyPNServerJob {
val serializedMessage = data.getByteArray(KEY_MESSAGE) val serializedMessage = data.getByteArray(KEY_MESSAGE)
@ -84,9 +87,7 @@ class NotifyPNServerJob(val message: SnodeMessage) : Job {
val input = Input(serializedMessage) val input = Input(serializedMessage)
val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java) val message: SnodeMessage = kryo.readObject(input, SnodeMessage::class.java)
input.close() input.close()
val job = NotifyPNServerJob(message) return NotifyPNServerJob(message)
job.initJob(data)
return job
} }
} }
} }

@ -0,0 +1,14 @@
package org.session.libsession.messaging.jobs
import java.util.*
class SessionJobInstantiator(private val jobFactories: Map<String, Job.Factory<out Job>>) {
fun instantiate(jobFactoryKey: String, data: Data): Job {
if (jobFactories.containsKey(jobFactoryKey)) {
return jobFactories[jobFactoryKey]?.create(data) ?: throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.")
} else {
throw IllegalStateException("Tried to instantiate a job with key '$jobFactoryKey', but no matching factory was found.")
}
}
}

@ -0,0 +1,18 @@
package org.session.libsession.messaging.jobs
import java.util.*
class SessionJobManagerFactories {
companion object {
fun getSessionJobFactories(): Map<String, Job.Factory<out Job>> {
return mapOf(
AttachmentDownloadJob.KEY to AttachmentDownloadJob.Factory(),
AttachmentUploadJob.KEY to AttachmentUploadJob.Factory(),
MessageReceiveJob.KEY to MessageReceiveJob.Factory(),
MessageSendJob.KEY to MessageSendJob.Factory(),
NotifyPNServerJob.KEY to NotifyPNServerJob.Factory()
)
}
}
}

@ -18,7 +18,7 @@ class VisibleMessage : Message() {
var profile: Profile? = null var profile: Profile? = null
companion object { companion object {
const val TAG = "BaseVisibleMessage" const val TAG = "VisibleMessage"
fun fromProto(proto: SignalServiceProtos.Content): VisibleMessage? { fun fromProto(proto: SignalServiceProtos.Content): VisibleMessage? {
val dataMessage = proto.dataMessage ?: return null val dataMessage = proto.dataMessage ?: return null

Loading…
Cancel
Save