473 lines
19 KiB
Swift
473 lines
19 KiB
Swift
//
|
|
// Copyright 2018 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
|
|
/// Durably enqueues outgoing messages.
|
|
///
|
|
/// Calls `MessageSender` to send messages.
|
|
///
|
|
/// # Retries
|
|
///
|
|
/// Both `MessageSenderJobQueue` and `MessageSender` implement retry
|
|
/// handling.
|
|
///
|
|
/// The latter (`MessageSender`) retries only specific errors that the
|
|
/// server indicates are immediately retryable (e.g., "you are missing a
|
|
/// device for the destination; add it and try again"). These retries aren't
|
|
/// "configurable", nor do they have any backoff. They are expected when the
|
|
/// system is operating normally, and they are part of the expected flow for
|
|
/// sending a message.
|
|
///
|
|
/// The former (`MessageSenderJobQueue`) retries generic/unknown failures
|
|
/// (e.g., "the server gave us a 5xx error; try after a few seconds", "there
|
|
/// isn't any Internet; try when we reconnect"). These retries are
|
|
/// "configurable", meaning we can decide how many occur and how often they
|
|
/// occur. These only happen when something is operating abnormally (e.g.,
|
|
/// "the server is down", "the user isn't connected to the network").
|
|
///
|
|
/// Both respect `IsRetryableProvider` and only retry retryable errors.
|
|
public class MessageSenderJobQueue {
|
|
private var jobSerializer = CompletionSerializer()
|
|
|
|
public init(appReadiness: AppReadiness) {
|
|
appReadiness.runNowOrWhenAppDidBecomeReadyAsync {
|
|
self.setUp()
|
|
}
|
|
}
|
|
|
|
public func add(
|
|
message: PreparedOutgoingMessage,
|
|
limitToCurrentProcessLifetime: Bool = false,
|
|
isHighPriority: Bool = false,
|
|
transaction: SDSAnyWriteTransaction
|
|
) {
|
|
self.add(
|
|
message: message,
|
|
exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
|
|
isHighPriority: isHighPriority,
|
|
future: nil,
|
|
transaction: transaction
|
|
)
|
|
}
|
|
|
|
public func add(
|
|
_ namespace: PromiseNamespace,
|
|
message: PreparedOutgoingMessage,
|
|
limitToCurrentProcessLifetime: Bool = false,
|
|
isHighPriority: Bool = false,
|
|
transaction: SDSAnyWriteTransaction
|
|
) -> Promise<Void> {
|
|
return Promise { future in
|
|
self.add(
|
|
message: message,
|
|
exclusiveToCurrentProcessIdentifier: limitToCurrentProcessLifetime,
|
|
isHighPriority: isHighPriority,
|
|
future: future,
|
|
transaction: transaction
|
|
)
|
|
}
|
|
}
|
|
|
|
private func add(
|
|
message: PreparedOutgoingMessage,
|
|
exclusiveToCurrentProcessIdentifier: Bool,
|
|
isHighPriority: Bool,
|
|
future: Future<Void>?,
|
|
transaction: SDSAnyWriteTransaction
|
|
) {
|
|
// Mark as sending now so the UI updates immediately.
|
|
message.updateAllUnsentRecipientsAsSending(tx: transaction)
|
|
let jobRecord: MessageSenderJobRecord
|
|
do {
|
|
jobRecord = try message.asMessageSenderJobRecord(isHighPriority: isHighPriority, tx: transaction)
|
|
} catch {
|
|
message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: transaction)
|
|
future?.reject(error)
|
|
return
|
|
}
|
|
owsAssertDebug(jobRecord.status == .ready)
|
|
if exclusiveToCurrentProcessIdentifier {
|
|
// Nothing to do. Just don't insert it into the database.
|
|
} else {
|
|
jobRecord.anyInsert(transaction: transaction)
|
|
}
|
|
|
|
self.state.update {
|
|
$0.pendingJobs.append(Job(record: jobRecord, isInMemoryOnly: exclusiveToCurrentProcessIdentifier))
|
|
if let future {
|
|
$0.jobFutures[jobRecord.uniqueId] = future
|
|
}
|
|
}
|
|
|
|
transaction.addTransactionFinalizationBlock(forKey: "\(#fileID):\(#line)") { _ in
|
|
self.startPendingJobRecordsIfPossible()
|
|
}
|
|
}
|
|
|
|
// MARK: JobQueue
|
|
|
|
/// A job that needs to be executed.
|
|
private struct Job {
|
|
let record: MessageSenderJobRecord
|
|
let isInMemoryOnly: Bool
|
|
}
|
|
|
|
/// A job that's been queued but hasn't started yet.
|
|
private struct QueuedOperationState {
|
|
let job: Job
|
|
let message: PreparedOutgoingMessage
|
|
let future: Future<Void>?
|
|
}
|
|
|
|
/// A job that's actively executing; it may be suspended due to errors.
|
|
private struct ActiveOperationState {
|
|
let job: Job
|
|
let message: PreparedOutgoingMessage
|
|
let future: Future<Void>?
|
|
let externalRetryTriggerState = AtomicValue(ExternalRetryTriggerState(), lock: .init())
|
|
|
|
init(queuedOperation: QueuedOperationState) {
|
|
self.job = queuedOperation.job
|
|
self.message = queuedOperation.message
|
|
self.future = queuedOperation.future
|
|
}
|
|
|
|
/// "Consume" any triggers that have already fired.
|
|
///
|
|
/// Callers should do this before performing any retryable action that might
|
|
/// fail due to one of the triggers. For example, if a message might fail to
|
|
/// send because there's no Internet, this should be called before
|
|
/// attempting to send the message.
|
|
///
|
|
/// This pattern ensures no triggers are missed due to concurrently
|
|
/// executing operations & triggers. For example, if Internet isn't
|
|
/// available, you start sending a message, Internet becomes available, and
|
|
/// then the message fails to send with a "network failure" error, we want
|
|
/// to immediately retry. If we don't retry, we'd be stuck until something
|
|
/// *else* triggers a retry (e.g., losing & gaining Internet again).
|
|
func clearExternalRetryTriggers() {
|
|
self.externalRetryTriggerState.update {
|
|
$0.reportedExternalRetryTriggers = []
|
|
}
|
|
}
|
|
|
|
/// Trigger any jobs that failed because of `failureReason`.
|
|
///
|
|
/// This also triggers any in-progress jobs (after they fail) that fail
|
|
/// because of `failureReason`. This avoids race conditions (see above).
|
|
func reportExternalRetryTrigger(_ externalRetryTrigger: ExternalRetryTriggers) {
|
|
self.externalRetryTriggerState.update {
|
|
$0.reportedExternalRetryTriggers.formUnion(externalRetryTrigger)
|
|
notifyIfPossible(mutableState: &$0)
|
|
}
|
|
}
|
|
|
|
/// Waits until any of `failureReasons` has been triggered.
|
|
func waitForAnyExternalRetryTrigger(fromExternalRetryTriggers externalRetryTriggers: ExternalRetryTriggers) async throws {
|
|
let waitingContinuation = CancellableContinuation<Void>()
|
|
self.externalRetryTriggerState.update {
|
|
$0.waitingState = (waitingContinuation, externalRetryTriggers)
|
|
notifyIfPossible(mutableState: &$0)
|
|
}
|
|
return try await waitingContinuation.wait()
|
|
}
|
|
|
|
private func notifyIfPossible(mutableState: inout ExternalRetryTriggerState) {
|
|
guard let waitingState = mutableState.waitingState else {
|
|
return
|
|
}
|
|
if mutableState.reportedExternalRetryTriggers.isDisjoint(with: waitingState.externalRetryTriggers) {
|
|
return
|
|
}
|
|
waitingState.continuation.resume(with: .success(()))
|
|
}
|
|
}
|
|
|
|
/// Tracks information about failures with external retry triggers.
|
|
private struct ExternalRetryTriggerState {
|
|
var reportedExternalRetryTriggers: ExternalRetryTriggers = []
|
|
var waitingState: (continuation: CancellableContinuation<Void>, externalRetryTriggers: ExternalRetryTriggers)?
|
|
}
|
|
|
|
/// Tracks failure types with external retry triggers.
|
|
///
|
|
/// For example, a "network failure" error can be triggered before its
|
|
/// timer-based retry interval if Internet suddenly becomes available.
|
|
/// Conversely, 5xx errors are transient but can only be retried when their
|
|
/// timer-based retry fires, so they're not included here.
|
|
private struct ExternalRetryTriggers: OptionSet {
|
|
let rawValue: Int
|
|
|
|
static let networkBecameReachable = ExternalRetryTriggers(rawValue: 1 << 0)
|
|
}
|
|
|
|
private enum JobPriority: Hashable {
|
|
case high
|
|
case renderableContent
|
|
case low
|
|
}
|
|
|
|
private struct State {
|
|
var isLoaded = false
|
|
var pendingJobs = [Job]()
|
|
var queueStates = [QueueKey: QueueState]()
|
|
var jobFutures = [String: Future<Void>]()
|
|
}
|
|
|
|
private struct QueueKey: Hashable {
|
|
let threadId: String?
|
|
let priority: JobPriority
|
|
}
|
|
|
|
private struct QueueState {
|
|
var activeOperations = [ActiveOperationState]()
|
|
var queuedOperations = [QueuedOperationState]()
|
|
|
|
var isEmpty: Bool {
|
|
return activeOperations.isEmpty && queuedOperations.isEmpty
|
|
}
|
|
|
|
var hasExactlyOneActiveOperationThatUsesTheMediaQueue: Bool {
|
|
return activeOperations.count == 1 && activeOperations[0].job.record.useMediaQueue
|
|
}
|
|
}
|
|
|
|
private let state = AtomicValue<State>(State(), lock: .init())
|
|
|
|
private func didMarkAsReady(oldJobRecord: MessageSenderJobRecord, transaction: SDSAnyWriteTransaction) {
|
|
// TODO: Remove this method and status swapping logic entirely.
|
|
let uniqueId: String
|
|
switch oldJobRecord.messageType {
|
|
case .persisted(let messageId, _):
|
|
uniqueId = messageId
|
|
case .editMessage(let editedMessageId, _, _):
|
|
uniqueId = editedMessageId
|
|
case .transient, .none:
|
|
return
|
|
}
|
|
|
|
TSOutgoingMessage
|
|
.anyFetch(
|
|
uniqueId: uniqueId,
|
|
transaction: transaction
|
|
)
|
|
.flatMap { $0 as? TSOutgoingMessage }?
|
|
.updateAllUnsentRecipientsAsSending(transaction: transaction)
|
|
}
|
|
|
|
private let pendingJobQueue = DispatchQueue(label: "MessageSenderJobQueue.pendingJobRecords")
|
|
|
|
private func startPendingJobRecordsIfPossible() {
|
|
// Use a queue to ensure "pendingJobs" get passed to queueJob in the correct order.
|
|
pendingJobQueue.async {
|
|
let pendingJobs = self.state.update {
|
|
if $0.isLoaded {
|
|
let result = $0.pendingJobs
|
|
$0.pendingJobs = []
|
|
return result
|
|
}
|
|
return []
|
|
}
|
|
if !pendingJobs.isEmpty {
|
|
SSKEnvironment.shared.databaseStorageRef.write { tx in
|
|
for pendingJob in pendingJobs {
|
|
self.queueJob(pendingJob, tx: tx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func queueJob(_ job: Job, tx transaction: SDSAnyWriteTransaction) {
|
|
let future = self.state.update { $0.jobFutures.removeValue(forKey: job.record.uniqueId) }
|
|
|
|
guard let message = PreparedOutgoingMessage.restore(from: job.record, tx: transaction) else {
|
|
if !job.isInMemoryOnly {
|
|
job.record.anyRemove(transaction: transaction)
|
|
}
|
|
future?.reject(OWSAssertionError("Can't start job that can't be prepared."))
|
|
return
|
|
}
|
|
|
|
let sendPriority: JobPriority
|
|
if job.record.isHighPriority {
|
|
sendPriority = .high
|
|
} else if message.hasRenderableContent(tx: transaction) {
|
|
sendPriority = .renderableContent
|
|
} else {
|
|
sendPriority = .low
|
|
}
|
|
|
|
let operation = QueuedOperationState(
|
|
job: job,
|
|
message: message,
|
|
future: future
|
|
)
|
|
|
|
let queueKey = QueueKey(threadId: job.record.threadId, priority: sendPriority)
|
|
self.jobSerializer.addOrderedSyncCompletion(tx: transaction.asV2Write) {
|
|
self.state.update {
|
|
$0.queueStates[queueKey, default: QueueState()].queuedOperations.append(operation)
|
|
}
|
|
self.startNextJobIfNeeded(queueKey: queueKey)
|
|
}
|
|
}
|
|
|
|
public func setUp() {
|
|
let jobRecordFinder = JobRecordFinderImpl<MessageSenderJobRecord>(db: DependenciesBridge.shared.db)
|
|
Task {
|
|
if CurrentAppContext().isMainApp {
|
|
do {
|
|
let jobRecords = try await jobRecordFinder.loadRunnableJobs(updateRunnableJobRecord: { jobRecord, tx in
|
|
self.didMarkAsReady(oldJobRecord: jobRecord, transaction: SDSDB.shimOnlyBridge(tx))
|
|
})
|
|
let jobRecordUniqueIds = Set(jobRecords.lazy.map(\.uniqueId))
|
|
self.state.update {
|
|
var newlyPendingJobs = $0.pendingJobs
|
|
newlyPendingJobs.removeAll(where: { jobRecordUniqueIds.contains($0.record.uniqueId) })
|
|
$0.pendingJobs = jobRecords.map { Job(record: $0, isInMemoryOnly: false) }
|
|
$0.pendingJobs.append(contentsOf: newlyPendingJobs)
|
|
}
|
|
} catch {
|
|
owsFailDebug("Couldn't load existing message send jobs: \(error)")
|
|
}
|
|
}
|
|
|
|
// FIXME: The returned observer token is never unregistered.
|
|
// In practice all our JobQueues live forever, so this isn't a problem.
|
|
// We use "unowned" so that don't silently fail (or leak) when this changes.
|
|
NotificationCenter.default.addObserver(
|
|
forName: SSKReachability.owsReachabilityDidChange,
|
|
object: nil,
|
|
queue: nil
|
|
) { [unowned self] _ in
|
|
if SSKEnvironment.shared.reachabilityManagerRef.isReachable {
|
|
self.becameReachable()
|
|
}
|
|
}
|
|
|
|
// No matter what, mark it as loaded. This keeps things semi-functional.
|
|
self.state.update { $0.isLoaded = true }
|
|
startPendingJobRecordsIfPossible()
|
|
}
|
|
}
|
|
|
|
func becameReachable() {
|
|
self.state.update {
|
|
for (_, queueState) in $0.queueStates {
|
|
for activeOperation in queueState.activeOperations {
|
|
activeOperation.reportExternalRetryTrigger(.networkBecameReachable)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private func startNextJobIfNeeded(queueKey: QueueKey) {
|
|
self.state.update {
|
|
var queueState = $0.queueStates[queueKey, default: QueueState()]
|
|
|
|
// If nothing is running, start *any* operation that needs to be started.
|
|
if queueState.activeOperations.isEmpty {
|
|
if let nextIndex = queueState.queuedOperations.indices.first {
|
|
startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
|
|
}
|
|
}
|
|
|
|
// Non-media messages get an extra slot to run so that they don't get stuck
|
|
// behind media messages. If the first slot got filled by a media message,
|
|
// this one can be filled by a non-media message. If the first slot is
|
|
// filled by a non-media message, we can't schedule anything else.
|
|
|
|
// For example, if you send A, B, C, and D, where C is media and everything
|
|
// else is a text message, then only orderings ABCD and ABDC are allowed.
|
|
// This block exists to start sending "D" concurrently with "C".
|
|
if queueState.hasExactlyOneActiveOperationThatUsesTheMediaQueue {
|
|
if let nextIndex = queueState.queuedOperations.firstIndex(where: { !$0.job.record.useMediaQueue }) {
|
|
startNextJob(atQueuedIndex: nextIndex, forQueueKey: queueKey, in: &queueState)
|
|
}
|
|
}
|
|
|
|
$0.queueStates[queueKey] = queueState.isEmpty ? nil : queueState
|
|
}
|
|
}
|
|
|
|
private func startNextJob(atQueuedIndex index: Int, forQueueKey queueKey: QueueKey, in queueState: inout QueueState) {
|
|
let queuedOperation = queueState.queuedOperations.remove(at: index)
|
|
let activeOperation = ActiveOperationState(queuedOperation: queuedOperation)
|
|
queueState.activeOperations.append(activeOperation)
|
|
Task(priority: Self.taskPriority(forJobPriority: queueKey.priority)) {
|
|
await self.runOperation(activeOperation)
|
|
self.state.update {
|
|
$0.queueStates[queueKey]!.activeOperations.removeAll(where: { $0.job.record.uniqueId == activeOperation.job.record.uniqueId })
|
|
}
|
|
startNextJobIfNeeded(queueKey: queueKey)
|
|
}
|
|
}
|
|
|
|
private static func taskPriority(forJobPriority jobPriority: JobPriority) -> TaskPriority {
|
|
switch jobPriority {
|
|
case .high, .renderableContent:
|
|
return .userInitiated
|
|
case .low:
|
|
return .medium
|
|
}
|
|
}
|
|
|
|
/// Runs a job to send a particular message.
|
|
///
|
|
/// This method returns after the operation reaches a terminal result and
|
|
/// the job record has been deleted.
|
|
private func runOperation(_ operation: ActiveOperationState) async {
|
|
let result = await Result { try await self._runOperation(operation) }
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
if !operation.job.isInMemoryOnly {
|
|
operation.job.record.anyRemove(transaction: tx)
|
|
}
|
|
if case .failure(let error) = result {
|
|
operation.message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: tx)
|
|
}
|
|
}
|
|
switch result {
|
|
case .success(()):
|
|
operation.future?.resolve()
|
|
case .failure(let error):
|
|
operation.future?.reject(error)
|
|
}
|
|
}
|
|
|
|
/// Runs a job to send a particular message.
|
|
///
|
|
/// This methods returns after the operation has reached a terminal result
|
|
/// but before that result has been processed.
|
|
private func _runOperation(_ operation: ActiveOperationState) async throws {
|
|
var attemptCount = Int(operation.job.record.failureCount)
|
|
let maxRetries = 110
|
|
while true {
|
|
assert(!Task.isCancelled, "Cancellation isn't supported.")
|
|
do {
|
|
operation.clearExternalRetryTriggers()
|
|
try await SSKEnvironment.shared.messageSenderRef.sendMessage(operation.message)
|
|
return
|
|
} catch where error.isRetryable && !error.isFatalError && attemptCount < maxRetries {
|
|
attemptCount += 1
|
|
if !operation.job.isInMemoryOnly {
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
operation.job.record.addFailure(tx: tx)
|
|
}
|
|
}
|
|
var externalRetryTriggers: ExternalRetryTriggers = []
|
|
// If there's a network failure, we can retry when we reconnect.
|
|
if error.isNetworkFailureOrTimeout {
|
|
externalRetryTriggers.insert(.networkBecameReachable)
|
|
}
|
|
try? await withCooperativeTimeout(
|
|
seconds: OWSOperation.retryIntervalForExponentialBackoff(failureCount: UInt(attemptCount)),
|
|
operation: { try await operation.waitForAnyExternalRetryTrigger(fromExternalRetryTriggers: externalRetryTriggers) }
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|