Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): keep DataStore sync engine running even if model subscriptions fail #815

Merged
merged 4 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
private var reconciliationQueues: [String: ModelReconciliationQueue]
private var reconciliationQueueConnectionStatus: [String: Bool]
private var modelReconciliationQueueFactory: ModelReconciliationQueueFactory

private var isInitialized: Bool {
reconciliationQueueConnectionStatus.count == reconciliationQueues.count
}

init(modelTypes: [Model.Type],
api: APICategoryGraphQLBehavior,
Expand Down Expand Up @@ -107,10 +111,19 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu
switch receiveValue {
case .mutationEvent(let event):
eventReconciliationQueueTopic.send(.mutationEvent(event))
case .connected(let modelName):
case .connected(modelName: let modelName):
connectionStatusSerialQueue.async {
self.reconciliationQueueConnectionStatus[modelName] = true
if self.reconciliationQueueConnectionStatus.count == self.reconciliationQueues.count {
if self.isInitialized {
self.eventReconciliationQueueTopic.send(.initialized)
}
}
case .disconnected(modelName: let modelName, reason: .unauthorized):
connectionStatusSerialQueue.async {
self.reconciliationQueues[modelName]?.cancel()
self.modelReconciliationQueueSinks[modelName]?.cancel()
self.reconciliationQueueConnectionStatus[modelName] = false
if self.isInitialized {
self.eventReconciliationQueueTopic.send(.initialized)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl
}
}

// MARK: Resettable
@available(iOS 13.0, *)
extension AWSIncomingSubscriptionEventPublisher: Resettable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
self.enqueue(remoteModel)
})
case .connectionConnected:
modelReconciliationQueueSubject.send(.connected(modelName))
modelReconciliationQueueSubject.send(.connected(modelName: modelName))
}
}

Expand All @@ -164,6 +164,11 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
log.info("receivedCompletion: finished")
modelReconciliationQueueSubject.send(completion: .finished)
case .failure(let dataStoreError):
if case let .api(error, _) = dataStoreError,
case let APIError.operationError(_, _, underlyingError) = error, isUnauthorizedError(underlyingError) {
modelReconciliationQueueSubject.send(.disconnected(modelName: modelName, reason: .unauthorized))
return
}
log.error("receiveCompletion: error: \(dataStoreError)")
modelReconciliationQueueSubject.send(completion: .failure(dataStoreError))
}
Expand All @@ -173,6 +178,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue {
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue: DefaultLogger { }

// MARK: Resettable
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue: Resettable {

Expand Down Expand Up @@ -208,3 +214,26 @@ extension AWSModelReconciliationQueue: Resettable {
}

}

// MARK: Auth errors handling
@available(iOS 13.0, *)
extension AWSModelReconciliationQueue {
private typealias ResponseType = MutationSync<AnyModel>
private func graphqlErrors(from error: GraphQLResponseError<ResponseType>?) -> [GraphQLError]? {
if case let .error(errors) = error {
return errors
}
return nil
}

private func isUnauthorizedError(_ error: Error?) -> Bool {
if let responseError = error as? GraphQLResponseError<ResponseType>,
let graphQLError = graphqlErrors(from: responseError)?.first,
let extensions = graphQLError.extensions,
case let .string(errorTypeValue) = extensions["errorType"],
case .unauthorized = AppSyncErrorType(errorTypeValue) {
return true
}
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import Amplify
import AWSPluginsCore
import Combine

enum ModelConnectionDisconnectedReason {
case unauthorized
}

enum ModelReconciliationQueueEvent {
case started
case paused
case connected(String)
case connected(modelName: String)
case disconnected(modelName: String, reason: ModelConnectionDisconnectedReason)
case mutationEvent(MutationEvent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {

apiPlugin = MockAPICategoryPlugin()

operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue"
operationQueue.maxConcurrentOperationCount = 2
operationQueue.underlyingQueue = DispatchQueue.global()
operationQueue.isSuspended = true

}
var operationQueue: OperationQueue!

Expand All @@ -50,20 +56,14 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state")
XCTFail("Should not expect any other state, received: \(event)")
}
})

operationQueue = OperationQueue()
operationQueue.name = "com.amazonaws.DataStore.UnitTestQueue"
operationQueue.maxConcurrentOperationCount = 2
operationQueue.underlyingQueue = DispatchQueue.global()
operationQueue.isSuspended = true

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
queue.modelReconciliationQueueSubject.send(.connected(queueName))
queue.modelReconciliationQueueSubject.send(.connected(modelName: queueName))
}
operationQueue.addOperation(cancellableOperation)
}
Expand All @@ -73,4 +73,79 @@ class AWSIncomingEventReconciliationQueueTests: XCTestCase {
// Take action on the sink to prevent compiler warnings about unused variables.
sink.cancel()
}

func testSubscriptionFailedWithSingleModelUnauthorizedError() {
let expectInitialized = expectation(description: "eventQueue expected to send out initialized state")
let modelReconciliationQueueFactory
= MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:)
let eventQueue = AWSIncomingEventReconciliationQueue(
modelTypes: [Post.self],
api: apiPlugin,
storageAdapter: storageAdapter,
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
eventQueue.start()

let sink = eventQueue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting this to call")
}, receiveValue: { event in
switch event {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state, received: \(event)")
}
})

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
queue.modelReconciliationQueueSubject.send(.disconnected(modelName: queueName, reason: .unauthorized))
}
operationQueue.addOperation(cancellableOperation)
}
operationQueue.isSuspended = false
waitForExpectations(timeout: 2)

sink.cancel()
}

// This test case tests that initialized event is received even if only one
// model subscriptions out of two failed - Post subscription will fail but Comment will succeed
func testSubscriptionFailedWithMultipleModels() {
let expectInitialized = expectation(description: "eventQueue expected to send out initialized state")
let modelReconciliationQueueFactory
= MockModelReconciliationQueue.init(modelType:storageAdapter:api:auth:incomingSubscriptionEvents:)
let eventQueue = AWSIncomingEventReconciliationQueue(
modelTypes: [Post.self, Comment.self],
api: apiPlugin,
storageAdapter: storageAdapter,
modelReconciliationQueueFactory: modelReconciliationQueueFactory)
eventQueue.start()

let sink = eventQueue.publisher.sink(receiveCompletion: { _ in
XCTFail("Not expecting this to call")
}, receiveValue: { event in
switch event {
case .initialized:
expectInitialized.fulfill()
default:
XCTFail("Should not expect any other state, received: \(event)")
}
})

let reconciliationQueues = MockModelReconciliationQueue.mockModelReconciliationQueues
for (queueName, queue) in reconciliationQueues {
let cancellableOperation = CancelAwareBlockOperation {
let event: ModelReconciliationQueueEvent = queueName == Post.modelName ?
.disconnected(modelName: queueName, reason: .unauthorized) :
.connected(modelName: queueName)
queue.modelReconciliationQueueSubject.send(event)
}
operationQueue.addOperation(cancellableOperation)
}
operationQueue.isSuspended = false
waitForExpectations(timeout: 2)

sink.cancel()
}
}