2429 lines
108 KiB
Swift
2429 lines
108 KiB
Swift
//
|
|
// Copyright 2019 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
import Foundation
|
|
import LibSignalClient
|
|
public import SignalRingRTC
|
|
import SwiftProtobuf
|
|
|
|
public protocol StorageServiceManager {
|
|
typealias ManifestRotationMode = StorageServiceManagerManifestRotationMode
|
|
|
|
/// Updates the local user's identity.
|
|
///
|
|
/// Called during app launch, registration, and change number.
|
|
func setLocalIdentifiers(_ localIdentifiers: LocalIdentifiers)
|
|
|
|
/// The version of the latest known Storage Service manifest.
|
|
func currentManifestVersion(tx: DBReadTransaction) -> UInt64
|
|
/// Whether the latest-known Storage Service manifest contains a `recordIkm`.
|
|
func currentManifestHasRecordIkm(tx: DBReadTransaction) -> Bool
|
|
|
|
func recordPendingUpdates(updatedRecipientUniqueIds: [RecipientUniqueId])
|
|
func recordPendingUpdates(updatedAddresses: [SignalServiceAddress])
|
|
func recordPendingUpdates(updatedGroupV2MasterKeys: [Data])
|
|
func recordPendingUpdates(updatedStoryDistributionListIds: [Data])
|
|
func recordPendingUpdates(callLinkRootKeys: [CallLinkRootKey])
|
|
func recordPendingLocalAccountUpdates()
|
|
|
|
func backupPendingChanges(authedDevice: AuthedDevice)
|
|
|
|
@discardableResult
|
|
func restoreOrCreateManifestIfNecessary(authedDevice: AuthedDevice) -> Promise<Void>
|
|
|
|
func rotateManifest(
|
|
mode: ManifestRotationMode,
|
|
authedDevice: AuthedDevice
|
|
) async throws
|
|
|
|
/// Wipes all local state related to Storage Service, without mutating
|
|
/// remote state.
|
|
///
|
|
/// - Note
|
|
/// The expected behavior after calling this method is that the next time we
|
|
/// perform a backup we will create a brand-new manifest with version 1, as
|
|
/// we have no local manifest version. However, since we still (probably)
|
|
/// have a remote manifest this backup will be rejected, and we'll merge in
|
|
/// the remote manifest, then re-attempt our backup.
|
|
///
|
|
/// This is a weird behavior to specifically want, and new callers who are
|
|
/// interested in forcing a manifest recreation should probably prefer
|
|
/// ``rotateManifest`` instead.
|
|
func resetLocalData(transaction: DBWriteTransaction)
|
|
|
|
/// Waits for pending restores to finish.
|
|
///
|
|
/// When this is resolved, it means the current device has the latest state
|
|
/// available on storage service.
|
|
///
|
|
/// If this device believes there's new state available on storage service
|
|
/// but the request to fetch it has failed, this Promise will be rejected.
|
|
///
|
|
/// If the local device doesn't believe storage service has new state, this
|
|
/// will resolve without performing any network requests.
|
|
///
|
|
/// Due to the asynchronous nature of network requests, it's possible for
|
|
/// another device to write to storage service at the same time the returned
|
|
/// Promise resolves. Therefore, the precise behavior of this method is best
|
|
/// described as: "if this device has knowledge that storage service has new
|
|
/// state at the time this method is invoked, the returned Promise will be
|
|
/// resolved after that state has been fetched".
|
|
func waitForPendingRestores() -> Promise<Void>
|
|
}
|
|
|
|
extension StorageServiceManager {
|
|
public func recordPendingUpdates(groupModel: TSGroupModel) {
|
|
if let groupModelV2 = groupModel as? TSGroupModelV2 {
|
|
let masterKey: GroupMasterKey
|
|
do {
|
|
masterKey = try groupModelV2.masterKey()
|
|
} catch {
|
|
owsFailDebug("Missing master key: \(error)")
|
|
return
|
|
}
|
|
recordPendingUpdates(updatedGroupV2MasterKeys: [ masterKey.serialize().asData ])
|
|
} else {
|
|
owsFailDebug("How did we end up with pending updates to a V1 group?")
|
|
}
|
|
}
|
|
}
|
|
|
|
public enum StorageServiceManagerManifestRotationMode {
|
|
/// Recreate the manifest, preserving its contained data related to records.
|
|
/// Since the record data is preserved, such as their identifiers and the
|
|
/// `recordIkm`, the manifest can be inexpensively recreated in place
|
|
/// leaving records untouched.
|
|
///
|
|
/// - Note
|
|
/// This mode is only applicable if we have previously migrated to using a
|
|
/// `recordIkm`. If not, this mode is treated like `.alsoRotatingRecords`.
|
|
case preservingRecordsIfPossible
|
|
|
|
/// Recreate the manifest and all records, using local data as the source of
|
|
/// truth for creating records. This deletes all existing records, replacing
|
|
/// them with new ones with newly-generated identifiers; if we are capable,
|
|
/// the records will be encrypted using a newly-generated `recordIkm`.
|
|
case alsoRotatingRecords
|
|
|
|
/// Orders cases by precedence, with higher numbers more significant.
|
|
private var precedenceOrder: Int {
|
|
switch self {
|
|
case .preservingRecordsIfPossible: return 0
|
|
case .alsoRotatingRecords: return 1
|
|
}
|
|
}
|
|
|
|
/// Merge the given mode into this one, returning the one with the higher
|
|
/// precedence.
|
|
fileprivate func mergeByPrecedence(_ other: Self) -> Self {
|
|
if precedenceOrder >= other.precedenceOrder {
|
|
return self
|
|
}
|
|
|
|
return other
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public class StorageServiceManagerImpl: NSObject, StorageServiceManager {
|
|
|
|
private let appReadiness: AppReadiness
|
|
|
|
init(appReadiness: AppReadiness) {
|
|
self.appReadiness = appReadiness
|
|
super.init()
|
|
|
|
SwiftSingletons.register(self)
|
|
|
|
if CurrentAppContext().hasUI {
|
|
appReadiness.runNowOrWhenAppWillBecomeReady {
|
|
self.cleanUpUnknownData()
|
|
}
|
|
|
|
appReadiness.runNowOrWhenAppDidBecomeReadySync {
|
|
NotificationCenter.default.addObserver(
|
|
self,
|
|
selector: #selector(self.willResignActive),
|
|
name: .OWSApplicationWillResignActive,
|
|
object: nil
|
|
)
|
|
}
|
|
|
|
appReadiness.runNowOrWhenMainAppDidBecomeReadyAsync {
|
|
guard DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction.isRegistered else { return }
|
|
|
|
// Schedule a restore. This will do nothing unless we've never
|
|
// registered a manifest before.
|
|
self.restoreOrCreateManifestIfNecessary(authedDevice: .implicit)
|
|
|
|
// If we have any pending changes since we last launch, back them up now.
|
|
self.backupPendingChanges(authedDevice: .implicit)
|
|
}
|
|
|
|
appReadiness.runNowOrWhenMainAppDidBecomeReadyAsync {
|
|
Task { await self.cleanUpDeletedCallLinks() }
|
|
}
|
|
}
|
|
}
|
|
|
|
@objc
|
|
private func willResignActive() {
|
|
// If we have any pending changes, start a back up immediately
|
|
// to try and make sure the service doesn't get stale. If for
|
|
// some reason we aren't able to successfully complete this backup
|
|
// while in the background we'll try again on the next app launch.
|
|
backupPendingChanges(authedDevice: .implicit)
|
|
}
|
|
|
|
public func setLocalIdentifiers(_ localIdentifiers: LocalIdentifiers) {
|
|
updateManagerState { managerState in
|
|
managerState.localIdentifiers = localIdentifiers
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
public func currentManifestVersion(tx: DBReadTransaction) -> UInt64 {
|
|
return StorageServiceOperation.State.current(
|
|
transaction: SDSDB.shimOnlyBridge(tx)
|
|
).manifestVersion
|
|
}
|
|
|
|
public func currentManifestHasRecordIkm(tx: DBReadTransaction) -> Bool {
|
|
return StorageServiceOperation.State.current(
|
|
transaction: SDSDB.shimOnlyBridge(tx)
|
|
).manifestRecordIkm != nil
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
private struct ManagerState {
|
|
/// The local user's identifiers. In the future, this should be provided
|
|
/// when this class is initialized. For now, it's an Optional to handle the
|
|
/// window between initialization and when the database is loaded.
|
|
var localIdentifiers: LocalIdentifiers?
|
|
|
|
struct PendingManifestRotation {
|
|
var authedDevice: AuthedDevice
|
|
var continuations: [CheckedContinuation<Void, Error>]
|
|
var mode: ManifestRotationMode
|
|
}
|
|
var pendingManifestRotation: PendingManifestRotation?
|
|
|
|
var hasPendingCleanup = false
|
|
|
|
struct PendingBackup {
|
|
// Ideally, we instead have the entire StorageServiceManager class be
|
|
// instantiated with the necessary context to make authenticated requests.
|
|
// This is a middle ground between the current world (implicit auth we grab
|
|
// from tsAccountManager) and explicit auth management.
|
|
var authedDevice: AuthedDevice
|
|
}
|
|
var pendingBackup: PendingBackup?
|
|
var pendingBackupTimer: Timer?
|
|
|
|
struct PendingRestore {
|
|
var authedDevice: AuthedDevice
|
|
var futures: [Future<Void>]
|
|
}
|
|
var pendingRestore: PendingRestore?
|
|
|
|
var pendingMutations = PendingMutations()
|
|
|
|
/// If set, contains the Error from the most recent restore request. If
|
|
/// it's nil, we've either (a) not yet attempted a restore in this
|
|
/// process; or (b) completed the most recent restore successfully.
|
|
var mostRecentRestoreError: Error?
|
|
var pendingRestoreCompletionFutures = [Future<Void>]()
|
|
|
|
var isRunningOperation = false
|
|
}
|
|
|
|
private let managerState = AtomicValue(ManagerState(), lock: .init())
|
|
|
|
private func updateManagerState(block: (inout ManagerState) -> Void) {
|
|
managerState.map {
|
|
var mutableValue = $0
|
|
block(&mutableValue)
|
|
startNextOperationIfNeeded(&mutableValue)
|
|
return mutableValue
|
|
}
|
|
}
|
|
|
|
private func startNextOperationIfNeeded(_ managerState: inout ManagerState) {
|
|
guard !managerState.isRunningOperation else {
|
|
// Already running an operation -- we'll start the next when it finishes.
|
|
return
|
|
}
|
|
guard let (nextOperation, cleanupBlock) = popNextOperation(&managerState) else {
|
|
// There's nothing we need to do, so don't start any operation.
|
|
return
|
|
}
|
|
// Run the operation & check again when it's done.
|
|
managerState.isRunningOperation = true
|
|
|
|
Task {
|
|
let result = await Result { try await nextOperation() }
|
|
self.finishOperation(cleanupBlock: {
|
|
cleanupBlock?(&$0, {
|
|
switch result {
|
|
case .success(()): nil
|
|
case .failure(let error): error
|
|
}
|
|
}())
|
|
})
|
|
}
|
|
}
|
|
|
|
private func popNextOperation(_ managerState: inout ManagerState) -> (() async throws -> Void, ((inout ManagerState, (any Error)?) -> Void)?)? {
|
|
if let pendingManifestRotation = managerState.pendingManifestRotation {
|
|
managerState.pendingManifestRotation = nil
|
|
|
|
func resumeContinuations(_ error: Error?) {
|
|
for continuation in pendingManifestRotation.continuations {
|
|
if let error {
|
|
continuation.resume(throwing: error)
|
|
} else {
|
|
continuation.resume()
|
|
}
|
|
}
|
|
}
|
|
|
|
if let rotateManifestOperation = buildOperation(
|
|
managerState: managerState,
|
|
mode: .rotateManifest(mode: pendingManifestRotation.mode),
|
|
authedDevice: pendingManifestRotation.authedDevice
|
|
) {
|
|
let cleanupBlock: ((inout ManagerState, (any Error)?) -> Void) = { _, error in
|
|
resumeContinuations(error)
|
|
}
|
|
|
|
return (rotateManifestOperation, cleanupBlock)
|
|
} else {
|
|
/// Resume the continuations, but don't return `nil` since there
|
|
/// may be other operations we can pop instead.
|
|
resumeContinuations(OWSAssertionError("Failed to build rotate manifest operation!"))
|
|
}
|
|
}
|
|
|
|
if managerState.pendingMutations.hasChanges {
|
|
let pendingMutations = managerState.pendingMutations
|
|
managerState.pendingMutations = PendingMutations()
|
|
|
|
return (StorageServiceOperation.recordPendingMutations(pendingMutations), nil)
|
|
}
|
|
|
|
if managerState.hasPendingCleanup {
|
|
managerState.hasPendingCleanup = false
|
|
|
|
let cleanUpOperation = buildOperation(
|
|
managerState: managerState,
|
|
mode: .cleanUpUnknownData,
|
|
authedDevice: .implicit
|
|
)
|
|
if let cleanUpOperation {
|
|
return (cleanUpOperation, nil)
|
|
}
|
|
}
|
|
|
|
if let pendingRestore = managerState.pendingRestore {
|
|
managerState.pendingRestore = nil
|
|
managerState.mostRecentRestoreError = nil
|
|
|
|
let restoreOperation = buildOperation(
|
|
managerState: managerState,
|
|
mode: .restoreOrCreate,
|
|
authedDevice: pendingRestore.authedDevice
|
|
)
|
|
if let restoreOperation {
|
|
return ({
|
|
do {
|
|
try await restoreOperation()
|
|
pendingRestore.futures.forEach { $0.resolve() }
|
|
} catch {
|
|
pendingRestore.futures.forEach { $0.reject(error) }
|
|
throw error
|
|
}
|
|
}, { $0.mostRecentRestoreError = $1 })
|
|
}
|
|
}
|
|
|
|
if !managerState.pendingRestoreCompletionFutures.isEmpty {
|
|
let pendingRestoreCompletionFutures = managerState.pendingRestoreCompletionFutures
|
|
managerState.pendingRestoreCompletionFutures = []
|
|
|
|
let mostRecentRestoreError = managerState.mostRecentRestoreError
|
|
|
|
return ({
|
|
pendingRestoreCompletionFutures.forEach {
|
|
if let mostRecentRestoreError {
|
|
$0.reject(mostRecentRestoreError)
|
|
} else {
|
|
$0.resolve(())
|
|
}
|
|
}
|
|
}, nil)
|
|
}
|
|
|
|
if let pendingBackup = managerState.pendingBackup {
|
|
managerState.pendingBackup = nil
|
|
|
|
let backupOperation = buildOperation(
|
|
managerState: managerState,
|
|
mode: .backup,
|
|
authedDevice: pendingBackup.authedDevice
|
|
)
|
|
if let backupOperation {
|
|
return (backupOperation, nil)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
private func buildOperation(
|
|
managerState: ManagerState,
|
|
mode: StorageServiceOperation.Mode,
|
|
authedDevice: AuthedDevice
|
|
) -> (() async throws -> Void)? {
|
|
let localIdentifiers: LocalIdentifiers
|
|
let isPrimaryDevice: Bool
|
|
switch authedDevice {
|
|
case .explicit(let explicit):
|
|
localIdentifiers = explicit.localIdentifiers
|
|
isPrimaryDevice = explicit.isPrimaryDevice
|
|
case .implicit:
|
|
// Under the new reg flow, we will sync kbs keys before being fully ready with
|
|
// ts account manager auth set up. skip if so.
|
|
let registrationState = DependenciesBridge.shared.tsAccountManager.registrationStateWithMaybeSneakyTransaction
|
|
guard registrationState.isRegistered else {
|
|
Logger.info("Skipping storage service operation with implicit auth during registration.")
|
|
return nil
|
|
}
|
|
// The `isRegisteredAndReady` property only returns true when
|
|
// `LocalIdentifiers` are ready on `TSAccountManager`. These should have
|
|
// been provided to this object before we reach this point.
|
|
guard let implicitLocalIdentifiers = managerState.localIdentifiers else {
|
|
owsFailDebug("Trying to perform storage service operation without any identifiers.")
|
|
return nil
|
|
}
|
|
localIdentifiers = implicitLocalIdentifiers
|
|
guard let implicitIsPrimaryDevice = registrationState.isPrimaryDevice else {
|
|
owsFailDebug("Trying to perform storage service operation without isPrimaryDevice.")
|
|
return nil
|
|
}
|
|
isPrimaryDevice = implicitIsPrimaryDevice
|
|
}
|
|
return {
|
|
try await StorageServiceOperation(
|
|
mode: mode,
|
|
localIdentifiers: localIdentifiers,
|
|
isPrimaryDevice: isPrimaryDevice,
|
|
authedDevice: authedDevice
|
|
).run()
|
|
}
|
|
}
|
|
|
|
private func finishOperation(cleanupBlock: (inout ManagerState) -> Void) {
|
|
updateManagerState { managerState in
|
|
cleanupBlock(&managerState)
|
|
managerState.isRunningOperation = false
|
|
}
|
|
}
|
|
|
|
// MARK: - Pending Mutations
|
|
|
|
private func updatePendingMutations(block: (inout PendingMutations) -> Void) {
|
|
updateManagerState { managerState in
|
|
block(&managerState.pendingMutations)
|
|
|
|
// If we've made any changes, schedule a backup for the near future. This
|
|
// provides an interval during which pending mutations can be coalesced.
|
|
if managerState.pendingMutations.hasChanges, managerState.pendingBackupTimer == nil {
|
|
managerState.pendingBackupTimer = startBackupTimer()
|
|
}
|
|
}
|
|
}
|
|
|
|
public func recordPendingUpdates(updatedRecipientUniqueIds: [RecipientUniqueId]) {
|
|
if updatedRecipientUniqueIds.isEmpty {
|
|
return
|
|
}
|
|
Logger.info("Recording pending update for recipientUniqueIds: \(updatedRecipientUniqueIds)")
|
|
|
|
updatePendingMutations {
|
|
$0.updatedRecipientUniqueIds.formUnion(updatedRecipientUniqueIds)
|
|
}
|
|
}
|
|
|
|
public func recordPendingUpdates(updatedAddresses: [SignalServiceAddress]) {
|
|
if updatedAddresses.isEmpty {
|
|
return
|
|
}
|
|
Logger.info("Recording pending update for addresses: \(updatedAddresses)")
|
|
|
|
updatePendingMutations {
|
|
$0.updatedServiceIds.formUnion(updatedAddresses.lazy.compactMap({ $0.serviceId }))
|
|
}
|
|
}
|
|
|
|
@objc
|
|
public func recordPendingUpdates(updatedGroupV2MasterKeys: [Data]) {
|
|
updatePendingMutations { $0.updatedGroupV2MasterKeys.formUnion(updatedGroupV2MasterKeys) }
|
|
}
|
|
|
|
@objc
|
|
public func recordPendingUpdates(updatedStoryDistributionListIds: [Data]) {
|
|
updatePendingMutations { $0.updatedStoryDistributionListIds.formUnion(updatedStoryDistributionListIds) }
|
|
}
|
|
|
|
public func recordPendingUpdates(callLinkRootKeys: [CallLinkRootKey]) {
|
|
updatePendingMutations { $0.updatedCallLinkRootKeys.formUnion(callLinkRootKeys.lazy.map(\.bytes)) }
|
|
}
|
|
|
|
public func recordPendingLocalAccountUpdates() {
|
|
Logger.info("Recording pending local account updates")
|
|
|
|
updatePendingMutations { $0.updatedLocalAccount = true }
|
|
}
|
|
|
|
// MARK: - Actions
|
|
|
|
@discardableResult
|
|
public func restoreOrCreateManifestIfNecessary(authedDevice: AuthedDevice) -> Promise<Void> {
|
|
let (promise, future) = Promise<Void>.pending()
|
|
updateManagerState { managerState in
|
|
var pendingRestore = managerState.pendingRestore ?? .init(authedDevice: .implicit, futures: [])
|
|
pendingRestore.futures.append(future)
|
|
pendingRestore.authedDevice = authedDevice.orIfImplicitUse(pendingRestore.authedDevice)
|
|
managerState.pendingRestore = pendingRestore
|
|
}
|
|
return promise
|
|
}
|
|
|
|
public func rotateManifest(
|
|
mode: ManifestRotationMode,
|
|
authedDevice: AuthedDevice
|
|
) async throws {
|
|
try await withCheckedThrowingContinuation { continuation in
|
|
updateManagerState { managerState in
|
|
var pendingRotation = managerState.pendingManifestRotation ?? .init(
|
|
authedDevice: .implicit,
|
|
continuations: [],
|
|
mode: mode
|
|
)
|
|
pendingRotation.continuations.append(continuation)
|
|
pendingRotation.authedDevice = authedDevice.orIfImplicitUse(pendingRotation.authedDevice)
|
|
pendingRotation.mode = pendingRotation.mode.mergeByPrecedence(mode)
|
|
|
|
managerState.pendingManifestRotation = pendingRotation
|
|
}
|
|
}
|
|
}
|
|
|
|
public func backupPendingChanges(authedDevice: AuthedDevice) {
|
|
updateManagerState { managerState in
|
|
var pendingBackup = managerState.pendingBackup ?? .init(authedDevice: .implicit)
|
|
pendingBackup.authedDevice = authedDevice.orIfImplicitUse(pendingBackup.authedDevice)
|
|
managerState.pendingBackup = pendingBackup
|
|
|
|
if let pendingBackupTimer = managerState.pendingBackupTimer {
|
|
DispatchQueue.main.async { pendingBackupTimer.invalidate() }
|
|
managerState.pendingBackupTimer = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
public func waitForPendingRestores() -> Promise<Void> {
|
|
let (promise, future) = Promise<Void>.pending()
|
|
updateManagerState { managerState in
|
|
managerState.pendingRestoreCompletionFutures.append(future)
|
|
}
|
|
return promise
|
|
}
|
|
|
|
public func resetLocalData(transaction: DBWriteTransaction) {
|
|
Logger.info("Resetting local storage service data.")
|
|
StorageServiceOperation.keyValueStore.removeAll(transaction: transaction)
|
|
}
|
|
|
|
private func cleanUpUnknownData() {
|
|
updateManagerState { managerState in
|
|
managerState.hasPendingCleanup = true
|
|
}
|
|
}
|
|
|
|
// MARK: - Backup Scheduling
|
|
|
|
private static var backupDebounceInterval: TimeInterval = 0.2
|
|
|
|
// Schedule a one-time backup. By default, this will happen `backupDebounceInterval`
|
|
// seconds after the first pending change is recorded.
|
|
private func startBackupTimer() -> Timer {
|
|
let timer = Timer(
|
|
timeInterval: StorageServiceManagerImpl.backupDebounceInterval,
|
|
target: self,
|
|
selector: #selector(self.backupTimerFired(_:)),
|
|
userInfo: nil,
|
|
repeats: false
|
|
)
|
|
DispatchQueue.main.async {
|
|
RunLoop.current.add(timer, forMode: .default)
|
|
}
|
|
return timer
|
|
}
|
|
|
|
@objc
|
|
private func backupTimerFired(_ timer: Timer) {
|
|
AssertIsOnMainThread()
|
|
|
|
backupPendingChanges(authedDevice: .implicit)
|
|
}
|
|
|
|
// MARK: - Cleanup
|
|
|
|
private func cleanUpDeletedCallLinks() async {
|
|
let callLinkStore = DependenciesBridge.shared.callLinkStore
|
|
let deletionThresholdMs = Date.ows_millisecondTimestamp() - RemoteConfig.current.messageQueueTimeMs
|
|
do {
|
|
let callLinkRecords = try SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
try callLinkStore.fetchWhere(adminDeletedAtTimestampMsIsLessThan: deletionThresholdMs, tx: tx.asV2Read)
|
|
}
|
|
if !callLinkRecords.isEmpty {
|
|
Logger.info("Cleaning up \(callLinkRecords.count) call links that were deleted a while ago.")
|
|
try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
for callLinkRecord in callLinkRecords {
|
|
try callLinkStore.delete(callLinkRecord, tx: tx.asV2Write)
|
|
}
|
|
}
|
|
recordPendingUpdates(callLinkRootKeys: callLinkRecords.map(\.rootKey))
|
|
}
|
|
} catch {
|
|
owsFailDebug("Couldn't clean up deleted call links: \(error)")
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - PendingMutations
|
|
|
|
private struct PendingMutations {
|
|
var updatedRecipientUniqueIds = Set<RecipientUniqueId>()
|
|
var updatedServiceIds = Set<ServiceId>()
|
|
var updatedGroupV2MasterKeys = Set<Data>()
|
|
var updatedStoryDistributionListIds = Set<Data>()
|
|
var updatedCallLinkRootKeys = Set<Data>()
|
|
var updatedLocalAccount = false
|
|
|
|
var hasChanges: Bool {
|
|
return (
|
|
updatedLocalAccount
|
|
|| !updatedRecipientUniqueIds.isEmpty
|
|
|| !updatedServiceIds.isEmpty
|
|
|| !updatedGroupV2MasterKeys.isEmpty
|
|
|| !updatedStoryDistributionListIds.isEmpty
|
|
|| !updatedCallLinkRootKeys.isEmpty
|
|
)
|
|
}
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
class StorageServiceOperation {
|
|
|
|
private static let migrationStore: KeyValueStore = KeyValueStore(collection: "StorageServiceMigration")
|
|
private static let versionKey = "Version"
|
|
|
|
fileprivate static var keyValueStore: KeyValueStore {
|
|
return KeyValueStore(collection: "kOWSStorageServiceOperation_IdentifierMap")
|
|
}
|
|
|
|
// MARK: -
|
|
|
|
fileprivate enum Mode {
|
|
case rotateManifest(mode: StorageServiceManager.ManifestRotationMode)
|
|
case backup
|
|
case restoreOrCreate
|
|
case cleanUpUnknownData
|
|
}
|
|
private let mode: Mode
|
|
private let localIdentifiers: LocalIdentifiers
|
|
private let isPrimaryDevice: Bool
|
|
private let authedDevice: AuthedDevice
|
|
private var authedAccount: AuthedAccount { authedDevice.authedAccount }
|
|
|
|
fileprivate init(mode: Mode, localIdentifiers: LocalIdentifiers, isPrimaryDevice: Bool, authedDevice: AuthedDevice) {
|
|
self.mode = mode
|
|
self.localIdentifiers = localIdentifiers
|
|
self.isPrimaryDevice = isPrimaryDevice
|
|
self.authedDevice = authedDevice
|
|
}
|
|
|
|
// MARK: - Run
|
|
|
|
func run() async throws {
|
|
return try await Retry.performWithBackoff(maxAttempts: 4) {
|
|
return try await self._run()
|
|
}
|
|
}
|
|
|
|
// Called every retry, this is where the bulk of the operation's work should go.
|
|
private func _run() async throws {
|
|
let (
|
|
isKeyAvailable,
|
|
currentStateIfRotatingManifest
|
|
): (
|
|
Bool,
|
|
State?
|
|
) = SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
let isKeyAvailable = DependenciesBridge.shared.svrKeyDeriver.isKeyAvailable(.storageService, tx: tx.asV2Read)
|
|
|
|
switch mode {
|
|
case .rotateManifest:
|
|
return (isKeyAvailable, State.current(transaction: tx))
|
|
case .backup, .restoreOrCreate, .cleanUpUnknownData:
|
|
return (isKeyAvailable, nil)
|
|
}
|
|
}
|
|
|
|
// We don't have backup keys, do nothing. We'll try a
|
|
// fresh restore once the keys are set.
|
|
guard isKeyAvailable else {
|
|
return
|
|
}
|
|
|
|
switch mode {
|
|
case .rotateManifest(let mode):
|
|
guard isPrimaryDevice else {
|
|
throw OWSAssertionError("Can only rotate manifest from primary device!")
|
|
}
|
|
|
|
let nextManifestVersion = currentStateIfRotatingManifest!.manifestVersion + 1
|
|
|
|
switch mode {
|
|
case .preservingRecordsIfPossible:
|
|
try await createNewManifestPreservingRecords(version: nextManifestVersion)
|
|
case .alsoRotatingRecords:
|
|
try await createNewManifestAndRecords(version: nextManifestVersion)
|
|
}
|
|
case .backup:
|
|
try await backupPendingChanges()
|
|
case .restoreOrCreate:
|
|
try await restoreOrCreateManifestIfNecessary()
|
|
case .cleanUpUnknownData:
|
|
await cleanUpUnknownData()
|
|
}
|
|
}
|
|
|
|
// MARK: - Mark Pending Changes
|
|
|
|
fileprivate static func recordPendingMutations(_ pendingMutations: PendingMutations) -> (() async -> Void) {
|
|
return { await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { recordPendingMutations(pendingMutations, transaction: $0) } }
|
|
}
|
|
|
|
private static func recordPendingMutations(
|
|
_ pendingMutations: PendingMutations,
|
|
transaction: SDSAnyWriteTransaction
|
|
) {
|
|
var state = State.current(transaction: transaction)
|
|
recordPendingMutations(pendingMutations, in: &state, transaction: transaction)
|
|
state.save(transaction: transaction)
|
|
}
|
|
|
|
private static func recordPendingMutations(
|
|
_ pendingMutations: PendingMutations,
|
|
in state: inout State,
|
|
transaction tx: SDSAnyWriteTransaction
|
|
) {
|
|
// Coalesce addresses to account IDs. There may be duplicates among the
|
|
// addresses and account IDs.
|
|
|
|
var allRecipientUniqueIds = Set<RecipientUniqueId>()
|
|
|
|
allRecipientUniqueIds.formUnion(pendingMutations.updatedRecipientUniqueIds)
|
|
|
|
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
|
|
allRecipientUniqueIds.formUnion(pendingMutations.updatedServiceIds.lazy.compactMap { (serviceId: ServiceId) -> String? in
|
|
return recipientFetcher.fetchOrCreate(serviceId: serviceId, tx: tx.asV2Write).uniqueId
|
|
})
|
|
|
|
// Then, update State with all these pending mutations.
|
|
|
|
Logger.info(
|
|
"""
|
|
Recording pending mutations (\
|
|
Account: \(pendingMutations.updatedLocalAccount); \
|
|
Contacts: \(allRecipientUniqueIds.count); \
|
|
GV2: \(pendingMutations.updatedGroupV2MasterKeys.count); \
|
|
DLists: \(pendingMutations.updatedStoryDistributionListIds.count); \
|
|
CLinks: \(pendingMutations.updatedCallLinkRootKeys.count))
|
|
"""
|
|
)
|
|
|
|
if pendingMutations.updatedLocalAccount {
|
|
state.localAccountChangeState = .updated
|
|
}
|
|
|
|
allRecipientUniqueIds.forEach {
|
|
state.accountIdChangeMap[$0] = .updated
|
|
}
|
|
|
|
pendingMutations.updatedGroupV2MasterKeys.forEach {
|
|
state.groupV2ChangeMap[$0] = .updated
|
|
}
|
|
|
|
pendingMutations.updatedStoryDistributionListIds.forEach {
|
|
state.storyDistributionListChangeMap[$0] = .updated
|
|
}
|
|
|
|
pendingMutations.updatedCallLinkRootKeys.forEach {
|
|
state.callLinkRootKeyChangeMap[$0] = .updated
|
|
}
|
|
}
|
|
|
|
private func normalizePendingMutations(in state: inout State, transaction: SDSAnyReadTransaction) {
|
|
// If we didn't change any AccountIds, then we definitely don't have a
|
|
// match for the `if` check which follows & can avoid the query.
|
|
if state.accountIdChangeMap.isEmpty {
|
|
return
|
|
}
|
|
let localAci = localIdentifiers.aci
|
|
let recipientIdFinder = DependenciesBridge.shared.recipientIdFinder
|
|
let localRecipientUniqueId = try? recipientIdFinder.recipientUniqueId(for: localAci, tx: transaction.asV2Read)?.get()
|
|
// If we updated a recipient, and if that recipient is ourselves, move the
|
|
// update over to the Account record type.
|
|
if let localRecipientUniqueId, state.accountIdChangeMap.removeValue(forKey: localRecipientUniqueId) != nil {
|
|
state.localAccountChangeState = .updated
|
|
}
|
|
}
|
|
|
|
// MARK: - Backup
|
|
|
|
private func backupPendingChanges() async throws {
|
|
var updatedItems: [StorageService.StorageItem] = []
|
|
var deletedIdentifiers: [StorageService.StorageIdentifier] = []
|
|
|
|
func updateRecord<StateUpdater: StorageServiceStateUpdater>(
|
|
state: inout State,
|
|
localId: StateUpdater.IdType,
|
|
changeState: State.ChangeState,
|
|
stateUpdater: StateUpdater,
|
|
needsInterceptForMigration: Bool,
|
|
transaction: SDSAnyReadTransaction
|
|
) {
|
|
let recordUpdater = stateUpdater.recordUpdater
|
|
|
|
let newRecord: StateUpdater.RecordType?
|
|
|
|
switch changeState {
|
|
case .unchanged:
|
|
return
|
|
case .updated:
|
|
// We need to preserve the unknown fields (if any) so we don't blow away
|
|
// data written by newer versions of the app.
|
|
let recordWithUnknownFields = stateUpdater.recordWithUnknownFields(for: localId, in: state)
|
|
let unknownFields = recordWithUnknownFields.flatMap { recordUpdater.unknownFields(for: $0) }
|
|
newRecord = recordUpdater.buildRecord(
|
|
for: localId,
|
|
unknownFields: unknownFields,
|
|
transaction: transaction
|
|
)
|
|
case .deleted:
|
|
newRecord = nil
|
|
}
|
|
|
|
// Note: We might not have a `newRecord` even if the status is `.updated`.
|
|
// The local value may have been deleted before this operation started.
|
|
|
|
// If there is an existing identifier for this record, mark it for
|
|
// deletion. We generate a fresh identifier every time a record changes, so
|
|
// we always start by deleting the old record.
|
|
if let oldStorageIdentifier = stateUpdater.storageIdentifier(for: localId, in: state) {
|
|
deletedIdentifiers.append(oldStorageIdentifier)
|
|
}
|
|
// Clear out all of the state for the old record. We'll re-add the state if
|
|
// we have a new record to save.
|
|
stateUpdater.setStorageIdentifier(nil, for: localId, in: &state)
|
|
stateUpdater.setRecordWithUnknownFields(nil, for: localId, in: &state)
|
|
|
|
// We've deleted the old record. If we don't have a `newRecord`, stop.
|
|
guard var newRecord else {
|
|
return
|
|
}
|
|
|
|
if needsInterceptForMigration {
|
|
newRecord = StorageServiceUnknownFieldMigrator.interceptLocalManifestBeforeUploading(
|
|
record: newRecord,
|
|
tx: transaction
|
|
)
|
|
}
|
|
|
|
if recordUpdater.unknownFields(for: newRecord) != nil {
|
|
stateUpdater.setRecordWithUnknownFields(newRecord, for: localId, in: &state)
|
|
}
|
|
|
|
let storageItem = recordUpdater.buildStorageItem(for: newRecord)
|
|
stateUpdater.setStorageIdentifier(storageItem.identifier, for: localId, in: &state)
|
|
updatedItems.append(storageItem)
|
|
}
|
|
|
|
func updateRecords<StateUpdater: StorageServiceStateUpdater>(
|
|
state: inout State,
|
|
stateUpdater: StateUpdater,
|
|
needsInterceptForMigration: Bool,
|
|
transaction: SDSAnyReadTransaction
|
|
) {
|
|
stateUpdater.resetAndEnumerateChangeStates(in: &state) { mutableState, localId, changeState in
|
|
updateRecord(
|
|
state: &mutableState,
|
|
localId: localId,
|
|
changeState: changeState,
|
|
stateUpdater: stateUpdater,
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
}
|
|
}
|
|
|
|
var state: State = SSKEnvironment.shared.databaseStorageRef.read { transaction in
|
|
var state = State.current(transaction: transaction)
|
|
|
|
normalizePendingMutations(in: &state, transaction: transaction)
|
|
|
|
let needsInterceptForMigration =
|
|
StorageServiceUnknownFieldMigrator.shouldInterceptLocalManifestBeforeUploading(tx: transaction)
|
|
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildAccountUpdater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildContactUpdater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildGroupV1Updater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildGroupV2Updater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildStoryDistributionListUpdater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
updateRecords(
|
|
state: &state,
|
|
stateUpdater: buildCallLinkUpdater(),
|
|
needsInterceptForMigration: needsInterceptForMigration,
|
|
transaction: transaction
|
|
)
|
|
|
|
return state
|
|
}
|
|
|
|
// If we have no pending changes, we have nothing left to do
|
|
guard !deletedIdentifiers.isEmpty || !updatedItems.isEmpty else {
|
|
return
|
|
}
|
|
|
|
// If we have invalid identifiers, we intentionally exclude them from the
|
|
// prior check. We've already ignored them, so we can clean them up as part
|
|
// of the next unrelated change.
|
|
let invalidIdentifiers = state.invalidIdentifiers
|
|
state.invalidIdentifiers = []
|
|
|
|
// Bump the manifest version
|
|
state.manifestVersion += 1
|
|
|
|
let manifest = buildManifestRecord(
|
|
manifestVersion: state.manifestVersion,
|
|
manifestRecordIkm: state.manifestRecordIkm,
|
|
identifiers: state.allIdentifiers
|
|
)
|
|
|
|
Logger.info(
|
|
"""
|
|
Backing up pending changes with proposed manifest version \(state.manifestVersion) (\
|
|
New: \(updatedItems.count), \
|
|
Deleted: \(deletedIdentifiers.count), \
|
|
Invalid/Missing: \(invalidIdentifiers.count), \
|
|
Total: \(state.allIdentifiers.count))
|
|
"""
|
|
)
|
|
|
|
switch await StorageService.updateManifest(
|
|
manifest,
|
|
newItems: updatedItems,
|
|
deletedIdentifiers: deletedIdentifiers + invalidIdentifiers,
|
|
deleteAllExistingRecords: false,
|
|
chatServiceAuth: authedAccount.chatServiceAuth
|
|
) {
|
|
case .success:
|
|
break
|
|
case .conflictingManifest(let conflictingManifest):
|
|
// Throw away all our work, resolve conflicts, and try again.
|
|
try await self.mergeLocalManifest(withRemoteManifest: conflictingManifest, backupAfterSuccess: true)
|
|
return
|
|
case
|
|
.error(.manifestDecryptionFailed(let conflictingVersion)) where isPrimaryDevice,
|
|
.error(.manifestProtoDeserializationFailed(let conflictingVersion)) where isPrimaryDevice:
|
|
/// The remote manifest is invalid and conflicting, which is
|
|
/// blocking us from doing a backup. Overwrite it.
|
|
try await createNewManifestAndRecords(version: conflictingVersion + 1)
|
|
return
|
|
case .error(let storageError):
|
|
throw storageError
|
|
}
|
|
|
|
Logger.info("Successfully updated to manifest version: \(state.manifestVersion)")
|
|
|
|
// Successfully updated, store our changes.
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
state.save(clearConsecutiveConflicts: true, transaction: transaction)
|
|
StorageServiceUnknownFieldMigrator.didWriteToStorageService(tx: transaction)
|
|
}
|
|
|
|
// Notify our other devices that the storage manifest has changed.
|
|
await SSKEnvironment.shared.syncManagerRef.sendFetchLatestStorageManifestSyncMessage()
|
|
}
|
|
|
|
private func buildManifestRecord(
|
|
manifestVersion: UInt64,
|
|
manifestRecordIkm: Data?,
|
|
identifiers identifiersParam: [StorageService.StorageIdentifier]
|
|
) -> StorageServiceProtoManifestRecord {
|
|
let identifiers = StorageService.StorageIdentifier.deduplicate(identifiersParam)
|
|
var manifestBuilder = StorageServiceProtoManifestRecord.builder(version: manifestVersion)
|
|
if let manifestRecordIkm {
|
|
owsAssertDebug(
|
|
manifestRecordIkm.count == StorageService.ManifestRecordIkm.expectedLength,
|
|
"Found manifest recordIkm with unexpected length! Who generated it?"
|
|
)
|
|
manifestBuilder.setRecordIkm(manifestRecordIkm)
|
|
}
|
|
manifestBuilder.setKeys(identifiers.map { $0.buildRecord() })
|
|
manifestBuilder.setSourceDevice(DependenciesBridge.shared.tsAccountManager.storedDeviceIdWithMaybeTransaction)
|
|
return manifestBuilder.buildInfallibly()
|
|
}
|
|
|
|
// MARK: - Restore
|
|
|
|
private func restoreOrCreateManifestIfNecessary() async throws {
|
|
let state: State = SSKEnvironment.shared.databaseStorageRef.read { State.current(transaction: $0) }
|
|
|
|
let greaterThanVersion: UInt64? = {
|
|
// If we've been flagged to refetch the latest manifest,
|
|
// don't specify our current manifest version otherwise
|
|
// the server may return nothing because we've said we
|
|
// already parsed it.
|
|
if state.refetchLatestManifest { return nil }
|
|
return state.manifestVersion
|
|
}()
|
|
|
|
switch await StorageService.fetchLatestManifest(
|
|
greaterThanVersion: greaterThanVersion,
|
|
chatServiceAuth: authedAccount.chatServiceAuth
|
|
) {
|
|
case .noExistingManifest:
|
|
// There is no existing manifest, let's create one.
|
|
return try await self.createNewManifestAndRecords(version: 1)
|
|
case .noNewerManifest:
|
|
// Our manifest version matches the server version, nothing to do here.
|
|
return
|
|
case .latestManifest(let manifest):
|
|
// Our manifest is not the latest, merge in the latest copy.
|
|
return try await self.mergeLocalManifest(withRemoteManifest: manifest, backupAfterSuccess: false)
|
|
case .error(.manifestDecryptionFailed(let manifestVersion)):
|
|
// If we succeeded to fetch the manifest but were unable to decrypt it,
|
|
// it likely means our keys changed.
|
|
if self.isPrimaryDevice {
|
|
// If this is the primary device, throw everything away and re-encrypt
|
|
// the social graph with the keys we have locally.
|
|
Logger.warn("Manifest decryption failed on primary, recreating manifest.")
|
|
try await self.createNewManifestAndRecords(version: manifestVersion + 1)
|
|
return
|
|
} else {
|
|
// If this is a linked device, give up and request the latest storage
|
|
// service key from the primary device.
|
|
Logger.warn("Manifest decryption failed on linked device, clearing storage service keys.")
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
// Clear out the key, it's no longer valid. This will prevent us
|
|
// from trying to backup again until the sync response is received.
|
|
DependenciesBridge.shared.svr.clearSyncedStorageServiceKey(transaction: transaction.asV2Write)
|
|
SSKEnvironment.shared.syncManagerRef.sendKeysSyncRequestMessage(transaction: transaction)
|
|
}
|
|
}
|
|
case .error(.manifestProtoDeserializationFailed(let manifestVersion)) where isPrimaryDevice:
|
|
/// We have byte garbage in Storage Service. Our only recourse is to
|
|
/// throw everything away and recreate it with data we have locally.
|
|
Logger.warn("Manifest deserialization failed on primary, recreating manifest.")
|
|
try await self.createNewManifestAndRecords(version: manifestVersion + 1)
|
|
case .error(let storageError):
|
|
throw storageError
|
|
}
|
|
}
|
|
|
|
// MARK: - Creating new manifests
|
|
|
|
private func createNewManifestPreservingRecords(version: UInt64) async throws(StorageService.StorageError) {
|
|
owsPrecondition(isPrimaryDevice)
|
|
|
|
var state = SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
State.current(transaction: tx)
|
|
}
|
|
state.manifestVersion = version
|
|
|
|
guard let manifestRecordIkm = state.manifestRecordIkm else {
|
|
/// It only makes sense to preserve records if they're encrypted
|
|
/// differently from the manifest; which is to say, they use a
|
|
/// `recordIkm`. If we have no `recordIkm`, we should only create
|
|
/// a new manifest alongside all-new records.
|
|
Logger.warn("Missing manifest recordIkm while trying to create new manifest preserving records. Pivoting to creating new manifest and records.")
|
|
try await createNewManifestAndRecords(version: version)
|
|
return
|
|
}
|
|
|
|
let manifest = buildManifestRecord(
|
|
manifestVersion: version,
|
|
manifestRecordIkm: manifestRecordIkm,
|
|
identifiers: state.allIdentifiers
|
|
)
|
|
|
|
if let conflictingManifestVersion = try await createNewManifestAndSaveState(
|
|
manifest,
|
|
state: &state,
|
|
newItems: [],
|
|
deletedIdentifiers: [],
|
|
deleteAllExistingRecords: false
|
|
) {
|
|
/// We hit a conflict, and consequently we can't be confident that
|
|
/// the records we wanted to preserve can still be preserved. This
|
|
/// indicates devices racing with unfortunate timing, and so should
|
|
/// be a niche case. Since we know we need to create a new manifest,
|
|
/// we can recover by recreating the manifest and records.
|
|
Logger.warn("Got conflicting manifest version while trying to create new manifest preserving records. Pivoting to creating new manifest and records.")
|
|
try await createNewManifestAndRecords(version: conflictingManifestVersion + 1)
|
|
}
|
|
}
|
|
|
|
private func createNewManifestAndRecords(version: UInt64) async throws(StorageService.StorageError) {
|
|
owsPrecondition(isPrimaryDevice)
|
|
|
|
var allItems: [StorageService.StorageItem] = []
|
|
var state = State()
|
|
|
|
state.manifestVersion = version
|
|
|
|
SSKEnvironment.shared.databaseStorageRef.read { transaction in
|
|
if
|
|
DependenciesBridge.shared.storageServiceRecordIkmCapabilityStore
|
|
.isRecordIkmCapable(tx: transaction.asV2Read)
|
|
{
|
|
/// If we are `recordIkm`-capable, we should generate a new one
|
|
/// each time we create a new manifest. The records recreated
|
|
/// alongside this manifest will be encrypted using this newly-
|
|
/// generated value.
|
|
state.manifestRecordIkm = StorageService.ManifestRecordIkm.generateForNewManifest()
|
|
}
|
|
|
|
let shouldInterceptForMigration =
|
|
StorageServiceUnknownFieldMigrator.shouldInterceptLocalManifestBeforeUploading(tx: transaction)
|
|
|
|
func createRecord<StateUpdater: StorageServiceStateUpdater>(
|
|
localId: StateUpdater.IdType,
|
|
stateUpdater: StateUpdater
|
|
) {
|
|
let recordUpdater = stateUpdater.recordUpdater
|
|
|
|
let newRecord = recordUpdater.buildRecord(
|
|
for: localId,
|
|
unknownFields: nil,
|
|
transaction: transaction
|
|
)
|
|
guard var newRecord else {
|
|
return
|
|
}
|
|
if shouldInterceptForMigration {
|
|
newRecord = StorageServiceUnknownFieldMigrator.interceptLocalManifestBeforeUploading(
|
|
record: newRecord,
|
|
tx: transaction
|
|
)
|
|
}
|
|
|
|
let storageItem = recordUpdater.buildStorageItem(for: newRecord)
|
|
stateUpdater.setStorageIdentifier(storageItem.identifier, for: localId, in: &state)
|
|
allItems.append(storageItem)
|
|
}
|
|
|
|
let accountUpdater = buildAccountUpdater()
|
|
let contactUpdater = buildContactUpdater()
|
|
SignalRecipient.anyEnumerate(transaction: transaction) { recipient, _ in
|
|
// There's only one recipient that can match our ACI (the column has a
|
|
// UNIQUE constraint). If, for some reason, our PNI or phone number shows
|
|
// up elsewhere, we'll try to create a contact record for that identifier,
|
|
// and we'll fail because it's our own identifier. If we fed *every* match
|
|
// for a local identifier into the account updater, we might create
|
|
// multiple account records.
|
|
if self.localIdentifiers.aci == recipient.aci {
|
|
createRecord(localId: (), stateUpdater: accountUpdater)
|
|
} else {
|
|
createRecord(localId: recipient.uniqueId, stateUpdater: contactUpdater)
|
|
}
|
|
}
|
|
|
|
let groupV2Updater = buildGroupV2Updater()
|
|
let storyDistributionListUpdater = buildStoryDistributionListUpdater()
|
|
TSThread.anyEnumerate(transaction: transaction) { thread, _ in
|
|
if
|
|
let groupThread = thread as? TSGroupThread,
|
|
let groupModel = groupThread.groupModel as? TSGroupModelV2
|
|
{
|
|
let masterKey: GroupMasterKey
|
|
do {
|
|
masterKey = try groupModel.masterKey()
|
|
} catch {
|
|
owsFailDebug("Invalid group model \(error).")
|
|
return
|
|
}
|
|
createRecord(localId: masterKey.serialize().asData, stateUpdater: groupV2Updater)
|
|
} else if let storyThread = thread as? TSPrivateStoryThread {
|
|
guard let distributionListId = storyThread.distributionListIdentifier else {
|
|
owsFailDebug("Missing distribution list id for story thread \(thread.uniqueId)")
|
|
return
|
|
}
|
|
createRecord(localId: distributionListId, stateUpdater: storyDistributionListUpdater)
|
|
}
|
|
}
|
|
|
|
// Deleted Private Stories
|
|
DependenciesBridge.shared.privateStoryThreadDeletionManager
|
|
.allDeletedIdentifiers(tx: transaction.asV2Read)
|
|
.forEach { deletedDistributionListIdentifier in
|
|
createRecord(
|
|
localId: deletedDistributionListIdentifier,
|
|
stateUpdater: storyDistributionListUpdater
|
|
)
|
|
}
|
|
|
|
let callLinkUpdater = buildCallLinkUpdater()
|
|
let callLinkStore = callLinkUpdater.recordUpdater.callLinkStore
|
|
do {
|
|
try callLinkStore.fetchAll(tx: transaction.asV2Read).forEach {
|
|
createRecord(localId: $0.rootKey.bytes, stateUpdater: callLinkUpdater)
|
|
}
|
|
} catch {
|
|
owsFailDebug("Couldn't add CallLinks to manifest: \(error)")
|
|
}
|
|
}
|
|
|
|
let identifiers = allItems.map { $0.identifier }
|
|
let manifest = buildManifestRecord(
|
|
manifestVersion: state.manifestVersion,
|
|
manifestRecordIkm: state.manifestRecordIkm,
|
|
identifiers: identifiers
|
|
)
|
|
|
|
// We want to do this only when absolutely necessary as it's an expensive
|
|
// query on the server. When we set this flag, the server will query and
|
|
// purge any orphaned records.
|
|
let shouldDeletePreviousRecords = version > 1
|
|
|
|
if let conflictingManifestVersion = try await createNewManifestAndSaveState(
|
|
manifest,
|
|
state: &state,
|
|
newItems: allItems,
|
|
deletedIdentifiers: [],
|
|
deleteAllExistingRecords: shouldDeletePreviousRecords
|
|
) {
|
|
/// We know affirmatively that we want to create a new manifest from
|
|
/// the data on this device, so if we hit a conflict we'll bump the
|
|
/// version number and try again (thereby overwriting whatever we
|
|
/// conflicted with).
|
|
let newManifestVersion = conflictingManifestVersion + 1
|
|
|
|
state.manifestVersion = newManifestVersion
|
|
let manifest = {
|
|
var builder = manifest.asBuilder()
|
|
builder.setVersion(newManifestVersion)
|
|
return builder.buildInfallibly()
|
|
}()
|
|
|
|
if try await createNewManifestAndSaveState(
|
|
manifest,
|
|
state: &state,
|
|
newItems: allItems,
|
|
deletedIdentifiers: [],
|
|
deleteAllExistingRecords: true
|
|
) != nil {
|
|
owsFailDebug("Repeated conflicts trying to create a new manifest; giving up. What's going on?")
|
|
throw .assertion
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Creates a new manifest from the given parameters, and if successful
|
|
/// persists the given state.
|
|
///
|
|
/// - Returns
|
|
/// `nil` if successful, or the version of the current remote manifest if
|
|
/// updating the manifest results in a version conflict.
|
|
private func createNewManifestAndSaveState(
|
|
_ manifest: StorageServiceProtoManifestRecord,
|
|
state: inout State,
|
|
newItems: [StorageService.StorageItem],
|
|
deletedIdentifiers: [StorageService.StorageIdentifier],
|
|
deleteAllExistingRecords: Bool
|
|
) async throws(StorageService.StorageError) -> UInt64? {
|
|
owsPrecondition(isPrimaryDevice)
|
|
|
|
Logger.info("Creating a new manifest with manifest version: \(manifest.version).")
|
|
|
|
let conflictingManifestVersion: UInt64
|
|
switch await StorageService.updateManifest(
|
|
manifest,
|
|
newItems: newItems,
|
|
deletedIdentifiers: deletedIdentifiers,
|
|
deleteAllExistingRecords: deleteAllExistingRecords,
|
|
chatServiceAuth: authedAccount.chatServiceAuth
|
|
) {
|
|
case .success:
|
|
/// We created a new manifest, so let's tell our other devices to go
|
|
/// fetch it.
|
|
await SSKEnvironment.shared.syncManagerRef.sendFetchLatestStorageManifestSyncMessage()
|
|
|
|
/// Store our changes.
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
state.save(clearConsecutiveConflicts: true, transaction: transaction)
|
|
StorageServiceUnknownFieldMigrator.didWriteToStorageService(tx: transaction)
|
|
}
|
|
|
|
return nil
|
|
case .conflictingManifest(let conflictingManifest):
|
|
/// This is weird, because we generally only create a new manifest
|
|
/// when we know the existing manifest is broken. Somehow, between
|
|
/// the time we found it broken and decided we needed to recreate
|
|
/// and now, it became un-broken.
|
|
///
|
|
/// This should never happen, so rather than trying to merge in the
|
|
/// conflicting manifest and handling errors (such as those from
|
|
/// fetching and decrypting storage items that may yet be broken)
|
|
/// callers will see the conflicting version and overwrite whatever
|
|
/// was in the mysteriously-fixed manifest.
|
|
conflictingManifestVersion = conflictingManifest.version
|
|
case
|
|
.error(.manifestDecryptionFailed(let _conflictingManifestVersion)),
|
|
.error(.manifestProtoDeserializationFailed(let _conflictingManifestVersion)):
|
|
/// This indicates that we found a conflicting remote manifest that
|
|
/// we couldn't read. For example, maybe we're creating a new
|
|
/// manifest in response to having rotated keys on this (primary)
|
|
/// device, and one of our other devices updated the manifest using
|
|
/// old keys.
|
|
///
|
|
/// Regardless, we can't recover what's in this manifest, so instead
|
|
/// we'll let callers see the conflicting version and overwrite
|
|
/// whatever was in it.
|
|
conflictingManifestVersion = _conflictingManifestVersion
|
|
case .error(let storageError):
|
|
throw storageError
|
|
}
|
|
|
|
return conflictingManifestVersion
|
|
}
|
|
|
|
// MARK: - Conflict Resolution
|
|
|
|
private func mergeLocalManifest(
|
|
withRemoteManifest manifest: StorageServiceProtoManifestRecord,
|
|
backupAfterSuccess: Bool
|
|
) async throws {
|
|
var state: State = await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
var state = State.current(transaction: transaction)
|
|
|
|
normalizePendingMutations(in: &state, transaction: transaction)
|
|
|
|
// Increment our conflict count.
|
|
state.consecutiveConflicts += 1
|
|
state.save(transaction: transaction)
|
|
|
|
return state
|
|
}
|
|
|
|
// If we've tried many times in a row to resolve conflicts, something weird
|
|
// is happening (potentially a bug on the service or a race with another
|
|
// app). Give up and wait until the next backup runs.
|
|
guard state.consecutiveConflicts <= StorageServiceOperation.maxConsecutiveConflicts else {
|
|
owsFailDebug("unexpectedly have had numerous repeated conflicts")
|
|
|
|
// Clear out the consecutive conflicts count so we can try again later.
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
state.save(clearConsecutiveConflicts: true, transaction: transaction)
|
|
}
|
|
|
|
throw OWSAssertionError("exceeded max consecutive conflicts, creating a new manifest")
|
|
}
|
|
|
|
let allManifestItems: Set<StorageService.StorageIdentifier> = Set(manifest.keys.lazy.map {
|
|
.init(data: $0.data, type: $0.type)
|
|
})
|
|
|
|
// Calculate new or updated items by looking up the ids of any items we
|
|
// don't know about locally. Since a new id is always generated after a
|
|
// change, this reflects changes made since the last manifest version.
|
|
var newOrUpdatedItems = Array(allManifestItems.subtracting(state.allIdentifiers))
|
|
|
|
// We also want to refetch any identifiers that we didn't know how to parse
|
|
// before but now do know how to parse. These might not have gotten
|
|
// updated, so we need to add them explicitly.
|
|
for (keyType, unknownIdentifiers) in state.unknownIdentifiersTypeMap {
|
|
guard Self.isKnownKeyType(keyType) else { continue }
|
|
newOrUpdatedItems.append(contentsOf: unknownIdentifiers)
|
|
}
|
|
|
|
let localKeysCount = state.allIdentifiers.count
|
|
|
|
Logger.info("\(manifest.logDescription); merging \(newOrUpdatedItems.count); \(localKeysCount) local; \(allManifestItems.count) remote")
|
|
|
|
do {
|
|
// First, fetch the local account record if it has been updated. We give this record
|
|
// priority over all other records as it contains things like the user's configuration
|
|
// that we want to update ASAP, especially when restoring after linking.
|
|
try await {
|
|
if let storageIdentifier = state.localAccountIdentifier, allManifestItems.contains(storageIdentifier) {
|
|
return
|
|
}
|
|
|
|
let localAccountIdentifiers = newOrUpdatedItems.filter { $0.type == .account }
|
|
assert(localAccountIdentifiers.count <= 1)
|
|
|
|
guard let newLocalAccountIdentifier = localAccountIdentifiers.first else {
|
|
owsFailDebug("remote manifest is missing local account, mark it for update")
|
|
state.localAccountChangeState = .updated
|
|
return
|
|
}
|
|
|
|
Logger.info("\(manifest.logDescription); merging account record")
|
|
|
|
let item: StorageService.StorageItem?
|
|
switch await StorageService.fetchItems(
|
|
for: [newLocalAccountIdentifier],
|
|
manifest: manifest,
|
|
chatServiceAuth: authedAccount.chatServiceAuth
|
|
) {
|
|
case .success(let storageItems):
|
|
item = storageItems.first
|
|
case .error(let storageError):
|
|
throw storageError
|
|
}
|
|
|
|
guard let item else {
|
|
// This can happen in normal use if between fetching the manifest and starting the item
|
|
// fetch a linked device has updated the manifest.
|
|
state.localAccountChangeState = .updated
|
|
return
|
|
}
|
|
|
|
guard let accountRecord = item.accountRecord else {
|
|
throw OWSAssertionError("unexpected item type for account identifier")
|
|
}
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
self.mergeRecord(
|
|
accountRecord,
|
|
identifier: item.identifier,
|
|
state: &state,
|
|
stateUpdater: self.buildAccountUpdater(),
|
|
transaction: transaction
|
|
)
|
|
state.save(transaction: transaction)
|
|
}
|
|
|
|
// Remove any account record identifiers from the new or updated basket. We've processed them.
|
|
newOrUpdatedItems.removeAll { localAccountIdentifiers.contains($0) }
|
|
}()
|
|
|
|
// Clean up our unknown identifiers type map to only reflect identifiers
|
|
// that still exist in the manifest. If we find more unknown identifiers in
|
|
// any batch, we'll add them in `fetchAndMergeItemsInBatches`.
|
|
state.unknownIdentifiersTypeMap = state.unknownIdentifiersTypeMap
|
|
.mapValues { unknownIdentifiers in Array(allManifestItems.intersection(unknownIdentifiers)) }
|
|
.filter { (recordType, unknownIdentifiers) in !unknownIdentifiers.isEmpty }
|
|
|
|
// Then, fetch the remaining items in the manifest and resolve any conflicts as appropriate.
|
|
try await self.fetchAndMergeItemsInBatches(identifiers: newOrUpdatedItems, manifest: manifest, state: &state)
|
|
|
|
let storageServiceManager = SSKEnvironment.shared.storageServiceManagerRef
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
// Update the manifest version to reflect the remote version we just restored to
|
|
state.manifestVersion = manifest.version
|
|
|
|
/// Update the manifest `recordIkm` to reflect the remote one we
|
|
/// just merged in. We need to save this, since it should only
|
|
/// change if we are fully recreating the manifest and
|
|
/// reuploading all records.
|
|
state.manifestRecordIkm = manifest.recordIkm
|
|
if
|
|
isPrimaryDevice,
|
|
let localManifestRecordIkm = state.manifestRecordIkm,
|
|
let remoteManifestRecordIkm = manifest.recordIkm
|
|
{
|
|
owsAssertDebug(
|
|
localManifestRecordIkm == remoteManifestRecordIkm,
|
|
"Primary unexpectedly found a remote manifest recordIkm that doesn't match the local one. Who rotated it?"
|
|
)
|
|
}
|
|
|
|
// We just did a successful manifest fetch and restore, so we no longer need to refetch it
|
|
state.refetchLatestManifest = false
|
|
|
|
// We fetched all the previously unknown identifiers, so we don't need to
|
|
// fetch them again in the future unless they're updated.
|
|
state.unknownIdentifiersTypeMap = state.unknownIdentifiersTypeMap
|
|
.filter { (keyType, _) in !Self.isKnownKeyType(keyType) }
|
|
|
|
// Save invalid identifiers to remove during the write operation.
|
|
//
|
|
// We don't remove them immediately because we've already ignored them, and
|
|
// we want to avoid fighting against another device that may put them back
|
|
// when we remove them. Instead, we simply keep track of them so that we
|
|
// can delete them during our next mutation.
|
|
//
|
|
// We may have invalid identifiers for three reasons:
|
|
//
|
|
// (1) We got back an .invalid merge result, meaning we didn't process a
|
|
// storage item. As a result, our local state won't reference it.
|
|
//
|
|
// (2) There are two storage items (with different storage identifiers)
|
|
// whose contents refer to the same thing (eg, group, story). In this case,
|
|
// the latter will replace the former, and the former will be orphaned.
|
|
//
|
|
// (3) The identifier is present in the manifest, but the corresponding
|
|
// item can't be fetched. When this happens, the most likely explanation is
|
|
// that our manifest is out of date. The next time we try to write, we'll
|
|
// get a conflict, merge the latest manifest, see that it no longer
|
|
// references this identifier, and remove it from `invalidIdentifiers`. (In
|
|
// the less common case where the latest manifest does refer to a
|
|
// non-existent identifier, this device will take care of fixing up the
|
|
// manifest to remove the reference.)
|
|
|
|
state.invalidIdentifiers = allManifestItems.subtracting(state.allIdentifiers)
|
|
let invalidIdentifierCount = state.invalidIdentifiers.count
|
|
|
|
// Mark any orphaned records as pending update so we re-add them to the manifest.
|
|
|
|
var orphanedGroupV2Count = 0
|
|
for (groupMasterKey, identifier) in state.groupV2MasterKeyToIdentifierMap where !allManifestItems.contains(identifier) {
|
|
state.groupV2ChangeMap[groupMasterKey] = .updated
|
|
orphanedGroupV2Count += 1
|
|
}
|
|
|
|
var orphanedStoryDistributionListCount = 0
|
|
for (dlistIdentifier, storageIdentifier) in state.storyDistributionListIdentifierToStorageIdentifierMap where !allManifestItems.contains(storageIdentifier) {
|
|
state.storyDistributionListChangeMap[dlistIdentifier] = .updated
|
|
orphanedStoryDistributionListCount += 1
|
|
}
|
|
|
|
var orphanedCallLinkRootKeyCount = 0
|
|
for (callLinkRootKeyData, storageIdentifier) in state.callLinkRootKeyToStorageIdentifierMap where !allManifestItems.contains(storageIdentifier) {
|
|
// If another client removes a deleted call link, allow it.
|
|
let callLinkStore = DependenciesBridge.shared.callLinkStore
|
|
guard
|
|
let callLinkRootKey = try? CallLinkRootKey(callLinkRootKeyData),
|
|
let callLinkRecord = try? callLinkStore.fetch(roomId: callLinkRootKey.deriveRoomId(), tx: transaction.asV2Read),
|
|
callLinkRecord.adminPasskey != nil
|
|
else {
|
|
continue
|
|
}
|
|
state.callLinkRootKeyChangeMap[callLinkRootKeyData] = .updated
|
|
orphanedCallLinkRootKeyCount += 1
|
|
}
|
|
|
|
var orphanedAccountCount = 0
|
|
let currentDate = Date()
|
|
for (recipientUniqueId, identifier) in state.accountIdToIdentifierMap where !allManifestItems.contains(identifier) {
|
|
// Only consider registered recipients as orphaned. If another client
|
|
// removes an unregistered recipient, allow it.
|
|
guard
|
|
let storageServiceContact = StorageServiceContact.fetch(for: recipientUniqueId, tx: transaction),
|
|
storageServiceContact.shouldBeInStorageService(currentDate: currentDate, remoteConfig: .current),
|
|
storageServiceContact.registrationStatus(currentDate: currentDate, remoteConfig: .current) == .registered
|
|
else {
|
|
continue
|
|
}
|
|
state.accountIdChangeMap[recipientUniqueId] = .updated
|
|
orphanedAccountCount += 1
|
|
}
|
|
|
|
let pendingChangesCount = (
|
|
state.accountIdChangeMap.count
|
|
+ state.groupV2ChangeMap.count
|
|
+ state.storyDistributionListChangeMap.count
|
|
+ state.callLinkRootKeyChangeMap.count
|
|
)
|
|
|
|
Logger.info(
|
|
"""
|
|
\(manifest.logDescription) finished; \
|
|
\(pendingChangesCount) pending updates; \
|
|
\(invalidIdentifierCount) missing/invalid ids; \
|
|
\(orphanedAccountCount) orphaned accounts; \
|
|
\(orphanedGroupV2Count) orphaned gv2; \
|
|
\(orphanedStoryDistributionListCount) orphaned dlists; \
|
|
\(orphanedCallLinkRootKeyCount) orphaned clinks
|
|
"""
|
|
)
|
|
|
|
state.save(clearConsecutiveConflicts: true, transaction: transaction)
|
|
|
|
if backupAfterSuccess {
|
|
storageServiceManager.backupPendingChanges(authedDevice: self.authedDevice)
|
|
}
|
|
}
|
|
} catch let storageError as StorageService.StorageError {
|
|
// If we succeeded to fetch the records but were unable to decrypt any of them,
|
|
// it likely means our keys changed.
|
|
if case .itemDecryptionFailed = storageError {
|
|
// If this is the primary device, throw everything away and re-encrypt
|
|
// the social graph with the keys we have locally.
|
|
if self.isPrimaryDevice {
|
|
Logger.warn("Item decryption failed, recreating manifest.")
|
|
try await self.createNewManifestAndRecords(version: manifest.version + 1)
|
|
return
|
|
}
|
|
|
|
Logger.warn("Item decryption failed, clearing storage service keys.")
|
|
|
|
// If this is a linked device, give up and request the latest storage
|
|
// service key from the primary device.
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in
|
|
// Clear out the key, it's no longer valid. This will prevent us
|
|
// from trying to backup again until the sync response is received.
|
|
DependenciesBridge.shared.svr.clearSyncedStorageServiceKey(transaction: transaction.asV2Write)
|
|
SSKEnvironment.shared.syncManagerRef.sendKeysSyncRequestMessage(transaction: transaction)
|
|
}
|
|
} else if
|
|
case .itemProtoDeserializationFailed = storageError,
|
|
self.isPrimaryDevice
|
|
{
|
|
// If decryption succeeded but proto deserialization failed, we somehow ended up with
|
|
// byte garbage in storage service. Our only recourse is to throw everything away and
|
|
// re-encrypt the social graph with data we have locally.
|
|
Logger.warn("Item deserialization failed, recreating manifest.")
|
|
try await self.createNewManifestAndRecords(version: manifest.version + 1)
|
|
return
|
|
}
|
|
throw storageError
|
|
}
|
|
}
|
|
|
|
private static var itemsBatchSize: Int { CurrentAppContext().isNSE ? 256 : 1024 }
|
|
private func fetchAndMergeItemsInBatches(
|
|
identifiers: [StorageService.StorageIdentifier],
|
|
manifest: StorageServiceProtoManifestRecord,
|
|
state: inout State
|
|
) async throws {
|
|
var deferredItems = [StorageService.StorageItem]()
|
|
for identifierBatch in identifiers.chunked(by: Self.itemsBatchSize) {
|
|
let fetchedItems: [StorageService.StorageItem]
|
|
switch await StorageService.fetchItems(
|
|
for: Array(identifierBatch),
|
|
manifest: manifest,
|
|
chatServiceAuth: self.authedAccount.chatServiceAuth
|
|
) {
|
|
case .success(let _fetchedItems):
|
|
fetchedItems = _fetchedItems
|
|
case .error(let storageError):
|
|
throw storageError
|
|
}
|
|
|
|
// We process contacts with ACIs before those without ACIs. We do this to
|
|
// ensure we process split operations first. If we don't, then we'll likely
|
|
// try to re-populate the ACI based on our local state.
|
|
var batchItems = [StorageService.StorageItem]()
|
|
var batchDeferredItemCount = 0
|
|
for fetchedItem in fetchedItems {
|
|
if let record = fetchedItem.contactRecord, StorageServiceContactRecordUpdater.shouldDeferMerge(record) {
|
|
deferredItems.append(fetchedItem)
|
|
batchDeferredItemCount += 1
|
|
} else {
|
|
batchItems.append(fetchedItem)
|
|
}
|
|
}
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
self.mergeItems(batchItems, state: &state, tx: tx)
|
|
}
|
|
Logger.info("\(manifest.logDescription); fetched \(identifierBatch.count) items; processed \(batchItems.count); deferred \(batchDeferredItemCount)")
|
|
}
|
|
for deferredBatch in deferredItems.chunked(by: Self.itemsBatchSize) {
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
self.mergeItems(deferredBatch, state: &state, tx: tx)
|
|
}
|
|
Logger.info("\(manifest.logDescription); processed \(deferredBatch.count) deferred items")
|
|
}
|
|
}
|
|
|
|
private func mergeItems(_ items: some Sequence<StorageService.StorageItem>, state: inout State, tx: SDSAnyWriteTransaction) {
|
|
let contactUpdater = buildContactUpdater()
|
|
let groupV1Updater = buildGroupV1Updater()
|
|
let groupV2Updater = buildGroupV2Updater()
|
|
let storyDistributionListUpdater = buildStoryDistributionListUpdater()
|
|
let callLinkUpdater = buildCallLinkUpdater()
|
|
for item in items {
|
|
func _mergeRecord<StateUpdater: StorageServiceStateUpdater>(
|
|
_ record: StateUpdater.RecordType,
|
|
stateUpdater: StateUpdater
|
|
) {
|
|
self.mergeRecord(
|
|
record,
|
|
identifier: item.identifier,
|
|
state: &state,
|
|
stateUpdater: stateUpdater,
|
|
transaction: tx
|
|
)
|
|
}
|
|
|
|
if let contactRecord = item.contactRecord {
|
|
_mergeRecord(contactRecord, stateUpdater: contactUpdater)
|
|
} else if let groupV1Record = item.groupV1Record {
|
|
_mergeRecord(groupV1Record, stateUpdater: groupV1Updater)
|
|
} else if let groupV2Record = item.groupV2Record {
|
|
_mergeRecord(groupV2Record, stateUpdater: groupV2Updater)
|
|
} else if let storyDistributionListRecord = item.storyDistributionListRecord {
|
|
_mergeRecord(storyDistributionListRecord, stateUpdater: storyDistributionListUpdater)
|
|
} else if let callLinkRecord = item.callLinkRecord {
|
|
_mergeRecord(callLinkRecord, stateUpdater: callLinkUpdater)
|
|
} else if case .account = item.identifier.type {
|
|
owsFailDebug("unexpectedly found account record in remaining items")
|
|
} else {
|
|
// This is not a record type we know about yet, so record this identifier in
|
|
// our unknown mapping. This allows us to skip fetching it in the future and
|
|
// not accidentally blow it away when we push an update.
|
|
var unknownIdentifiersOfType = state.unknownIdentifiersTypeMap[item.identifier.type] ?? []
|
|
unknownIdentifiersOfType.append(item.identifier)
|
|
state.unknownIdentifiersTypeMap[item.identifier.type] = unknownIdentifiersOfType
|
|
}
|
|
}
|
|
// Saving here records the new storage identifiers with the *old* manifest
|
|
// version. This allows us to incrementally work through changes in a
|
|
// manifest, even if we fail part way through the update we'll continue
|
|
// trying to apply the changes we haven't received yet (since we still know
|
|
// we're on an older version overall).
|
|
state.save(clearConsecutiveConflicts: true, transaction: tx)
|
|
}
|
|
|
|
// MARK: - Clean Up
|
|
|
|
private func cleanUpUnknownData() async {
|
|
Logger.info("")
|
|
|
|
var (state, migrationVersion) = SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
var state = State.current(transaction: tx)
|
|
normalizePendingMutations(in: &state, transaction: tx)
|
|
return (state, Self.migrationStore.getInt(Self.versionKey, defaultValue: 0, transaction: tx.asV2Read))
|
|
}
|
|
|
|
await self.cleanUpUnknownIdentifiers(in: &state)
|
|
await self.cleanUpRecordsWithUnknownFields(in: &state)
|
|
await self.cleanUpOrphanedAccounts(in: &state)
|
|
|
|
switch migrationVersion {
|
|
case 0:
|
|
await self.recordPendingMutationsForContactsWithPNIs(in: &state)
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
Self.migrationStore.setInt(1, key: Self.versionKey, transaction: tx.asV2Write)
|
|
}
|
|
fallthrough
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
|
|
private static func isKnownKeyType(_ keyType: StorageServiceProtoManifestRecordKeyType?) -> Bool {
|
|
switch keyType {
|
|
case .contact:
|
|
return true
|
|
case .groupv1:
|
|
return true
|
|
case .groupv2:
|
|
return true
|
|
case .account:
|
|
return true
|
|
case .storyDistributionList:
|
|
return true
|
|
case .callLink:
|
|
return true
|
|
case .unknown, .UNRECOGNIZED, nil:
|
|
return false
|
|
}
|
|
}
|
|
|
|
private func cleanUpUnknownIdentifiers(in state: inout State) async {
|
|
let canParseAnyUnknownIdentifier = state.unknownIdentifiersTypeMap.contains { keyType, unknownIdentifiers in
|
|
guard Self.isKnownKeyType(keyType) else {
|
|
// We don't know this type, so it's not parseable.
|
|
return false
|
|
}
|
|
guard !unknownIdentifiers.isEmpty else {
|
|
// There's no identifiers of this type, so there's nothing to parse.
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
guard canParseAnyUnknownIdentifier else {
|
|
return
|
|
}
|
|
|
|
// We may have learned of new record types. If so, we should refetch the
|
|
// latest manifest so that we can merge these items.
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
state.refetchLatestManifest = true
|
|
state.save(transaction: tx)
|
|
}
|
|
}
|
|
|
|
private func cleanUpRecordsWithUnknownFields(in state: inout State) async {
|
|
var shouldCleanUpRecordsWithUnknownFields =
|
|
state.unknownFieldLastCheckedAppVersion != AppVersionImpl.shared.currentAppVersion
|
|
#if DEBUG
|
|
// Debug builds don't have proper version numbers but we do want to run
|
|
// these migrations on them.
|
|
if !shouldCleanUpRecordsWithUnknownFields {
|
|
if SSKEnvironment.shared.databaseStorageRef.read(block: { StorageServiceUnknownFieldMigrator.needsAnyUnknownFieldsMigrations(tx: $0) }) {
|
|
shouldCleanUpRecordsWithUnknownFields = true
|
|
}
|
|
}
|
|
#endif
|
|
guard shouldCleanUpRecordsWithUnknownFields else {
|
|
return
|
|
}
|
|
state.unknownFieldLastCheckedAppVersion = AppVersionImpl.shared.currentAppVersion
|
|
|
|
func fetchRecordsWithUnknownFields(
|
|
stateUpdater: some StorageServiceStateUpdater,
|
|
tx: SDSAnyWriteTransaction
|
|
) -> [any MigrateableStorageServiceRecordType] {
|
|
return stateUpdater.recordsWithUnknownFields(in: state)
|
|
.lazy
|
|
.map(\.1)
|
|
.compactMap {
|
|
$0 as? (any MigrateableStorageServiceRecordType)
|
|
}
|
|
}
|
|
|
|
// For any cached records with unknown fields, optimistically try to merge
|
|
// with our local data to see if we now understand those fields. Note: It's
|
|
// possible and expected that we might understand some of the fields that
|
|
// were previously unknown but not all of them. Even if we can't fully
|
|
// merge any values, we might partially merge all the values.
|
|
func mergeRecordsWithUnknownFields(
|
|
stateUpdater: some StorageServiceStateUpdater,
|
|
tx: SDSAnyWriteTransaction
|
|
) {
|
|
let recordsWithUnknownFields = stateUpdater.recordsWithUnknownFields(in: state)
|
|
if recordsWithUnknownFields.isEmpty {
|
|
return
|
|
}
|
|
|
|
let debugDescription = "\(type(of: stateUpdater.recordUpdater))"
|
|
for (localId, recordWithUnknownFields) in recordsWithUnknownFields {
|
|
guard let storageIdentifier = stateUpdater.storageIdentifier(for: localId, in: state) else {
|
|
owsFailDebug("Unknown fields: Missing identifier for \(debugDescription)")
|
|
stateUpdater.setRecordWithUnknownFields(nil, for: localId, in: &state)
|
|
continue
|
|
}
|
|
mergeRecord(
|
|
recordWithUnknownFields,
|
|
identifier: storageIdentifier,
|
|
state: &state,
|
|
stateUpdater: stateUpdater,
|
|
transaction: tx
|
|
)
|
|
}
|
|
let remainingCount = stateUpdater.recordsWithUnknownFields(in: state).count
|
|
let resolvedCount = recordsWithUnknownFields.count - remainingCount
|
|
Logger.info("Unknown fields: Resolved \(resolvedCount) records (\(remainingCount) remaining) for \(debugDescription)")
|
|
}
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
let stateUpdaters: [any StorageServiceStateUpdater] = [
|
|
buildAccountUpdater(),
|
|
buildContactUpdater(),
|
|
buildGroupV2Updater(),
|
|
buildStoryDistributionListUpdater(),
|
|
buildCallLinkUpdater(),
|
|
]
|
|
|
|
if StorageServiceUnknownFieldMigrator.needsAnyUnknownFieldsMigrations(tx: tx) {
|
|
// First accumulate records to run one-time migrations on.
|
|
var records: [any MigrateableStorageServiceRecordType] = []
|
|
|
|
for stateUpdater in stateUpdaters {
|
|
records.append(
|
|
contentsOf: fetchRecordsWithUnknownFields(
|
|
stateUpdater: stateUpdater,
|
|
tx: tx
|
|
)
|
|
)
|
|
}
|
|
|
|
// Note: we run even if there are no records with "unknown fields".
|
|
// This is because fields with default values (e.g. a bool with false set)
|
|
// don't show up in the serialized proto at all. Therefore, if there is an
|
|
// unknown field sent to us with a default value, we won't even know its
|
|
// there and it won't show up in "records with unknown fields".
|
|
// But we should still run migrations, which should assume the default
|
|
// value was set for any records not passed in.
|
|
StorageServiceUnknownFieldMigrator.runMigrationsForRecordsWithUnknownFields(
|
|
records: records,
|
|
tx: tx
|
|
)
|
|
}
|
|
|
|
stateUpdaters.forEach { mergeRecordsWithUnknownFields(stateUpdater: $0, tx: tx) }
|
|
Logger.info("Resolved unknown fields using manifest version \(state.manifestVersion)")
|
|
state.save(transaction: tx)
|
|
}
|
|
}
|
|
|
|
private func cleanUpOrphanedAccounts(in state: inout State) async {
|
|
// We don't keep unregistered accounts in storage service after a certain
|
|
// amount of time. We may also have records for accounts that no longer
|
|
// exist, e.g. that SignalRecipient was merged with another recipient. We
|
|
// try to proactively delete these records from storage service, but there
|
|
// was a period of time we didn't, and we need to cleanup after ourselves.
|
|
|
|
let currentDate = Date()
|
|
let currentConfig: RemoteConfig = .current
|
|
await recordPendingAccountMutations(in: &state, shouldUpdate: {
|
|
return $0?.shouldBeInStorageService(currentDate: currentDate, remoteConfig: currentConfig) != true
|
|
})
|
|
}
|
|
|
|
private func recordPendingMutationsForContactsWithPNIs(in state: inout State) async {
|
|
// We stored invalid PNIs, so run a one-off migration to fix them.
|
|
await recordPendingAccountMutations(in: &state, shouldUpdate: { $0?.pni != nil })
|
|
}
|
|
|
|
private func recordPendingAccountMutations(
|
|
in state: inout State,
|
|
caller: String = #function,
|
|
shouldUpdate: (StorageServiceContact?) -> Bool
|
|
) async {
|
|
let recipientUniqueIds = SSKEnvironment.shared.databaseStorageRef.read { tx in
|
|
state.accountIdToIdentifierMap.keys.filter { shouldUpdate(StorageServiceContact.fetch(for: $0, tx: tx)) }
|
|
}
|
|
|
|
if recipientUniqueIds.isEmpty {
|
|
return
|
|
}
|
|
|
|
Logger.info("Marking \(recipientUniqueIds.count) contact records as mutated via \(caller)")
|
|
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
var pendingMutations = PendingMutations()
|
|
pendingMutations.updatedRecipientUniqueIds.formUnion(recipientUniqueIds)
|
|
Self.recordPendingMutations(pendingMutations, in: &state, transaction: tx)
|
|
state.save(transaction: tx)
|
|
}
|
|
}
|
|
|
|
// MARK: - Record Merge
|
|
|
|
private func mergeRecord<StateUpdater: StorageServiceStateUpdater>(
|
|
_ record: StateUpdater.RecordType,
|
|
identifier: StorageService.StorageIdentifier,
|
|
state: inout State,
|
|
stateUpdater: StateUpdater,
|
|
transaction: SDSAnyWriteTransaction
|
|
) {
|
|
var record = record
|
|
// First apply any migrations
|
|
if StorageServiceUnknownFieldMigrator.shouldInterceptRemoteManifestBeforeMerging(tx: transaction) {
|
|
record = StorageServiceUnknownFieldMigrator.interceptRemoteManifestBeforeMerging(
|
|
record: record,
|
|
tx: transaction
|
|
)
|
|
}
|
|
|
|
let mergeResult = stateUpdater.recordUpdater.mergeRecord(
|
|
record,
|
|
transaction: transaction
|
|
)
|
|
switch mergeResult {
|
|
case .invalid:
|
|
// This record doesn't have a valid identifier. We can't fix it, so we have
|
|
// no choice but to delete it.
|
|
break
|
|
|
|
case .merged(needsUpdate: let needsUpdate, let localId):
|
|
// Mark that our local state matches the state from storage service.
|
|
stateUpdater.setStorageIdentifier(identifier, for: localId, in: &state)
|
|
|
|
// If we have local changes that need to be synced, mark the state as
|
|
// `.updated`. Otherwise, our local state and storage service state match,
|
|
// so we can clear out any pending sync request.
|
|
stateUpdater.setChangeState(needsUpdate ? .updated : nil, for: localId, in: &state)
|
|
|
|
// If the record has unknown fields, we need to hold on to it. This allows
|
|
// future versions of the app to interpret those fields.
|
|
let hasUnknownFields = stateUpdater.recordUpdater.unknownFields(for: record) != nil
|
|
stateUpdater.setRecordWithUnknownFields(hasUnknownFields ? record : nil, for: localId, in: &state)
|
|
}
|
|
}
|
|
|
|
// MARK: - Record Updaters
|
|
|
|
private func buildAccountUpdater() -> SingleElementStateUpdater<StorageServiceAccountRecordUpdater> {
|
|
return SingleElementStateUpdater(
|
|
recordUpdater: StorageServiceAccountRecordUpdater(
|
|
localIdentifiers: localIdentifiers,
|
|
isPrimaryDevice: isPrimaryDevice,
|
|
authedAccount: authedAccount,
|
|
backupSubscriptionManager: DependenciesBridge.shared.backupSubscriptionManager,
|
|
dmConfigurationStore: DependenciesBridge.shared.disappearingMessagesConfigurationStore,
|
|
linkPreviewSettingStore: DependenciesBridge.shared.linkPreviewSettingStore,
|
|
localUsernameManager: DependenciesBridge.shared.localUsernameManager,
|
|
paymentsHelper: SSKEnvironment.shared.paymentsHelperRef,
|
|
phoneNumberDiscoverabilityManager: DependenciesBridge.shared.phoneNumberDiscoverabilityManager,
|
|
pinnedThreadManager: DependenciesBridge.shared.pinnedThreadManager,
|
|
preferences: SSKEnvironment.shared.preferencesRef,
|
|
profileManager: SSKEnvironment.shared.profileManagerImplRef,
|
|
receiptManager: SSKEnvironment.shared.receiptManagerRef,
|
|
registrationStateChangeManager: DependenciesBridge.shared.registrationStateChangeManager,
|
|
storageServiceManager: SSKEnvironment.shared.storageServiceManagerRef,
|
|
systemStoryManager: SSKEnvironment.shared.systemStoryManagerRef,
|
|
tsAccountManager: DependenciesBridge.shared.tsAccountManager,
|
|
typingIndicators: SSKEnvironment.shared.typingIndicatorsRef,
|
|
udManager: SSKEnvironment.shared.udManagerRef,
|
|
usernameEducationManager: DependenciesBridge.shared.usernameEducationManager
|
|
),
|
|
changeState: \.localAccountChangeState,
|
|
storageIdentifier: \.localAccountIdentifier,
|
|
recordWithUnknownFields: \.localAccountRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
private func buildContactUpdater() -> MultipleElementStateUpdater<StorageServiceContactRecordUpdater> {
|
|
return MultipleElementStateUpdater(
|
|
recordUpdater: StorageServiceContactRecordUpdater(
|
|
localIdentifiers: localIdentifiers,
|
|
isPrimaryDevice: isPrimaryDevice,
|
|
authedAccount: authedAccount,
|
|
blockingManager: SSKEnvironment.shared.blockingManagerRef,
|
|
contactsManager: SSKEnvironment.shared.contactManagerImplRef,
|
|
identityManager: DependenciesBridge.shared.identityManager,
|
|
nicknameManager: DependenciesBridge.shared.nicknameManager,
|
|
profileFetcher: SSKEnvironment.shared.profileFetcherRef,
|
|
profileManager: SSKEnvironment.shared.profileManagerImplRef,
|
|
recipientManager: DependenciesBridge.shared.recipientManager,
|
|
recipientMerger: DependenciesBridge.shared.recipientMerger,
|
|
recipientHidingManager: DependenciesBridge.shared.recipientHidingManager,
|
|
remoteConfigProvider: SSKEnvironment.shared.remoteConfigManagerRef,
|
|
signalServiceAddressCache: SSKEnvironment.shared.signalServiceAddressCacheRef,
|
|
tsAccountManager: DependenciesBridge.shared.tsAccountManager,
|
|
usernameLookupManager: DependenciesBridge.shared.usernameLookupManager
|
|
),
|
|
changeState: \.accountIdChangeMap,
|
|
storageIdentifier: \.accountIdToIdentifierMap,
|
|
recordWithUnknownFields: \.accountIdToRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
private func buildGroupV1Updater() -> MultipleElementStateUpdater<StorageServiceGroupV1RecordUpdater> {
|
|
return MultipleElementStateUpdater(
|
|
recordUpdater: StorageServiceGroupV1RecordUpdater(),
|
|
changeState: \.groupV1ChangeMap,
|
|
storageIdentifier: \.groupV1IdToIdentifierMap,
|
|
recordWithUnknownFields: \.groupV1IdToRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
private func buildGroupV2Updater() -> MultipleElementStateUpdater<StorageServiceGroupV2RecordUpdater> {
|
|
return MultipleElementStateUpdater(
|
|
recordUpdater: StorageServiceGroupV2RecordUpdater(
|
|
authedAccount: authedAccount,
|
|
blockingManager: SSKEnvironment.shared.blockingManagerRef,
|
|
groupsV2: SSKEnvironment.shared.groupsV2Ref,
|
|
profileManager: SSKEnvironment.shared.profileManagerRef
|
|
),
|
|
changeState: \.groupV2ChangeMap,
|
|
storageIdentifier: \.groupV2MasterKeyToIdentifierMap,
|
|
recordWithUnknownFields: \.groupV2MasterKeyToRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
private func buildStoryDistributionListUpdater() -> MultipleElementStateUpdater<StorageServiceStoryDistributionListRecordUpdater> {
|
|
return MultipleElementStateUpdater(
|
|
recordUpdater: StorageServiceStoryDistributionListRecordUpdater(
|
|
privateStoryThreadDeletionManager: DependenciesBridge.shared.privateStoryThreadDeletionManager,
|
|
threadRemover: DependenciesBridge.shared.threadRemover
|
|
),
|
|
changeState: \.storyDistributionListChangeMap,
|
|
storageIdentifier: \.storyDistributionListIdentifierToStorageIdentifierMap,
|
|
recordWithUnknownFields: \.storyDistributionListIdentifierToRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
private func buildCallLinkUpdater() -> MultipleElementStateUpdater<StorageServiceCallLinkRecordUpdater> {
|
|
return MultipleElementStateUpdater(
|
|
recordUpdater: StorageServiceCallLinkRecordUpdater(
|
|
callLinkStore: DependenciesBridge.shared.callLinkStore,
|
|
callRecordDeleteManager: DependenciesBridge.shared.callRecordDeleteManager,
|
|
callRecordStore: DependenciesBridge.shared.callRecordStore
|
|
),
|
|
changeState: \.callLinkRootKeyChangeMap,
|
|
storageIdentifier: \.callLinkRootKeyToStorageIdentifierMap,
|
|
recordWithUnknownFields: \.callLinkRootKeyToRecordWithUnknownFields
|
|
)
|
|
}
|
|
|
|
// MARK: - State
|
|
|
|
private static var maxConsecutiveConflicts = 3
|
|
|
|
struct State: Codable {
|
|
fileprivate var manifestVersion: UInt64 = 0
|
|
private var _refetchLatestManifest: Bool?
|
|
fileprivate var refetchLatestManifest: Bool {
|
|
get { _refetchLatestManifest ?? false }
|
|
set { _refetchLatestManifest = newValue }
|
|
}
|
|
|
|
/// Input Keying Material (IKM) used to encrypt records tracked by the
|
|
/// current manifest.
|
|
fileprivate var manifestRecordIkm: Data?
|
|
|
|
fileprivate var consecutiveConflicts: Int = 0
|
|
|
|
fileprivate var localAccountIdentifier: StorageService.StorageIdentifier?
|
|
fileprivate var localAccountRecordWithUnknownFields: StorageServiceProtoAccountRecord?
|
|
|
|
@BidirectionalLegacyDecoding fileprivate var accountIdToIdentifierMap: [RecipientUniqueId: StorageService.StorageIdentifier] = [:]
|
|
private var _accountIdToRecordWithUnknownFields: [RecipientUniqueId: StorageServiceProtoContactRecord]?
|
|
var accountIdToRecordWithUnknownFields: [RecipientUniqueId: StorageServiceProtoContactRecord] {
|
|
get { _accountIdToRecordWithUnknownFields ?? [:] }
|
|
set { _accountIdToRecordWithUnknownFields = newValue }
|
|
}
|
|
|
|
@BidirectionalLegacyDecoding fileprivate var groupV1IdToIdentifierMap: [Data: StorageService.StorageIdentifier] = [:]
|
|
private var _groupV1IdToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV1Record]?
|
|
var groupV1IdToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV1Record] {
|
|
get { _groupV1IdToRecordWithUnknownFields ?? [:] }
|
|
set { _groupV1IdToRecordWithUnknownFields = newValue }
|
|
}
|
|
|
|
@BidirectionalLegacyDecoding fileprivate var groupV2MasterKeyToIdentifierMap: [Data: StorageService.StorageIdentifier] = [:]
|
|
private var _groupV2MasterKeyToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV2Record]?
|
|
var groupV2MasterKeyToRecordWithUnknownFields: [Data: StorageServiceProtoGroupV2Record] {
|
|
get { _groupV2MasterKeyToRecordWithUnknownFields ?? [:] }
|
|
set { _groupV2MasterKeyToRecordWithUnknownFields = newValue }
|
|
}
|
|
|
|
private var _storyDistributionListIdentifierToStorageIdentifierMap: [Data: StorageService.StorageIdentifier]?
|
|
fileprivate var storyDistributionListIdentifierToStorageIdentifierMap: [Data: StorageService.StorageIdentifier] {
|
|
get { _storyDistributionListIdentifierToStorageIdentifierMap ?? [:] }
|
|
set { _storyDistributionListIdentifierToStorageIdentifierMap = newValue }
|
|
}
|
|
private var _storyDistributionListIdentifierToRecordWithUnknownFields: [Data: StorageServiceProtoStoryDistributionListRecord]?
|
|
fileprivate var storyDistributionListIdentifierToRecordWithUnknownFields: [Data: StorageServiceProtoStoryDistributionListRecord] {
|
|
get { _storyDistributionListIdentifierToRecordWithUnknownFields ?? [:] }
|
|
set { _storyDistributionListIdentifierToRecordWithUnknownFields = newValue }
|
|
}
|
|
|
|
fileprivate var unknownIdentifiersTypeMap: [StorageServiceProtoManifestRecordKeyType: [StorageService.StorageIdentifier]] = [:]
|
|
fileprivate var unknownIdentifiers: [StorageService.StorageIdentifier] { unknownIdentifiersTypeMap.values.flatMap { $0 } }
|
|
|
|
/// Invalid identifiers from the most recent merge that should be removed
|
|
/// during the next mutation.
|
|
fileprivate var invalidIdentifiers: Set<StorageService.StorageIdentifier> {
|
|
get { _invalidIdentifiers ?? Set() }
|
|
set { _invalidIdentifiers = newValue.isEmpty ? nil : newValue }
|
|
}
|
|
fileprivate var _invalidIdentifiers: Set<StorageService.StorageIdentifier>?
|
|
|
|
/// The app version from the last time we checked unknown fields. We can
|
|
/// only transition unknown fields to known fields via an update, so we only
|
|
/// need to check once per app version.
|
|
fileprivate var unknownFieldLastCheckedAppVersion: String?
|
|
|
|
enum ChangeState: Int, Codable {
|
|
case unchanged = 0
|
|
case updated = 1
|
|
|
|
/// This is mostly vestigial, but even when we no longer assign this status
|
|
/// in new versions of the application, we'll still need to support reading
|
|
/// it (for times when it was written by prior versions of the application).
|
|
case deleted = 2
|
|
}
|
|
|
|
fileprivate var localAccountChangeState: ChangeState = .unchanged
|
|
fileprivate var accountIdChangeMap: [RecipientUniqueId: ChangeState] = [:]
|
|
fileprivate var groupV2ChangeMap: [Data: ChangeState] = [:]
|
|
|
|
/// We will no longer update this value, and want to also ignore this
|
|
/// value in any previously-persisted state.
|
|
@EmptyForCodable fileprivate var groupV1ChangeMap: [Data: ChangeState] = [:]
|
|
|
|
private var _storyDistributionListChangeMap: [Data: ChangeState]?
|
|
fileprivate var storyDistributionListChangeMap: [Data: ChangeState] {
|
|
get { _storyDistributionListChangeMap ?? [:] }
|
|
set { _storyDistributionListChangeMap = newValue }
|
|
}
|
|
|
|
private var _callLinkRootKeyChangeMap: [Data: ChangeState]?
|
|
fileprivate var callLinkRootKeyChangeMap: [Data: ChangeState] {
|
|
get { _callLinkRootKeyChangeMap ?? [:] }
|
|
set { _callLinkRootKeyChangeMap = newValue }
|
|
}
|
|
private var _callLinkRootKeyToStorageIdentifierMap: [Data: StorageService.StorageIdentifier]?
|
|
fileprivate var callLinkRootKeyToStorageIdentifierMap: [Data: StorageService.StorageIdentifier] {
|
|
get { _callLinkRootKeyToStorageIdentifierMap ?? [:] }
|
|
set { _callLinkRootKeyToStorageIdentifierMap = newValue }
|
|
}
|
|
private var _callLinkRootKeyToRecordWithUnknownFields: [Data: StorageServiceProtoCallLinkRecord]?
|
|
fileprivate var callLinkRootKeyToRecordWithUnknownFields: [Data: StorageServiceProtoCallLinkRecord] {
|
|
get { _callLinkRootKeyToRecordWithUnknownFields ?? [:] }
|
|
set { _callLinkRootKeyToRecordWithUnknownFields = newValue }
|
|
}
|
|
|
|
fileprivate var allIdentifiers: [StorageService.StorageIdentifier] {
|
|
var allIdentifiers = [StorageService.StorageIdentifier]()
|
|
if let localAccountIdentifier = localAccountIdentifier {
|
|
allIdentifiers.append(localAccountIdentifier)
|
|
}
|
|
|
|
allIdentifiers += accountIdToIdentifierMap.values
|
|
allIdentifiers += groupV1IdToIdentifierMap.values
|
|
allIdentifiers += groupV2MasterKeyToIdentifierMap.values
|
|
allIdentifiers += storyDistributionListIdentifierToStorageIdentifierMap.values
|
|
allIdentifiers += callLinkRootKeyToStorageIdentifierMap.values
|
|
|
|
// We must persist any unknown identifiers, as they are potentially associated with
|
|
// valid records that this version of the app doesn't yet understand how to parse.
|
|
// Otherwise, this will cause ping-ponging with newer apps when they try and backup
|
|
// new types of records, and then we subsequently delete them.
|
|
allIdentifiers += unknownIdentifiers
|
|
|
|
return allIdentifiers
|
|
}
|
|
|
|
private static let stateKey = "state"
|
|
|
|
fileprivate static func current(transaction: SDSAnyReadTransaction) -> State {
|
|
guard let stateData = keyValueStore.getData(stateKey, transaction: transaction.asV2Read) else { return State() }
|
|
guard let current = try? JSONDecoder().decode(State.self, from: stateData) else {
|
|
owsFailDebug("failed to decode state data")
|
|
return State()
|
|
}
|
|
return current
|
|
}
|
|
|
|
fileprivate mutating func save(clearConsecutiveConflicts: Bool = false, transaction: SDSAnyWriteTransaction) {
|
|
if clearConsecutiveConflicts { consecutiveConflicts = 0 }
|
|
guard let stateData = try? JSONEncoder().encode(self) else { return owsFailDebug("failed to encode state data") }
|
|
keyValueStore.setData(stateData, key: State.stateKey, transaction: transaction.asV2Write)
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// MARK: - State Updaters
|
|
|
|
protocol StorageServiceStateUpdater {
|
|
associatedtype RecordUpdaterType: StorageServiceRecordUpdater
|
|
|
|
typealias IdType = RecordUpdaterType.IdType
|
|
typealias RecordType = RecordUpdaterType.RecordType
|
|
typealias State = StorageServiceOperation.State
|
|
|
|
var recordUpdater: RecordUpdaterType { get }
|
|
|
|
func changeState(for localId: IdType, in state: State) -> State.ChangeState?
|
|
func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State)
|
|
func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void)
|
|
|
|
func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier?
|
|
func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State)
|
|
|
|
func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType?
|
|
func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State)
|
|
|
|
func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)]
|
|
}
|
|
|
|
private struct SingleElementStateUpdater<RecordUpdaterType: StorageServiceRecordUpdater>: StorageServiceStateUpdater where RecordUpdaterType.IdType == Void {
|
|
typealias IdType = RecordUpdaterType.IdType
|
|
typealias RecordType = RecordUpdaterType.RecordType
|
|
typealias State = StorageServiceOperation.State
|
|
|
|
let recordUpdater: RecordUpdaterType
|
|
|
|
private let changeStateKeyPath: WritableKeyPath<State, State.ChangeState>
|
|
private let storageIdentifierKeyPath: WritableKeyPath<State, StorageService.StorageIdentifier?>
|
|
private let recordWithUnknownFieldsKeyPath: WritableKeyPath<State, RecordType?>
|
|
|
|
init(
|
|
recordUpdater: RecordUpdaterType,
|
|
changeState: WritableKeyPath<State, State.ChangeState>,
|
|
storageIdentifier: WritableKeyPath<State, StorageService.StorageIdentifier?>,
|
|
recordWithUnknownFields: WritableKeyPath<State, RecordType?>
|
|
) {
|
|
self.recordUpdater = recordUpdater
|
|
self.changeStateKeyPath = changeState
|
|
self.storageIdentifierKeyPath = storageIdentifier
|
|
self.recordWithUnknownFieldsKeyPath = recordWithUnknownFields
|
|
}
|
|
|
|
func changeState(for localId: IdType, in state: State) -> State.ChangeState? {
|
|
state[keyPath: changeStateKeyPath]
|
|
}
|
|
|
|
func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: changeStateKeyPath] = changeState ?? .unchanged
|
|
}
|
|
|
|
func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void) {
|
|
let oldState = state[keyPath: changeStateKeyPath]
|
|
state[keyPath: changeStateKeyPath] = .unchanged
|
|
block(&state, (), oldState)
|
|
}
|
|
|
|
func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier? {
|
|
state[keyPath: storageIdentifierKeyPath]
|
|
}
|
|
|
|
func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: storageIdentifierKeyPath] = storageIdentifier
|
|
}
|
|
|
|
func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType? {
|
|
state[keyPath: recordWithUnknownFieldsKeyPath]
|
|
}
|
|
|
|
func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: recordWithUnknownFieldsKeyPath] = recordWithUnknownFields
|
|
}
|
|
|
|
func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)] {
|
|
guard let recordWithUnknownFields = state[keyPath: recordWithUnknownFieldsKeyPath] else {
|
|
return []
|
|
}
|
|
return [((), recordWithUnknownFields)]
|
|
}
|
|
}
|
|
|
|
private struct MultipleElementStateUpdater<RecordUpdaterType: StorageServiceRecordUpdater>: StorageServiceStateUpdater where RecordUpdaterType.IdType: Hashable {
|
|
typealias IdType = RecordUpdaterType.IdType
|
|
typealias RecordType = RecordUpdaterType.RecordType
|
|
typealias State = StorageServiceOperation.State
|
|
|
|
let recordUpdater: RecordUpdaterType
|
|
private let changeStateKeyPath: WritableKeyPath<State, [IdType: State.ChangeState]>
|
|
private let storageIdentifierKeyPath: WritableKeyPath<State, [IdType: StorageService.StorageIdentifier]>
|
|
private let recordWithUnknownFieldsKeyPath: WritableKeyPath<State, [IdType: RecordType]>
|
|
|
|
init(
|
|
recordUpdater: RecordUpdaterType,
|
|
changeState: WritableKeyPath<State, [IdType: State.ChangeState]>,
|
|
storageIdentifier: WritableKeyPath<State, [IdType: StorageService.StorageIdentifier]>,
|
|
recordWithUnknownFields: WritableKeyPath<State, [IdType: RecordType]>
|
|
) {
|
|
self.recordUpdater = recordUpdater
|
|
self.changeStateKeyPath = changeState
|
|
self.storageIdentifierKeyPath = storageIdentifier
|
|
self.recordWithUnknownFieldsKeyPath = recordWithUnknownFields
|
|
}
|
|
|
|
func changeState(for localId: IdType, in state: State) -> State.ChangeState? {
|
|
state[keyPath: changeStateKeyPath][localId]
|
|
}
|
|
|
|
func setChangeState(_ changeState: State.ChangeState?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: changeStateKeyPath][localId] = changeState
|
|
}
|
|
|
|
func resetAndEnumerateChangeStates(in state: inout State, block: (inout State, IdType, State.ChangeState) -> Void) {
|
|
let oldValue = state[keyPath: changeStateKeyPath]
|
|
state[keyPath: changeStateKeyPath] = [:]
|
|
for (localId, changeState) in oldValue {
|
|
block(&state, localId, changeState)
|
|
}
|
|
}
|
|
|
|
func storageIdentifier(for localId: IdType, in state: State) -> StorageService.StorageIdentifier? {
|
|
state[keyPath: storageIdentifierKeyPath][localId]
|
|
}
|
|
|
|
func setStorageIdentifier(_ storageIdentifier: StorageService.StorageIdentifier?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: storageIdentifierKeyPath][localId] = storageIdentifier
|
|
}
|
|
|
|
func recordWithUnknownFields(for localId: IdType, in state: State) -> RecordType? {
|
|
state[keyPath: recordWithUnknownFieldsKeyPath][localId]
|
|
}
|
|
|
|
func setRecordWithUnknownFields(_ recordWithUnknownFields: RecordType?, for localId: IdType, in state: inout State) {
|
|
state[keyPath: recordWithUnknownFieldsKeyPath][localId] = recordWithUnknownFields
|
|
}
|
|
|
|
func recordsWithUnknownFields(in state: State) -> [(IdType, RecordType)] {
|
|
state[keyPath: recordWithUnknownFieldsKeyPath].map { $0 }
|
|
}
|
|
}
|
|
|
|
// MARK: - Legacy Codable
|
|
|
|
extension Dictionary: EmptyInitializable {}
|
|
|
|
/// Optionally attempts decoding a dictionary as a BidirectionalDictionary,
|
|
/// in case it was previously stored in that format.
|
|
@propertyWrapper
|
|
private struct BidirectionalLegacyDecoding<Value: Codable>: Codable {
|
|
enum BidirectionalDictionaryCodingKeys: String, CodingKey {
|
|
case forwardDictionary
|
|
case backwardDictionary
|
|
}
|
|
|
|
var wrappedValue: Value
|
|
init(wrappedValue: Value) {
|
|
self.wrappedValue = wrappedValue
|
|
}
|
|
|
|
init(from decoder: Swift.Decoder) throws {
|
|
do {
|
|
// First, try and decode as if we're just a dictionary.
|
|
wrappedValue = try Value(from: decoder)
|
|
} catch DecodingError.keyNotFound, DecodingError.typeMismatch {
|
|
// If we hit a decoding error, try and decode as if
|
|
// we were a BidirectionalDictionary.
|
|
let bidirectionalContainer = try decoder.container(keyedBy: BidirectionalDictionaryCodingKeys.self)
|
|
wrappedValue = try bidirectionalContainer.decode(Value.self, forKey: .forwardDictionary)
|
|
}
|
|
}
|
|
|
|
func encode(to encoder: Encoder) throws {
|
|
try wrappedValue.encode(to: encoder)
|
|
}
|
|
}
|
|
|
|
// MARK: - StorageServiceProtoManifestRecord
|
|
|
|
private extension StorageServiceProtoManifestRecord {
|
|
var logDescription: String { "v[\(version)].\(sourceDevice)" }
|
|
}
|