533 lines
24 KiB
Swift
533 lines
24 KiB
Swift
//
|
|
// Copyright 2024 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
|
|
/// A way to report partial progress on async tasks back to the caller; typically used to drive
|
|
/// some kind of loading bar UI.
|
|
///
|
|
/// You create a ``OWSProgressSink``, add ``OWSProgressSource``(s) to that sink,
|
|
/// and update progress on the sources as the long running task does its work.
|
|
///
|
|
/// The API has three goals/principles:
|
|
/// 1. Progress from multiple independent sources can be combined into a single net total output progress
|
|
/// 2. Adding child progress sources is async-friendly and thread safe
|
|
/// 3. Updating a single source is *fast* but **NOT** thread safe
|
|
///
|
|
/// Of note, workers that increment progress are assumed to be single threaded (or have their own locking).
|
|
/// If your worker is multi-threaded, you should probably generate one source per thread or locking context.
|
|
///
|
|
/// First, call ``OWSProgress/createSink(_:)`` with an observer block which is called with progress updates.
|
|
/// **WARNING**: the block is escaping and strongly held by OWSProgressSink. Beware of retain cycles.
|
|
///
|
|
/// Add one or more sources to the sink with ``OWSProgressSink/addSource(withLabel:unitCount:)``.
|
|
/// When you add a source to a sink, you update the sink's total unit count.
|
|
/// Units can mean anything; what matters is that updates to sources are measured in units,
|
|
/// and progress is reported in units. (Or percentage of units completed, via convenience var).
|
|
/// See ``Foundation/NSProgress``, which uses similar "unit" semantics.
|
|
///
|
|
/// You may add a child sink with ``OWSProgressSink/addChild(withLabel:unitCount:)``.
|
|
/// Child sinks have a unit count and can themselves have their own sources (and child sinks).
|
|
/// The completed unit count of a child sink is proportional to its children's completed unit count.
|
|
/// Put another way:
|
|
/// `parent_complete_units = parent_total_units * Sum(child_completed_units) / Sum(child_total_units)`
|
|
/// In this way a child's units are independent of its parent's (and counsins') units.
|
|
///
|
|
/// For example, say you download a file and then write rows to the db.
|
|
/// Add two child sinks: "Download" and "Write" , each with a unit count of 50.
|
|
/// Add a source to "Download" with unit count of [file byte length].
|
|
/// Add a source to "Write" with unit count of [# of rows to write to db].
|
|
/// In this way, even though "Download" and "Write" use totally different units, units of progress
|
|
/// at the root represent % complete with each counting towards 50% of the work.
|
|
/// If we download half the file, the root completed unit count would be 25 (%).
|
|
///
|
|
/// Note two other implicit advantages in the example above:
|
|
/// 1. We can determine the [# of SQL rows to write] _after_ downloading, by adding
|
|
/// the Write child at the start (proportioning 50% of the "progress" to it), but only
|
|
/// adding its source later after we've downloaded.
|
|
/// 2. A DownloadManager can have a download method that takes an ``OWSProgressSink``
|
|
/// without knowing or caring whether that sink is itself a root or a child; progress units are
|
|
/// re-normalized to parent progress units transparently to callers.
|
|
///
|
|
/// A note on ``Foundation/NSProgress``.
|
|
/// This type _looks_ like NSProgress but behaves very differently.
|
|
/// * NSProgress is a class class meant to be updated and observed with KVO.
|
|
/// OWSProgress is a snapshot-in-time struct;OWSProgressSink manages observation.
|
|
/// * NSProgress uses locks for updates, making rapid updates on a single thread expensive.
|
|
/// OWSProgress optimizes for single-threaded updates; batching observer updates to do so efficiently.
|
|
/// * NSProgress requires you to know unit counts for all children up-front and they must all share units.
|
|
/// OWSProgress lets you add children lazily and renormalizes disparate units at each level of the tree.
|
|
public struct OWSProgress: Equatable {
|
|
/// The completed unit count across all direct children.
|
|
public let completedUnitCount: UInt64
|
|
/// The total unit count of all direct children.
|
|
public let totalUnitCount: UInt64
|
|
/// The chain of labels (ending with the source's label) from the root
|
|
/// sink to the source with the highest percentage completion.
|
|
/// In case of ties, sources added later are preferred.
|
|
public let currentLabels: [String]
|
|
|
|
/// Percentage completion measured as (completedUnitCount / totalUnitCount)
|
|
/// 0 if no children or sources have been added.
|
|
public var percentComplete: Float {
|
|
guard totalUnitCount > 0 else { return 0 }
|
|
return Float(completedUnitCount) / Float(totalUnitCount)
|
|
}
|
|
|
|
/// Percent = 1. False if no children or sources have been added.
|
|
public var isFinished: Bool {
|
|
totalUnitCount != 0 && completedUnitCount == totalUnitCount
|
|
}
|
|
|
|
/// See ``currentLabels``; label of the final source in the chain.
|
|
/// Nil if no progress has been made.
|
|
public var currentSourceLabel: String? { currentLabels.last }
|
|
|
|
/// Create a root sink, taking the single observer block of progress updates.
|
|
/// See class docs for this type for usage.
|
|
public static func createSink(_ observer: @escaping OWSProgressSink.Observer) -> OWSProgressSink {
|
|
return OWSProgressRootNode(observer: observer)
|
|
}
|
|
}
|
|
|
|
/// Sinks are thread-safe and can have children added from any thread context.
|
|
public protocol OWSProgressSink {
|
|
typealias Observer = (OWSProgress) -> Void
|
|
|
|
/// Add a child sink, returning it.
|
|
/// Child sinks contribute to the total unit count of their parent.
|
|
/// A child sink's progress is its own unit count weighted by the completed unit count across all its children.
|
|
/// - precondition: unitCount > 0
|
|
///
|
|
/// **WARNING** adding a child to a parent sink after some sibling has previously updated progress
|
|
/// results in undefined behavior; old progress values are not renormalized to new total unit counts.
|
|
/// Adding grandchildren is allowed; typically you want to "reserve" proportional unit counts
|
|
/// by adding a child up-front and then adding a grandchild to that child later.
|
|
func addChild(withLabel label: String, unitCount: UInt64) async -> OWSProgressSink
|
|
|
|
/// Add a source, returning it.
|
|
/// Sources contribute to the total unit count of their parent.
|
|
/// Sources are **NOT** thread-safe and should only be updated from a single thread or locking context.
|
|
/// - precondition: unitCount > 0
|
|
///
|
|
/// **WARNING** adding a source to a parent sink after some sibling has previously updated progress
|
|
/// results in undefined behavior; old progress values are not renormalized to new total unit counts.
|
|
/// Adding grandchildren is allowed; typically you want to "reserve" proportional unit counts
|
|
/// by adding a child up-front and then adding a source to that child later.
|
|
func addSource(withLabel label: String, unitCount: UInt64) async -> OWSProgressSource
|
|
}
|
|
|
|
/// Sources are **NOT** thread-safe and should only be updated from a single thread or locking context.
|
|
public protocol OWSProgressSource {
|
|
|
|
var completedUnitCount: UInt64 { get }
|
|
var totalUnitCount: UInt64 { get }
|
|
|
|
/// Increment the completed unit count (which can only go up).
|
|
/// You can pass 0, though that does nothing.
|
|
/// You can also continue to increment past the total unit count; the value
|
|
/// will be internally capped to the total and further updates no-op.
|
|
func incrementCompletedUnitCount(by increment: UInt64)
|
|
}
|
|
|
|
extension OWSProgressSource {
|
|
|
|
/// Given some block of asynchronous work, update progress
|
|
/// on the current source periodically (every ``timeInterval`` seconds)
|
|
/// until the work block completes.
|
|
/// Returns with the result of the work block when it completes.
|
|
public func updatePeriodically<T, E>(
|
|
timeInterval: TimeInterval = 0.1,
|
|
estimatedTimeToCompletion: TimeInterval,
|
|
work: @escaping () async throws(E) -> T
|
|
) async throws(E) -> T {
|
|
let sleepDurationMillis = UInt64(timeInterval * 1000)
|
|
let source = self
|
|
let didComplete = AtomicBool(false, lock: .init())
|
|
let startDate = Date()
|
|
var lastCompletedUnitCount = source.completedUnitCount
|
|
// Minus one so the timer can never complete it.
|
|
let maxTimerCompletedUnitCount = source.totalUnitCount - 1
|
|
let timeToUnitsMultiplier = Double(source.totalUnitCount) / estimatedTimeToCompletion
|
|
let result = await withTaskGroup(of: Optional<Result<T, E>>.self) { taskGroup in
|
|
taskGroup.addTask {
|
|
while !didComplete.get() {
|
|
try? await Task.sleep(nanoseconds: sleepDurationMillis * NSEC_PER_MSEC)
|
|
let date = Date()
|
|
var units = UInt64(date.timeIntervalSince(startDate) * timeToUnitsMultiplier)
|
|
units = min(maxTimerCompletedUnitCount, units)
|
|
defer { lastCompletedUnitCount = units }
|
|
let incrementalUnits = units - lastCompletedUnitCount
|
|
if incrementalUnits > 0 {
|
|
source.incrementCompletedUnitCount(by: units)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
taskGroup.addTask {
|
|
let result: Result<T, E>
|
|
do {
|
|
result = .success(try await work())
|
|
} catch let error as E {
|
|
return .failure(error)
|
|
} catch {
|
|
// Impossible; work only throws E
|
|
fatalError()
|
|
}
|
|
didComplete.set(true)
|
|
source.incrementCompletedUnitCount(by: source.totalUnitCount)
|
|
return result
|
|
}
|
|
while let result = await taskGroup.next() {
|
|
switch result {
|
|
case .none:
|
|
break
|
|
case .some(let value):
|
|
return value
|
|
}
|
|
}
|
|
// Impossible to get here; the second task in the group
|
|
// always returns some result.
|
|
fatalError()
|
|
}
|
|
return try result.get()
|
|
}
|
|
}
|
|
|
|
/// Root node for OWSProgress. Does not itself have a unit count or concept of progress;
|
|
/// its children define units entirely.
|
|
private actor OWSProgressRootNode: OWSProgressSink {
|
|
|
|
private var latestEmittedProgress: OWSProgress?
|
|
private let observer: Observer
|
|
|
|
private var totalDirectChildUnitCount: UInt64 = 0
|
|
/// Children hold strong references to their parent, so parents hold weak references to children.
|
|
/// If callers release children, they can't be updated anyway so no point retaining them.
|
|
private var directChildren = [Weak<OWSProgressChildNode>]()
|
|
|
|
private class SourceNode {
|
|
/// Sources hold strong references to their root sink, so the sink must hold weak references to sources.
|
|
/// If callers release sources, they can't be updated anyway so no point retaining them.
|
|
weak var node: OWSProgressSourceNode?
|
|
/// Hold onto the last completed unit count (pre-weighted) in case
|
|
/// the source gets released e.g. after hitting 100%.
|
|
var lastCompletedUnitCount: Float?
|
|
|
|
init(node: OWSProgressSourceNode) {
|
|
self.node = node
|
|
}
|
|
}
|
|
|
|
/// All sources at all nested levels in the tree.
|
|
private var allSources = [SourceNode]()
|
|
|
|
fileprivate init(observer: @escaping Observer) {
|
|
self.observer = observer
|
|
}
|
|
|
|
func addChild(withLabel label: String, unitCount: UInt64) async -> OWSProgressSink {
|
|
owsAssertDebug(unitCount > 0)
|
|
self.totalDirectChildUnitCount += unitCount
|
|
let child = OWSProgressSinkNode(
|
|
label: label,
|
|
parentLabels: [],
|
|
unitCount: unitCount,
|
|
parent: self,
|
|
rootNode: self
|
|
)
|
|
self.directChildren.append(Weak(value: child))
|
|
// Tell all children (including the new one) about the new total unit count.
|
|
await updateUnitCountsOnChildren()
|
|
// Issue a progres update as the total unit count has changed.
|
|
progressDidUpdate()
|
|
return child
|
|
}
|
|
|
|
func addSource(withLabel label: String, unitCount: UInt64) async -> OWSProgressSource {
|
|
owsAssertDebug(unitCount > 0)
|
|
self.totalDirectChildUnitCount += unitCount
|
|
let source = OWSProgressSourceNode(
|
|
label: label,
|
|
parentLabels: [],
|
|
totalUnitCount: unitCount,
|
|
parent: self,
|
|
rootNode: self
|
|
)
|
|
self.directChildren.append(Weak(value: source))
|
|
// Tell all children (including the new one) about the new total unit count.
|
|
await updateUnitCountsOnChildren()
|
|
self.addSource(source)
|
|
// Issue a progres update as the total unit count has changed.
|
|
progressDidUpdate()
|
|
return source
|
|
}
|
|
|
|
fileprivate func addSource(_ source: OWSProgressSourceNode) {
|
|
allSources.append(SourceNode(node: source))
|
|
}
|
|
|
|
private func updateUnitCountsOnChildren() async {
|
|
// Touch each child so it updates its own children's multiplier.
|
|
for child in directChildren {
|
|
// Direct children of the root have a multiplier of 1;
|
|
// 1 unit corresponds to 1 unit on the top-level progress.
|
|
await child.value?.updateCompletedUnitCountMultiplier(1)
|
|
}
|
|
}
|
|
|
|
fileprivate func progressDidUpdate() {
|
|
guard allSources.isEmpty.negated else {
|
|
return
|
|
}
|
|
var completedUnitCount: Float = 0
|
|
var highestPercent: Float = -1
|
|
var currentLabels: [String] = []
|
|
allSources.forEach { sourceNode in
|
|
let sourceCompletedUnitCount: Float
|
|
if let source = sourceNode.node {
|
|
sourceCompletedUnitCount =
|
|
source.completedUnitCountMultiplier
|
|
* Float(source.completedUnitCount)
|
|
let percent = Float(source.completedUnitCount) / Float(source.totalUnitCount)
|
|
if percent > highestPercent {
|
|
highestPercent = percent
|
|
currentLabels = source.labels
|
|
}
|
|
} else {
|
|
sourceCompletedUnitCount = sourceNode.lastCompletedUnitCount ?? 0
|
|
}
|
|
sourceNode.lastCompletedUnitCount = sourceCompletedUnitCount
|
|
completedUnitCount += sourceCompletedUnitCount
|
|
}
|
|
let progress = OWSProgress(
|
|
// Round up optimistically.
|
|
completedUnitCount: UInt64(ceil(completedUnitCount)),
|
|
totalUnitCount: totalDirectChildUnitCount,
|
|
currentLabels: currentLabels
|
|
)
|
|
defer { latestEmittedProgress = progress }
|
|
|
|
// Only update the observer if the units changed;
|
|
// label changes are arbitrary and shouldn't trigger updates.
|
|
var progressDidChange = false
|
|
if progress.completedUnitCount != latestEmittedProgress?.completedUnitCount {
|
|
progressDidChange = true
|
|
}
|
|
if progress.totalUnitCount != latestEmittedProgress?.totalUnitCount {
|
|
progressDidChange = true
|
|
}
|
|
if progressDidChange {
|
|
latestEmittedProgress = progress
|
|
Task { [progress] in
|
|
observer(progress)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Covers both child sinks and sources. Only the root sink is not a child node.
|
|
private protocol OWSProgressChildNode {
|
|
var totalUnitCount: UInt64 { get }
|
|
|
|
/// This is all implementation details (this protocol is fileprivate) but
|
|
/// read on if you want the nitty-gritty.
|
|
///
|
|
/// This is confusing and happens in reverse to common sense.
|
|
/// Say we have the following tree (unit counts in parens):
|
|
/// ```
|
|
/// root
|
|
/// __________|_____________
|
|
/// | |
|
|
/// source 1 (50) child sink A (50)
|
|
/// ___________|__________
|
|
/// | |
|
|
/// source 2 (10) child sink B (10)
|
|
/// ___________|__________
|
|
/// | |
|
|
/// source 3 (100) source 4 (300)
|
|
/// ```
|
|
/// What should the root complete unit count be if all souces' counts are 0,
|
|
/// except source 4 which has progress of 200?
|
|
/// Answer: 13 units.
|
|
/// How do we get there? Source 4 has 200 units, which is half the total
|
|
/// units across the children of sink B (100 + 300 = 400). So it is "worth" half
|
|
/// of B's units, or 5 units. B's siblings have a total unit count of 20, 5/20 = 25%
|
|
/// so it is "worth" 25% of A's units, or 12.5 units, which gets rounded up to 13.
|
|
///
|
|
/// We could do all these calculations at read time (addind up sibling unit counts and dividing by them),
|
|
/// but we want progress updates to be FAST. So we instead calculate a "multiplier" up front, at
|
|
/// the time we add children, so that we can quickly normalize sources' units at read time.
|
|
/// The multiplier at each level is:
|
|
/// `[parent's multiplier] * ([parent unit count] ÷ [total count across siblings])`
|
|
///
|
|
/// In this example, source 4's multiplier would be 0.0625 (`10 ÷ (100 + 300) * (50 ÷ (10 + 10))`)
|
|
/// Other multiplers:
|
|
/// source 1 & child A: 1 (root)
|
|
/// source 2 & child B: 2.5 (`50 ÷ (10 + 10)`)
|
|
/// source 3: (same as source 4).
|
|
func updateCompletedUnitCountMultiplier(_ newValue: Float) async
|
|
}
|
|
|
|
/// A sink that is itself a child to another sink.
|
|
private actor OWSProgressSinkNode: OWSProgressSink, OWSProgressChildNode {
|
|
|
|
/// The chain of labels starting at the root (no label) and ending in this node's label.
|
|
fileprivate nonisolated let labels: [String]
|
|
/// The unit count of this node. Note that child sinks don't have completedUnitCounts
|
|
/// of their own; instead its children determine the unit count as proportion of this total.
|
|
fileprivate nonisolated let totalUnitCount: UInt64
|
|
|
|
/// See ``OWSProgressChildNode/updateCompletedUnitCountMultiplier``.
|
|
/// This gets set immediately after initialization before it can possibly be read.
|
|
fileprivate var completedUnitCountMultiplier: Float = 1
|
|
|
|
private var totalDirectChildUnitCount: UInt64 = 0
|
|
private var directChildren = [Weak<OWSProgressChildNode>]()
|
|
|
|
/// Children hold strong referenced to their parents; as long as callers
|
|
/// hold a reference to some child source (to increment its progress)
|
|
/// the whole tree above that child will be retained.
|
|
private nonisolated let parent: OWSProgressSink
|
|
/// Every node in the tree holds a strong reference to the root (and in turn its observer block).
|
|
/// The root holds only weak references to its children.
|
|
private nonisolated let rootNode: OWSProgressRootNode
|
|
|
|
fileprivate init(
|
|
label: String,
|
|
parentLabels: [String],
|
|
unitCount: UInt64,
|
|
parent: OWSProgressSink,
|
|
rootNode: OWSProgressRootNode
|
|
) {
|
|
self.labels = parentLabels + [label]
|
|
self.totalUnitCount = unitCount
|
|
self.parent = parent
|
|
self.rootNode = rootNode
|
|
}
|
|
|
|
func addChild(withLabel label: String, unitCount: UInt64) async -> OWSProgressSink {
|
|
owsAssertDebug(unitCount > 0)
|
|
self.totalDirectChildUnitCount += unitCount
|
|
let child = OWSProgressSinkNode(
|
|
label: label,
|
|
parentLabels: self.labels,
|
|
unitCount: unitCount,
|
|
parent: self,
|
|
rootNode: rootNode
|
|
)
|
|
self.directChildren.append(Weak(value: child))
|
|
// Tell all children (including the new one) about the new total unit count.
|
|
await updateUnitCountsOnChildren()
|
|
return child
|
|
}
|
|
|
|
func addSource(withLabel label: String, unitCount: UInt64) async -> OWSProgressSource {
|
|
owsAssertDebug(unitCount > 0)
|
|
self.totalDirectChildUnitCount += unitCount
|
|
let source = OWSProgressSourceNode(
|
|
label: label,
|
|
parentLabels: self.labels,
|
|
totalUnitCount: unitCount,
|
|
parent: self,
|
|
rootNode: rootNode
|
|
)
|
|
self.directChildren.append(Weak(value: source))
|
|
// Tell all children (including the new one) about the new total unit count.
|
|
await updateUnitCountsOnChildren()
|
|
// All sources at all levels talk to the root to issue observer updates.
|
|
await rootNode.addSource(source)
|
|
return source
|
|
}
|
|
|
|
/// See ``OWSProgressChildNode/updateCompletedUnitCountMultiplier``.
|
|
func updateCompletedUnitCountMultiplier(_ newValue: Float) async {
|
|
self.completedUnitCountMultiplier = newValue
|
|
// Recursively update children all the way down the tree.
|
|
await updateUnitCountsOnChildren()
|
|
}
|
|
|
|
func updateUnitCountsOnChildren() async {
|
|
// See `updateCompletedUnitCountMultiplier`.
|
|
let childCompletedUnitCountMultiplier = self.completedUnitCountMultiplier
|
|
* Float(totalUnitCount) / Float(totalDirectChildUnitCount)
|
|
for child in directChildren {
|
|
await child.value?.updateCompletedUnitCountMultiplier(
|
|
childCompletedUnitCountMultiplier
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
private class OWSProgressSourceNode: OWSProgressSource, OWSProgressChildNode {
|
|
|
|
/// The chain of labels starting at the root (no label) and ending in this node's label.
|
|
fileprivate let labels: [String]
|
|
var completedUnitCount: UInt64 = 0
|
|
let totalUnitCount: UInt64
|
|
|
|
/// See ``OWSProgressChildNode/updateCompletedUnitCountMultiplier``.
|
|
/// This gets set immediately after initialization before it can possibly be read.
|
|
fileprivate var completedUnitCountMultiplier: Float = 1
|
|
|
|
/// Children hold strong referenced to their parents; as long as callers
|
|
/// hold a reference to some child source (to increment its progress)
|
|
/// the whole tree above that child will be retained.
|
|
private let parent: OWSProgressSink
|
|
/// Every node in the tree holds a strong reference to the root (and in turn its observer block).
|
|
/// The root holds only weak references to its children.
|
|
private let rootNode: OWSProgressRootNode
|
|
|
|
init(
|
|
label: String,
|
|
parentLabels: [String],
|
|
totalUnitCount: UInt64,
|
|
parent: OWSProgressSink,
|
|
rootNode: OWSProgressRootNode
|
|
) {
|
|
self.labels = parentLabels + [label]
|
|
self.totalUnitCount = totalUnitCount
|
|
self.parent = parent
|
|
self.rootNode = rootNode
|
|
}
|
|
|
|
func incrementCompletedUnitCount(by increment: UInt64) {
|
|
owsAssertDebug(increment > 0)
|
|
completedUnitCount = min(
|
|
totalUnitCount,
|
|
completedUnitCount + increment
|
|
)
|
|
emitProgressIfNeeded()
|
|
}
|
|
|
|
/// Tracks whether an async progress update task has been scheduled
|
|
/// but not run yet; if true further calls to ``emitProgressIfNeeded``
|
|
/// will early exit.
|
|
private var dirtyBit = false
|
|
|
|
private func emitProgressIfNeeded() {
|
|
guard !dirtyBit else {
|
|
return
|
|
}
|
|
dirtyBit = true
|
|
// Retain self, so that if the caller updates progress
|
|
// to 100% then discards the reference to self, its
|
|
// still retained long enough to update observers.
|
|
Task { [self, rootNode] in
|
|
// It looks risky to write this value from an
|
|
// arbitrary task thread; but because we read
|
|
// the progress value after setting this it should
|
|
// never result in missed updates (just additional
|
|
// unecessary updates).
|
|
self.dirtyBit = false
|
|
await rootNode.progressDidUpdate()
|
|
}
|
|
}
|
|
|
|
/// See ``OWSProgressChildNode/updateCompletedUnitCountMultiplier``.
|
|
func updateCompletedUnitCountMultiplier(_ newValue: Float) {
|
|
self.completedUnitCountMultiplier = newValue
|
|
}
|
|
}
|