TM-SGNL-iOS/SignalServiceKit/Network/Receiving/GroupsV2MessageProcessor.swift
TeleMessage developers dde0620daf initial commit
2025-05-03 12:28:28 -07:00

1109 lines
41 KiB
Swift

//
// Copyright 2020 Signal Messenger, LLC
// SPDX-License-Identifier: AGPL-3.0-only
//
import Foundation
public import LibSignalClient
private struct IncomingGroupsV2MessageJobInfo {
let job: IncomingGroupsV2MessageJob
var envelope: SSKProtoEnvelope?
var groupContext: SSKProtoGroupContextV2?
var groupContextInfo: GroupV2ContextInfo?
}
// MARK: -
class IncomingGroupsV2MessageQueue: MessageProcessingPipelineStage {
private let appReadiness: AppReadiness
private let finder = GRDBGroupsV2MessageJobFinder()
init(appReadiness: AppReadiness) {
self.appReadiness = appReadiness
SwiftSingletons.register(self)
observeNotifications()
// Start processing.
drainQueueWhenReady()
}
private func observeNotifications() {
let nc = NotificationCenter.default
nc.addObserver(
self,
selector: #selector(applicationWillEnterForeground),
name: .OWSApplicationWillEnterForeground,
object: nil
)
nc.addObserver(
self,
selector: #selector(applicationDidEnterBackground),
name: .OWSApplicationDidEnterBackground,
object: nil
)
nc.addObserver(
self,
selector: #selector(registrationStateDidChange),
name: .registrationStateDidChange,
object: nil
)
nc.addObserver(
self,
selector: #selector(chatConnectionStateDidChange),
name: OWSChatConnection.chatConnectionStateDidChange,
object: nil
)
nc.addObserver(
self,
selector: #selector(reachabilityChanged),
name: SSKReachability.owsReachabilityDidChange,
object: nil
)
appReadiness.runNowOrWhenAppDidBecomeReadySync {
SSKEnvironment.shared.messagePipelineSupervisorRef.register(pipelineStage: self)
}
}
// MARK: - Notifications
@objc
func applicationWillEnterForeground() {
AssertIsOnMainThread()
drainQueueWhenReady()
}
@objc
func applicationDidEnterBackground() {
AssertIsOnMainThread()
drainQueueWhenReady()
}
@objc
func registrationStateDidChange() {
AssertIsOnMainThread()
drainQueueWhenReady()
}
@objc
func chatConnectionStateDidChange() {
AssertIsOnMainThread()
drainQueueWhenReady()
}
@objc
func reachabilityChanged() {
AssertIsOnMainThread()
drainQueueWhenReady()
}
func supervisorDidResumeMessageProcessing(_ supervisor: MessagePipelineSupervisor) {
DispatchQueue.main.async {
self.drainQueueWhenReady()
}
}
// MARK: -
fileprivate func enqueue(
envelopeData: Data,
plaintextData: Data,
groupId: Data,
wasReceivedByUD: Bool,
serverDeliveryTimestamp: UInt64,
tx: SDSAnyWriteTransaction
) {
// We need to persist the decrypted envelope data ASAP to prevent data loss.
finder.addJob(
envelopeData: envelopeData,
plaintextData: plaintextData,
groupId: groupId,
wasReceivedByUD: wasReceivedByUD,
serverDeliveryTimestamp: serverDeliveryTimestamp,
transaction: tx.unwrapGrdbWrite
)
}
fileprivate func drainQueueWhenReady() {
guard CurrentAppContext().shouldProcessIncomingMessages else {
return
}
appReadiness.runNowOrWhenAppDidBecomeReadySync {
DispatchQueue.global().async {
self.drainQueues()
}
}
}
private let unfairLock = UnfairLock()
private var groupIdsBeingProcessed: Set<Data> = Set()
fileprivate var isActivelyProcessing: Bool {
return unfairLock.withLock {
return groupIdsBeingProcessed.isEmpty.negated
}
}
// At any given time, we need to ensure that there is exactly
// one GroupsMessageProcessor for each group that needs to
// process incoming messages.
private func drainQueues() {
owsAssertDebug(!Thread.isMainThread)
guard appReadiness.isAppReady || CurrentAppContext().isRunningTests else {
owsFailDebug("App is not ready.")
return
}
let canProcess = (
SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted &&
DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered
)
guard canProcess else {
// Don't process queues.
return
}
// Obtain the list of groups that currently need processing.
let groupIdsWithJobs = Set(SSKEnvironment.shared.databaseStorageRef.read { transaction in
self.finder.allEnqueuedGroupIds(transaction: transaction.unwrapGrdbRead)
})
guard !groupIdsWithJobs.isEmpty else {
NotificationCenter.default.postNotificationNameAsync(GroupsV2MessageProcessor.didFlushGroupsV2MessageQueue, object: nil)
return
}
let messageProcessors: [GroupsMessageProcessor] = unfairLock.withLock {
let groupIdsToProcess = groupIdsWithJobs.subtracting(groupIdsBeingProcessed)
groupIdsBeingProcessed.formUnion(groupIdsToProcess)
return groupIdsToProcess.map { GroupsMessageProcessor(groupId: $0, appReadiness: appReadiness) }
}
for processor in messageProcessors {
firstly(on: DispatchQueue.global()) { () -> Promise<Void> in
processor.promise
}.ensure(on: DispatchQueue.global()) {
self.unfairLock.withLock {
_ = self.groupIdsBeingProcessed.remove(processor.groupId)
}
self.drainQueues()
}.catch(on: DispatchQueue.global()) { error in
owsFailDebug("Error: \(error)")
}
}
}
func hasPendingJobs(tx: SDSAnyReadTransaction) -> Bool {
self.finder.jobCount(transaction: tx) > 0
}
func pendingJobCount(tx: SDSAnyReadTransaction) -> UInt {
self.finder.jobCount(transaction: tx)
}
}
// MARK: -
// The entity tries to process all pending jobs for a given group.
//
// * It retries with exponential backoff.
// * It retries immediately if reachability, etc. change.
//
// It's promise is fulfilled when all jobs are processed _or_
// we give up.
internal class GroupsMessageProcessor: MessageProcessingPipelineStage {
private let appReadiness: AppReadiness
fileprivate let groupId: Data
private let finder = GRDBGroupsV2MessageJobFinder()
fileprivate let promise: Promise<Void>
private let future: Future<Void>
internal init(groupId: Data, appReadiness: AppReadiness) {
self.appReadiness = appReadiness
self.groupId = groupId
let (promise, future) = Promise<Void>.pending()
self.promise = promise
self.future = future
observeNotifications()
tryToProcess()
}
private func observeNotifications() {
let nc = NotificationCenter.default
nc.addObserver(
self,
selector: #selector(applicationWillEnterForeground),
name: .OWSApplicationWillEnterForeground,
object: nil
)
nc.addObserver(
self,
selector: #selector(applicationDidEnterBackground),
name: .OWSApplicationDidEnterBackground,
object: nil
)
nc.addObserver(
self,
selector: #selector(registrationStateDidChange),
name: .registrationStateDidChange,
object: nil
)
nc.addObserver(
self,
selector: #selector(chatConnectionStateDidChange),
name: OWSChatConnection.chatConnectionStateDidChange,
object: nil
)
nc.addObserver(
self,
selector: #selector(reachabilityChanged),
name: SSKReachability.owsReachabilityDidChange,
object: nil
)
appReadiness.runNowOrWhenAppDidBecomeReadySync {
SSKEnvironment.shared.messagePipelineSupervisorRef.register(pipelineStage: self)
}
}
// MARK: - Notifications
@objc
func applicationWillEnterForeground() {
AssertIsOnMainThread()
tryToProcess()
}
@objc
func applicationDidEnterBackground() {
AssertIsOnMainThread()
tryToProcess()
}
@objc
func registrationStateDidChange() {
AssertIsOnMainThread()
tryToProcess()
}
@objc
func chatConnectionStateDidChange() {
AssertIsOnMainThread()
tryToProcess()
}
@objc
func reachabilityChanged() {
AssertIsOnMainThread()
tryToProcess()
}
// MARK: -
private let isDrainingQueue = AtomicBool(false, lock: .sharedGlobal)
private func tryToProcess(retryDelayAfterFailure: TimeInterval = 1.0) {
guard isDrainingQueue.tryToSetFlag() else {
// Batch already in flight.
return
}
processWorkStep(retryDelayAfterFailure: retryDelayAfterFailure)
}
private typealias BatchCompletionBlock = ([IncomingGroupsV2MessageJob], Bool, SDSAnyWriteTransaction) -> Void
private func processWorkStep(retryDelayAfterFailure: TimeInterval = 1.0) {
owsAssertDebug(isDrainingQueue.get())
let canProcess = (
SSKEnvironment.shared.messagePipelineSupervisorRef.isMessageProcessingPermitted &&
DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered
)
guard canProcess else {
Logger.warn("Cannot process.")
future.resolve()
return
}
// We want a value that is just high enough to yield perf benefits.
let kIncomingMessageBatchSize: UInt = 16
// If the app is in the background, use batch size of 1.
// This reduces the cost of being interrupted and rolled back if
// app is suspended.
let batchSize: UInt = CurrentAppContext().isInBackground() ? 1 : kIncomingMessageBatchSize
let batchJobs = SSKEnvironment.shared.databaseStorageRef.read { transaction in
self.finder.nextJobs(forGroupId: self.groupId, batchSize: batchSize, transaction: transaction.unwrapGrdbRead)
}
guard !batchJobs.isEmpty else {
future.resolve()
return
}
var backgroundTask: OWSBackgroundTask? = OWSBackgroundTask(label: "\(#function)")
let completion: BatchCompletionBlock = { (processedJobs, shouldWaitBeforeRetrying, transaction) in
// NOTE: This transaction is the same transaction as the transaction
// passed to processJobs() in the "sync" case but is a different
// transaction in the "async" case.
if shouldWaitBeforeRetrying {
Logger.warn("shouldWaitBeforeRetrying")
}
let processedUniqueIds = processedJobs.map { $0.uniqueId }
self.finder.removeJobs(withUniqueIds: processedUniqueIds, transaction: transaction.unwrapGrdbWrite)
transaction.addAsyncCompletionOffMain {
assert(backgroundTask != nil)
backgroundTask = nil
if shouldWaitBeforeRetrying {
// After successfully processing a batch drainQueueWorkStep()
// calls itself to process the next batch, if any.
// The isDrainingQueue flag is cleared when all batches have
// been processed.
//
// After failures, we clear isDrainingQueue immediately and
// call drainQueue(), not drainQueueWorkStep() after a delay.
// That allows us to kick off another batch immediately if
// reachability changes, etc.
if !self.isDrainingQueue.tryToClearFlag() {
self.future.reject(OWSAssertionError("Couldn't clear flag."))
return
}
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + retryDelayAfterFailure) {
self.tryToProcess(retryDelayAfterFailure: retryDelayAfterFailure * 2)
}
} else {
// Wait always a bit in hopes of increasing the size of the next batch.
// This delay won't affect the first message to arrive when this queue is idle,
// so by definition we're receiving more than one message and can benefit from
// batching.
let batchSpacingSeconds: TimeInterval = 0.5
DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + batchSpacingSeconds) {
self.processWorkStep()
}
}
}
}
SSKEnvironment.shared.databaseStorageRef.write { tx in
self.processJobs(jobs: batchJobs, tx: tx, completion: completion)
}
}
private static func jobInfo(
forJob job: IncomingGroupsV2MessageJob,
tx: SDSAnyReadTransaction
) -> IncomingGroupsV2MessageJobInfo {
var jobInfo = IncomingGroupsV2MessageJobInfo(job: job)
guard let envelope = job.envelope else {
owsFailDebug("Missing envelope.")
return jobInfo
}
jobInfo.envelope = envelope
guard let plaintextData = job.plaintextData,
let groupContext = GroupsV2MessageProcessor.groupContextV2(fromPlaintextData: plaintextData) else {
owsFailDebug("Missing group context.")
return jobInfo
}
jobInfo.groupContext = groupContext
do {
jobInfo.groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return jobInfo
}
return jobInfo
}
internal static func discardMode(
forMessageFrom sourceAci: Aci,
groupContext: SSKProtoGroupContextV2,
tx: SDSAnyReadTransaction
) -> GroupsV2MessageProcessor.DiscardMode {
guard groupContext.hasRevision else {
Logger.info("Missing revision in group context")
return .discard
}
let groupContextInfo: GroupV2ContextInfo
do {
groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return .discard
}
return GroupsV2MessageProcessor.discardMode(
forMessageFrom: sourceAci,
groupId: groupContextInfo.groupId,
tx: tx
)
}
private static func discardMode(
forJobInfo jobInfo: IncomingGroupsV2MessageJobInfo,
hasGroupBeenUpdated: Bool,
tx: SDSAnyReadTransaction
) -> GroupsV2MessageProcessor.DiscardMode {
guard let envelope = jobInfo.envelope else {
owsFailDebug("Missing envelope.")
return .discard
}
guard let groupContextInfo = jobInfo.groupContextInfo else {
owsFailDebug("Missing groupContextInfo.")
return .discard
}
guard let sourceAci = Aci.parseFrom(aciString: envelope.sourceServiceID) else {
owsFailDebug("Invalid source address.")
return .discard
}
return GroupsV2MessageProcessor.discardMode(
forMessageFrom: sourceAci,
groupId: groupContextInfo.groupId,
shouldCheckGroupModel: hasGroupBeenUpdated,
tx: tx
)
}
// Like non-v2 group messages, we want to do batch processing
// wherever possible for perf reasons (to reduce view updates).
// We should be able to mostly do that. However, in some cases
// we need to update the group before we can process the
// message. Such messages should be processed in a batch
// of their own.
//
// Therefore, when trying to process we try to process either:
//
// * The first N messages that can be processed "without update".
// * The first message, which has to be processed "with update".
//
// Which type of batch we try to process is determined by the
// message at the head of the queue.
private func canJobBeProcessedWithoutUpdate(
jobInfo: IncomingGroupsV2MessageJobInfo,
tx: SDSAnyReadTransaction
) -> Bool {
if .discard == Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: false, tx: tx) {
return true
}
guard let groupContext = jobInfo.groupContext else {
owsFailDebug("Missing groupContext.")
return true
}
guard let groupContextInfo = jobInfo.groupContextInfo else {
owsFailDebug("Missing groupContextInfo.")
return true
}
return GroupsV2MessageProcessor.canContextBeProcessedWithoutUpdate(
groupContext: groupContext,
groupContextInfo: groupContextInfo,
tx: tx
)
}
// NOTE: This method might do its work synchronously (in the "no update" case)
// or asynchronously (in the "update" case). It may only process
// a subset of the jobs.
private func processJobs(
jobs: [IncomingGroupsV2MessageJob],
tx: SDSAnyWriteTransaction,
completion: @escaping BatchCompletionBlock
) {
// 1. Gather info for each job.
// 2. Decide whether we'll process 1 "update" job or N "no update" jobs.
//
// "Update" jobs may require interaction with the service, namely
// fetching group changes or latest group state.
var isUpdateBatch = false
var jobInfos = [IncomingGroupsV2MessageJobInfo]()
for job in jobs {
let jobInfo = Self.jobInfo(forJob: job, tx: tx)
let canJobBeProcessedWithoutUpdate = self.canJobBeProcessedWithoutUpdate(jobInfo: jobInfo, tx: tx)
if !canJobBeProcessedWithoutUpdate {
if jobInfos.count > 0 {
// Can't add "update" job to "no update" batch, abort and process jobs
// already added to batch.
break
}
// Update batches should only contain a single job.
isUpdateBatch = true
jobInfos.append(jobInfo)
break
}
jobInfos.append(jobInfo)
}
if isUpdateBatch {
assert(jobInfos.count == 1)
guard let jobInfo = jobInfos.first else {
owsFailDebug("Missing job")
completion([], false, tx)
return
}
Task {
await updateGroupAndProcessJob(jobInfo: jobInfo, completion: completion)
}
} else {
let processedJobs = performLocalProcessingSync(jobInfos: jobInfos, tx: tx)
completion(processedJobs, false, tx)
}
}
private func performLocalProcessingSync(
jobInfos: [IncomingGroupsV2MessageJobInfo],
tx: SDSAnyWriteTransaction
) -> [IncomingGroupsV2MessageJob] {
guard jobInfos.count > 0 else {
owsFailDebug("Missing jobInfos.")
return []
}
var processedJobs = [IncomingGroupsV2MessageJob]()
for jobInfo in jobInfos {
let job = jobInfo.job
let discardMode = Self.discardMode(forJobInfo: jobInfo, hasGroupBeenUpdated: true, tx: tx)
if discardMode == .discard {
// Do nothing.
} else {
// The forced unwraps are checked in `discardMode`, so they can't fail.
// TODO: Refactor so that the compiler enforces the above statement.
SSKEnvironment.shared.messageReceiverRef.processEnvelope(
jobInfo.envelope!,
plaintextData: job.plaintextData!,
wasReceivedByUD: job.wasReceivedByUD,
serverDeliveryTimestamp: job.serverDeliveryTimestamp,
shouldDiscardVisibleMessages: discardMode == .discardVisibleMessages,
localIdentifiers: DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx.asV2Read)!,
tx: tx
)
}
processedJobs.append(job)
if CurrentAppContext().isInBackground() {
// If the app is in the background, stop processing this batch.
//
// Since this check is done after processing jobs, we'll continue
// to process jobs in batches of 1. This reduces the cost of
// being interrupted and rolled back if app is suspended.
break
}
}
return processedJobs
}
private enum UpdateOutcome {
case successShouldProcess
case failureShouldDiscard
case failureShouldRetry
case failureShouldFailoverToService
}
private func updateGroupAndProcessJob(
jobInfo: IncomingGroupsV2MessageJobInfo,
completion: @escaping BatchCompletionBlock
) async {
do {
let updateOutcome = await updateGroup(jobInfo: jobInfo)
switch updateOutcome {
case .successShouldProcess:
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
let processedJobs = self.performLocalProcessingSync(jobInfos: [jobInfo], tx: tx)
completion(processedJobs, false, tx)
}
case .failureShouldDiscard:
throw GroupsV2Error.shouldDiscard
case .failureShouldRetry:
throw GroupsV2Error.shouldRetry
case .failureShouldFailoverToService:
owsFailDebug("Invalid embeddedUpdateOutcome: .failureShouldFailoverToService.")
throw GroupsV2Error.shouldDiscard
}
} catch {
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
if self.isRetryableError(error) {
Logger.warn("Error: \(error)")
// Retry
// _Do not_ include the job in the processed jobs.
// _Do_ wait before retrying.
completion([], true, tx)
} else {
// This should only occur if we no longer have access to group state,
// e.g. a) we were kicked out of the group. b) our invite was revoked.
// c) our request to join via group invite link was denied.
Logger.warn("Discarding unprocess-able message: \(error)")
// Do not retry
// _Do_ include the job in the processed jobs.
// It will be discarded.
// _Do not_ wait before retrying.
completion([jobInfo.job], false, tx)
}
}
}
}
private func updateGroup(jobInfo: IncomingGroupsV2MessageJobInfo) async -> UpdateOutcome {
// First, we try to update the group locally using changes embedded in
// the group context (if any).
let embeddedUpdateOutcome = await updateUsingEmbeddedGroupUpdate(jobInfo: jobInfo)
if embeddedUpdateOutcome == .failureShouldFailoverToService {
return await tryToUpdateUsingService(jobInfo: jobInfo)
} else {
return embeddedUpdateOutcome
}
}
// We only try to apply one embedded update per batch.
//
// If applying the embedded update fails, we fail
// over to fetching latest state from service.
//
// This method should:
//
// * Return .successShouldProcess if message processing should proceed. Either:
// * ...the group didn't need to be updated (some other component beat us to it).
// * ...the group was successfully updated to the target revision.
// * Return .failureShouldFailoverToService if the group could not be updated
// to the target revision and we should fail over to fetching group changes
// and/or latest group state from the service.
// * Return .failureShouldDiscard if this message should be discarded.
//
// This method should never return .failureShouldRetry.
private func updateUsingEmbeddedGroupUpdate(
jobInfo: IncomingGroupsV2MessageJobInfo
) async -> UpdateOutcome {
guard
let groupContextInfo = jobInfo.groupContextInfo,
let groupContext = jobInfo.groupContext
else {
owsFailDebug("Missing jobInfo properties.")
return .failureShouldDiscard
}
let groupId = groupContextInfo.groupId
guard GroupManager.isValidGroupId(groupId, groupsVersion: .V2) else {
owsFailDebug("Invalid groupId.")
return .failureShouldDiscard
}
let thread = SSKEnvironment.shared.databaseStorageRef.read { tx in
return TSGroupThread.fetch(groupId: groupId, transaction: tx)
}
guard let groupThread = thread else {
// We might be learning of a group for the first time
// in which case we should fetch current group state from the
// service.
return .failureShouldFailoverToService
}
guard let oldGroupModel = groupThread.groupModel as? TSGroupModelV2 else {
owsFailDebug("Invalid group model.")
return .failureShouldDiscard
}
guard groupContext.hasRevision else {
owsFailDebug("Missing revision.")
return .failureShouldDiscard
}
let contextRevision = groupContext.revision
guard contextRevision > oldGroupModel.revision else {
// Group is already updated.
// No need to apply embedded change from the group context; it is obsolete.
// This can happen due to races.
return .successShouldProcess
}
guard contextRevision == oldGroupModel.revision + 1 else {
// We can only apply embedded changes if we're behind exactly
// one revision.
return .failureShouldFailoverToService
}
guard let changeProtoData = groupContext.groupChange else {
// No embedded group change.
return .failureShouldFailoverToService
}
do {
let changeProto = try GroupsProtoGroupChange(serializedData: changeProtoData)
guard changeProto.changeEpoch <= GroupManager.changeProtoEpoch else {
throw OWSAssertionError("Invalid embedded change proto epoch: \(changeProto.changeEpoch).")
}
// We need to verify the signatures because these protos came from
// another client, not the service.
let changeActionsProto = try GroupsV2Protos.parseGroupChangeProto(changeProto, verificationOperation: .verifySignature(groupId: groupId))
guard changeActionsProto.revision == contextRevision else {
throw OWSAssertionError("Embedded change proto revision doesn't match context revision.")
}
let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
guard let serverGuid = jobInfo.envelope?.serverGuid else { return .unreportable }
return .reportable(serverGuid: serverGuid)
}()
let updatedGroupThread = try await SSKEnvironment.shared.groupsV2Ref.updateGroupWithChangeActions(
groupId: oldGroupModel.groupId,
spamReportingMetadata: spamReportingMetadata,
changeActionsProto: changeActionsProto,
groupSecretParams: try oldGroupModel.secretParams()
)
guard let updatedGroupModel = updatedGroupThread.groupModel as? TSGroupModelV2 else {
owsFailDebug("Invalid group model.")
return .failureShouldFailoverToService
}
guard updatedGroupModel.revision >= contextRevision else {
owsFailDebug("Invalid revision.")
return .failureShouldFailoverToService
}
guard updatedGroupModel.revision == contextRevision else {
// We expect the embedded changes to update us to the target
// revision. If we update past that, assert but proceed in production.
owsFailDebug("Unexpected revision.")
return .successShouldProcess
}
} catch {
if self.isRetryableError(error) {
Logger.warn("Error: \(error)")
return .failureShouldRetry
} else {
if case GroupsV2Error.cantApplyChangesToPlaceholder = error {
Logger.warn("Error: \(error)")
} else {
owsFailDebug("Error: \(error)")
}
return .failureShouldFailoverToService
}
}
Logger.info("Successfully applied embedded change proto from group context.")
return .successShouldProcess
}
private func tryToUpdateUsingService(jobInfo: IncomingGroupsV2MessageJobInfo) async -> UpdateOutcome {
guard
let groupContextInfo = jobInfo.groupContextInfo,
let groupContext = jobInfo.groupContext
else {
owsFailDebug("Missing jobInfo properties.")
return .failureShouldDiscard
}
// See GroupV2UpdatesImpl.
// This will try to update the group using incremental "changes" but
// failover to using a "snapshot".
let spamReportingMetadata: GroupUpdateSpamReportingMetadata = {
guard let serverGuid = jobInfo.envelope?.serverGuid else { return .unreportable }
return .reportable(serverGuid: serverGuid)
}()
let groupUpdateMode = GroupUpdateMode.upToSpecificRevisionImmediately(upToRevision: groupContext.revision)
do {
try await SSKEnvironment.shared.groupV2UpdatesRef.tryToRefreshV2GroupThread(
groupId: groupContextInfo.groupId,
spamReportingMetadata: spamReportingMetadata,
groupSecretParams: groupContextInfo.groupSecretParams,
groupUpdateMode: groupUpdateMode
)
return .successShouldProcess
} catch {
if self.isRetryableError(error) {
Logger.warn("error: \(type(of: error)) \(error)")
return .failureShouldRetry
}
if case GroupsV2Error.localUserNotInGroup = error {
// This should only occur if we no longer have access to group state,
// e.g. a) we were kicked out of the group. b) our invite was revoked.
// c) our request to join via group invite link was denied.
Logger.warn("Error: \(type(of: error)) \(error)")
} else {
owsFailDebugUnlessNetworkFailure(error)
}
return .failureShouldDiscard
}
}
private func isRetryableError(_ error: Error) -> Bool {
if error.isNetworkFailureOrTimeout {
return true
}
if let statusCode = error.httpStatusCode {
if statusCode == 401 ||
(500 <= statusCode && statusCode <= 599) {
return true
}
}
switch error {
case GroupsV2Error.timeout, GroupsV2Error.shouldRetry:
return true
default:
return false
}
}
}
// MARK: -
public class GroupsV2MessageProcessor: NSObject {
public static let didFlushGroupsV2MessageQueue = Notification.Name("didFlushGroupsV2MessageQueue")
private let processingQueue: IncomingGroupsV2MessageQueue
public init(appReadiness: AppReadiness) {
self.processingQueue = IncomingGroupsV2MessageQueue(appReadiness: appReadiness)
super.init()
SwiftSingletons.register(self)
processingQueue.drainQueueWhenReady()
}
// MARK: -
public func enqueue(
envelopeData: Data,
plaintextData: Data,
wasReceivedByUD: Bool,
serverDeliveryTimestamp: UInt64,
tx: SDSAnyWriteTransaction
) {
guard !envelopeData.isEmpty else {
owsFailDebug("Empty envelope.")
return
}
guard let groupId = groupId(fromPlaintextData: plaintextData) else {
owsFailDebug("Missing or invalid group id")
return
}
// We need to persist the decrypted envelope data ASAP to prevent data loss.
processingQueue.enqueue(
envelopeData: envelopeData,
plaintextData: plaintextData,
groupId: groupId,
wasReceivedByUD: wasReceivedByUD,
serverDeliveryTimestamp: serverDeliveryTimestamp,
tx: tx
)
if DebugFlags.internalLogging {
let jobCount = processingQueue.pendingJobCount(tx: tx)
Logger.info("jobCount: \(jobCount)")
}
// The new envelope won't be visible to the finder until this transaction commits,
// so drainQueue in the transaction completion.
tx.addAsyncCompletionOffMain {
self.processingQueue.drainQueueWhenReady()
}
}
private func groupId(fromPlaintextData plaintextData: Data) -> Data? {
guard let groupContext = GroupsV2MessageProcessor.groupContextV2(fromPlaintextData: plaintextData) else {
owsFailDebug("Invalid content.")
return nil
}
do {
let groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
return groupContextInfo.groupId
} catch {
owsFailDebug("Invalid group context: \(error).")
return nil
}
}
public class func isGroupsV2Message(plaintextData: Data) -> Bool {
return groupContextV2(fromPlaintextData: plaintextData) != nil
}
public class func canContextBeProcessedImmediately(
groupContext: SSKProtoGroupContextV2,
tx: SDSAnyReadTransaction
) -> Bool {
let groupContextInfo: GroupV2ContextInfo
do {
groupContextInfo = try GroupV2ContextInfo.deriveFrom(masterKeyData: groupContext.masterKey ?? Data())
} catch {
owsFailDebug("Invalid group context: \(error).")
return false
}
// We can only process GV2 messages immediately if:
// 1. We don't have any other messages queued for this thread
// 2. The message can be processed without updates
guard !GRDBGroupsV2MessageJobFinder().existsJob(forGroupId: groupContextInfo.groupId, transaction: tx.unwrapGrdbRead) else {
Logger.warn("Cannot immediately process GV2 message because there are messages queued")
return false
}
return canContextBeProcessedWithoutUpdate(
groupContext: groupContext,
groupContextInfo: groupContextInfo,
tx: tx
)
}
fileprivate class func canContextBeProcessedWithoutUpdate(
groupContext: SSKProtoGroupContextV2,
groupContextInfo: GroupV2ContextInfo,
tx: SDSAnyReadTransaction
) -> Bool {
guard let groupThread = TSGroupThread.fetch(groupId: groupContextInfo.groupId, transaction: tx) else {
return false
}
guard let groupModel = groupThread.groupModel as? TSGroupModelV2 else {
Logger.warn("Invalid group model; possibly needs to be migrated.")
return false
}
let messageRevision = groupContext.revision
let modelRevision = groupModel.revision
if messageRevision <= modelRevision {
return true
}
// The incoming message indicates that there is a new group revision.
// We'll update our group model in a standalone batch using either
// the change proto embedded in the group context or by fetching
// latest state from the service.
return false
}
public class func groupContextV2(fromPlaintextData plaintextData: Data) -> SSKProtoGroupContextV2? {
guard !plaintextData.isEmpty else {
return nil
}
let contentProto: SSKProtoContent
do {
contentProto = try SSKProtoContent(serializedData: plaintextData)
} catch {
owsFailDebug("could not parse proto: \(error)")
return nil
}
return groupContextV2(from: contentProto)
}
public class func groupContextV2(from contentProto: SSKProtoContent) -> SSKProtoGroupContextV2? {
if let groupV2 = contentProto.dataMessage?.groupV2 {
return groupV2
}
if let groupV2 = contentProto.syncMessage?.sent?.message?.groupV2 {
return groupV2
}
return nil
}
public func isActivelyProcessing() -> Bool {
return processingQueue.isActivelyProcessing
}
public func hasPendingJobs(tx: SDSAnyReadTransaction) -> Bool {
processingQueue.hasPendingJobs(tx: tx)
}
public func pendingJobCount(tx: SDSAnyReadTransaction) -> UInt {
processingQueue.pendingJobCount(tx: tx)
}
public enum DiscardMode {
/// Do not process this envelope.
case discard
/// Process this envelope.
case doNotDiscard
/// Process this envelope, but discard any "renderable" content,
/// e.g. calls or messages in the chat history.
case discardVisibleMessages
}
/// Returns whether a group message from the given user should be discarded.
///
/// If `shouldCheckGroupModel` is false, only checks whether the sender or group is blocked.
public static func discardMode(
forMessageFrom sourceAci: Aci,
groupId: Data,
shouldCheckGroupModel: Bool = true,
tx: SDSAnyReadTransaction
) -> DiscardMode {
// We want to discard asap to avoid problems with batching.
let blockingManager = SSKEnvironment.shared.blockingManagerRef
let isBlocked: Bool = (
blockingManager.isAddressBlocked(SignalServiceAddress(sourceAci), transaction: tx)
|| blockingManager.isGroupIdBlocked(groupId, transaction: tx)
)
if isBlocked {
Logger.info("Discarding blocked envelope.")
return .discard
}
// We need to pre-process all incoming envelopes; they might change
// our group state.
//
// But we should only pass envelopes to the MessageManager for
// processing if they correspond to v2 groups of which we are a
// non-pending member.
if shouldCheckGroupModel {
guard let localAddress = DependenciesBridge.shared.tsAccountManager.localIdentifiers(tx: tx.asV2Read)?.aciAddress else {
owsFailDebug("Missing localAddress.")
return .discard
}
guard let groupThread = TSGroupThread.fetch(groupId: groupId, transaction: tx) else {
// The user might have just deleted the thread
// but this race should be extremely rare.
// Usually this should indicate a bug.
owsFailDebug("Missing thread.")
return .discard
}
guard groupThread.groupModel.groupMembership.isFullMember(localAddress) else {
// * Local user might have just left the group.
// * Local user may have just learned that we were removed from the group.
// * Local user might be a pending member with an invite.
Logger.warn("Discarding envelope; local user is not an active group member.")
return .discard
}
guard groupThread.groupModel.groupMembership.isFullMember(SignalServiceAddress(sourceAci)) else {
// * The sender might have just left the group.
Logger.warn("Discarding envelope; sender is not an active group member.")
return .discard
}
if let groupModel = groupThread.groupModel as? TSGroupModelV2 {
if groupModel.isAnnouncementsOnly {
guard groupThread.groupModel.groupMembership.isFullMemberAndAdministrator(SignalServiceAddress(sourceAci)) else {
// * Only administrators can send "renderable" messages to a "announcement-only" group.
Logger.warn("Discarding renderable content in envelope; sender is not an active group member.")
return .discardVisibleMessages
}
}
} else {
owsFailDebug("Invalid group model.")
}
}
return .doNotDiscard
}
}