313 lines
13 KiB
Swift
313 lines
13 KiB
Swift
//
|
|
// Copyright 2019 Signal Messenger, LLC
|
|
// SPDX-License-Identifier: AGPL-3.0-only
|
|
//
|
|
|
|
public extension Notification.Name {
|
|
static let incomingContactSyncDidComplete = Notification.Name("IncomingContactSyncDidComplete")
|
|
}
|
|
|
|
public class IncomingContactSyncJobQueue: NSObject {
|
|
public enum Constants {
|
|
public static let insertedThreads = "insertedThreads"
|
|
}
|
|
|
|
private let jobQueueRunner: JobQueueRunner<
|
|
JobRecordFinderImpl<IncomingContactSyncJobRecord>,
|
|
IncomingContactSyncJobRunnerFactory
|
|
>
|
|
private var jobSerializer = CompletionSerializer()
|
|
|
|
public init(appReadiness: AppReadiness, db: any DB, reachabilityManager: SSKReachabilityManager) {
|
|
self.jobQueueRunner = JobQueueRunner(
|
|
canExecuteJobsConcurrently: false,
|
|
db: db,
|
|
jobFinder: JobRecordFinderImpl(db: db),
|
|
jobRunnerFactory: IncomingContactSyncJobRunnerFactory(appReadiness: appReadiness)
|
|
)
|
|
super.init()
|
|
self.jobQueueRunner.listenForReachabilityChanges(reachabilityManager: reachabilityManager)
|
|
}
|
|
|
|
public func start(appContext: AppContext) {
|
|
jobQueueRunner.start(shouldRestartExistingJobs: appContext.isMainApp)
|
|
}
|
|
|
|
public func add(
|
|
cdnNumber: UInt32,
|
|
cdnKey: String,
|
|
encryptionKey: Data,
|
|
digest: Data,
|
|
plaintextLength: UInt32?,
|
|
isComplete: Bool,
|
|
tx: SDSAnyWriteTransaction
|
|
) {
|
|
let jobRecord = IncomingContactSyncJobRecord(
|
|
cdnNumber: cdnNumber,
|
|
cdnKey: cdnKey,
|
|
encryptionKey: encryptionKey,
|
|
digest: digest,
|
|
plaintextLength: plaintextLength,
|
|
isCompleteContactSync: isComplete
|
|
)
|
|
jobRecord.anyInsert(transaction: tx)
|
|
jobSerializer.addOrderedSyncCompletion(tx: tx.asV2Write) {
|
|
self.jobQueueRunner.addPersistedJob(jobRecord)
|
|
}
|
|
}
|
|
}
|
|
|
|
private class IncomingContactSyncJobRunnerFactory: JobRunnerFactory {
|
|
|
|
private let appReadiness: AppReadiness
|
|
|
|
init(appReadiness: AppReadiness) {
|
|
self.appReadiness = appReadiness
|
|
}
|
|
|
|
func buildRunner() -> IncomingContactSyncJobRunner {
|
|
return IncomingContactSyncJobRunner(appReadiness: appReadiness)
|
|
}
|
|
}
|
|
|
|
private class IncomingContactSyncJobRunner: JobRunner {
|
|
private enum Constants {
|
|
static let maxRetries: UInt = 4
|
|
}
|
|
|
|
private let appReadiness: AppReadiness
|
|
|
|
init(appReadiness: AppReadiness) {
|
|
self.appReadiness = appReadiness
|
|
}
|
|
|
|
func runJobAttempt(_ jobRecord: IncomingContactSyncJobRecord) async -> JobAttemptResult {
|
|
return await JobAttemptResult.executeBlockWithDefaultErrorHandler(
|
|
jobRecord: jobRecord,
|
|
retryLimit: Constants.maxRetries,
|
|
db: DependenciesBridge.shared.db,
|
|
block: { try await _runJob(jobRecord) }
|
|
)
|
|
}
|
|
|
|
func didFinishJob(_ jobRecordId: JobRecord.RowId, result: JobResult) async {}
|
|
|
|
private func _runJob(_ jobRecord: IncomingContactSyncJobRecord) async throws {
|
|
let fileUrl: URL
|
|
switch jobRecord.downloadInfo {
|
|
case .invalid:
|
|
owsFailDebug("Invalid contact sync job!")
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
jobRecord.anyRemove(transaction: tx)
|
|
}
|
|
return
|
|
case .transient(let downloadMetadata):
|
|
fileUrl = try await DependenciesBridge.shared.attachmentDownloadManager.downloadTransientAttachment(
|
|
metadata: downloadMetadata
|
|
).awaitable()
|
|
}
|
|
|
|
let insertedThreads = try await firstly(on: DispatchQueue.global()) {
|
|
try self.processContactSync(decryptedFileUrl: fileUrl, isComplete: jobRecord.isCompleteContactSync)
|
|
}.awaitable()
|
|
await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in
|
|
jobRecord.anyRemove(transaction: tx)
|
|
}
|
|
NotificationCenter.default.post(name: .incomingContactSyncDidComplete, object: self, userInfo: [
|
|
IncomingContactSyncJobQueue.Constants.insertedThreads: insertedThreads
|
|
])
|
|
}
|
|
|
|
// MARK: - Private
|
|
|
|
private func processContactSync(
|
|
decryptedFileUrl fileUrl: URL,
|
|
isComplete: Bool
|
|
) throws -> [(threadUniqueId: String, sortOrder: UInt32)] {
|
|
|
|
var insertedThreads = [(threadUniqueId: String, sortOrder: UInt32)]()
|
|
try Data(contentsOf: fileUrl, options: .mappedIfSafe).withUnsafeBytes { bufferPtr in
|
|
if let baseAddress = bufferPtr.baseAddress, bufferPtr.count > 0 {
|
|
let pointer = baseAddress.assumingMemoryBound(to: UInt8.self)
|
|
let inputStream = ChunkedInputStream(forReadingFrom: pointer, count: bufferPtr.count)
|
|
let contactStream = ContactsInputStream(inputStream: inputStream)
|
|
|
|
// We use batching to avoid long-running write transactions
|
|
// and to place an upper bound on memory usage.
|
|
var allPhoneNumbers = [E164]()
|
|
while try processBatch(
|
|
contactStream: contactStream,
|
|
insertedThreads: &insertedThreads,
|
|
processedPhoneNumbers: &allPhoneNumbers
|
|
) {}
|
|
|
|
if isComplete {
|
|
try pruneContacts(exceptThoseReceivedFromCompleteSync: allPhoneNumbers)
|
|
}
|
|
|
|
SSKEnvironment.shared.databaseStorageRef.write { transaction in
|
|
// Always fire just one identity change notification, rather than potentially
|
|
// once per contact. It's possible that *no* identities actually changed,
|
|
// but we have no convenient way to track that.
|
|
let identityManager = DependenciesBridge.shared.identityManager
|
|
identityManager.fireIdentityStateChangeNotification(after: transaction.asV2Write)
|
|
}
|
|
}
|
|
}
|
|
return insertedThreads
|
|
}
|
|
|
|
// Returns false when there are no more contacts to process.
|
|
private func processBatch(
|
|
contactStream: ContactsInputStream,
|
|
insertedThreads: inout [(threadUniqueId: String, sortOrder: UInt32)],
|
|
processedPhoneNumbers: inout [E164]
|
|
) throws -> Bool {
|
|
try autoreleasepool {
|
|
// We use batching to avoid long-running write transactions.
|
|
guard let contactBatch = try Self.buildBatch(contactStream: contactStream) else {
|
|
return false
|
|
}
|
|
guard !contactBatch.isEmpty else {
|
|
owsFailDebug("Empty batch.")
|
|
return false
|
|
}
|
|
try SSKEnvironment.shared.databaseStorageRef.write { tx in
|
|
for contact in contactBatch {
|
|
if let phoneNumber = try processContactDetails(contact, insertedThreads: &insertedThreads, tx: tx) {
|
|
processedPhoneNumbers.append(phoneNumber)
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
|
|
private static func buildBatch(contactStream: ContactsInputStream) throws -> [ContactDetails]? {
|
|
let batchSize = 8
|
|
var contacts = [ContactDetails]()
|
|
while contacts.count < batchSize, let contact = try contactStream.decodeContact() {
|
|
contacts.append(contact)
|
|
}
|
|
guard !contacts.isEmpty else {
|
|
return nil
|
|
}
|
|
return contacts
|
|
}
|
|
|
|
private func processContactDetails(
|
|
_ contactDetails: ContactDetails,
|
|
insertedThreads: inout [(threadUniqueId: String, sortOrder: UInt32)],
|
|
tx: SDSAnyWriteTransaction
|
|
) throws -> E164? {
|
|
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
|
|
guard let localIdentifiers = tsAccountManager.localIdentifiers(tx: tx.asV2Read) else {
|
|
throw OWSGenericError("Not registered.")
|
|
}
|
|
|
|
let recipientFetcher = DependenciesBridge.shared.recipientFetcher
|
|
let recipientManager = DependenciesBridge.shared.recipientManager
|
|
let recipientMerger = DependenciesBridge.shared.recipientMerger
|
|
|
|
let recipient: SignalRecipient
|
|
if let aci = contactDetails.aci {
|
|
recipient = recipientMerger.applyMergeFromContactSync(
|
|
localIdentifiers: localIdentifiers,
|
|
aci: aci,
|
|
phoneNumber: contactDetails.phoneNumber,
|
|
tx: tx.asV2Write
|
|
)
|
|
// Mark as registered only if we have a UUID (we always do in this branch).
|
|
// If we don't have a UUID, contacts can't be registered.
|
|
recipientManager.markAsRegisteredAndSave(recipient, shouldUpdateStorageService: false, tx: tx.asV2Write)
|
|
} else if let phoneNumber = contactDetails.phoneNumber {
|
|
recipient = recipientFetcher.fetchOrCreate(phoneNumber: phoneNumber, tx: tx.asV2Write)
|
|
} else {
|
|
throw OWSAssertionError("No identifier in ContactDetails.")
|
|
}
|
|
|
|
let address = recipient.address
|
|
|
|
let contactThread: TSContactThread
|
|
let isNewThread: Bool
|
|
if let existingThread = TSContactThread.getWithContactAddress(address, transaction: tx) {
|
|
contactThread = existingThread
|
|
isNewThread = false
|
|
} else {
|
|
let newThread = TSContactThread(contactAddress: address)
|
|
newThread.shouldThreadBeVisible = true
|
|
|
|
contactThread = newThread
|
|
isNewThread = true
|
|
}
|
|
|
|
if isNewThread {
|
|
contactThread.anyInsert(transaction: tx)
|
|
let inboxSortOrder = contactDetails.inboxSortOrder ?? UInt32.max
|
|
insertedThreads.append((threadUniqueId: contactThread.uniqueId, sortOrder: inboxSortOrder))
|
|
}
|
|
|
|
let disappearingMessageToken = VersionedDisappearingMessageToken.token(
|
|
forProtoExpireTimerSeconds: contactDetails.expireTimer,
|
|
version: contactDetails.expireTimerVersion
|
|
)
|
|
GroupManager.remoteUpdateDisappearingMessages(
|
|
contactThread: contactThread,
|
|
disappearingMessageToken: disappearingMessageToken,
|
|
changeAuthor: nil,
|
|
localIdentifiers: localIdentifiers,
|
|
transaction: tx
|
|
)
|
|
|
|
return contactDetails.phoneNumber
|
|
}
|
|
|
|
/// Clear ``SignalAccount``s that weren't part of a complete sync.
|
|
///
|
|
/// Although "system contact" details (represented by a ``SignalAccount``)
|
|
/// are synced via StorageService rather than contact sync messages, any
|
|
/// contacts not included in a complete contact sync are not present on the
|
|
/// primary device and should there be removed from this linked device.
|
|
///
|
|
/// In theory, StorageService updates should handle removing these contacts.
|
|
/// However, there's no periodic sync check our state against
|
|
/// StorageService, so this job continues to fulfill that role. In the
|
|
/// future, if you're removing this method, you should first ensure that
|
|
/// periodic full syncs of contact details happen with StorageService.
|
|
private func pruneContacts(exceptThoseReceivedFromCompleteSync phoneNumbers: [E164]) throws {
|
|
try SSKEnvironment.shared.databaseStorageRef.write { transaction in
|
|
// Every contact sync includes your own address. However, we shouldn't
|
|
// create a SignalAccount for your own address. (If you're a primary, this
|
|
// is handled by FetchedSystemContacts.phoneNumbers(…).)
|
|
let tsAccountManager = DependenciesBridge.shared.tsAccountManager
|
|
guard let localIdentifiers = tsAccountManager.localIdentifiers(tx: transaction.asV2Read) else {
|
|
throw OWSGenericError("Not registered.")
|
|
}
|
|
let setOfPhoneNumbers = Set(phoneNumbers.lazy.filter { !localIdentifiers.contains(phoneNumber: $0) })
|
|
|
|
// Rather than collecting SignalAccount objects, collect their unique IDs.
|
|
// This operation can run in the memory-constrainted NSE, so trade off a
|
|
// bit of speed to save memory.
|
|
var uniqueIdsToRemove = [String]()
|
|
SignalAccount.anyEnumerate(transaction: transaction, batchingPreference: .batched(8)) { signalAccount, _ in
|
|
if let phoneNumber = E164(signalAccount.recipientPhoneNumber), setOfPhoneNumbers.contains(phoneNumber) {
|
|
// This contact was received in this batch, so don't remove it.
|
|
return
|
|
}
|
|
uniqueIdsToRemove.append(signalAccount.uniqueId)
|
|
}
|
|
Logger.info("Removing \(uniqueIdsToRemove.count) contacts during contact sync")
|
|
for uniqueId in uniqueIdsToRemove {
|
|
autoreleasepool {
|
|
guard let signalAccount = SignalAccount.anyFetch(uniqueId: uniqueId, transaction: transaction) else {
|
|
return
|
|
}
|
|
signalAccount.anyRemove(transaction: transaction)
|
|
}
|
|
}
|
|
if !uniqueIdsToRemove.isEmpty {
|
|
SSKEnvironment.shared.contactManagerImplRef.didUpdateSignalAccounts(transaction: transaction)
|
|
}
|
|
}
|
|
}
|
|
}
|