TM-SGNL-iOS/SignalServiceKit/Messages/MessageSender.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

1833 lines
78 KiB
Swift

//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
import LibSignalClient
// MARK: - Message "isXYZ" properties
private extension TSOutgoingMessage {
var isTransientSKDM: Bool {
(self as? OWSOutgoingSenderKeyDistributionMessage)?.isSentOnBehalfOfOnlineMessage ?? false
}
var isResendRequest: Bool {
self is OWSOutgoingResendRequest
}
var isSyncMessage: Bool { self is OWSOutgoingSyncMessage }
var canSendToLocalAddress: Bool {
return (isSyncMessage ||
self is OWSOutgoingCallMessage ||
self is OWSOutgoingResendRequest ||
self is OWSOutgoingResendResponse)
}
}
// MARK: - MessageSender
public class MessageSender {
private var preKeyManager: PreKeyManager { DependenciesBridge.shared.preKeyManager }
public init() {
SwiftSingletons.register(self)
}
private let pendingTasks = PendingTasks(label: "Message Sends")
public func pendingSendsPromise() -> Promise<Void> {
// This promise blocks on all operations already in the queue,
// but will not block on new operations added after this promise
// is created. That's intentional to ensure that NotificationService
// instances complete in a timely way.
pendingTasks.pendingTasksPromise()
}
// MARK: - Creating Signal Protocol Sessions
private func containsValidSession(for serviceId: ServiceId, deviceId: UInt32, tx: DBReadTransaction) throws -> Bool {
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
do {
guard let session = try sessionStore.loadSession(for: serviceId, deviceId: deviceId, tx: tx) else {
return false
}
return session.hasCurrentState
} catch {
switch error {
case RecipientIdError.mustNotUsePniBecauseAciExists:
throw error
default:
return false
}
}
}
/// Establishes a session with the recipient if one doesn't already exist.
private func ensureRecipientHasSession(
recipientUniqueId: RecipientUniqueId,
serviceId: ServiceId,
deviceId: UInt32,
isOnlineMessage: Bool,
isTransientSenderKeyDistributionMessage: Bool,
isStoryMessage: Bool,
sealedSenderParameters: SealedSenderParameters?
) async throws {
let hasSession = try SSKEnvironment.shared.databaseStorageRef.read { tx in
try containsValidSession(for: serviceId, deviceId: deviceId, tx: tx.asV2Read)
}
if hasSession {
return
}
let preKeyBundle = try await makePrekeyRequest(
recipientUniqueId: recipientUniqueId,
serviceId: serviceId,
deviceId: deviceId,
isOnlineMessage: isOnlineMessage,
isTransientSenderKeyDistributionMessage: isTransientSenderKeyDistributionMessage,
isStoryMessage: isStoryMessage,
sealedSenderParameters: sealedSenderParameters
)
try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
try self.createSession(
for: preKeyBundle,
recipientUniqueId: recipientUniqueId,
serviceId: serviceId,
deviceId: deviceId,
transaction: tx
)
}
}
private func makePrekeyRequest(
recipientUniqueId: RecipientUniqueId?,
serviceId: ServiceId,
deviceId: UInt32,
isOnlineMessage: Bool,
isTransientSenderKeyDistributionMessage: Bool,
isStoryMessage: Bool,
sealedSenderParameters: SealedSenderParameters?
) async throws -> SignalServiceKit.PreKeyBundle {
Logger.info("serviceId: \(serviceId).\(deviceId)")
// As an optimization, skip the request if an error is guaranteed.
if willDefinitelyHaveUntrustedIdentityError(for: serviceId) {
Logger.info("Skipping prekey request due to untrusted identity.")
throw UntrustedIdentityError(serviceId: serviceId)
}
if let recipientUniqueId, willLikelyHaveInvalidKeySignatureError(for: recipientUniqueId) {
Logger.info("Skipping prekey request due to invalid prekey signature.")
// Check if this error is happening repeatedly for this recipientUniqueId.
// If so, return an InvalidKeySignatureError as a terminal failure.
throw InvalidKeySignatureError(serviceId: serviceId, isTerminalFailure: true)
}
if isOnlineMessage || isTransientSenderKeyDistributionMessage {
Logger.info("Skipping prekey request for transient message")
throw MessageSenderNoSessionForTransientMessageError()
}
let requestMaker = RequestMaker(
label: "Prekey Fetch",
serviceId: serviceId,
// Don't use UD for story preKey fetches, we don't have a valid UD auth key
// TODO: (PreKey Cleanup)
accessKey: isStoryMessage ? nil : sealedSenderParameters?.accessKey,
authedAccount: .implicit(),
options: []
)
do {
let result = try await requestMaker.makeRequest {
return OWSRequestFactory.recipientPreKeyRequest(serviceId: serviceId, deviceId: deviceId, auth: $0)
}
guard let responseData = result.response.responseBodyData else {
throw OWSAssertionError("Prekey fetch missing response object.")
}
guard let bundle = try? JSONDecoder().decode(SignalServiceKit.PreKeyBundle.self, from: responseData) else {
throw OWSAssertionError("Prekey fetch returned an invalid bundle.")
}
return bundle
} catch {
switch error.httpStatusCode {
case 404:
throw MessageSenderError.missingDevice
case 429:
throw MessageSenderError.prekeyRateLimit
default:
throw error
}
}
}
private func createSession(
for preKeyBundle: SignalServiceKit.PreKeyBundle,
recipientUniqueId: String,
serviceId: ServiceId,
deviceId: UInt32,
transaction: SDSAnyWriteTransaction
) throws {
assert(!Thread.isMainThread)
if try containsValidSession(for: serviceId, deviceId: deviceId, tx: transaction.asV2Write) {
Logger.warn("Session already exists for \(serviceId), deviceId: \(deviceId).")
return
}
guard let deviceBundle = preKeyBundle.devices.first(where: { $0.deviceId == deviceId }) else {
throw OWSAssertionError("Server didn't provide a bundle for the requested device.")
}
Logger.info("Creating session for \(serviceId), deviceId: \(deviceId); signed \(deviceBundle.signedPreKey.keyId), one-time \(deviceBundle.preKey?.keyId as Optional), kyber \(deviceBundle.pqPreKey?.keyId as Optional)")
let bundle: LibSignalClient.PreKeyBundle
if let preKey = deviceBundle.preKey {
if let pqPreKey = deviceBundle.pqPreKey {
bundle = try LibSignalClient.PreKeyBundle(
registrationId: deviceBundle.registrationId,
deviceId: deviceId,
prekeyId: preKey.keyId,
prekey: preKey.publicKey,
signedPrekeyId: deviceBundle.signedPreKey.keyId,
signedPrekey: deviceBundle.signedPreKey.publicKey,
signedPrekeySignature: deviceBundle.signedPreKey.signature,
identity: preKeyBundle.identityKey,
kyberPrekeyId: pqPreKey.keyId,
kyberPrekey: pqPreKey.publicKey,
kyberPrekeySignature: pqPreKey.signature
)
} else {
bundle = try LibSignalClient.PreKeyBundle(
registrationId: deviceBundle.registrationId,
deviceId: deviceId,
prekeyId: preKey.keyId,
prekey: preKey.publicKey,
signedPrekeyId: deviceBundle.signedPreKey.keyId,
signedPrekey: deviceBundle.signedPreKey.publicKey,
signedPrekeySignature: deviceBundle.signedPreKey.signature,
identity: preKeyBundle.identityKey
)
}
} else {
if let pqPreKey = deviceBundle.pqPreKey {
bundle = try LibSignalClient.PreKeyBundle(
registrationId: deviceBundle.registrationId,
deviceId: deviceId,
signedPrekeyId: deviceBundle.signedPreKey.keyId,
signedPrekey: deviceBundle.signedPreKey.publicKey,
signedPrekeySignature: deviceBundle.signedPreKey.signature,
identity: preKeyBundle.identityKey,
kyberPrekeyId: pqPreKey.keyId,
kyberPrekey: pqPreKey.publicKey,
kyberPrekeySignature: pqPreKey.signature
)
} else {
bundle = try LibSignalClient.PreKeyBundle(
registrationId: deviceBundle.registrationId,
deviceId: deviceId,
signedPrekeyId: deviceBundle.signedPreKey.keyId,
signedPrekey: deviceBundle.signedPreKey.publicKey,
signedPrekeySignature: deviceBundle.signedPreKey.signature,
identity: preKeyBundle.identityKey
)
}
}
do {
let identityManager = DependenciesBridge.shared.identityManager
let protocolAddress = ProtocolAddress(serviceId, deviceId: deviceId)
try processPreKeyBundle(
bundle,
for: protocolAddress,
sessionStore: DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore,
identityStore: identityManager.libSignalStore(for: .aci, tx: transaction.asV2Write),
context: transaction
)
} catch SignalError.untrustedIdentity(_), IdentityManagerError.identityKeyMismatchForOutgoingMessage {
Logger.warn("Found untrusted identity for \(serviceId)")
handleUntrustedIdentityKeyError(
serviceId: serviceId,
recipientUniqueId: recipientUniqueId,
preKeyBundle: preKeyBundle,
transaction: transaction
)
throw UntrustedIdentityError(serviceId: serviceId)
} catch SignalError.invalidSignature(_) {
Logger.error("Invalid key signature for \(serviceId)")
// Received this error from the server, so this could either be
// an invalid key due to a broken client, or it may be a random
// corruption in transit. Mark having encountered an error for
// this recipient so later checks can determine if this has happend
// more than once and fail early.
// The error thrown here is considered non-terminal which allows
// the request to be retried.
hadInvalidKeySignatureError(for: recipientUniqueId)
throw InvalidKeySignatureError(serviceId: serviceId, isTerminalFailure: false)
}
owsAssertDebug(try containsValidSession(for: serviceId, deviceId: deviceId, tx: transaction.asV2Write), "Couldn't create session.")
}
// MARK: - Untrusted Identities
private func handleUntrustedIdentityKeyError(
serviceId: ServiceId,
recipientUniqueId: RecipientUniqueId,
preKeyBundle: SignalServiceKit.PreKeyBundle,
transaction tx: SDSAnyWriteTransaction
) {
let identityManager = DependenciesBridge.shared.identityManager
identityManager.saveIdentityKey(preKeyBundle.identityKey, for: serviceId, tx: tx.asV2Write)
}
/// If true, we expect fetching a bundle will fail no matter what it contains.
///
/// If we're noLongerVerified, nothing we fetch can alter the state. The
/// user must manually accept the new identity key and then retry the
/// message.
///
/// If we're implicit & it's not trusted, it means it changed recently. It
/// would work if we waited a few seconds, but we want to surface the error
/// to the user.
///
/// Even though it's only a few seconds, we must not talk to the server in
/// the implicit case because there could be many messages queued up that
/// would all try to fetch their own bundle.
private func willDefinitelyHaveUntrustedIdentityError(for serviceId: ServiceId) -> Bool {
assert(!Thread.isMainThread)
// Prekey rate limits are strict. Therefore, we want to avoid requesting
// prekey bundles that can't be processed. After a prekey request, we might
// not be able to process it if the new identity key isn't trusted.
let identityManager = DependenciesBridge.shared.identityManager
return SSKEnvironment.shared.databaseStorageRef.read { tx in
return identityManager.untrustedIdentityForSending(
to: SignalServiceAddress(serviceId),
untrustedThreshold: nil,
tx: tx.asV2Read
) != nil
}
}
// MARK: - Invalid Signatures
private typealias InvalidSignatureCache = [RecipientUniqueId: InvalidSignatureCacheItem]
private struct InvalidSignatureCacheItem {
let lastErrorDate: Date
let errorCount: UInt32
}
private let invalidKeySignatureCache = AtomicValue(InvalidSignatureCache(), lock: .init())
private func hadInvalidKeySignatureError(for recipientUniqueId: RecipientUniqueId) {
invalidKeySignatureCache.update { cache in
var errorCount: UInt32 = 1
if let mostRecentError = cache[recipientUniqueId] {
errorCount = mostRecentError.errorCount + 1
}
cache[recipientUniqueId] = InvalidSignatureCacheItem(
lastErrorDate: Date(),
errorCount: errorCount
)
}
}
private func willLikelyHaveInvalidKeySignatureError(for recipientUniqueId: RecipientUniqueId) -> Bool {
assert(!Thread.isMainThread)
// Similar to untrusted identity errors, when an invalid signature for a prekey
// is encountered, it will probably be encountered for a while until the
// target client rotates prekeys and hopfully fixes the bad signature.
// To avoid running into prekey rate limits, remember when an error is
// encountered and slow down sending prekey requests for this recipient.
//
// Additionally, there is always a chance of corruption of the prekey
// bundle during data transmission, which would result in an invalid
// signature of an otherwise correct bundle. To handle this rare case,
// don't begin limiting the prekey request until after encounting the
// second bad signature for a particular recipient.
guard let mostRecentError = invalidKeySignatureCache.get()[recipientUniqueId] else {
return false
}
let staleIdentityLifetime = kMinuteInterval * 5
guard abs(mostRecentError.lastErrorDate.timeIntervalSinceNow) < staleIdentityLifetime else {
// Error has expired, remove it to reset the count
invalidKeySignatureCache.update { cache in
_ = cache.removeValue(forKey: recipientUniqueId)
}
return false
}
// Let the first error go, only skip starting on the second error
guard mostRecentError.errorCount > 1 else {
return false
}
return true
}
// MARK: - Sending Attachments
public func sendTransientContactSyncAttachment(
dataSource: DataSource,
thread: TSThread
) async throws {
let uploadResult = try await DependenciesBridge.shared.attachmentUploadManager.uploadTransientAttachment(
dataSource: dataSource
)
let message = SSKEnvironment.shared.databaseStorageRef.read { tx in
return OWSSyncContactsMessage(uploadedAttachment: uploadResult, thread: thread, tx: tx)
}
let preparedMessage = PreparedOutgoingMessage.preprepared(contactSyncMessage: message)
let result = await Result { try await sendMessage(preparedMessage) }
try result.get()
}
// MARK: - Constructing Message Sends
public func sendMessage(_ preparedOutgoingMessage: PreparedOutgoingMessage) async throws {
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
preparedOutgoingMessage.updateAllUnsentRecipientsAsSending(tx: tx)
}
Logger.info("Sending \(preparedOutgoingMessage)")
// We create a PendingTask so we can block on flushing all current message sends.
let pendingTask = pendingTasks.buildPendingTask(label: "Message Send")
defer { pendingTask.complete() }
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
let uploadOperations = SSKEnvironment.shared.databaseStorageRef.read { tx in
preparedOutgoingMessage.attachmentUploadOperations(tx: tx)
}
for uploadOperation in uploadOperations {
taskGroup.addTask {
try await Upload.uploadQueue.run(uploadOperation)
}
}
try await taskGroup.waitForAll()
}
try await preparedOutgoingMessage.send(self.sendPreparedMessage(_:))
}
private func waitForPreKeyRotationIfNeeded() async throws {
while let taskToWaitFor = preKeyRotationTaskIfNeeded() {
try await taskToWaitFor.value
}
}
private let pendingPreKeyRotation = AtomicValue<Task<Void, Error>?>(nil, lock: .init())
private func preKeyRotationTaskIfNeeded() -> Task<Void, Error>? {
return pendingPreKeyRotation.map { existingTask in
if let existingTask {
return existingTask
}
let shouldRunPreKeyRotation = SSKEnvironment.shared.databaseStorageRef.read { tx in
preKeyManager.isAppLockedDueToPreKeyUpdateFailures(tx: tx.asV2Read)
}
if shouldRunPreKeyRotation {
Logger.info("Rotating signed pre-key before sending message.")
// Retry prekey update every time user tries to send a message while app is
// disabled due to prekey update failures.
//
// Only try to update the signed prekey; updating it is sufficient to
// re-enable message sending.
return Task {
try await self.preKeyManager.rotateSignedPreKeys().value
self.pendingPreKeyRotation.set(nil)
}
}
return nil
}
}
// Mark skipped recipients as such. We may skip because:
//
// * A recipient is no longer in the group.
// * A recipient is blocked.
// * A recipient is unregistered.
// * A recipient does not have the required capability.
private func markSkippedRecipients(
of message: TSOutgoingMessage,
sendingRecipients: [ServiceId],
tx: SDSAnyWriteTransaction
) {
let skippedRecipients = Set(message.sendingRecipientAddresses())
.subtracting(sendingRecipients.lazy.map { SignalServiceAddress($0) })
for address in skippedRecipients {
// Mark this recipient as "skipped".
message.updateWithSkippedRecipient(address, transaction: tx)
}
}
private func unsentRecipients(
of message: TSOutgoingMessage,
in thread: TSThread,
localIdentifiers: LocalIdentifiers,
tx: SDSAnyReadTransaction
) throws -> [SignalServiceAddress] {
if message.isSyncMessage {
return [localIdentifiers.aciAddress]
}
if let groupThread = thread as? TSGroupThread {
// Send to the intersection of:
//
// * "sending" recipients of the message.
// * members of the group.
//
// I.e. try to send a message IFF:
//
// * The recipient was in the group when the message was first tried to be sent.
// * The recipient is still in the group.
// * The recipient is in the "sending" state.
var recipientAddresses = Set<SignalServiceAddress>()
recipientAddresses.formUnion(message.sendingRecipientAddresses())
// Only send to members in the latest known group member list.
// If a member has left the group since this message was enqueued,
// they should not receive the message.
let groupMembership = groupThread.groupModel.groupMembership
var currentValidRecipients = groupMembership.fullMembers
// ...or latest known list of "additional recipients".
//
// This is used to send group update messages for v2 groups to
// pending members who are not included in .sendingRecipientAddresses().
if GroupManager.shouldMessageHaveAdditionalRecipients(message, groupThread: groupThread) {
currentValidRecipients.formUnion(groupMembership.invitedMembers)
}
currentValidRecipients.remove(localIdentifiers.aciAddress)
recipientAddresses.formIntersection(currentValidRecipients)
let blockedAddresses = SSKEnvironment.shared.blockingManagerRef.blockedAddresses(transaction: tx)
recipientAddresses.subtract(blockedAddresses)
return Array(recipientAddresses)
} else if let contactAddress = (thread as? TSContactThread)?.contactAddress {
// Treat 1:1 sends to blocked contacts as failures.
// If we block a user, don't send 1:1 messages to them. The UI
// should prevent this from occurring, but in some edge cases
// you might, for example, have a pending outgoing message when
// you block them.
if SSKEnvironment.shared.blockingManagerRef.isAddressBlocked(contactAddress, transaction: tx) {
Logger.info("Skipping 1:1 send to blocked contact: \(contactAddress).")
throw MessageSenderError.blockedContactRecipient
} else {
return [contactAddress]
}
} else {
// Send to the intersection of:
//
// * "sending" recipients of the message.
// * recipients of the thread
//
// I.e. try to send a message IFF:
//
// * The recipient was part of the thread when the message was first tried to be sent.
// * The recipient is still part of the thread.
// * The recipient is in the "sending" state.
var recipientAddresses = Set(message.sendingRecipientAddresses())
// Only send to members in the latest known thread recipients list.
let currentValidThreadRecipients = thread.recipientAddresses(with: tx)
recipientAddresses.formIntersection(currentValidThreadRecipients)
let blockedAddresses = SSKEnvironment.shared.blockingManagerRef.blockedAddresses(transaction: tx)
recipientAddresses.subtract(blockedAddresses)
if recipientAddresses.contains(localIdentifiers.aciAddress) {
owsFailDebug("Message send recipients should not include self.")
}
return Array(recipientAddresses)
}
}
private static func partitionAddresses(_ addresses: [SignalServiceAddress]) -> ([ServiceId], [E164]) {
var serviceIds = [ServiceId]()
var phoneNumbers = [E164]()
for address in addresses {
if let serviceId = address.serviceId {
serviceIds.append(serviceId)
} else if let phoneNumber = address.e164 {
phoneNumbers.append(phoneNumber)
} else {
owsFailDebug("Recipient has neither ServiceId nor E164.")
}
}
return (serviceIds, phoneNumbers)
}
private func lookUpPhoneNumbers(_ phoneNumbers: [E164]) async throws {
_ = try await SSKEnvironment.shared.contactDiscoveryManagerRef.lookUp(
phoneNumbers: Set(phoneNumbers.lazy.map { $0.stringValue }),
mode: .outgoingMessage
)
}
private func areAttachmentsUploadedWithSneakyTransaction(for message: TSOutgoingMessage) -> Bool {
if message.shouldBeSaved == false {
// Unsaved attachments come in two types:
// * no attachments
// * contact sync, already-uploaded attachment required on init
// So checking for upload state for unsaved attachments is pointless
// (and will, in fact, fail, because of foreign key constraints).
return true
}
return SSKEnvironment.shared.databaseStorageRef.read { tx in
for attachment in message.allAttachments(transaction: tx) {
guard attachment.isUploadedToTransitTier else {
return false
}
}
return true
}
}
private func sendPreparedMessage(_ message: TSOutgoingMessage) async throws {
if !areAttachmentsUploadedWithSneakyTransaction(for: message) {
throw OWSUnretryableMessageSenderError()
}
if DependenciesBridge.shared.appExpiry.isExpired {
throw AppExpiredError()
}
if DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered.negated {
throw AppDeregisteredError()
}
if message.shouldBeSaved {
let latestCopy = SSKEnvironment.shared.databaseStorageRef.read { tx in
TSInteraction.anyFetch(uniqueId: message.uniqueId, transaction: tx) as? TSOutgoingMessage
}
guard let latestCopy, latestCopy.wasRemotelyDeleted.negated else {
throw MessageDeletedBeforeSentError()
}
}
if DebugFlags.messageSendsFail.get() {
throw OWSUnretryableMessageSenderError()
}
do {
try await waitForPreKeyRotationIfNeeded()
let senderCertificates = try await SSKEnvironment.shared.udManagerRef.fetchSenderCertificates(certificateExpirationPolicy: .permissive)
try await sendPreparedMessage(
message,
recoveryState: OuterRecoveryState(),
senderCertificates: senderCertificates
)
} catch {
if message.wasSentToAnyRecipient {
// Always ignore the sync error...
try? await handleMessageSentLocally(message)
}
// ...so that we can throw the original error for the caller. (Note that we
// throw this error even if the sync message is sent successfully.)
throw error
}
try await handleMessageSentLocally(message)
}
private enum SendMessageNextAction {
/// Look up missing phone numbers & then try sending again.
case lookUpPhoneNumbersAndTryAgain([E164])
/// Perform the `sendPreparedMessage` step.
case sendPreparedMessage(PreparedState)
struct PreparedState {
let serializedMessage: SerializedMessage
let thread: TSThread
let fanoutRecipients: [ServiceId]
let sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?
let senderCertificate: SenderCertificate
let udAccess: [ServiceId: OWSUDAccess]
let localIdentifiers: LocalIdentifiers
}
}
/// Certain errors are "correctable" and result in immediate retries. For
/// example, if there's a newly-added device, we should encrypt the message
/// for that device and try to send it immediately. However, some of these
/// errors can *theoretically* happen ad nauseam (but they shouldn't). To
/// avoid tight retry loops, we handle them immediately just once and then
/// use the standard retry logic if they happen repeatedly.
private struct OuterRecoveryState {
var canLookUpPhoneNumbers = true
// Sender key sends will fail if a single recipient has an invalid access
// token, but the server can't identify the recipient for us. To recover,
// fall back to a fanout; this will fail only for the affect recipient.
var canUseMultiRecipientSealedSender = true
var canHandleMultiRecipientMismatchedDevices = true
var canHandleMultiRecipientStaleDevices = true
func mutated(_ block: (inout Self) -> Void) -> Self {
var mutableSelf = self
block(&mutableSelf)
return mutableSelf
}
}
private func sendPreparedMessage(
_ message: TSOutgoingMessage,
recoveryState: OuterRecoveryState,
senderCertificates: SenderCertificates
) async throws {
let nextAction: SendMessageNextAction? = try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
guard let thread = message.thread(tx: tx) else {
throw MessageSenderError.threadMissing
}
let canSendToThread: Bool = {
if message is OWSOutgoingReactionMessage {
return thread.canSendReactionToThread
}
let isChatMessage = (
(
message.shouldBeSaved
&& message.insertedMessageHasRenderableContent(rowId: message.sqliteRowId!, tx: tx)
)
|| message is OutgoingGroupCallUpdateMessage
|| message is OWSOutgoingCallMessage
)
return isChatMessage ? thread.canSendChatMessagesToThread() : thread.canSendNonChatMessagesToThread
}()
guard canSendToThread else {
if message.shouldBeSaved {
throw OWSAssertionError("Sending to thread blocked.")
}
// Pretend to succeed for non-visible messages like read receipts, etc.
return nil
}
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
guard let localIdentifiers = tsAccountManager.localIdentifiers(tx: tx.asV2Read) else {
throw OWSAssertionError("Not registered.")
}
let proposedAddresses = try self.unsentRecipients(of: message, in: thread, localIdentifiers: localIdentifiers, tx: tx)
let (serviceIds, phoneNumbersToFetch) = Self.partitionAddresses(proposedAddresses)
// If we haven't yet tried to look up phone numbers, send an asynchronous
// request to look up phone numbers, and then try to go through this logic
// *again* in a new transaction. Things may change for that subsequent
// attempt, and if there's still missing phone numbers at that point, we'll
// skip them for this message.
if recoveryState.canLookUpPhoneNumbers, !phoneNumbersToFetch.isEmpty {
return .lookUpPhoneNumbersAndTryAgain(phoneNumbersToFetch)
}
self.markSkippedRecipients(of: message, sendingRecipients: serviceIds, tx: tx)
if let contactThread = thread as? TSContactThread {
// In the "self-send" aka "Note to Self" special case, we only need to send
// certain kinds of messages. (In particular, regular data messages are
// sent via their implicit sync message only.)
if contactThread.contactAddress.isLocalAddress, !message.canSendToLocalAddress {
owsAssertDebug(serviceIds.count == 1)
Logger.info("Dropping \(type(of: message)) sent to local address (it should be sent by sync message)")
// Don't mark self-sent messages as read (or sent) until the sync transcript is sent.
return nil
}
}
if serviceIds.isEmpty {
// All recipients are already sent or can be skipped. NOTE: We might still
// need to send a sync transcript.
return nil
}
guard let serializedMessage = self.buildAndRecordMessage(message, in: thread, tx: tx) else {
throw OWSAssertionError("Couldn't build message.")
}
let senderCertificate: SenderCertificate = {
switch SSKEnvironment.shared.udManagerRef.phoneNumberSharingMode(tx: tx.asV2Read).orDefault {
case .everybody:
return senderCertificates.defaultCert
case .nobody:
return senderCertificates.uuidOnlyCert
}
}()
let udAccessMap = self.fetchSealedSenderAccess(
for: serviceIds,
message: message,
senderCertificate: senderCertificate,
localIdentifiers: localIdentifiers,
tx: tx
)
let senderKeyRecipients: [ServiceId]
let sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?
if recoveryState.canUseMultiRecipientSealedSender, thread.usesSenderKey {
(senderKeyRecipients, sendViaSenderKey) = self.prepareSenderKeyMessageSend(
for: serviceIds,
in: thread,
message: message,
serializedMessage: serializedMessage,
udAccessMap: udAccessMap,
senderCertificate: senderCertificate,
localIdentifiers: localIdentifiers,
tx: tx
)
} else {
senderKeyRecipients = []
sendViaSenderKey = nil
}
return .sendPreparedMessage(SendMessageNextAction.PreparedState(
serializedMessage: serializedMessage,
thread: thread,
fanoutRecipients: Array(Set(serviceIds).subtracting(senderKeyRecipients)),
sendViaSenderKey: sendViaSenderKey,
senderCertificate: senderCertificate,
udAccess: udAccessMap,
localIdentifiers: localIdentifiers
))
}
let retryRecoveryState: OuterRecoveryState
switch nextAction {
case .none:
return
case .lookUpPhoneNumbersAndTryAgain(let phoneNumbers):
try await lookUpPhoneNumbers(phoneNumbers)
retryRecoveryState = recoveryState.mutated({ $0.canLookUpPhoneNumbers = false })
case .sendPreparedMessage(let state):
let perRecipientErrors = await sendPreparedMessage(
message: message,
serializedMessage: state.serializedMessage,
in: state.thread,
viaFanoutTo: state.fanoutRecipients,
viaSenderKey: state.sendViaSenderKey,
senderCertificate: state.senderCertificate,
udAccess: state.udAccess,
localIdentifiers: state.localIdentifiers
)
let recipientErrors = MessageSenderRecipientErrors(recipientErrors: perRecipientErrors)
if recipientErrors.containsAny(of: .invalidAuthHeader, .invalidRecipient) {
retryRecoveryState = recoveryState.mutated({ $0.canUseMultiRecipientSealedSender = false })
break
}
if recoveryState.canHandleMultiRecipientMismatchedDevices, recipientErrors.containsAny(of: .deviceUpdate) {
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientMismatchedDevices = false })
break
}
if recoveryState.canHandleMultiRecipientStaleDevices, recipientErrors.containsAny(of: .staleDevices) {
retryRecoveryState = recoveryState.mutated({ $0.canHandleMultiRecipientStaleDevices = false })
break
}
if !perRecipientErrors.isEmpty {
try await handleSendFailure(message: message, thread: state.thread, perRecipientErrors: perRecipientErrors)
}
return
}
try await sendPreparedMessage(
message,
recoveryState: retryRecoveryState,
senderCertificates: senderCertificates
)
}
private func sendPreparedMessage(
message: TSOutgoingMessage,
serializedMessage: SerializedMessage,
in thread: TSThread,
viaFanoutTo fanoutRecipients: [ServiceId],
viaSenderKey sendViaSenderKey: (@Sendable () async -> [(ServiceId, any Error)])?,
senderCertificate: SenderCertificate,
udAccess sendingAccessMap: [ServiceId: OWSUDAccess],
localIdentifiers: LocalIdentifiers
) async -> [(ServiceId, any Error)] {
// Both types are Arrays because Sender Key Tasks may return N errors when
// sending to N participants. (Fanout Tasks always send to one recipient
// and will therefore return either no error or exactly one error.)
return await withTaskGroup(
of: [(ServiceId, any Error)].self,
returning: [(ServiceId, any Error)].self
) { taskGroup in
if let sendViaSenderKey {
taskGroup.addTask(operation: sendViaSenderKey)
}
// Perform an "OWSMessageSend" for each non-senderKey recipient.
for serviceId in fanoutRecipients {
let messageSend = OWSMessageSend(
message: message,
plaintextContent: serializedMessage.plaintextData,
plaintextPayloadId: serializedMessage.payloadId,
thread: thread,
serviceId: serviceId,
localIdentifiers: localIdentifiers
)
let sealedSenderParameters = SealedSenderParameters(
message: message,
senderCertificate: senderCertificate,
accessKey: sendingAccessMap[serviceId]
)
taskGroup.addTask {
do {
try await self.performMessageSend(messageSend, sealedSenderParameters: sealedSenderParameters)
return []
} catch {
return [(messageSend.serviceId, error)]
}
}
}
return await taskGroup.reduce(into: [], { $0.append(contentsOf: $1) })
}
}
private func fetchSealedSenderAccess(
for serviceIds: [ServiceId],
message: TSOutgoingMessage,
senderCertificate: SenderCertificate,
localIdentifiers: LocalIdentifiers,
tx: SDSAnyReadTransaction
) -> [ServiceId: OWSUDAccess] {
if DebugFlags.disableUD.get() {
return [:]
}
var result = [ServiceId: OWSUDAccess]()
for serviceId in serviceIds {
if localIdentifiers.contains(serviceId: serviceId) {
continue
}
result[serviceId] = (
message.isStorySend ? SSKEnvironment.shared.udManagerRef.storyUdAccess() : SSKEnvironment.shared.udManagerRef.udAccess(for: serviceId, tx: tx)
)
}
return result
}
private func handleSendFailure(
message: TSOutgoingMessage,
thread: TSThread,
perRecipientErrors allErrors: [(serviceId: ServiceId, error: any Error)]
) async throws {
// Some errors should be ignored when sending messages to non 1:1 threads.
// See discussion on NSError (MessageSender) category.
let shouldIgnoreError = { (error: Error) -> Bool in
return !(thread is TSContactThread) && error.shouldBeIgnoredForNonContactThreads
}
let filteredErrors = allErrors.lazy.filter { !shouldIgnoreError($0.error) }
// If we only received errors that we should ignore, consider this send a
// success, unless the message could not be sent to any recipient.
guard let anyError = filteredErrors.first?.error else {
if message.sentRecipientAddresses().count == 0 {
throw MessageSenderErrorNoValidRecipients()
}
return
}
// Record the individual error for each "failed" recipient.
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
message.updateWithFailedRecipients(filteredErrors, tx: tx)
self.normalizeRecipientStatesIfNeeded(message: message, recipientErrors: filteredErrors, tx: tx)
}
// Some errors should never be retried, in order to avoid hitting rate
// limits, for example. Unfortunately, since group send retry is
// all-or-nothing, we need to fail immediately even if some of the other
// recipients had retryable errors.
if let fatalError = filteredErrors.map({ $0.error }).first(where: { $0.isFatalError }) {
throw fatalError
}
// If any of the send errors are retryable, we want to retry. Therefore,
// prefer to propagate a retryable error.
if let retryableError = filteredErrors.map({ $0.error }).first(where: { $0.isRetryable }) {
throw retryableError
}
// Otherwise, if we have any error at all, propagate it.
throw anyError
}
private func normalizeRecipientStatesIfNeeded(
message: TSOutgoingMessage,
recipientErrors: some Sequence<(serviceId: ServiceId, error: Error)>,
tx: SDSAnyWriteTransaction
) {
guard recipientErrors.contains(where: {
switch $0.error {
case RecipientIdError.mustNotUsePniBecauseAciExists:
return true
default:
return false
}
}) else {
return
}
let recipientStateMerger = RecipientStateMerger(
recipientDatabaseTable: DependenciesBridge.shared.recipientDatabaseTable,
signalServiceAddressCache: SSKEnvironment.shared.signalServiceAddressCacheRef
)
message.anyUpdateOutgoingMessage(transaction: tx) { message in
recipientStateMerger.normalize(&message.recipientAddressStates, tx: tx.asV2Read)
}
}
/// Sending a reply to a hidden recipient unhides them. But how we
/// define "reply" is not inclusive of all outgoing messages. We unhide
/// when the message indicates the user's intent to resume association
/// with the hidden recipient.
///
/// It is important to be conservative about which messages unhide a
/// recipient. It is far better to not unhide when should than to
/// unhide when we should not.
private func shouldMessageSendUnhideRecipient(_ message: TSOutgoingMessage, tx: SDSAnyReadTransaction) -> Bool {
if
message.shouldBeSaved,
let rowId = message.sqliteRowId,
// Its a persisted message; check if its renderable
message.insertedMessageHasRenderableContent(rowId: rowId, tx: tx)
{
return true
}
if message is OWSOutgoingReactionMessage {
return true
}
if
let message = message as? OWSOutgoingCallMessage,
/// OWSOutgoingCallMessages include not only calling
/// someone (ie, an "offer message"), but also sending
/// hangup messages, busy messages, and other kinds of
/// call-related "messages" that do not indicate the
/// sender's intent to resume association with a recipient.
message.offerMessage != nil
{
return true
}
return false
}
private func handleMessageSentLocally(_ message: TSOutgoingMessage) async throws {
try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
if
let thread = message.thread(tx: tx) as? TSContactThread,
self.shouldMessageSendUnhideRecipient(message, tx: tx),
let localAddress = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx.asV2Read)?.aciAddress,
!localAddress.isEqualToAddress(thread.contactAddress)
{
try DependenciesBridge.shared.recipientHidingManager.removeHiddenRecipient(
thread.contactAddress,
wasLocallyInitiated: true,
tx: tx.asV2Write
)
}
if message.shouldBeSaved {
let latestInteraction = TSInteraction.anyFetch(uniqueId: message.uniqueId, transaction: tx)
guard let latestMessage = latestInteraction as? TSOutgoingMessage else {
Logger.warn("Could not update expiration for deleted message.")
return
}
ViewOnceMessages.completeIfNecessary(message: latestMessage, transaction: tx)
}
}
try await sendSyncTranscriptIfNeeded(for: message)
// Don't mark self-sent messages as read (or sent) until the sync
// transcript is sent.
//
// NOTE: This only applies to the 'note to self' conversation.
if message.isSyncMessage {
return
}
let thread = SSKEnvironment.shared.databaseStorageRef.read { tx in message.thread(tx: tx) }
guard let contactThread = thread as? TSContactThread, contactThread.contactAddress.isLocalAddress else {
return
}
owsAssertDebug(message.recipientAddresses().count == 1)
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
let deviceId = DependenciesBridge.shared.tsAccountManager.storedDeviceId(tx: tx.asV2Read)
for sendingAddress in message.sendingRecipientAddresses() {
message.update(
withReadRecipient: sendingAddress,
deviceId: deviceId,
readTimestamp: message.timestamp,
tx: tx
)
if message.isVoiceMessage || message.isViewOnceMessage {
message.update(
withViewedRecipient: sendingAddress,
deviceId: deviceId,
viewedTimestamp: message.timestamp,
tx: tx
)
}
}
}
}
private func sendSyncTranscriptIfNeeded(for message: TSOutgoingMessage) async throws {
guard message.shouldSyncTranscript() else {
return
}
try await message.sendSyncTranscript()
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
message.update(withHasSyncedTranscript: true, transaction: tx)
}
}
// MARK: - Performing Message Sends
struct SerializedMessage {
let plaintextData: Data
let payloadId: Int64?
}
func buildAndRecordMessage(
_ message: TSOutgoingMessage,
in thread: TSThread,
tx: SDSAnyWriteTransaction
) -> SerializedMessage? {
guard let plaintextData = message.buildPlainTextData(thread, transaction: tx) else {
return nil
}
let messageSendLog = SSKEnvironment.shared.messageSendLogRef
let payloadId = messageSendLog.recordPayload(plaintextData, for: message, tx: tx)
return SerializedMessage(plaintextData: plaintextData, payloadId: payloadId)
}
private struct InnerRecoveryState {
var canHandleMismatchedDevices = true
var canHandleStaleDevices = true
var canHandleCaptcha = true
func mutated(_ block: (inout Self) -> Void) -> Self {
var mutableSelf = self
block(&mutableSelf)
return mutableSelf
}
}
@discardableResult
func performMessageSend(
_ messageSend: OWSMessageSend,
sealedSenderParameters: SealedSenderParameters?
) async throws -> [SentDeviceMessage] {
return try await performMessageSendAttempt(messageSend, recoveryState: InnerRecoveryState(), sealedSenderParameters: sealedSenderParameters)
}
private func performMessageSendAttempt(
_ messageSend: OWSMessageSend,
recoveryState: InnerRecoveryState,
sealedSenderParameters: SealedSenderParameters?
) async throws -> [SentDeviceMessage] {
let message = messageSend.message
let serviceId = messageSend.serviceId
Logger.info("Sending message: \(type(of: message)); timestamp: \(message.timestamp); serviceId: \(serviceId)")
let retryRecoveryState: InnerRecoveryState
do {
let deviceMessages = try await buildDeviceMessages(
messageSend: messageSend,
sealedSenderParameters: sealedSenderParameters
)
if shouldSkipMessageSend(messageSend, deviceMessages: deviceMessages) {
// This emulates the completion logic of an actual successful send (see below).
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
message.updateWithSkippedRecipient(messageSend.localIdentifiers.aciAddress, transaction: tx)
}
return []
}
for deviceMessage in deviceMessages {
let hasValidMessageType: Bool = {
switch deviceMessage.type {
case .unidentifiedSender:
return sealedSenderParameters != nil
case .ciphertext, .prekeyBundle, .plaintextContent:
return sealedSenderParameters == nil
case .unknown, .keyExchange, .receipt, .senderkeyMessage:
return false
}
}()
guard hasValidMessageType else {
owsFailDebug("Invalid message type: \(deviceMessage.type)")
throw OWSUnretryableMessageSenderError()
}
}
return try await sendDeviceMessages(
deviceMessages,
messageSend: messageSend,
sealedSenderParameters: sealedSenderParameters
)
} catch RequestMakerUDAuthError.udAuthFailure {
owsPrecondition(sealedSenderParameters != nil)
// This failure can happen on pre key fetches or message sends.
return try await performMessageSendAttempt(
messageSend,
recoveryState: recoveryState,
sealedSenderParameters: nil // Retry as an unsealed send.
)
} catch DeviceMessagesError.mismatchedDevices where recoveryState.canHandleMismatchedDevices {
retryRecoveryState = recoveryState.mutated({ $0.canHandleMismatchedDevices = false })
} catch DeviceMessagesError.staleDevices where recoveryState.canHandleStaleDevices {
retryRecoveryState = recoveryState.mutated({ $0.canHandleStaleDevices = false })
} catch where error.httpStatusCode == 428 && recoveryState.canHandleCaptcha {
retryRecoveryState = recoveryState.mutated({ $0.canHandleCaptcha = false })
}
return try await performMessageSendAttempt(
messageSend,
recoveryState: retryRecoveryState,
sealedSenderParameters: sealedSenderParameters
)
}
/// We can skip sending sync messages if we know that we have no linked
/// devices. However, we need to be sure to handle the case where the linked
/// device list has just changed.
///
/// The linked device list is reflected in two separate pieces of state:
///
/// * OWSDevice's state is updated when you link or unlink a device.
/// * SignalRecipient's state is updated by 409 "Mismatched devices"
/// responses from the service.
///
/// If _both_ of these pieces of state agree that there are no linked
/// devices, then can safely skip sending sync message.
private func shouldSkipMessageSend(_ messageSend: OWSMessageSend, deviceMessages: [DeviceMessage]) -> Bool {
guard messageSend.localIdentifiers.contains(serviceId: messageSend.serviceId) else {
return false
}
owsAssertDebug(messageSend.message.canSendToLocalAddress)
let hasMessageForLinkedDevice = deviceMessages.contains(where: {
$0.destinationDeviceId != DependenciesBridge.shared.tsAccountManager.storedDeviceIdWithMaybeTransaction
})
if hasMessageForLinkedDevice {
return false
}
let mightHaveUnknownLinkedDevice = SSKEnvironment.shared.databaseStorageRef.read { tx in
DependenciesBridge.shared.deviceManager.mightHaveUnknownLinkedDevice(transaction: tx.asV2Read)
}
if mightHaveUnknownLinkedDevice {
// We may have just linked a new secondary device which is not yet
// reflected in the SignalRecipient that corresponds to ourself. Continue
// sending, where we expect to learn about new devices via a 409 response.
return false
}
return true
}
private func buildDeviceMessages(
messageSend: OWSMessageSend,
sealedSenderParameters: SealedSenderParameters?
) async throws -> [DeviceMessage] {
let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable
let recipient = SSKEnvironment.shared.databaseStorageRef.read { tx in
return recipientDatabaseTable.fetchRecipient(serviceId: messageSend.serviceId, transaction: tx.asV2Read)
}
// If we think the recipient isn't registered, don't build any device
// messages. Instead, send an empty message to the server to learn if the
// account has any devices.
guard let recipient, recipient.isRegistered else {
return []
}
var recipientDeviceIds = recipient.deviceIds
if messageSend.localIdentifiers.contains(serviceId: messageSend.serviceId) {
let localDeviceId = DependenciesBridge.shared.tsAccountManager.storedDeviceIdWithMaybeTransaction
recipientDeviceIds.removeAll(where: { $0 == localDeviceId })
}
var results = [DeviceMessage]()
for deviceId in recipientDeviceIds {
let deviceMessage = try await buildDeviceMessage(
messagePlaintextContent: messageSend.plaintextContent,
messageEncryptionStyle: messageSend.message.encryptionStyle,
recipientUniqueId: recipient.uniqueId,
serviceId: messageSend.serviceId,
deviceId: deviceId,
isOnlineMessage: messageSend.message.isOnline,
isTransientSenderKeyDistributionMessage: messageSend.message.isTransientSKDM,
isStoryMessage: messageSend.message.isStorySend,
isResendRequestMessage: messageSend.message.isResendRequest,
sealedSenderParameters: sealedSenderParameters
)
if let deviceMessage {
results.append(deviceMessage)
}
}
return results
}
/// Build a ``DeviceMessage`` for the given parameters describing a message.
///
/// A `nil` return value indicates that the given message could not be built
/// due to an invalid device ID.
func buildDeviceMessage(
messagePlaintextContent: Data,
messageEncryptionStyle: EncryptionStyle,
recipientUniqueId: RecipientUniqueId,
serviceId: ServiceId,
deviceId: UInt32,
isOnlineMessage: Bool,
isTransientSenderKeyDistributionMessage: Bool,
isStoryMessage: Bool,
isResendRequestMessage: Bool,
sealedSenderParameters: SealedSenderParameters?
) async throws -> DeviceMessage? {
AssertNotOnMainThread()
do {
try await ensureRecipientHasSession(
recipientUniqueId: recipientUniqueId,
serviceId: serviceId,
deviceId: deviceId,
isOnlineMessage: isOnlineMessage,
isTransientSenderKeyDistributionMessage: isTransientSenderKeyDistributionMessage,
isStoryMessage: isStoryMessage,
sealedSenderParameters: sealedSenderParameters
)
} catch let error {
switch error {
case MessageSenderError.missingDevice:
// If we have an invalid device exception, remove this device from the
// recipient and suppress the error.
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
self.updateDevices(
serviceId: serviceId,
devicesToAdd: [],
devicesToRemove: [deviceId],
transaction: tx
)
}
return nil
case is MessageSenderNoSessionForTransientMessageError:
// When users re-register, we don't want transient messages (like typing
// indicators) to cause users to hit the prekey fetch rate limit. So we
// silently discard these message if there is no pre-existing session for
// the recipient.
throw error
case is UntrustedIdentityError:
// This *can* happen under normal usage, but it should happen relatively
// rarely. We expect it to happen whenever Bob reinstalls, and Alice
// messages Bob before she can pull down his latest identity. If it's
// happening a lot, we should rethink our profile fetching strategy.
throw error
case is InvalidKeySignatureError:
// This should never happen unless a broken client is uploading invalid
// keys. The server should now enforce valid signatures on upload,
// resulting in this become exceedingly rare as time goes by.
throw error
case MessageSenderError.prekeyRateLimit:
throw SignalServiceRateLimitedError()
case is SpamChallengeRequiredError, is SpamChallengeResolvedError:
throw error
case RecipientIdError.mustNotUsePniBecauseAciExists:
throw error
case RequestMakerUDAuthError.udAuthFailure:
throw error
default:
owsAssertDebug(error.isNetworkFailureOrTimeout)
throw OWSRetryableMessageSenderError()
}
}
return try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
do {
switch messageEncryptionStyle {
case .whisper:
return try self.encryptMessage(
plaintextContent: messagePlaintextContent,
serviceId: serviceId,
deviceId: deviceId,
sealedSenderParameters: sealedSenderParameters,
transaction: tx
)
case .plaintext:
return try self.wrapPlaintextMessage(
plaintextContent: messagePlaintextContent,
serviceId: serviceId,
deviceId: deviceId,
isResendRequestMessage: isResendRequestMessage,
sealedSenderParameters: sealedSenderParameters,
transaction: tx
)
@unknown default:
throw OWSAssertionError("Unrecognized encryption style")
}
} catch IdentityManagerError.identityKeyMismatchForOutgoingMessage {
Logger.warn("Found identity key mismatch on outgoing message to \(serviceId).\(deviceId). Archiving session before retrying...")
let signalProtocolStoreManager = DependenciesBridge.shared.signalProtocolStoreManager
let aciSessionStore = signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
aciSessionStore.archiveSession(for: serviceId, deviceId: deviceId, tx: tx.asV2Write)
throw OWSRetryableMessageSenderError()
} catch SignalError.untrustedIdentity {
Logger.warn("Found untrusted identity on outgoing message to \(serviceId). Wrapping error and throwing...")
throw UntrustedIdentityError(serviceId: serviceId)
} catch {
Logger.warn("Failed to encrypt message \(error)")
throw error
}
}
}
private enum DeviceMessagesError: Error, IsRetryableProvider {
case mismatchedDevices
case staleDevices
var isRetryableProvider: Bool { true }
}
private func sendDeviceMessages(
_ deviceMessages: [DeviceMessage],
messageSend: OWSMessageSend,
sealedSenderParameters: SealedSenderParameters?
) async throws -> [SentDeviceMessage] {
let message: TSOutgoingMessage = messageSend.message
let requestMaker = RequestMaker(
label: "Message Send",
serviceId: messageSend.serviceId,
accessKey: sealedSenderParameters?.accessKey,
authedAccount: .implicit(),
options: []
)
do {
let result = try await requestMaker.makeRequest {
return OWSRequestFactory.submitMessageRequest(
serviceId: messageSend.serviceId,
messages: deviceMessages,
timestamp: message.timestamp,
isOnline: message.isOnline,
isUrgent: message.isUrgent,
isStory: message.isStorySend,
auth: $0
)
}
return await messageSendDidSucceed(
messageSend,
deviceMessages: deviceMessages,
wasSentByUD: result.wasSentByUD
)
} catch {
return try await messageSendDidFail(
messageSend,
responseError: error,
sealedSenderParameters: sealedSenderParameters
)
}
}
private func messageSendDidSucceed(
_ messageSend: OWSMessageSend,
deviceMessages: [DeviceMessage],
wasSentByUD: Bool
) async -> [SentDeviceMessage] {
let message: TSOutgoingMessage = messageSend.message
Logger.info("Successfully sent message: \(type(of: message)), serviceId: \(messageSend.serviceId), timestamp: \(message.timestamp), wasSentByUD: \(wasSentByUD)")
let sentDeviceMessages = deviceMessages.map {
return SentDeviceMessage(
destinationDeviceId: $0.destinationDeviceId,
destinationRegistrationId: $0.destinationRegistrationId
)
}
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
if deviceMessages.isEmpty, messageSend.localIdentifiers.contains(serviceId: messageSend.serviceId) {
// Since we know we have no linked devices, we can record that
// fact to later avoid unnecessary sync message sends unless we
// later learn of a new linked device.
Logger.info("Sent a message with no device messages. Recording no linked devices.")
DependenciesBridge.shared.deviceManager.setMightHaveUnknownLinkedDevice(
false,
transaction: transaction.asV2Write
)
}
deviceMessages.forEach { deviceMessage in
if let payloadId = messageSend.plaintextPayloadId, let recipientAci = messageSend.serviceId as? Aci {
let messageSendLog = SSKEnvironment.shared.messageSendLogRef
messageSendLog.recordPendingDelivery(
payloadId: payloadId,
recipientAci: recipientAci,
recipientDeviceId: deviceMessage.destinationDeviceId,
message: message,
tx: transaction
)
}
}
message.updateWithSentRecipient(messageSend.serviceId, wasSentByUD: wasSentByUD, transaction: transaction)
if let resendResponse = message as? OWSOutgoingResendResponse {
resendResponse.didPerformMessageSend(sentDeviceMessages, to: messageSend.serviceId, tx: transaction)
}
// If we've just delivered a message to a user, we know they have a valid
// Signal account. However, if we're sending a story, the server will
// always tell us the recipient is registered, so we can't use this as an
// affirmate indication for the existence of an account.
//
// This is low trust because we don't actually know for sure the fully
// qualified address is valid.
if !message.isStorySend {
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
let recipient = recipientFetcher.fetchOrCreate(
serviceId: messageSend.serviceId,
tx: transaction.asV2Write
)
let recipientManager = DependenciesBridge.shared.recipientManager
recipientManager.markAsRegisteredAndSave(recipient, shouldUpdateStorageService: true, tx: transaction.asV2Write)
}
SSKEnvironment.shared.profileManagerRef.didSendOrReceiveMessage(
serviceId: messageSend.serviceId,
localIdentifiers: messageSend.localIdentifiers,
tx: transaction.asV2Write
)
}
return sentDeviceMessages
}
struct MismatchedDevices: Decodable {
let extraDevices: [Int8]
let missingDevices: [Int8]
fileprivate static func parse(_ responseData: Data) throws -> Self {
return try JSONDecoder().decode(Self.self, from: responseData)
}
}
struct StaleDevices: Decodable {
let staleDevices: [Int8]
fileprivate static func parse(_ responseData: Data) throws -> Self {
return try JSONDecoder().decode(Self.self, from: responseData)
}
}
private func messageSendDidFail(
_ messageSend: OWSMessageSend,
responseError: Error,
sealedSenderParameters: SealedSenderParameters?
) async throws -> [SentDeviceMessage] {
let message: TSOutgoingMessage = messageSend.message
Logger.warn("\(type(of: message)) to \(messageSend.serviceId), timestamp: \(message.timestamp), error: \(responseError)")
switch responseError.httpStatusCode {
case 401:
Logger.warn("Unable to send due to invalid credentials.")
throw MessageSendUnauthorizedError()
case 404:
try await failSendForUnregisteredRecipient(messageSend)
case 409:
let response = try MismatchedDevices.parse(responseError.httpResponseData ?? Data())
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
handleMismatchedDevices(
serviceId: messageSend.serviceId,
missingDevices: response.missingDevices,
extraDevices: response.extraDevices,
tx: tx
)
}
throw DeviceMessagesError.mismatchedDevices
case 410:
let response = try StaleDevices.parse(responseError.httpResponseData ?? Data())
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
handleStaleDevices(serviceId: messageSend.serviceId, staleDevices: response.staleDevices, tx: tx)
}
throw DeviceMessagesError.staleDevices
case 428:
// SPAM TODO: Only retry messages with -hasRenderableContent
Logger.warn("Server requested user complete spam challenge.")
try await SSKEnvironment.shared.spamChallengeResolverRef.tryToHandleSilently(
bodyData: responseError.httpResponseData,
retryAfter: responseError.httpRetryAfterDate
)
// The resolver has 10s to asynchronously resolve a challenge If it
// resolves, great! We'll let MessageSender auto-retry. Otherwise, it'll be
// marked as "pending"
throw responseError
default:
throw responseError
}
}
private func failSendForUnregisteredRecipient(_ messageSend: OWSMessageSend) async throws -> Never {
let message: TSOutgoingMessage = messageSend.message
if !message.isSyncMessage {
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { writeTx in
self.markAsUnregistered(
serviceId: messageSend.serviceId,
message: message,
thread: messageSend.thread,
transaction: writeTx
)
}
}
throw MessageSenderNoSuchSignalRecipientError()
}
// MARK: - Unregistered, Missing, & Stale Devices
func markAsUnregistered(
serviceId: ServiceId,
message: TSOutgoingMessage,
thread: TSThread,
transaction tx: SDSAnyWriteTransaction
) {
AssertNotOnMainThread()
if thread.isNonContactThread {
// Mark as "skipped" group members who no longer have signal accounts.
message.updateWithSkippedRecipient(SignalServiceAddress(serviceId), transaction: tx)
}
let recipientDatabaseTable = DependenciesBridge.shared.recipientDatabaseTable
guard let recipient = recipientDatabaseTable.fetchRecipient(serviceId: serviceId, transaction: tx.asV2Read) else {
return
}
let recipientManager = DependenciesBridge.shared.recipientManager
recipientManager.markAsUnregisteredAndSave(recipient, unregisteredAt: .now, shouldUpdateStorageService: true, tx: tx.asV2Write)
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
guard let localIdentifiers = tsAccountManager.localIdentifiers(tx: tx.asV2Read) else {
Logger.warn("Can't split recipient because we're not registered.")
return
}
let recipientMerger = DependenciesBridge.shared.recipientMerger
recipientMerger.splitUnregisteredRecipientIfNeeded(
localIdentifiers: localIdentifiers,
unregisteredRecipient: recipient,
tx: tx.asV2Write
)
}
func handleMismatchedDevices(serviceId: ServiceId, missingDevices: [Int8], extraDevices: [Int8], tx: SDSAnyWriteTransaction) {
Logger.warn("Mismatched devices for \(serviceId): +\(missingDevices) -\(extraDevices)")
self.updateDevices(
serviceId: serviceId,
devicesToAdd: missingDevices.compactMap({ UInt32(exactly: $0) }),
devicesToRemove: extraDevices.compactMap({ UInt32(exactly: $0) }),
transaction: tx
)
}
func handleStaleDevices(serviceId: ServiceId, staleDevices: [Int8], tx: SDSAnyWriteTransaction) {
Logger.warn("Stale devices for \(serviceId): \(staleDevices)")
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
for staleDeviceId in staleDevices.compactMap({ UInt32(exactly: $0) }) {
sessionStore.archiveSession(for: serviceId, deviceId: staleDeviceId, tx: tx.asV2Write)
}
}
func updateDevices(
serviceId: ServiceId,
devicesToAdd: [UInt32],
devicesToRemove: [UInt32],
transaction: SDSAnyWriteTransaction
) {
AssertNotOnMainThread()
owsAssertDebug(Set(devicesToAdd).isDisjoint(with: devicesToRemove))
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
let recipient = recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: transaction.asV2Write)
let recipientManager = DependenciesBridge.shared.recipientManager
recipientManager.modifyAndSave(
recipient,
deviceIdsToAdd: devicesToAdd,
deviceIdsToRemove: devicesToRemove,
shouldUpdateStorageService: true,
tx: transaction.asV2Write
)
if !devicesToRemove.isEmpty {
Logger.info("Archiving sessions for extra devices: \(devicesToRemove)")
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
for deviceId in devicesToRemove {
sessionStore.archiveSession(for: serviceId, deviceId: deviceId, tx: transaction.asV2Write)
}
}
}
// MARK: - Encryption
private func encryptMessage(
plaintextContent plainText: Data,
serviceId: ServiceId,
deviceId: UInt32,
sealedSenderParameters: SealedSenderParameters?,
transaction: SDSAnyWriteTransaction
) throws -> DeviceMessage {
owsAssertDebug(!Thread.isMainThread)
guard try containsValidSession(for: serviceId, deviceId: deviceId, tx: transaction.asV2Write) else {
throw MessageSendEncryptionError(serviceId: serviceId, deviceId: deviceId)
}
let paddedPlaintext = plainText.paddedMessageBody
let serializedMessage: Data
let messageType: SSKProtoEnvelopeType
let identityManager = DependenciesBridge.shared.identityManager
let signalProtocolStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci)
let protocolAddress = ProtocolAddress(serviceId, deviceId: deviceId)
if let sealedSenderParameters {
let secretCipher = try SMKSecretSessionCipher(
sessionStore: signalProtocolStore.sessionStore,
preKeyStore: signalProtocolStore.preKeyStore,
signedPreKeyStore: signalProtocolStore.signedPreKeyStore,
kyberPreKeyStore: signalProtocolStore.kyberPreKeyStore,
identityStore: identityManager.libSignalStore(for: .aci, tx: transaction.asV2Write),
senderKeyStore: SSKEnvironment.shared.senderKeyStoreRef
)
serializedMessage = try secretCipher.encryptMessage(
for: serviceId,
deviceId: deviceId,
paddedPlaintext: paddedPlaintext,
contentHint: sealedSenderParameters.contentHint.signalClientHint,
groupId: sealedSenderParameters.envelopeGroupId(tx: transaction.asV2Read),
senderCertificate: sealedSenderParameters.senderCertificate,
protocolContext: transaction
)
messageType = .unidentifiedSender
} else {
let result = try signalEncrypt(
message: paddedPlaintext,
for: protocolAddress,
sessionStore: signalProtocolStore.sessionStore,
identityStore: identityManager.libSignalStore(for: .aci, tx: transaction.asV2Write),
context: transaction
)
switch result.messageType {
case .whisper:
messageType = .ciphertext
case .preKey:
messageType = .prekeyBundle
case .plaintext:
messageType = .plaintextContent
default:
owsFailDebug("Unrecognized message type")
messageType = .unknown
}
serializedMessage = Data(result.serialize())
}
// We had better have a session after encrypting for this recipient!
let session = try signalProtocolStore.sessionStore.loadSession(
for: protocolAddress,
context: transaction
)!
return DeviceMessage(
type: messageType,
destinationDeviceId: protocolAddress.deviceId,
destinationRegistrationId: try session.remoteRegistrationId(),
serializedMessage: serializedMessage
)
}
private func wrapPlaintextMessage(
plaintextContent rawPlaintext: Data,
serviceId: ServiceId,
deviceId: UInt32,
isResendRequestMessage: Bool,
sealedSenderParameters: SealedSenderParameters?,
transaction: SDSAnyWriteTransaction
) throws -> DeviceMessage {
owsAssertDebug(!Thread.isMainThread)
let identityManager = DependenciesBridge.shared.identityManager
let protocolAddress = ProtocolAddress(serviceId, deviceId: deviceId)
// Only resend request messages are allowed to use this codepath.
guard isResendRequestMessage else {
throw OWSAssertionError("Unexpected message type")
}
let plaintext = try PlaintextContent(bytes: rawPlaintext)
let serializedMessage: Data
let messageType: SSKProtoEnvelopeType
if let sealedSenderParameters {
let usmc = try UnidentifiedSenderMessageContent(
CiphertextMessage(plaintext),
from: sealedSenderParameters.senderCertificate,
contentHint: sealedSenderParameters.contentHint.signalClientHint,
groupId: sealedSenderParameters.envelopeGroupId(tx: transaction.asV2Read) ?? Data()
)
let outerBytes = try sealedSenderEncrypt(
usmc,
for: protocolAddress,
identityStore: identityManager.libSignalStore(for: .aci, tx: transaction.asV2Write),
context: transaction
)
serializedMessage = Data(outerBytes)
messageType = .unidentifiedSender
} else {
serializedMessage = Data(plaintext.serialize())
messageType = .plaintextContent
}
let sessionStore = DependenciesBridge.shared.signalProtocolStoreManager.signalProtocolStore(for: .aci).sessionStore
let session = try sessionStore.loadSession(for: protocolAddress, context: transaction)!
return DeviceMessage(
type: messageType,
destinationDeviceId: protocolAddress.deviceId,
destinationRegistrationId: try session.remoteRegistrationId(),
serializedMessage: serializedMessage
)
}
}