TM-SGNL-iOS/SignalServiceKit/Messages/MessageProcessor.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

878 lines
36 KiB
Swift

//
// Copyright 2021 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import GRDB
import LibSignalClient
public class MessageProcessor: NSObject {
public static let messageProcessorDidDrainQueue = Notification.Name("messageProcessorDidDrainQueue")
private var hasPendingEnvelopes: Bool {
!pendingEnvelopes.isEmpty
}
/// When calling `waitForProcessingComplete` while message processing is
/// suspended, there is a problem. We may have pending messages waiting to
/// be processed once the suspension is lifted. But what's more, we may have
/// started processing messages, then suspended, then called
/// `waitForProcessingComplete` before that initial processing finished.
/// Suspending does not interrupt processing if it already started.
///
/// So there are 4 cases to worry about:
/// 1. Message processing isn't suspended
/// 2. Suspended with no pending messages
/// 3. Suspended with pending messages and no active processing underway
/// 4. Suspended but still processing from before the suspension took effect
///
/// Cases 1 and 2 are easy and behave the same in all cases.
///
/// Case 3 differs in behavior; sometimes we want to wait for suspension to
/// be lifted and those pending messages to be processed, other times we
/// don't want to wait to unsuspend.
///
/// Case 4 is once again the same in all cases; processing has started and
/// can't be stopped, so we should always wait until it finishes.
public enum SuspensionBehavior {
/// Default value. (Legacy behavior)
/// If suspended with pending messages and no processing underway, wait for
/// suspension to be lifted and those messages to be processed.
case alwaysWait
/// If suspended with pending messages, only wait if processing has already
/// started. If it hasn't started, don't wait for it to start, so that the
/// promise can resolve before suspension is lifted.
case onlyWaitIfAlreadyInProgress
}
/// - parameter suspensionBehavior: What the promise should wait for if
/// message processing is suspended; see `SuspensionBehavior` documentation
/// for details.
public func waitForProcessingComplete(
suspensionBehavior: SuspensionBehavior = .alwaysWait
) -> Guarantee<Void> {
guard CurrentAppContext().shouldProcessIncomingMessages else {
return Guarantee.value(())
}
// Check if processing is suspended; if so we need to fork behavior.
let shouldWaitForEverything: Bool
if SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted {
shouldWaitForEverything = true
} else {
switch suspensionBehavior {
case .alwaysWait:
shouldWaitForEverything = true
case .onlyWaitIfAlreadyInProgress:
shouldWaitForEverything = false
}
}
let shouldWaitForMessageProcessing: () -> Bool = {
// Check if we are already processing, if so wait for that to finish.
// If not don't wait even if we have pending messages; those won't process
// until we unsuspend.
return shouldWaitForEverything ? self.hasPendingEnvelopes : self.isDrainingPendingEnvelopes.get()
}
if shouldWaitForMessageProcessing() {
let messageProcessingPromise = NotificationCenter.default.observe(once: Self.messageProcessorDidDrainQueue)
// We must check (again) after setting up the observer in case we miss the
// notification. If you check before setting up the observer, the
// notification might fire while the thread is sleeping.
if shouldWaitForMessageProcessing() {
return messageProcessingPromise.then { _ in
// Recur, in case we've enqueued messages handled in another block.
self.waitForProcessingComplete(suspensionBehavior: suspensionBehavior)
}.asVoid()
}
}
let shouldWaitForGroupMessageProcessing: () -> Bool = {
if shouldWaitForEverything {
return SSKEnvironment.shared.databaseStorageRef.read {
SSKEnvironment.shared.groupsV2MessageProcessorRef.hasPendingJobs(tx: $0)
}
} else {
return SSKEnvironment.shared.groupsV2MessageProcessorRef.isActivelyProcessing()
}
}
if shouldWaitForGroupMessageProcessing() {
let groupMessageProcessingPromise = NotificationCenter.default.observe(once: GroupsV2MessageProcessor.didFlushGroupsV2MessageQueue)
if shouldWaitForGroupMessageProcessing() {
return groupMessageProcessingPromise.then { _ in
// Recur, in case we've enqueued messages handled in another block.
self.waitForProcessingComplete(suspensionBehavior: suspensionBehavior)
}.asVoid()
}
}
return Guarantee.value(())
}
/// Suspends message processing, but before doing so processes any messages
/// received so far.
/// This suppression will persist until the suspension is explicitly lifted.
/// For this reason calling this method is highly dangerous, please use with care.
public func waitForProcessingCompleteAndThenSuspend(
for suspension: MessagePipelineSupervisor.Suspension
) -> Guarantee<Void> {
// We need to:
// 1. wait to process
// 2. suspend
// 3. wait to process again
// This is because steps 1 and 2 are not transactional, and in between a message
// may get queued up for processing. After 2, nothing new can come in, so we only
// need to wait the once.
// In most cases nothing sneaks in between 1 and 2, so 3 resolves instantly.
return waitForProcessingComplete(suspensionBehavior: .onlyWaitIfAlreadyInProgress).then(on: DispatchQueue.main) {
SSKEnvironment.shared.messagePipelineSupervisorRef.suspendMessageProcessingWithoutHandle(for: suspension)
return self.waitForProcessingComplete(suspensionBehavior: .onlyWaitIfAlreadyInProgress)
}.recover(on: SyncScheduler()) { _ in return () }
}
public func waitForFetchingAndProcessing(
suspensionBehavior: SuspensionBehavior = .alwaysWait
) -> Guarantee<Void> {
return firstly { () -> Guarantee<Void> in
return SSKEnvironment.shared.messageFetcherJobRef.waitForFetchingComplete()
}.then { () -> Guarantee<Void> in
return self.waitForProcessingComplete(suspensionBehavior: suspensionBehavior)
}
}
private let appReadiness: AppReadiness
public init(appReadiness: AppReadiness) {
self.appReadiness = appReadiness
super.init()
SwiftSingletons.register(self)
NotificationCenter.default.addObserver(
self,
selector: #selector(registrationStateDidChange),
name: .registrationStateDidChange,
object: nil
)
appReadiness.runNowOrWhenAppDidBecomeReadySync {
SSKEnvironment.shared.messagePipelineSupervisorRef.register(pipelineStage: self)
SSKEnvironment.shared.databaseStorageRef.read { transaction in
// We may have legacy process jobs queued. We want to schedule them for
// processing immediately when we launch, so that we can drain the old queue.
let legacyProcessingJobRecords = LegacyMessageJobFinder().allJobs(transaction: transaction)
for jobRecord in legacyProcessingJobRecords {
let completion: (Error?) -> Void = { _ in
SSKEnvironment.shared.databaseStorageRef.write { jobRecord.anyRemove(transaction: $0) }
}
do {
let envelope = try SSKProtoEnvelope(serializedData: jobRecord.envelopeData)
self.processReceivedEnvelope(
ReceivedEnvelope(
envelope: envelope,
encryptionStatus: .decrypted(plaintextData: jobRecord.plaintextData, wasReceivedByUD: jobRecord.wasReceivedByUD),
serverDeliveryTimestamp: jobRecord.serverDeliveryTimestamp,
completion: completion
),
envelopeSource: .unknown
)
} catch {
completion(error)
}
}
// We may have legacy decrypt jobs queued. We want to schedule them for
// processing immediately when we launch, so that we can drain the old queue.
let legacyDecryptJobRecords: [LegacyMessageDecryptJobRecord]
do {
let jobRecordFinder = JobRecordFinderImpl<LegacyMessageDecryptJobRecord>(db: DependenciesBridge.shared.db)
legacyDecryptJobRecords = try jobRecordFinder.allRecords(
status: .ready,
transaction: transaction.asV2Read
)
} catch {
legacyDecryptJobRecords = []
Logger.error("Couldn't fetch legacy job records: \(error)")
}
for jobRecord in legacyDecryptJobRecords {
let completion: (Error?) -> Void = { _ in
SSKEnvironment.shared.databaseStorageRef.write { jobRecord.anyRemove(transaction: $0) }
}
do {
guard let envelopeData = jobRecord.envelopeData else {
throw OWSAssertionError("Skipping job with no envelope data")
}
self.processReceivedEnvelopeData(
envelopeData,
serverDeliveryTimestamp: jobRecord.serverDeliveryTimestamp,
envelopeSource: .unknown,
completion: completion
)
} catch {
completion(error)
}
}
}
}
}
public func processReceivedEnvelopeData(
_ envelopeData: Data,
serverDeliveryTimestamp: UInt64,
envelopeSource: EnvelopeSource,
completion: @escaping (Error?) -> Void
) {
guard !envelopeData.isEmpty else {
completion(OWSAssertionError("Empty envelope, envelopeSource: \(envelopeSource)."))
return
}
let protoEnvelope: SSKProtoEnvelope
do {
protoEnvelope = try SSKProtoEnvelope(serializedData: envelopeData)
} catch {
owsFailDebug("Failed to parse encrypted envelope \(error), envelopeSource: \(envelopeSource)")
completion(error)
return
}
// Drop any too-large messages on the floor. Well behaving clients should never send them.
guard (protoEnvelope.content ?? Data()).count <= Self.maxEnvelopeByteCount else {
completion(OWSAssertionError("Oversize envelope, envelopeSource: \(envelopeSource)."))
return
}
processReceivedEnvelope(
ReceivedEnvelope(
envelope: protoEnvelope,
encryptionStatus: .encrypted,
serverDeliveryTimestamp: serverDeliveryTimestamp,
completion: completion
),
envelopeSource: envelopeSource
)
}
public func processReceivedEnvelope(
_ envelopeProto: SSKProtoEnvelope,
serverDeliveryTimestamp: UInt64,
envelopeSource: EnvelopeSource,
completion: @escaping (Error?) -> Void
) {
processReceivedEnvelope(
ReceivedEnvelope(
envelope: envelopeProto,
encryptionStatus: .encrypted,
serverDeliveryTimestamp: serverDeliveryTimestamp,
completion: completion
),
envelopeSource: envelopeSource
)
}
private func processReceivedEnvelope(_ receivedEnvelope: ReceivedEnvelope, envelopeSource: EnvelopeSource) {
let replacedEnvelope = pendingEnvelopes.enqueue(receivedEnvelope)
if let replacedEnvelope {
Logger.warn("Replaced \(replacedEnvelope.envelope.timestamp) serverGuid: \(replacedEnvelope.envelope.serverGuid as Optional)")
replacedEnvelope.completion(MessageProcessingError.replacedEnvelope)
}
drainPendingEnvelopes()
}
private static let maxEnvelopeByteCount = 256 * 1024
private let serialQueue = DispatchQueue(
label: "org.signal.message-processor",
autoreleaseFrequency: .workItem
)
private var pendingEnvelopes = PendingEnvelopes()
private let isDrainingPendingEnvelopes = AtomicBool(false, lock: .init())
private func drainPendingEnvelopes() {
guard CurrentAppContext().shouldProcessIncomingMessages else { return }
guard DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else { return }
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else { return }
serialQueue.async {
self.isDrainingPendingEnvelopes.set(true)
while autoreleasepool(invoking: { self.drainNextBatch() }) {}
self.isDrainingPendingEnvelopes.set(false)
if self.pendingEnvelopes.isEmpty {
NotificationCenter.default.postNotificationNameAsync(Self.messageProcessorDidDrainQueue, object: nil)
}
}
}
/// Returns whether or not to continue draining the queue.
private func drainNextBatch() -> Bool {
assertOnQueue(serialQueue)
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
return false
}
// We want a value that is just high enough to yield perf benefits.
let kIncomingMessageBatchSize = 16
// If the app is in the background, use batch size of 1.
// This reduces the risk of us never being able to drain any
// messages from the queue. We should fine tune this number
// to yield the best perf we can get.
let batchSize = CurrentAppContext().isInBackground() ? 1 : kIncomingMessageBatchSize
let batch = pendingEnvelopes.nextBatch(batchSize: batchSize)
let batchEnvelopes = batch.batchEnvelopes
let pendingEnvelopesCount = batch.pendingEnvelopesCount
guard !batchEnvelopes.isEmpty else {
return false
}
let startTime = CACurrentMediaTime()
var processedEnvelopesCount = 0
SSKEnvironment.shared.databaseStorageRef.write { tx in
// This is only called via `drainPendingEnvelopes`, and that confirms that
// we're registered. If we're registered, we must have `LocalIdentifiers`,
// so this (generally) shouldn't fail.
guard let localIdentifiers = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx.asV2Read) else {
return
}
let localDeviceId = DependenciesBridge.shared.tsAccountManager.storedDeviceId(tx: tx.asV2Read)
var remainingEnvelopes = batchEnvelopes
while !remainingEnvelopes.isEmpty {
guard SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted else {
break
}
autoreleasepool {
// If we build a request, we must handle it to ensure it's not lost if we
// stop processing envelopes.
let combinedRequest = buildNextCombinedRequest(
envelopes: &remainingEnvelopes,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx
)
handle(
combinedRequest: combinedRequest,
localIdentifiers: localIdentifiers,
transaction: tx
)
}
}
processedEnvelopesCount += batchEnvelopes.count - remainingEnvelopes.count
}
pendingEnvelopes.removeProcessedEnvelopes(processedEnvelopesCount)
let endTime = CACurrentMediaTime()
let formattedDuration = String(format: "%.1f", (endTime - startTime) * 1000)
Logger.info("Processed \(processedEnvelopesCount) envelopes (of \(pendingEnvelopesCount) total) in \(formattedDuration)ms")
return true
}
// If envelopes is not empty, this will emit a single request for a non-delivery receipt or one or more requests
// all for delivery receipts.
private func buildNextCombinedRequest(
envelopes: inout [ReceivedEnvelope],
localIdentifiers: LocalIdentifiers,
localDeviceId: UInt32,
tx: SDSAnyWriteTransaction
) -> RelatedProcessingRequests {
let result = RelatedProcessingRequests()
while let envelope = envelopes.first {
envelopes.removeFirst()
let request = processingRequest(
for: envelope,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx
)
result.add(request)
if request.deliveryReceiptMessageTimestamps == nil {
// If we hit a non-delivery receipt envelope, handle it immediately to avoid
// keeping potentially large decrypted envelopes in memory.
break
}
}
return result
}
private func handle(combinedRequest: RelatedProcessingRequests, localIdentifiers: LocalIdentifiers, transaction: SDSAnyWriteTransaction) {
// Efficiently handle delivery receipts for the same message by fetching the sent message only
// once and only using one updateWith... to update the message with new recipient state.
BatchingDeliveryReceiptContext.withDeferredUpdates(transaction: transaction) { context in
for request in combinedRequest.processingRequests {
handleProcessingRequest(request, context: context, localIdentifiers: localIdentifiers, tx: transaction)
}
}
}
private func reallyHandleProcessingRequest(
_ request: ProcessingRequest,
context: DeliveryReceiptContext,
localIdentifiers: LocalIdentifiers,
transaction: SDSAnyWriteTransaction
) -> Error? {
switch request.state {
case .completed(error: let error):
Logger.info("Envelope completed early with error \(String(describing: error))")
return error
case .enqueueForGroup(let decryptedEnvelope, let envelopeData):
SSKEnvironment.shared.groupsV2MessageProcessorRef.enqueue(
envelopeData: envelopeData,
plaintextData: decryptedEnvelope.plaintextData,
wasReceivedByUD: decryptedEnvelope.wasReceivedByUD,
serverDeliveryTimestamp: request.receivedEnvelope.serverDeliveryTimestamp,
tx: transaction
)
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
return nil
case .messageReceiverRequest(let messageReceiverRequest):
SSKEnvironment.shared.messageReceiverRef.handleRequest(messageReceiverRequest, context: context, localIdentifiers: localIdentifiers, tx: transaction)
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(messageReceiverRequest.decryptedEnvelope, tx: transaction)
return nil
case .clearPlaceholdersOnly(let decryptedEnvelope):
SSKEnvironment.shared.messageReceiverRef.finishProcessingEnvelope(decryptedEnvelope, tx: transaction)
return nil
case .serverReceipt(let serverReceiptEnvelope):
SSKEnvironment.shared.messageReceiverRef.handleDeliveryReceipt(envelope: serverReceiptEnvelope, context: context, tx: transaction)
return nil
}
}
private func handleProcessingRequest(
_ request: ProcessingRequest,
context: DeliveryReceiptContext,
localIdentifiers: LocalIdentifiers,
tx: SDSAnyWriteTransaction
) {
let error = reallyHandleProcessingRequest(request, context: context, localIdentifiers: localIdentifiers, transaction: tx)
tx.addSyncCompletion { request.receivedEnvelope.completion(error) }
}
@objc
private func registrationStateDidChange() {
appReadiness.runNowOrWhenAppDidBecomeReadySync {
self.drainPendingEnvelopes()
}
}
public enum MessageAckBehavior {
case shouldAck
case shouldNotAck(error: Error)
}
public static func handleMessageProcessingOutcome(error: Error?) -> MessageAckBehavior {
guard let error = error else {
// Success.
return .shouldAck
}
if case MessageProcessingError.replacedEnvelope = error {
// _DO NOT_ ACK if de-duplicated before decryption.
return .shouldNotAck(error: error)
} else if case MessageProcessingError.blockedSender = error {
return .shouldAck
} else if let owsError = error as? OWSError,
owsError.errorCode == OWSErrorCode.failedToDecryptDuplicateMessage.rawValue {
// _DO_ ACK if de-duplicated during decryption.
return .shouldAck
} else {
Logger.warn("Failed to process message: \(error)")
// This should only happen for malformed envelopes. We may eventually
// want to show an error in this case.
return .shouldAck
}
}
}
private struct ProcessingRequest {
enum State {
case completed(error: Error?)
case enqueueForGroup(decryptedEnvelope: DecryptedIncomingEnvelope, envelopeData: Data)
case messageReceiverRequest(MessageReceiverRequest)
case serverReceipt(ServerReceiptEnvelope)
// Message decrypted but had an invalid protobuf.
case clearPlaceholdersOnly(DecryptedIncomingEnvelope)
}
let receivedEnvelope: ReceivedEnvelope
let state: State
// If this request is for a delivery receipt, return the timestamps for the sent-messages it
// corresponds to.
var deliveryReceiptMessageTimestamps: [UInt64]? {
switch state {
case .completed, .enqueueForGroup, .clearPlaceholdersOnly:
return nil
case .serverReceipt(let envelope):
return [envelope.validatedEnvelope.timestamp]
case .messageReceiverRequest(let request):
guard
case .receiptMessage = request.messageType,
let receiptMessage = request.protoContent.receiptMessage,
receiptMessage.type == .delivery
else {
return nil
}
return receiptMessage.timestamp
}
}
init(_ receivedEnvelope: ReceivedEnvelope, state: State) {
self.receivedEnvelope = receivedEnvelope
self.state = state
}
}
private class RelatedProcessingRequests {
private(set) var processingRequests = [ProcessingRequest]()
func add(_ processingRequest: ProcessingRequest) {
processingRequests.append(processingRequest)
}
}
private struct ProcessingRequestBuilder {
let receivedEnvelope: ReceivedEnvelope
let blockingManager: BlockingManager
let localDeviceId: UInt32
let localIdentifiers: LocalIdentifiers
let messageDecrypter: OWSMessageDecrypter
let messageReceiver: MessageReceiver
init(
_ receivedEnvelope: ReceivedEnvelope,
blockingManager: BlockingManager,
localDeviceId: UInt32,
localIdentifiers: LocalIdentifiers,
messageDecrypter: OWSMessageDecrypter,
messageReceiver: MessageReceiver
) {
self.receivedEnvelope = receivedEnvelope
self.blockingManager = blockingManager
self.localDeviceId = localDeviceId
self.localIdentifiers = localIdentifiers
self.messageDecrypter = messageDecrypter
self.messageReceiver = messageReceiver
}
func build(tx: SDSAnyWriteTransaction) -> ProcessingRequest.State {
do {
let decryptionResult = try receivedEnvelope.decryptIfNeeded(
messageDecrypter: messageDecrypter,
localIdentifiers: localIdentifiers,
localDeviceId: localDeviceId,
tx: tx
)
switch decryptionResult {
case .serverReceipt(let receiptEnvelope):
return .serverReceipt(receiptEnvelope)
case .decryptedMessage(let decryptedEnvelope):
return processingRequest(for: decryptedEnvelope, tx: tx)
}
} catch {
return .completed(error: error)
}
}
private enum ProcessingStep {
case discard
case enqueueForGroupProcessing
case processNow(shouldDiscardVisibleMessages: Bool)
}
private func processingStep(
for decryptedEnvelope: DecryptedIncomingEnvelope,
tx: SDSAnyWriteTransaction
) -> ProcessingStep {
guard
let contentProto = decryptedEnvelope.content,
let groupContextV2 = GroupsV2MessageProcessor.groupContextV2(from: contentProto)
else {
// Non-v2-group messages can be processed immediately.
return .processNow(shouldDiscardVisibleMessages: false)
}
guard GroupsV2MessageProcessor.canContextBeProcessedImmediately(
groupContext: groupContextV2,
tx: tx
) else {
// Some v2 group messages required group state to be
// updated before they can be processed.
return .enqueueForGroupProcessing
}
let discardMode = GroupsMessageProcessor.discardMode(
forMessageFrom: decryptedEnvelope.sourceAci,
groupContext: groupContextV2,
tx: tx
)
switch discardMode {
case .discard:
// Some v2 group messages should be discarded and not processed.
return .discard
case .doNotDiscard:
return .processNow(shouldDiscardVisibleMessages: false)
case .discardVisibleMessages:
// Some v2 group messages should be processed, but discarding any "visible"
// messages, e.g. text messages or calls.
return .processNow(shouldDiscardVisibleMessages: true)
}
}
private func processingRequest(
for decryptedEnvelope: DecryptedIncomingEnvelope,
tx: SDSAnyWriteTransaction
) -> ProcessingRequest.State {
owsPrecondition(CurrentAppContext().shouldProcessIncomingMessages)
// Pre-processing has to happen during the same transaction that performed
// decryption.
messageReceiver.preprocessEnvelope(decryptedEnvelope, tx: tx)
// If the sender is in the block list, we can skip scheduling any additional processing.
let sourceAddress = SignalServiceAddress(decryptedEnvelope.sourceAci)
if blockingManager.isAddressBlocked(sourceAddress, transaction: tx) {
Logger.info("Skipping processing for blocked envelope from \(decryptedEnvelope.sourceAci)")
return .completed(error: MessageProcessingError.blockedSender)
}
if decryptedEnvelope.localIdentity == .pni {
let identityManager = DependenciesBridge.shared.identityManager
identityManager.setShouldSharePhoneNumber(with: decryptedEnvelope.sourceAci, tx: tx.asV2Write)
}
switch processingStep(for: decryptedEnvelope, tx: tx) {
case .discard:
// Do nothing.
return .completed(error: nil)
case .enqueueForGroupProcessing:
// If we can't process the message immediately, we enqueue it for
// for processing in the same transaction within which it was decrypted
// to prevent data loss.
let envelopeData: Data
do {
envelopeData = try decryptedEnvelope.envelope.serializedData()
} catch {
owsFailDebug("failed to reserialize envelope: \(error)")
return .completed(error: error)
}
return .enqueueForGroup(decryptedEnvelope: decryptedEnvelope, envelopeData: envelopeData)
case .processNow(let shouldDiscardVisibleMessages):
// Envelopes can be processed immediately if they're:
// 1. Not a GV2 message.
// 2. A GV2 message that doesn't require updating the group.
//
// The advantage to processing the message immediately is that we can full
// process the message in the same transaction that we used to decrypt it.
// This results in a significant perf benefit verse queueing the message
// and waiting for that queue to open new transactions and process
// messages. The downside is that if we *fail* to process this message
// (e.g. the app crashed or was killed), we'll have to re-decrypt again
// before we process. This is safe since the decrypt operation would also
// be rolled back (since the transaction didn't commit) and should be rare.
messageReceiver.checkForUnknownLinkedDevice(in: decryptedEnvelope, tx: tx)
let buildResult = MessageReceiverRequest.buildRequest(
for: decryptedEnvelope,
serverDeliveryTimestamp: receivedEnvelope.serverDeliveryTimestamp,
shouldDiscardVisibleMessages: shouldDiscardVisibleMessages,
tx: tx
)
switch buildResult {
case .discard:
return .completed(error: nil)
case .noContent:
return .clearPlaceholdersOnly(decryptedEnvelope)
case .request(let messageReceiverRequest):
return .messageReceiverRequest(messageReceiverRequest)
}
}
}
}
private extension MessageProcessor {
func processingRequest(
for envelope: ReceivedEnvelope,
localIdentifiers: LocalIdentifiers,
localDeviceId: UInt32,
tx: SDSAnyWriteTransaction
) -> ProcessingRequest {
assertOnQueue(serialQueue)
let builder = ProcessingRequestBuilder(
envelope,
blockingManager: SSKEnvironment.shared.blockingManagerRef,
localDeviceId: localDeviceId,
localIdentifiers: localIdentifiers,
messageDecrypter: SSKEnvironment.shared.messageDecrypterRef,
messageReceiver: SSKEnvironment.shared.messageReceiverRef
)
return ProcessingRequest(envelope, state: builder.build(tx: tx))
}
}
// MARK: -
extension MessageProcessor: MessageProcessingPipelineStage {
public func supervisorDidResumeMessageProcessing(_ supervisor: MessagePipelineSupervisor) {
drainPendingEnvelopes()
}
}
// MARK: -
private struct ReceivedEnvelope {
enum EncryptionStatus {
case encrypted
/// Kept for historical purposes -- unused by new clients.
case decrypted(plaintextData: Data?, wasReceivedByUD: Bool)
}
let envelope: SSKProtoEnvelope
let encryptionStatus: EncryptionStatus
let serverDeliveryTimestamp: UInt64
let completion: (Error?) -> Void
enum DecryptionResult {
case serverReceipt(ServerReceiptEnvelope)
case decryptedMessage(DecryptedIncomingEnvelope)
}
func decryptIfNeeded(
messageDecrypter: OWSMessageDecrypter,
localIdentifiers: LocalIdentifiers,
localDeviceId: UInt32,
tx: SDSAnyWriteTransaction
) throws -> DecryptionResult {
// Figure out what type of envelope we're dealing with.
let validatedEnvelope = try ValidatedIncomingEnvelope(envelope, localIdentifiers: localIdentifiers)
switch encryptionStatus {
case .encrypted:
switch validatedEnvelope.kind {
case .serverReceipt:
return .serverReceipt(try ServerReceiptEnvelope(validatedEnvelope))
case .identifiedSender(let cipherType):
return .decryptedMessage(
try messageDecrypter.decryptIdentifiedEnvelope(
validatedEnvelope, cipherType: cipherType, localIdentifiers: localIdentifiers, tx: tx
)
)
case .unidentifiedSender:
return .decryptedMessage(
try messageDecrypter.decryptUnidentifiedSenderEnvelope(
validatedEnvelope, localIdentifiers: localIdentifiers, localDeviceId: localDeviceId, tx: tx
)
)
}
case .decrypted(let plaintextData, let wasReceivedByUD):
switch validatedEnvelope.kind {
case .serverReceipt:
return .serverReceipt(try ServerReceiptEnvelope(validatedEnvelope))
case .identifiedSender, .unidentifiedSender:
// In this flow, we've already decrypted the sender and added them to our
// local copy of the envelope. So we can grab the source from the envelope
// in both cases.
let (sourceAci, sourceDeviceId) = try validatedEnvelope.validateSource(Aci.self)
guard let plaintextData else {
throw OWSAssertionError("Missing plaintextData for previously-encrypted message.")
}
return .decryptedMessage(DecryptedIncomingEnvelope(
validatedEnvelope: validatedEnvelope,
updatedEnvelope: envelope,
sourceAci: sourceAci,
sourceDeviceId: sourceDeviceId,
wasReceivedByUD: wasReceivedByUD,
plaintextData: plaintextData
))
}
}
}
func isDuplicateOf(_ other: ReceivedEnvelope) -> Bool {
guard let serverGuid = self.envelope.serverGuid else {
owsFailDebug("Missing serverGuid.")
return false
}
guard let otherServerGuid = other.envelope.serverGuid else {
owsFailDebug("Missing other.serverGuid.")
return false
}
return serverGuid == otherServerGuid
}
}
// MARK: -
public enum EnvelopeSource {
case unknown
case websocketIdentified
case websocketUnidentified
case rest
// We re-decrypt incoming messages after accepting a safety number change.
case identityChangeError
case debugUI
case tests
}
// MARK: -
private class PendingEnvelopes {
private let unfairLock = UnfairLock()
private var pendingEnvelopes = [ReceivedEnvelope]()
var isEmpty: Bool {
unfairLock.withLock { pendingEnvelopes.isEmpty }
}
var count: Int {
unfairLock.withLock { pendingEnvelopes.count }
}
struct Batch {
let batchEnvelopes: [ReceivedEnvelope]
let pendingEnvelopesCount: Int
}
func nextBatch(batchSize: Int) -> Batch {
unfairLock.withLock {
Batch(
batchEnvelopes: Array(pendingEnvelopes.prefix(batchSize)),
pendingEnvelopesCount: pendingEnvelopes.count
)
}
}
func removeProcessedEnvelopes(_ processedEnvelopesCount: Int) {
unfairLock.withLock {
pendingEnvelopes.removeFirst(processedEnvelopesCount)
}
}
func enqueue(_ receivedEnvelope: ReceivedEnvelope) -> ReceivedEnvelope? {
return unfairLock.withLock { () -> ReceivedEnvelope? in
if let indexToReplace = pendingEnvelopes.firstIndex(where: { receivedEnvelope.isDuplicateOf($0) }) {
let replacedEnvelope = pendingEnvelopes[indexToReplace]
pendingEnvelopes[indexToReplace] = receivedEnvelope
return replacedEnvelope
} else {
pendingEnvelopes.append(receivedEnvelope)
return nil
}
}
}
}
// MARK: -
public enum MessageProcessingError: Error {
case wrongDestinationUuid
case invalidMessageTypeForDestinationUuid
case replacedEnvelope
case blockedSender
}