TM-SGNL-iOS/Signal/Calls/CallLinkFetchJobRunner.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

103 lines
3.3 KiB
Swift

//
// Copyright 2024 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import SignalServiceKit
/// Refreshes call links that need to be updated.
actor CallLinkFetchJobRunner: DatabaseChangeDelegate {
private let callLinkStore: any CallLinkRecordStore
private let callLinkStateUpdater: CallLinkStateUpdater
private let db: any DB
init(
callLinkStore: any CallLinkRecordStore,
callLinkStateUpdater: CallLinkStateUpdater,
db: any DB
) {
self.callLinkStore = callLinkStore
self.callLinkStateUpdater = callLinkStateUpdater
self.db = db
}
/// If true, we need to issue a query to check for pending fetches.
private var mightHavePendingFetch = false
/// If true, we're currently fetching and don't need another Task to fetch.
private var isFetching = false
/// Indicates that a fetch may have been scheduled.
///
/// It's harmless to call this when it's not required.
nonisolated func setMightHavePendingFetchAndFetch() {
Task { await self._setMightHavePendingFetchAndFetch() }
}
private func _setMightHavePendingFetchAndFetch() async {
self.mightHavePendingFetch = true
await self.fetchIfNeeded()
}
private func fetchIfNeeded() async {
guard mightHavePendingFetch, !isFetching else {
return
}
isFetching = true
defer {
isFetching = false
}
var sequentialFailureCount = 0
while true {
let callLinkToFetch: CallLinkRecord?
do {
callLinkToFetch = try db.read(block: callLinkStore.fetchAnyPendingRecord(tx:))
} catch {
owsFailDebug("Can't fetch pending record: \(error)")
mightHavePendingFetch = false
return
}
guard let callLinkToFetch else {
// Nothing to fetch.
mightHavePendingFetch = false
return
}
do {
Logger.info("Refreshing \(callLinkToFetch.rootKey); pendingFetchCounter = \(callLinkToFetch.pendingFetchCounter)")
_ = try await callLinkStateUpdater.readCallLink(rootKey: callLinkToFetch.rootKey)
sequentialFailureCount = 0
} catch {
sequentialFailureCount += 1
let retryDelayNs = OWSOperation.retryIntervalForExponentialBackoffNs(failureCount: sequentialFailureCount, maxBackoff: 6 * kHourInterval)
Logger.warn("Retrying persistent call link fetch after ≈\(OWSOperation.formattedNs(retryDelayNs))s; \(error)")
try? await Task.sleep(nanoseconds: retryDelayNs)
}
}
}
// MARK: - DatabaseChangeDelegate
@MainActor
func observeDatabase(_ databaseChangeObserver: DatabaseChangeObserver) {
databaseChangeObserver.appendDatabaseChangeDelegate(self)
}
@MainActor
func databaseChangesDidReset() {}
@MainActor
func databaseChangesDidUpdateExternally() {
setMightHavePendingFetchAndFetch()
}
@MainActor
func databaseChangesDidUpdate(databaseChanges: any DatabaseChanges) {
guard databaseChanges.didUpdate(tableName: CallLinkRecord.databaseTableName) else {
return
}
setMightHavePendingFetchAndFetch()
}
}