TM-SGNL-iOS/SignalServiceKit/Jobs/BulkDeleteInteractionJobQueue.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

292 lines
11 KiB
Swift
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
public final class BulkDeleteInteractionJobQueue {
private let jobRunnerFactory: BulkDeleteInteractionJobRunnerFactory
private let jobQueueRunner: JobQueueRunner<
JobRecordFinderImpl<BulkDeleteInteractionJobRecord>,
BulkDeleteInteractionJobRunnerFactory
>
private var jobSerializer = CompletionSerializer()
init(
addressableMessageFinder: DeleteForMeAddressableMessageFinder,
db: any DB,
interactionDeleteManager: InteractionDeleteManager,
threadSoftDeleteManager: ThreadSoftDeleteManager,
threadStore: ThreadStore
) {
self.jobRunnerFactory = BulkDeleteInteractionJobRunnerFactory(
addressableMessageFinder: addressableMessageFinder,
db: db,
interactionDeleteManager: interactionDeleteManager,
threadSoftDeleteManager: threadSoftDeleteManager,
threadStore: threadStore
)
self.jobQueueRunner = JobQueueRunner(
canExecuteJobsConcurrently: false,
db: db,
jobFinder: JobRecordFinderImpl(db: db),
jobRunnerFactory: self.jobRunnerFactory
)
}
func start(appContext: AppContext) {
jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp)
}
func addJob(
anchorMessageRowId: Int64,
isFullThreadDelete: Bool,
threadUniqueId: String,
tx: SDSAnyWriteTransaction
) {
let jobRecord = BulkDeleteInteractionJobRecord(
anchorMessageRowId: anchorMessageRowId,
fullThreadDeletionAnchorMessageRowId: { () -> Int64? in
if isFullThreadDelete {
return InteractionFinder(threadUniqueId: threadUniqueId)
.mostRecentRowId(tx: tx)
}
return nil
}(),
threadUniqueId: threadUniqueId
)
jobRecord.anyInsert(transaction: tx)
jobSerializer.addOrderedSyncCompletion(tx: tx.asV2Write) {
self.jobQueueRunner.addPersistedJob(jobRecord)
}
}
}
// MARK: -
private class BulkDeleteInteractionJobRunner: JobRunner {
typealias JobRecordType = BulkDeleteInteractionJobRecord
private enum Constants {
static let maxRetries: UInt = 110
static let deletionBatchSize: Int = 500
}
private let addressableMessageFinder: DeleteForMeAddressableMessageFinder
private let db: any DB
private let interactionDeleteManager: InteractionDeleteManager
private let threadSoftDeleteManager: ThreadSoftDeleteManager
private let threadStore: ThreadStore
private let logger = PrefixedLogger(prefix: "[DeleteForMe]")
init(
addressableMessageFinder: DeleteForMeAddressableMessageFinder,
db: any DB,
interactionDeleteManager: InteractionDeleteManager,
threadSoftDeleteManager: ThreadSoftDeleteManager,
threadStore: ThreadStore
) {
self.addressableMessageFinder = addressableMessageFinder
self.db = db
self.interactionDeleteManager = interactionDeleteManager
self.threadSoftDeleteManager = threadSoftDeleteManager
self.threadStore = threadStore
}
func runJobAttempt(
_ jobRecord: BulkDeleteInteractionJobRecord
) async -> JobAttemptResult {
return await JobAttemptResult.executeBlockWithDefaultErrorHandler(
jobRecord: jobRecord,
retryLimit: Constants.maxRetries,
db: db,
block: {
try await _runJobAttempt(jobRecord)
}
)
}
func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult) async {
switch result.ranSuccessfullyOrError {
case .success:
break
case .failure(let failure):
logger.error("Failed to perform delete-for-me bulk action! \(failure)")
}
}
private func _runJobAttempt(
_ jobRecord: BulkDeleteInteractionJobRecord
) async throws {
let anchorMessageRowId = jobRecord.anchorMessageRowId
let fullThreadDeletionAnchorMessageRowId = jobRecord.fullThreadDeletionAnchorMessageRowId
let threadUniqueId = jobRecord.threadUniqueId
logger.info("Attempting to bulk-delete interactions for thread \(threadUniqueId), isFullThreadDelete \(fullThreadDeletionAnchorMessageRowId != nil).")
let deletedCount = await TimeGatedBatch.processAllAsync(db: db) { tx -> Int in
return self.deleteSomeInteractions(
threadUniqueId: threadUniqueId,
anchorMessageRowId: anchorMessageRowId,
tx: tx
)
}
logger.info("Deleted \(deletedCount) messages for thread \(threadUniqueId), isFullThreadDelete \(fullThreadDeletionAnchorMessageRowId != nil).")
await db.awaitableWrite { tx in
let sdsTx: SDSAnyWriteTransaction = SDSDB.shimOnlyBridge(tx)
jobRecord.anyRemove(transaction: sdsTx)
guard
let fullThreadDeletionAnchorMessageRowId,
let thread = self.threadStore.fetchThread(uniqueId: threadUniqueId, tx: tx)
else { return }
/// At this point we've deleted all the messages at or before our
/// view of the most-recent addressable message. Since we also know
/// that the user's intent was a "full thread delete", we'll try and
/// go further and additionally soft-delete the thread.
///
/// This will have a couple desirable side-effects: at the time of
/// writing, these include deleting associated story messages and
/// hiding the thread from the chat list. Additionally, if there
/// were any non-addressable messages that were newer than our
/// bulk-delete anchor, those will also be deleted.
///
/// Caveats:
///
/// 1. We'll abort the soft-delete if there are any addressable
/// messages remaining. This would indicate that the user sent or
/// received messages newer than our bulk-delete anchor.
///
/// 2. We'll abort the soft-delete if the most-recent message in the
/// thread now is newer than the most-recent message when we
/// created the bulk-delete job. This would indicate that while
/// all the remaining messages are non-addressable, one of them
/// was inserted while the bulk-delete was running.
if self.addressableMessageFinder.threadContainsAnyAddressableMessages(
threadUniqueId: threadUniqueId,
tx: tx
) {
self.logger.warn("Not doing thread soft-delete thread contains addressable messages after delete.")
} else if InteractionFinder(threadUniqueId: threadUniqueId)
.mostRecentRowId(tx: sdsTx) > fullThreadDeletionAnchorMessageRowId
{
self.logger.warn("Not doing thread soft-delete most recent row ID was newer than when we started delete.")
} else {
self.threadSoftDeleteManager.softDelete(
threads: [thread],
sendDeleteForMeSyncMessage: false,
tx: tx
)
}
}
}
/// Delete a batch of interactions.
/// - Returns
/// The number of interactions deleted. A return value of 0 indicates that
/// there are no more interactions to delete.
private func deleteSomeInteractions(
threadUniqueId: String,
anchorMessageRowId: Int64,
tx: DBWriteTransaction
) -> Int {
let sdsTx: SDSAnyWriteTransaction = SDSDB.shimOnlyBridge(tx)
let interactionsToDelete: [TSInteraction]
do {
interactionsToDelete = try InteractionFinder(
threadUniqueId: threadUniqueId
).fetchAllInteractions(
rowIdFilter: .atOrBefore(anchorMessageRowId),
limit: Constants.deletionBatchSize,
tx: sdsTx
)
} catch {
owsFailDebug("Failed to get interactions to delete!")
return 0
}
if interactionsToDelete.isEmpty { return 0 }
for interaction in interactionsToDelete {
interactionDeleteManager.delete(
interaction,
sideEffects: .custom(
associatedCallDelete: .localDeleteOnly,
updateThreadOnInteractionDelete: .doNotUpdate
),
tx: tx
)
}
/// Above, we're skipping a per-interaction thread update that would
/// otherwise set various "last visible" properties on the thread. To
/// compensate, we'll do a single thread update at the end of each
/// transaction (note that because we're in a `TimeGatedBatch`, we don't
/// know how many interactions will be deleted in a single transaction).
/// This will ensure that anyone who opens a transaction between our
/// time-gated batches sees a thread with appropriately-updated values.
sdsTx.addTransactionFinalizationBlock(
forKey: "BulkDeleteInteractionJobQueue"
) { finalizingTransaction in
if let thread = self.threadStore.fetchThread(
uniqueId: threadUniqueId,
tx: finalizingTransaction.asV2Write
) {
thread.updateOnInteractionsRemoved(
needsToUpdateLastInteractionRowId: true,
needsToUpdateLastVisibleSortId: true,
transaction: finalizingTransaction
)
}
}
return interactionsToDelete.count
}
}
private extension InteractionFinder {
}
// MARK: -
private class BulkDeleteInteractionJobRunnerFactory: JobRunnerFactory {
typealias JobRunnerType = BulkDeleteInteractionJobRunner
private let addressableMessageFinder: DeleteForMeAddressableMessageFinder
private let db: any DB
private let interactionDeleteManager: InteractionDeleteManager
private let threadSoftDeleteManager: ThreadSoftDeleteManager
private let threadStore: ThreadStore
init(
addressableMessageFinder: DeleteForMeAddressableMessageFinder,
db: any DB,
interactionDeleteManager: InteractionDeleteManager,
threadSoftDeleteManager: ThreadSoftDeleteManager,
threadStore: ThreadStore
) {
self.addressableMessageFinder = addressableMessageFinder
self.db = db
self.interactionDeleteManager = interactionDeleteManager
self.threadSoftDeleteManager = threadSoftDeleteManager
self.threadStore = threadStore
}
func buildRunner() -> BulkDeleteInteractionJobRunner {
return BulkDeleteInteractionJobRunner(
addressableMessageFinder: addressableMessageFinder,
db: db,
interactionDeleteManager: interactionDeleteManager,
threadSoftDeleteManager: threadSoftDeleteManager,
threadStore: threadStore
)
}
}