TM-SGNL-iOS/SignalServiceKit/Storage/TimeGatedBatch.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

150 lines
6.2 KiB
Swift

//
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
enum TimeGatedBatch {
/// Processes `objects` within one or more transactions.
///
/// You probably don't need this method and shouldn't use it. Splitting an
/// operation across multiple transactions (whether you're using this method
/// or not) requires careful consideration to ensure data integrity.
///
/// Most loops should require approximately the same amount of time for each
/// object. In those cases, it's simpler to split `objects` into batches
/// with a fixed size and process each batch within its own transaction.
///
/// However, if you are processing objects with significant and
/// unpredictable variability in their processing time, this pattern may be
/// useful. If a few elements take orders of magnitude longer to process
/// than the majority, this will provide a reasonable balance between the
/// number of transactions and the amount of work performed in each
/// transaction. You must ensure that it's safe to process each object in a
/// separate transaction AND you also must ensure it's safe to process all
/// objects within a single transaction BECAUSE this method will split
/// transactions at arbitrary points.
///
/// - Parameter yieldTxAfter: A suggestion for the maximum amount of time to
/// keep the transaction open for a single batch. This method will start a
/// new transaction when `block` returns if more than `yieldTxAfter` seconds
/// have elapsed since the transaction was opened. Note: This means the
/// actual maximum transaction duration is unbounded because `block` may
/// never return or may run extremely slow queries.
public static func enumerateObjects<T>(
_ objects: some Sequence<T>,
db: any DB,
yieldTxAfter: TimeInterval = 1.0,
file: String = #file,
function: String = #function,
line: Int = #line,
block: (T, DBWriteTransaction) throws -> Void
) async rethrows {
var isDone = false
var objectEnumerator = objects.makeIterator()
while !isDone {
try await db.awaitableWrite(file: file, function: function, line: line) { tx in
let startTime = CACurrentMediaTime()
while true {
guard let object = objectEnumerator.next() else {
// We're done with everything, so exit the outer loop.
isDone = true
return
}
try block(object, tx)
let elapsedTime = CACurrentMediaTime() - startTime
guard elapsedTime < yieldTxAfter else {
// We're done with this batch, so we want another transaction.
return
}
// Process another object with this transaction...
}
}
}
}
/// Processes all elements in batches bound by time, asynchronously.
///
/// - SeeAlso
/// ``TimeGatedBatch/processAll(db:yieldTxAfter:processBatch)``. This method
/// is an `async` variant of that method; see its docs for more details.
static func processAllAsync(
db: any DB,
yieldTxAfter maximumDuration: TimeInterval = 0.5,
processBatch: @escaping (DBWriteTransaction) throws -> Int
) async rethrows -> Int {
var itemCount = 0
while true {
let (txItemCount, mightHaveMore) = try await db.awaitableWrite { tx in
let startTime = CACurrentMediaTime()
return try processSome(
yieldDeadline: startTime + maximumDuration,
processBatch: processBatch,
tx: tx
)
}
itemCount += txItemCount
guard mightHaveMore else {
break
}
}
return itemCount
}
/// Processes all elements in batches bound by time.
///
/// This method invokes `processBatch` repeatedly & in a tight loop. It
/// stops when `processBatch` returns zero (indicating an empty batch).
/// Callers must ensure `processBatch` eventually returns zero; they likely
/// need to delete objects as part of each batch or maintain a cursor to
/// avoid processing the same elements multiple times.
///
/// This method is most useful for "fetch and delete" operations that are
/// trying to avoid DELETE-ing rows from the database while SELECT-ing them
/// via enumeration. This method will execute multiple batches within a
/// single transaction (if time allows), so those operations can fetch &
/// delete in small batches without exploding the number of transactions.
///
/// - Returns: The total number of items processed across all batches.
static func processAll(
db: any DB,
yieldTxAfter maximumDuration: TimeInterval = 0.5,
processBatch: (DBWriteTransaction) throws -> Int
) rethrows -> Int {
var itemCount = 0
while true {
let (txItemCount, mightHaveMore) = try db.write { tx in
let startTime = CACurrentMediaTime()
return try processSome(
yieldDeadline: startTime + maximumDuration,
processBatch: processBatch,
tx: tx
)
}
itemCount += txItemCount
guard mightHaveMore else {
break
}
}
return itemCount
}
private static func processSome(
yieldDeadline: CFTimeInterval,
processBatch: (DBWriteTransaction) throws -> Int,
tx: DBWriteTransaction
) rethrows -> (txItemCount: Int, mightHaveMore: Bool) {
var itemCount = 0
while true {
let batchCount = try autoreleasepool { try processBatch(tx) }
if batchCount == 0 {
return (itemCount, mightHaveMore: false)
}
itemCount += batchCount
guard CACurrentMediaTime() < yieldDeadline else {
return (itemCount, mightHaveMore: true)
}
}
}
}