TM-SGNL-iOS/SignalServiceKit/Jobs/JobQueueRunner.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

388 lines
16 KiB
Swift

//
// Copyright 2023 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
public enum JobAttemptResult {
/// The Job has succeeded or reached a terminal error. A retryable error may
/// become terminal if too many failures occur.
case finished(Result<Void, Error>)
/// The Job encountered a transient, retryable error. If `canRetryEarly` is
/// true, the job may be attempted again before the time interval has
/// elapsed (eg, when Reachability reports that we've reconnected).
case retryAfter(TimeInterval, canRetryEarly: Bool = true)
/// Invokes `block` and handles retryable errors.
///
/// If `block()` succeeds, a terminal success result is returned. In this
/// case, the caller (or, more typically, `block`) is responsible for
/// removing the job from the database.
///
/// If `block()` throws an error, calls `performDefaultErrorHandler`.
public static func executeBlockWithDefaultErrorHandler(
jobRecord: JobRecord,
retryLimit: UInt,
db: any DB,
block: () async throws -> Void
) async -> JobAttemptResult {
do {
try await block()
return .finished(.success(()))
} catch {
return await db.awaitableWrite { tx in
return performDefaultErrorHandler(error: error, jobRecord: jobRecord, retryLimit: retryLimit, tx: tx)
}
}
}
/// Performs default error handling for an error.
///
/// If the job throws a retryable error, `jobRecord.failureCount` is
/// incremented (assuming it's less than `retryLimit`) and this method
/// returns a `.retryAfter` value with exponential backoff.
///
/// If the job throws a terminal error (or retryable error with no retries
/// remaining), the job is removed and a terminal error is returned.
public static func performDefaultErrorHandler(
error: Error,
jobRecord: JobRecord,
retryLimit: UInt,
tx: DBWriteTransaction
) -> JobAttemptResult {
if jobRecord.failureCount < retryLimit, error.isRetryable {
jobRecord.addFailure(tx: SDSDB.shimOnlyBridge(tx))
let delay = OWSOperation.retryIntervalForExponentialBackoff(failureCount: jobRecord.failureCount)
return .retryAfter(delay, canRetryEarly: true)
} else {
jobRecord.anyRemove(transaction: SDSDB.shimOnlyBridge(tx))
return .finished(.failure(error))
}
}
}
public enum JobResult {
/// The Job ran to completion and returned a Result.
case ranJob(Result<Void, Error>)
/// The Job couldn't be found, so it wasn't executed.
case notFound
/// The Job couldn't be fetched because of an Error.
case fetchError(Error)
public var ranSuccessfullyOrError: Result<Void, Error> {
switch self {
case .ranJob(.success(())):
return .success(())
case .ranJob(.failure(let error)), .fetchError(let error):
return .failure(error)
case .notFound:
return .failure(OWSGenericError("JobRecord not found."))
}
}
}
/// A `JobRunner` is responsible for running a `JobRecord`.
///
/// A single JobRunner is used for all attempts of a `JobRecord` (within a
/// single app launch/process). Therefore, a `JobRunner` is a good place to
/// store mutable, transient state (eg a per-job error counter).
///
/// When a job is initially scheduled, the caller can provide its own
/// `JobRunner`. This can be useful for registering completion callbacks.
/// However, it's important to note that those completion callbacks will be
/// lost if the app is relaunched and a new `JobRunner` is created. In
/// practice, this doesn't cause problems because the UX that's waiting for
/// the completion callback is also dismissed if the app is relaunched.
///
/// When should you use this type as opposed to ``TaskQueueLoader`` and ``TaskRecord``?
/// The JobRunner classes use a single db table across all jobs, and thus provide more in-built functionality
/// around scheduling, starting, retrying, and failing jobs. ``TaskQueueLoader`` requires you to write
/// your own db table (or in memory queue) but offers less functionality in exchange. You should consider
/// ``TaskQueueLoader`` mainly if you want to enforce a priority order on your jobs; ``JobRunner``
/// only handles FIFO ordering.
public protocol JobRunner<JobRecordType> {
associatedtype JobRecordType: JobRecord
/// Runs a single attempt of the job.
///
/// Each attempt can return one of two results: `.finished` (eg, "success",
/// "terminal error", "out of retries") or `.retryAfter` (ie "network error;
/// try again in 2 minutes").
///
/// If this method `.finished`, then it's also responsible for removing
/// `jobRecord` from the database. Passing this responsibility to
/// `runJobAttempt` ensures that removing `jobRecord` can be performed
/// atomically with other database operations. (In DEBUG builds, the caller
/// will try to ensure this invariant remains true.)
func runJobAttempt(_ jobRecord: JobRecordType) async -> JobAttemptResult
/// Invoked when a job reaches a terminal result.
///
/// This method is guaranteed to be invoked exactly once for every
/// `JobRunner` provided to `JobQueueRunner.addPersistedJob(...)` (assuming
/// the app doesn't crash just before or during its execution).
///
/// If a `JobRecord` is removed from the database before it's run (or if
/// fetching it throws an error), then `runJobAttempt` won't be invoked, but
/// this method will still be invoked. This method is therefore an excellent
/// place to invoke "exactly once", in-memory completion handlers.
func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult) async
}
/// A `JobRunnerFactory` creates `JobRunner` instances.
///
/// Most `JobRunner` types currently use `Dependencies`, but when they
/// don't, these factories will be responsible for passing dependencies into
/// `JobRunner` instances.
public protocol JobRunnerFactory<JobRunnerType> {
associatedtype JobRunnerType: JobRunner
/// Creates a `JobRunner` for a `JobRecord` loaded from the database.
func buildRunner() -> JobRunnerType
}
public class JobQueueRunner<
JobFinderType: JobRecordFinder,
JobRunnerFactoryType: JobRunnerFactory
> where JobFinderType.JobRecordType == JobRunnerFactoryType.JobRunnerType.JobRecordType {
private let db: any DB
private let jobFinder: JobFinderType
private let jobRunnerFactory: JobRunnerFactoryType
private var observers = [NSObjectProtocol]()
private enum Mode {
/// The runner hasn't been started yet, or the runner is in the process of
/// starting. In both of these states, new jobs are held until persisted
/// jobs have been loaded. (This ensures new jobs execute after old jobs.)
case loading(canExecuteJobsConcurrently: Bool, jobsToEnqueueAfterLoading: [QueuedJob])
/// The runner is operating concurrently. New jobs are started immediately.
case concurrent
/// The runner is operating serially. No jobs are running, so a new job can
/// be started immediately.
case serialPaused
/// The runner is operating serially. A job is being executed, so any new
/// jobs will be added to `nextJobs`. When the current job finishes, it will
/// start the first job in `nextJobs`.
case serialRunning(nextJobs: [QueuedJob])
}
private struct QueuedJob {
var rowId: JobRecord.RowId
var runner: JobRunnerFactoryType.JobRunnerType
}
private struct State {
var mode: Mode
/// If a job encounters a transient failure, it can request to be run again
/// after a delay. While it's waiting, it will store a reference to its
/// waiting Task here. On certain external triggers (eg we reconnect to the
/// Internet), these waiting Tasks can be canceled to trigger the next
/// attempt immediately.
var waitingTasks = [JobRecord.RowId: Task<Void, Never>]()
}
private let state: AtomicValue<State>
public init(canExecuteJobsConcurrently: Bool, db: any DB, jobFinder: JobFinderType, jobRunnerFactory: JobRunnerFactoryType) {
let mode: Mode = .loading(canExecuteJobsConcurrently: canExecuteJobsConcurrently, jobsToEnqueueAfterLoading: [])
self.state = AtomicValue<State>(State(mode: mode), lock: .init())
self.db = db
self.jobFinder = jobFinder
self.jobRunnerFactory = jobRunnerFactory
}
deinit {
observers.forEach { NotificationCenter.default.removeObserver($0) }
}
public func start(shouldRestartExistingJobs: Bool) {
Task { await self._start(shouldRestartExistingJobs: shouldRestartExistingJobs) }
}
private func _start(shouldRestartExistingJobs: Bool) async {
var oldJobs = [JobFinderType.JobRecordType]()
if shouldRestartExistingJobs {
do {
oldJobs.append(contentsOf: try await jobFinder.loadRunnableJobs(updateRunnableJobRecord: { _, _ in }))
} catch {
Logger.error("Couldn't start existing jobs, so no new jobs will start: \(error)")
return
}
}
state.update { state in
let newMode: Mode
switch state.mode {
case .loading(let canExecuteJobsConcurrently, let jobsToEnqueueAfterLoading):
// Every runner must be started before it can execute jobs. When a runner
// is started, it may optionally fetch all previously-persisted jobs to
// execute. Any jobs that are scheduled while previously-persisted jobs are
// being loaded from disk must be scheduled after those
// previously-persisted jobs. If a new job is persisted while
// previously-persisted jobs are being loaded, then it may or may not be
// considered a previously-persisted job (depending on db race conditions).
// If it is present in the previously-persisted jobs, we want to ignore it
// since the new job might have a custom `JobRunner`.
let newRowIds = Set(jobsToEnqueueAfterLoading.map { $0.rowId })
var queuedJobs = [QueuedJob]()
queuedJobs.append(
contentsOf: oldJobs.filter { !newRowIds.contains($0.id!) }.map {
QueuedJob(rowId: $0.id!, runner: jobRunnerFactory.buildRunner())
}
)
queuedJobs.append(contentsOf: jobsToEnqueueAfterLoading)
if canExecuteJobsConcurrently {
queuedJobs.forEach(runJob(_:))
newMode = .concurrent
} else if queuedJobs.isEmpty {
newMode = .serialPaused
} else {
var serialJobs = queuedJobs
runJob(serialJobs.removeFirst())
newMode = .serialRunning(nextJobs: serialJobs)
}
case .concurrent, .serialPaused, .serialRunning:
owsFail("Can't start a JobQueueRunner more than once.")
}
state.mode = newMode
}
}
// MARK: - Reachability Changes
public func listenForReachabilityChanges(reachabilityManager: SSKReachabilityManager) {
observers.append(NotificationCenter.default.addObserver(
forName: SSKReachability.owsReachabilityDidChange,
object: reachabilityManager,
queue: nil,
using: { [weak self] _ in
if reachabilityManager.isReachable {
self?.retryWaitingJobs()
}
}
))
}
func retryWaitingJobs() {
// Cancel each waiting task so that the next retry can commence.
state.update { state in
state.waitingTasks.forEach { (_, waitingTask) in waitingTask.cancel() }
}
}
// MARK: - Queuing Jobs
public func addPersistedJob(_ jobRecord: JobFinderType.JobRecordType, runner: JobRunnerFactoryType.JobRunnerType? = nil) {
enqueueAndStartJob(QueuedJob(rowId: jobRecord.id!, runner: runner ?? jobRunnerFactory.buildRunner()))
}
private func enqueueAndStartJob(_ job: QueuedJob) {
state.update { state in
switch state.mode {
case .loading(let canExecuteJobsConcurrently, let jobsToEnqueueAfterLoading):
state.mode = .loading(
canExecuteJobsConcurrently: canExecuteJobsConcurrently,
jobsToEnqueueAfterLoading: jobsToEnqueueAfterLoading + [job]
)
case .concurrent:
runJob(job)
case .serialPaused:
runJob(job)
state.mode = .serialRunning(nextJobs: [])
case .serialRunning(nextJobs: let nextJobs):
state.mode = .serialRunning(nextJobs: nextJobs + [job])
}
}
}
// MARK: - Running Jobs
private func runJob(_ queuedJob: QueuedJob) {
Task {
let jobResult = await _runJob(queuedJob)
await queuedJob.runner.didFinishJob(queuedJob.rowId, result: jobResult)
state.update { state in startNextJob(state: &state) }
}
}
private func _runJob(_ queuedJob: QueuedJob) async -> JobResult {
while true {
switch await runJobAttempt(queuedJob) {
case .runAgain:
continue
case .notFound:
return .notFound
case .finished(let result):
return .ranJob(result)
case .fetchError(let error):
Logger.warn("Couldn't fetch a job; skipping just that job: \(error)")
return .fetchError(error)
}
}
}
private enum JobAttemptResult {
case runAgain
case finished(Result<Void, Error>)
case notFound
case fetchError(Error)
}
private func runJobAttempt(_ queuedJob: QueuedJob) async -> JobAttemptResult {
let jobRecord: JobFinderType.JobRecordType?
do {
jobRecord = try db.read(block: { tx in try jobFinder.fetchJob(rowId: queuedJob.rowId, tx: tx) })
} catch {
return .fetchError(error)
}
guard let jobRecord else {
// The job no longer exists, so we can start the next one.
return .notFound
}
let result = await queuedJob.runner.runJobAttempt(jobRecord)
switch result {
case .finished(let result):
// In DEBUG builds, make sure that .finished jobs were deleted.
assert(db.read(block: { tx in (try? jobFinder.fetchJob(rowId: queuedJob.rowId, tx: tx)) == nil }))
return .finished(result)
case .retryAfter(let retryAfter, let canRetryEarly):
// Create a Task that waits for `retryAfter`. If `retryWaitingJobs` is
// called, this Task will be canceled, starting the next retry immediately.
let waitingTask = Task { _ = try? await Task.sleep(nanoseconds: UInt64(retryAfter*Double(NSEC_PER_SEC))) }
if canRetryEarly {
state.update { state in state.waitingTasks[queuedJob.rowId] = waitingTask }
}
await waitingTask.value
if canRetryEarly {
state.update { state in state.waitingTasks[queuedJob.rowId] = nil }
}
return .runAgain
}
}
private func startNextJob(state: inout State) {
switch state.mode {
case .loading, .serialPaused:
owsFailBeta("Can't start the next job.")
case .concurrent:
return // All of these are started immediately.
case .serialRunning(nextJobs: var nextJobs):
state.mode = {
if nextJobs.isEmpty {
// There's no more jobs to run, so pause the runner.
return .serialPaused
} else {
let jobToRun = nextJobs.removeFirst()
runJob(jobToRun)
return .serialRunning(nextJobs: nextJobs)
}
}()
}
}
}