TM-SGNL-iOS/SignalServiceKit/Messages/Attachments/V2/OrphanedAttachments/OrphanedAttachmentCleaner.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

362 lines
14 KiB
Swift

//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import GRDB
/// Observes insertions to the OrphanedAttachmentRecord table and deletes the associated files added to it.
public protocol OrphanedAttachmentCleaner {
/// Begin observing changes to the ``OrphanedAttachmentRecord`` table.
/// Should be called on every app launch.
///
/// Whenever a new row is inserted into the table, starts up a job to delete any files
/// associated with rows in the table (cleaning up a deleted attachment's files) and
/// removes the row once file deletion is confirmed.
///
/// Also fires immediately to clean up existing rows in the table, if any remained from prior app launches.
func beginObserving()
/// Marks pending attachment files for deletion.
/// Call `releasePendingAttachment` to un-mark the files for deletion
/// once the attachment has been created.
///
/// This method opens a write transaction and commits the changes; this is required
/// so that after this method returns attachment files can be safely created/moved at
/// the target file paths.
///
/// Return the id which can be used to release the pending attachment.
func commitPendingAttachmentWithSneakyTransaction(
_ record: OrphanedAttachmentRecord
) throws -> OrphanedAttachmentRecord.IDType
/// Un-marks a pending attachment for deletion IFF currently marked for deletion.
///
/// If the id is not found, throws an error.
/// Why? Here is the expected sequence:
/// 1. Reserve attachment file locations (assign random file UUIDs)
/// 2. `commitPendingAttachment`
/// 3. Copy/write files into the reserved file locations
/// 4. Open write transaction
/// 5. Create Attachment table row
/// 6. Call `releasePendingAttachment`
/// 7. Close write transaction
///
/// If the attachment file(s) get deleted between steps 2 and 4, then this
/// method will crash in step 6 rolling back the write transaction in step 4/5.
///
/// This ensures that when we reach step 7, either:
/// A. Step 6 succeeded, attachment is created and not marked for deletion
/// B. Step 6 failed, everything is rolled back and we start from step 1 again.
/// There is never a case where step 5 succeeds but we have deleted files,
/// or step 5 fails but we didn't delete the files.
func releasePendingAttachment(
withId: OrphanedAttachmentRecord.IDType,
tx: DBWriteTransaction
)
}
public class OrphanedAttachmentCleanerImpl: OrphanedAttachmentCleaner {
private let dbProvider: () -> DatabaseWriter
private let taskScheduler: Shims.TaskScheduler
private var observer: OrphanTableObserver!
public convenience init(
db: SDSDatabaseStorage,
fileSystem: Shims.OWSFileSystem = Wrappers.OWSFileSystem(),
taskScheduler: Shims.TaskScheduler = Wrappers.TaskScheduler()
) {
self.init(
dbProvider: { db.grdbStorage.pool },
fileSystem: fileSystem,
taskScheduler: taskScheduler
)
}
internal init(
dbProvider: @escaping () -> DatabaseWriter,
fileSystem: Shims.OWSFileSystem,
taskScheduler: Shims.TaskScheduler
) {
self.dbProvider = dbProvider
self.taskScheduler = taskScheduler
self.observer = OrphanTableObserver(
jobRunner: JobRunner(
dbProvider: dbProvider,
fileSystem: fileSystem,
cleaner: self
),
taskScheduler: taskScheduler
)
}
public func beginObserving() {
// Kick off a run immediately for any rows already in the database.
taskScheduler.task { [observer] in
await observer!.jobRunner.runNextCleanupJob()
}
// Begin observing the database for changes.
dbProvider().add(transactionObserver: observer)
}
public func commitPendingAttachmentWithSneakyTransaction(
_ record: OrphanedAttachmentRecord
) throws -> OrphanedAttachmentRecord.IDType {
guard record.sqliteId == nil else {
throw OWSAssertionError("Reinserting existing record")
}
let id = try dbProvider().write { db in
var record = record
// Ensure we mark this attachment as pending.
record.isPendingAttachment = true
try record.insert(db)
guard let id = record.sqliteId else {
throw OWSAssertionError("Unable to insert")
}
skippedRowIds.update(block: { $0.insert(id) })
return id
}
return id
}
public func releasePendingAttachment(withId id: OrphanedAttachmentRecord.IDType, tx: any DBWriteTransaction) {
let db = tx.databaseConnection
let foundRecord = try! OrphanedAttachmentRecord.fetchOne(db, key: id)
guard let foundRecord else {
owsFailDebug("Pending attachment not marked for deletion")
return
}
try! foundRecord.delete(db)
// Remove from skipped row ids.
// This isn't critical; now that the row is gone skipping the id does nothing.
skippedRowIds.update(block: { $0.remove(id) })
}
// Tracks the row ids that should be skipped for the current in-memory process.
// This can be because they failed to delete, or they are pending attachments.
// We track these so we can skip them and not block subsequent rows from deletion.
// We keep this in memory; we will retry on next app launch.
//
// Should only be accessed from within a write transaction.
fileprivate var skippedRowIds = AtomicValue<Set<OrphanedAttachmentRecord.IDType>>.init(Set(), lock: .init())
private actor JobRunner {
private let dbProvider: () -> DatabaseWriter
private nonisolated let fileSystem: Shims.OWSFileSystem
private weak var cleaner: OrphanedAttachmentCleanerImpl?
init(
dbProvider: @escaping () -> DatabaseWriter,
fileSystem: Shims.OWSFileSystem,
cleaner: OrphanedAttachmentCleanerImpl
) {
self.dbProvider = dbProvider
self.fileSystem = fileSystem
self.cleaner = cleaner
}
private var isRunning = false
func runNextCleanupJob() async {
guard CurrentAppContext().isMainApp else {
// Don't run the cleaner outside the main app.
return
}
guard !isRunning else {
return
}
isRunning = true
guard let nextRecord = fetchNextOrphanRecord() else {
isRunning = false
return
}
await Task.yield()
if nextRecord.isPendingAttachment {
// This deletion job is potentially racing with the share
// share extension's attachment sending flow. This job wants to
// delete the files of the pending attachment being sent.
//
// If this job wins, the attachment send will fail (it will throw
// an error when calling `releasePendingAttachment`).
// That's recoverable, but its better if the send flow wins.
// Add a delay to increase the chances of the send flow winning.
try? await Task.sleep(nanoseconds: 5 * NSEC_PER_SEC)
}
let cleaner = self.cleaner
try? await dbProvider().write { db in
// Ensure the record is still around; if it was a pending attachment
// and the send flow finished while this job slept, just skip & exit.
// This happens within the database write lock to ensure correctness.
guard try nextRecord.exists(db) else {
Logger.info("Skipping since-deleted orphan row")
return
}
if
let skippedRowIds = cleaner?.skippedRowIds,
let nextRecordId = nextRecord.sqliteId,
skippedRowIds.get().contains(nextRecordId)
{
Logger.info("Skipping a marked-as-skipped row id")
return
}
do {
// Delete within the database write lock to ensure we don't
// conflict with the pending attachment send flow.
try self.deleteFiles(record: nextRecord)
_ = try nextRecord.delete(db)
Logger.info("Cleaned up orphaned attachment files")
return
} catch {
Logger.error("Failed to clean up orphan table row: \(error)")
let skipId = nextRecord.sqliteId!
cleaner?.skippedRowIds.update(block: { $0.insert(skipId) })
}
}
// Kick off the next run whether the prior run succeeded or not.
isRunning = false
await runNextCleanupJob()
}
private func fetchNextOrphanRecord() -> OrphanedAttachmentRecord? {
return try? dbProvider().read { db in
guard let skippedRowIds = cleaner?.skippedRowIds.get(), !skippedRowIds.isEmpty else {
return try? OrphanedAttachmentRecord.fetchOne(db)
}
let rowIdColumn = Column(OrphanedAttachmentRecord.CodingKeys.sqliteId)
var query: QueryInterfaceRequest<OrphanedAttachmentRecord>?
for skippedRowId in skippedRowIds {
if let querySoFar = query {
query = querySoFar.filter(rowIdColumn != skippedRowId)
} else {
query = OrphanedAttachmentRecord.filter(rowIdColumn != skippedRowId)
}
}
return try? query?.fetchOne(db)
}
}
private nonisolated func deleteFiles(record: OrphanedAttachmentRecord) throws {
let relativeFilePaths: [String] = [
record.localRelativeFilePath,
record.localRelativeFilePathThumbnail,
record.localRelativeFilePathAudioWaveform,
record.localRelativeFilePathVideoStillFrame
].compacted()
try relativeFilePaths.forEach { relativeFilePath in
let fileURL = AttachmentStream.absoluteAttachmentFileURL(relativeFilePath: relativeFilePath)
try fileSystem.deleteFileIfExists(url: fileURL)
}
if let localRelativeFilePath = record.localRelativeFilePath {
// Delete any cached thumbnails as well.
for quality in AttachmentThumbnailQuality.allCases {
let cacheFileUrl = AttachmentThumbnailQuality.thumbnailCacheFileUrl(
attachmentLocalRelativeFilePath: localRelativeFilePath,
at: quality
)
try fileSystem.deleteFileIfExists(url: cacheFileUrl)
}
}
}
}
// MARK: - Observation
private class OrphanTableObserver: TransactionObserver {
fileprivate let jobRunner: JobRunner
private let taskScheduler: Shims.TaskScheduler
init(
jobRunner: JobRunner,
taskScheduler: Shims.TaskScheduler
) {
self.jobRunner = jobRunner
self.taskScheduler = taskScheduler
}
func observes(eventsOfKind eventKind: DatabaseEventKind) -> Bool {
switch eventKind {
case .insert:
return eventKind.tableName == OrphanedAttachmentRecord.databaseTableName
case .delete, .update:
return false
}
}
/// `observes(eventsOfKind:)` filtering _only_ applies to `databaseDidChange`, _not_ `databaseDidCommit`.
/// We want to filter, but only want to _do_ anything after the changes commit.
/// Use this bool to track when the filter is passed (didChange) so we know whether to do anything on didCommit .
private var shouldRunOnNextCommit = false
func databaseDidChange(with event: DatabaseEvent) {
shouldRunOnNextCommit = true
}
func databaseDidCommit(_ db: GRDB.Database) {
guard shouldRunOnNextCommit else {
return
}
shouldRunOnNextCommit = false
// When we get a matching event, run the next job _after_ committing.
// The job should pick up whatever new row(s) got added to the table.
taskScheduler.task { [jobRunner] in
await jobRunner.runNextCleanupJob()
}
}
func databaseDidRollback(_ db: GRDB.Database) {}
}
}
extension OrphanedAttachmentCleanerImpl {
public enum Shims {
public typealias OWSFileSystem = _OrphanedAttachmentCleanerImpl_OWSFileSystemShim
public typealias TaskScheduler = _OrphanedAttachmentCleanerImpl_TaskSchedulerShim
}
public enum Wrappers {
public typealias OWSFileSystem = _OrphanedAttachmentCleanerImpl_OWSFileSystemWrapper
public typealias TaskScheduler = _OrphanedAttachmentCleanerImpl_TaskSchedulerWrapper
}
}
public protocol _OrphanedAttachmentCleanerImpl_OWSFileSystemShim {
func deleteFileIfExists(url: URL) throws
}
public class _OrphanedAttachmentCleanerImpl_OWSFileSystemWrapper: _OrphanedAttachmentCleanerImpl_OWSFileSystemShim {
public init() {}
public func deleteFileIfExists(url: URL) throws {
try OWSFileSystem.deleteFileIfExists(url: url)
}
}
public protocol _OrphanedAttachmentCleanerImpl_TaskSchedulerShim {
func task(_ block: @Sendable @escaping () async throws -> Void)
}
public class _OrphanedAttachmentCleanerImpl_TaskSchedulerWrapper: _OrphanedAttachmentCleanerImpl_TaskSchedulerShim {
public init() {}
public func task(_ block: @Sendable @escaping () async throws -> Void) {
Task(operation: block)
}
}