TM-SGNL-iOS/Signal/Attachments/AttachmentDownloadRetryRunner.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

158 lines
5.4 KiB
Swift

//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import GRDB
public import SignalServiceKit
public class AttachmentDownloadRetryRunner {
private let db: SDSDatabaseStorage
private let runner: Runner
private let dbObserver: DownloadTableObserver
public init(
attachmentDownloadManager: AttachmentDownloadManager,
attachmentDownloadStore: AttachmentDownloadStore,
db: SDSDatabaseStorage
) {
self.db = db
self.runner = Runner(
attachmentDownloadManager: attachmentDownloadManager,
attachmentDownloadStore: attachmentDownloadStore,
db: db
)
self.dbObserver = DownloadTableObserver(runner: runner)
}
public static let shared = AttachmentDownloadRetryRunner(
attachmentDownloadManager: DependenciesBridge.shared.attachmentDownloadManager,
attachmentDownloadStore: DependenciesBridge.shared.attachmentDownloadStore,
db: SSKEnvironment.shared.databaseStorageRef
)
public func beginObserving() {
db.grdbStorage.pool.add(transactionObserver: dbObserver)
Task {
await runner.runIfNotRunning()
}
NotificationCenter.default.addObserver(
self,
selector: #selector(didEnterForeground),
name: .OWSApplicationWillEnterForeground,
object: nil
)
}
@objc
private func didEnterForeground() {
Task {
// Trigger any ready-to-go downloads; this method exits early and cheaply
// if there is nothing to download.
self.runner.attachmentDownloadManager.beginDownloadingIfNecessary()
// Check for downloads with retry timers and wait for those timers.
await runner.runIfNotRunning()
}
}
private actor Runner {
nonisolated let attachmentDownloadManager: AttachmentDownloadManager
nonisolated let attachmentDownloadStore: AttachmentDownloadStore
nonisolated let db: SDSDatabaseStorage
init(
attachmentDownloadManager: AttachmentDownloadManager,
attachmentDownloadStore: AttachmentDownloadStore,
db: SDSDatabaseStorage
) {
self.attachmentDownloadManager = attachmentDownloadManager
self.attachmentDownloadStore = attachmentDownloadStore
self.db = db
}
private var isRunning = false
fileprivate func runIfNotRunning() async {
if self.isRunning { return }
await self.run()
}
private func run() async {
do {
self.isRunning = true
defer { self.isRunning = false }
let nextTimestamp = db.read { tx in
return try? self.attachmentDownloadStore.nextRetryTimestamp(tx: tx.asV2Read)
}
guard let nextTimestamp else {
return
}
let nowMs = Date().ows_millisecondsSince1970
if nowMs < nextTimestamp {
try? await Task.sleep(nanoseconds: (nextTimestamp - nowMs) * NSEC_PER_MSEC)
}
await db.awaitableWrite { tx in
try? self.attachmentDownloadStore.updateRetryableDownloads(tx: tx.asV2Write)
}
// Kick the tires to start any downloads.
attachmentDownloadManager.beginDownloadingIfNecessary()
}
// Run again to wait for the next timestamp.
await self.run()
}
}
// MARK: - Observation
private class DownloadTableObserver: TransactionObserver {
private let runner: Runner
init(runner: Runner) {
self.runner = runner
}
func observes(eventsOfKind eventKind: DatabaseEventKind) -> Bool {
switch eventKind {
case let .update(tableName, columnNames):
return
tableName == QueuedAttachmentDownloadRecord.databaseTableName
&& columnNames.contains(QueuedAttachmentDownloadRecord.CodingKeys.minRetryTimestamp.rawValue)
case .insert, .delete:
// We _never_ insert a download in the retry state to begin with,
// so we really only care about observing updates.
return false
}
}
/// `observes(eventsOfKind:)` filtering _only_ applies to `databaseDidChange`, _not_ `databaseDidCommit`.
/// We want to filter, but only want to _do_ anything after the changes commit.
/// Use this bool to track when the filter is passed (didChange) so we know whether to do anything on didCommit .
private var shouldRunOnNextCommit = false
func databaseDidChange(with event: DatabaseEvent) {
shouldRunOnNextCommit = true
}
func databaseDidCommit(_ db: GRDB.Database) {
guard shouldRunOnNextCommit else {
return
}
shouldRunOnNextCommit = false
// When we get a matching event, run the next job _after_ committing.
// The job should pick up whatever new row(s) got updated in the table.
Task { [runner] in
await runner.runIfNotRunning()
}
}
func databaseDidRollback(_ db: GRDB.Database) {}
}
}