TM-SGNL-iOS/SignalServiceKit/Jobs/SessionResetJob.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

111 lines
5.2 KiB
Swift

//
// Copyright 2016 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
public class SessionResetJobQueue {
private let jobQueueRunner: JobQueueRunner<JobRecordFinderImpl<SessionResetJobRecord>, SessionResetJobRunnerFactory>
public init(db: any DB, reachabilityManager: SSKReachabilityManager) {
self.jobQueueRunner = JobQueueRunner(
canExecuteJobsConcurrently: true,
db: db,
jobFinder: JobRecordFinderImpl(db: db),
jobRunnerFactory: SessionResetJobRunnerFactory()
)
self.jobQueueRunner.listenForReachabilityChanges(reachabilityManager: reachabilityManager)
}
public func start(appContext: AppContext) {
guard appContext.isMainApp else { return }
jobQueueRunner.start(shouldRestartExistingJobs: true)
}
public func add(contactThread: TSContactThread, transaction tx: SDSAnyWriteTransaction) {
let jobRecord = SessionResetJobRecord(contactThread: contactThread)
jobRecord.anyInsert(transaction: tx)
tx.addSyncCompletion { self.jobQueueRunner.addPersistedJob(jobRecord) }
}
}
private class SessionResetJobRunnerFactory: JobRunnerFactory {
func buildRunner() -> SessionResetJobRunner { SessionResetJobRunner() }
}
private class SessionResetJobRunner: JobRunner {
private enum Constants {
static let maxRetries: UInt = 10
}
private var hasArchivedAllSessions = false
func runJobAttempt(_ jobRecord: SessionResetJobRecord) async -> JobAttemptResult {
do {
try await _runJobAttempt(jobRecord)
return .finished(.success(()))
} catch {
return await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
let result = JobAttemptResult.performDefaultErrorHandler(
error: error, jobRecord: jobRecord, retryLimit: Constants.maxRetries, tx: tx.asV2Write
)
if case .finished(.failure) = result {
// Even though this is the failure handler - which means probably the
// recipient didn't receive the message - there's a chance that our send
// did succeed and the server just timed out our response or something.
// Since the cost of sending a future message using a session the recipient
// doesn't have is so high, we archive the session just in case.
Logger.warn("Terminal failure: \(error.userErrorDescription)")
if let contactThread = try? self.fetchThread(jobRecord: jobRecord, tx: tx) {
self.archiveAllSessions(for: contactThread, tx: tx)
}
}
return result
}
}
}
func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult) async {}
private func _runJobAttempt(_ jobRecord: SessionResetJobRecord) async throws {
let endSessionMessagePromise = try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
let contactThread = try self.fetchThread(jobRecord: jobRecord, tx: tx)
if !self.hasArchivedAllSessions {
self.archiveAllSessions(for: contactThread, tx: tx)
}
let endSessionMessage = EndSessionMessage(thread: contactThread, transaction: tx)
let preparedMessage = PreparedOutgoingMessage.preprepared(
transientMessageWithoutAttachments: endSessionMessage
)
return ThreadUtil.enqueueMessagePromise(message: preparedMessage, isHighPriority: true, transaction: tx)
}
self.hasArchivedAllSessions = true
try await endSessionMessagePromise.awaitable()
Logger.info("successfully sent EndSessionMessage.")
try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
let contactThread = try self.fetchThread(jobRecord: jobRecord, tx: tx)
// Archive the just-created session since the recipient should delete their
// corresponding session upon receiving and decrypting our EndSession
// message. Otherwise if we send another message before them, they won't
// have the session to decrypt it.
self.archiveAllSessions(for: contactThread, tx: tx)
let message = TSInfoMessage(thread: contactThread, messageType: .typeLocalUserEndedSession)
message.anyInsert(transaction: tx)
jobRecord.anyRemove(transaction: tx)
}
}
private func fetchThread(jobRecord: SessionResetJobRecord, tx: SDSAnyReadTransaction) throws -> TSContactThread {
let threadId = jobRecord.contactThreadId
guard let contactThread = TSContactThread.anyFetchContactThread(uniqueId: threadId, transaction: tx) else {
throw OWSGenericError("thread for session reset no longer exists")
}
return contactThread
}
private func archiveAllSessions(for contactThread: TSContactThread, tx: SDSAnyWriteTransaction) {
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
sessionStore.archiveAllSessions(for: contactThread.contactAddress, tx: tx.asV2Write)
}
}