From e264b67afbea3d45dd5e2203ac018cd5b719bdc5 Mon Sep 17 00:00:00 2001 From: Diego Costantino Date: Tue, 13 Oct 2020 15:28:04 -0700 Subject: [PATCH] fix(datastore): Keep DataStore sync engine running even if models subscriptions fail (#815) * fix(datastore): keep DataStore sync engine running even if models subscriptions fail * test(datastore): add sync engine start unit tests * fix(datastore): move authorization errors handling responsibility to AWSModelReconciliationQueue * fix(datastore): clear modelReconciliationQueueSinks if a subscription fail w/ unauthorized err --- .../AWSIncomingEventReconciliationQueue.swift | 17 +++- ...WSIncomingSubscriptionEventPublisher.swift | 1 + .../AWSModelReconciliationQueue.swift | 31 ++++++- .../ModelReconciliationQueue.swift | 7 +- ...ncomingEventReconciliationQueueTests.swift | 91 +++++++++++++++++-- 5 files changed, 135 insertions(+), 12 deletions(-) diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift index 28027d7a05..26254892b5 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift @@ -41,6 +41,10 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu private var reconciliationQueues: [String: ModelReconciliationQueue] private var reconciliationQueueConnectionStatus: [String: Bool] private var modelReconciliationQueueFactory: ModelReconciliationQueueFactory + + private var isInitialized: Bool { + reconciliationQueueConnectionStatus.count == reconciliationQueues.count + } init(modelTypes: [Model.Type], api: APICategoryGraphQLBehavior, @@ -107,10 +111,19 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu switch receiveValue { case .mutationEvent(let event): eventReconciliationQueueTopic.send(.mutationEvent(event)) - case .connected(let modelName): + case .connected(modelName: let modelName): connectionStatusSerialQueue.async { self.reconciliationQueueConnectionStatus[modelName] = true - if self.reconciliationQueueConnectionStatus.count == self.reconciliationQueues.count { + if self.isInitialized { + self.eventReconciliationQueueTopic.send(.initialized) + } + } + case .disconnected(modelName: let modelName, reason: .unauthorized): + connectionStatusSerialQueue.async { + self.reconciliationQueues[modelName]?.cancel() + self.modelReconciliationQueueSinks[modelName]?.cancel() + self.reconciliationQueueConnectionStatus[modelName] = false + if self.isInitialized { self.eventReconciliationQueueTopic.send(.initialized) } } diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift index 29758cd7e6..1ecbc1a21e 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift @@ -58,6 +58,7 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl } } +// MARK: Resettable @available(iOS 13.0, *) extension AWSIncomingSubscriptionEventPublisher: Resettable { diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift index 658e770990..003800371c 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift @@ -154,7 +154,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue { self.enqueue(remoteModel) }) case .connectionConnected: - modelReconciliationQueueSubject.send(.connected(modelName)) + modelReconciliationQueueSubject.send(.connected(modelName: modelName)) } } @@ -164,6 +164,11 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue { log.info("receivedCompletion: finished") modelReconciliationQueueSubject.send(completion: .finished) case .failure(let dataStoreError): + if case let .api(error, _) = dataStoreError, + case let APIError.operationError(_, _, underlyingError) = error, isUnauthorizedError(underlyingError) { + modelReconciliationQueueSubject.send(.disconnected(modelName: modelName, reason: .unauthorized)) + return + } log.error("receiveCompletion: error: \(dataStoreError)") modelReconciliationQueueSubject.send(completion: .failure(dataStoreError)) } @@ -173,6 +178,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue { @available(iOS 13.0, *) extension AWSModelReconciliationQueue: DefaultLogger { } +// MARK: Resettable @available(iOS 13.0, *) extension AWSModelReconciliationQueue: Resettable { @@ -208,3 +214,26 @@ extension AWSModelReconciliationQueue: Resettable { } } + +// MARK: Auth errors handling +@available(iOS 13.0, *) +extension AWSModelReconciliationQueue { + private typealias ResponseType = MutationSync + private func graphqlErrors(from error: GraphQLResponseError?) -> [GraphQLError]? { + if case let .error(errors) = error { + return errors + } + return nil + } + + private func isUnauthorizedError(_ error: Error?) -> Bool { + if let responseError = error as? GraphQLResponseError, + let graphQLError = graphqlErrors(from: responseError)?.first, + let extensions = graphQLError.extensions, + case let .string(errorTypeValue) = extensions["errorType"], + case .unauthorized = AppSyncErrorType(errorTypeValue) { + return true + } + return false + } +} diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift index ae7f117336..87fcf3de79 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/ModelReconciliationQueue.swift @@ -9,10 +9,15 @@ import Amplify import AWSPluginsCore import Combine +enum ModelConnectionDisconnectedReason { + case unauthorized +} + enum ModelReconciliationQueueEvent { case started case paused - case connected(String) + case connected(modelName: String) + case disconnected(modelName: String, reason: ModelConnectionDisconnectedReason) case mutationEvent(MutationEvent) } diff --git a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueueTests.swift b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueueTests.swift index 4e57e0ccb8..7fb2a25d00 100644 --- a/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueueTests.swift +++ b/AmplifyPlugins/DataStore/AWSDataStoreCategoryPluginTests/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueueTests.swift @@ -25,6 +25,12 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase { apiPlugin = MockAPICategoryPlugin() + operationQueue = OperationQueue() + operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue" + operationQueue.maxConcurrentOperationCount = 2 + operationQueue.underlyingQueue = DispatchQueue.global() + operationQueue.isSuspended = true + } var operationQueue: OperationQueue! @@ -50,20 +56,14 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase { case .initialized: expectInitialized.fulfill() default: - XCTFail("Should not expect any other state") + XCTFail("Should not expect any other state, received: \(event)") } }) - operationQueue = OperationQueue() - operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue" - operationQueue.maxConcurrentOperationCount = 2 - operationQueue.underlyingQueue = DispatchQueue.global() - operationQueue.isSuspended = true - let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues for (queueName, queue) in reconciliationQueues { let cancellableOperation = CancelAwareBlockOperation { - queue.modelReconciliationQueueSubject.send(.connected(queueName)) + queue.modelReconciliationQueueSubject.send(.connected(modelName: queueName)) } operationQueue.addOperation(cancellableOperation) } @@ -73,4 +73,79 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase { // Take action on the sink to prevent compiler warnings about unused variables. sink.cancel() } + + func testSubscriptionFailedWithSingleModelUnauthorizedError() { + let expectInitialized = expectation(description: "eventQueue expected to send out initialized state") + let modelReconciliationQueueFactory + = MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:) + let eventQueue = AWSIncomingEventReconciliationQueue( + modelTypes: [Post.self], + api: apiPlugin, + storageAdapter: storageAdapter, + modelReconciliationQueueFactory: modelReconciliationQueueFactory) + eventQueue.start() + + let sink = eventQueue.publisher.sink(receiveCompletion: { _ in + XCTFail("Not expecting this to call") + }, receiveValue: { event in + switch event { + case .initialized: + expectInitialized.fulfill() + default: + XCTFail("Should not expect any other state, received: \(event)") + } + }) + + let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues + for (queueName, queue) in reconciliationQueues { + let cancellableOperation = CancelAwareBlockOperation { + queue.modelReconciliationQueueSubject.send(.disconnected(modelName: queueName, reason: .unauthorized)) + } + operationQueue.addOperation(cancellableOperation) + } + operationQueue.isSuspended = false + waitForExpectations(timeout: 2) + + sink.cancel() + } + + // This test case tests that initialized event is received even if only one + // model subscriptions out of two failed - Post subscription will fail but Comment will succeed + func testSubscriptionFailedWithMultipleModels() { + let expectInitialized = expectation(description: "eventQueue expected to send out initialized state") + let modelReconciliationQueueFactory + = MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:) + let eventQueue = AWSIncomingEventReconciliationQueue( + modelTypes: [Post.self, Comment.self], + api: apiPlugin, + storageAdapter: storageAdapter, + modelReconciliationQueueFactory: modelReconciliationQueueFactory) + eventQueue.start() + + let sink = eventQueue.publisher.sink(receiveCompletion: { _ in + XCTFail("Not expecting this to call") + }, receiveValue: { event in + switch event { + case .initialized: + expectInitialized.fulfill() + default: + XCTFail("Should not expect any other state, received: \(event)") + } + }) + + let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues + for (queueName, queue) in reconciliationQueues { + let cancellableOperation = CancelAwareBlockOperation { + let event: ModelReconciliationQueueEvent = queueName == Post.modelName ? + .disconnected(modelName: queueName, reason: .unauthorized) : + .connected(modelName: queueName) + queue.modelReconciliationQueueSubject.send(event) + } + operationQueue.addOperation(cancellableOperation) + } + operationQueue.isSuspended = false + waitForExpectations(timeout: 2) + + sink.cancel() + } }