Skip to content

Commit

Permalink
fix(datastore): Keep DataStore sync engine running even if models sub…
Browse files Browse the repository at this point in the history
…scriptions fail (#815)

* fix(datastore): keep DataStore sync engine running even if models subscriptions fail

* test(datastore): add sync engine start unit tests

* fix(datastore): move authorization errors handling responsibility to AWSModelReconciliationQueue

* fix(datastore): clear modelReconciliationQueueSinks if a subscription fail w/ unauthorized err
  • Loading branch information
diegocstn authored Oct 13, 2020
1 parent 01fa243 commit e264b67
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
private var reconciliationQueues: [String: ModelReconciliationQueue]
private var reconciliationQueueConnectionStatus: [String: Bool]
private var modelReconciliationQueueFactory: ModelReconciliationQueueFactory

private var isInitialized: Bool {
reconciliationQueueConnectionStatus.count == reconciliationQueues.count
}

init(modelTypes: [Model.Type],
api: APICategoryGraphQLBehavior,
Expand Down Expand Up @@ -107,10 +111,19 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
switch receiveValue {
case .mutationEvent(let event):
eventReconciliationQueueTopic.send(.mutationEvent(event))
case .connected(let modelName):
case .connected(modelName: let modelName):
connectionStatusSerialQueue.async {
self.reconciliationQueueConnectionStatus[modelName] = true
if self.reconciliationQueueConnectionStatus.count == self.reconciliationQueues.count {
if self.isInitialized {
self.eventReconciliationQueueTopic.send(.initialized)
}
}
case .disconnected(modelName: let modelName, reason: .unauthorized):
connectionStatusSerialQueue.async {
self.reconciliationQueues[modelName]?.cancel()
self.modelReconciliationQueueSinks[modelName]?.cancel()
self.reconciliationQueueConnectionStatus[modelName] = false
if self.isInitialized {
self.eventReconciliationQueueTopic.send(.initialized)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl
}
}

// MARK: Resettable
@available(iOS 13.0, *)
extension AWSIncomingSubscriptionEventPublisher: Resettable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
self.enqueue(remoteModel)
})
case .connectionConnected:
modelReconciliationQueueSubject.send(.connected(modelName))
modelReconciliationQueueSubject.send(.connected(modelName: modelName))
}
}

Expand All @@ -164,6 +164,11 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
log.info("receivedCompletion: finished")
modelReconciliationQueueSubject.send(completion: .finished)
case .failure(let dataStoreError):
if case let .api(error, _) = dataStoreError,
case let APIError.operationError(_, _, underlyingError) = error, isUnauthorizedError(underlyingError) {
modelReconciliationQueueSubject.send(.disconnected(modelName: modelName, reason: .unauthorized))
return
}
log.error("receiveCompletion: error: \(dataStoreError)")
modelReconciliationQueueSubject.send(completion: .failure(dataStoreError))
}
Expand All @@ -173,6 +178,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue: DefaultLogger { }

// MARK: Resettable
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue: Resettable {

Expand Down Expand Up @@ -208,3 +214,26 @@ extension AWSModelReconciliationQueue: Resettable {
}

}

// MARK: Auth errors handling
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue {
private typealias ResponseType = MutationSync<AnyModel>
private func graphqlErrors(from error: GraphQLResponseError<ResponseType>?) -> [GraphQLError]? {
if case let .error(errors) = error {
return errors
}
return nil
}

private func isUnauthorizedError(_ error: Error?) -> Bool {
if let responseError = error as? GraphQLResponseError<ResponseType>,
let graphQLError = graphqlErrors(from: responseError)?.first,
let extensions = graphQLError.extensions,
case let .string(errorTypeValue) = extensions["errorType"],
case .unauthorized = AppSyncErrorType(errorTypeValue) {
return true
}
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import Amplify
import AWSPluginsCore
import Combine

enum ModelConnectionDisconnectedReason {
case unauthorized
}

enum ModelReconciliationQueueEvent {
case started
case paused
case connected(String)
case connected(modelName: String)
case disconnected(modelName: String, reason: ModelConnectionDisconnectedReason)
case mutationEvent(MutationEvent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {

apiPlugin = MockAPICategoryPlugin()

operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue"
operationQueue.maxConcurrentOperationCount = 2
operationQueue.underlyingQueue = DispatchQueue.global()
operationQueue.isSuspended = true

}
var operationQueue: OperationQueue!

Expand All @@ -50,20 +56,14 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state")
XCTFail("Should not expect any other state, received: \(event)")
}
})

operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue"
operationQueue.maxConcurrentOperationCount = 2
operationQueue.underlyingQueue = DispatchQueue.global()
operationQueue.isSuspended = true

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
queue.modelReconciliationQueueSubject.send(.connected(queueName))
queue.modelReconciliationQueueSubject.send(.connected(modelName: queueName))
}
operationQueue.addOperation(cancellableOperation)
}
Expand All @@ -73,4 +73,79 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {
// Take action on the sink to prevent compiler warnings about unused variables.
sink.cancel()
}

func testSubscriptionFailedWithSingleModelUnauthorizedError() {
let expectInitialized = expectation(description: "eventQueue expected to send out initialized state")
let modelReconciliationQueueFactory
= MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:)
let eventQueue = AWSIncomingEventReconciliationQueue(
modelTypes: [Post.self],
api: apiPlugin,
storageAdapter: storageAdapter,
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
eventQueue.start()

let sink = eventQueue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting this to call")
}, receiveValue: { event in
switch event {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state, received: \(event)")
}
})

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
queue.modelReconciliationQueueSubject.send(.disconnected(modelName: queueName, reason: .unauthorized))
}
operationQueue.addOperation(cancellableOperation)
}
operationQueue.isSuspended = false
waitForExpectations(timeout: 2)

sink.cancel()
}

// This test case tests that initialized event is received even if only one
// model subscriptions out of two failed - Post subscription will fail but Comment will succeed
func testSubscriptionFailedWithMultipleModels() {
let expectInitialized = expectation(description: "eventQueue expected to send out initialized state")
let modelReconciliationQueueFactory
= MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:)
let eventQueue = AWSIncomingEventReconciliationQueue(
modelTypes: [Post.self, Comment.self],
api: apiPlugin,
storageAdapter: storageAdapter,
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
eventQueue.start()

let sink = eventQueue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting this to call")
}, receiveValue: { event in
switch event {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state, received: \(event)")
}
})

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
let event: ModelReconciliationQueueEvent = queueName == Post.modelName ?
.disconnected(modelName: queueName, reason: .unauthorized) :
.connected(modelName: queueName)
queue.modelReconciliationQueueSubject.send(event)
}
operationQueue.addOperation(cancellableOperation)
}
operationQueue.isSuspended = false
waitForExpectations(timeout: 2)

sink.cancel()
}
}

0 comments on commit e264b67

Please sign in to comment.