Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction Race Corruption #201

Merged
merged 2 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ extension DiskPersistence {
}

if let parent {
/// If the transaction is read-only, stop here without applying anything to the parent.
guard !options.contains(.readOnly) else {
assert(entryMutations.isEmpty, "Entries were mutated in a read-only transaction!")
assert(createdRootObjects.isEmpty, "Root objects were created in a read-only transaction!")
assert(createdIndexes.isEmpty, "Indexes were created in a read-only transaction!")
assert(createdPages.isEmpty, "Pages were created in a read-only transaction!")
assert(deletedRootObjects.isEmpty, "Root objects were deleted in a read-only transaction!")
assert(deletedIndexes.isEmpty, "Indexes were deleted in a read-only transaction!")
assert(deletedPages.isEmpty, "Pages were deleted in a read-only transaction!")
return
}
try await parent.apply(
rootObjects: rootObjects,
entryMutations: entryMutations,
Expand Down Expand Up @@ -219,6 +230,7 @@ extension DiskPersistence {
options: UnsafeTransactionOptions,
handler: @escaping (_ transaction: Transaction, _ isDurable: Bool) async throws -> T
) async -> (Transaction, Task<T, Error>) {
assert(!self.options.contains(.readOnly) || options.contains(.readOnly), "A child transaction was declared read-write, even though its parent was read-only!")
let transaction = Transaction(
persistence: persistence,
parent: self,
Expand Down Expand Up @@ -323,6 +335,7 @@ extension DiskPersistence.Transaction: DatastoreInterfaceProtocol {
}

func apply(descriptor: DatastoreDescriptor, for datastoreKey: DatastoreKey) async throws {
assert(!options.contains(.readOnly), "apply(descriptor:for:) called on a read-only transaction!")
try checkIsActive()

if let existingRootObject = try await rootObject(for: datastoreKey) {
Expand Down Expand Up @@ -868,6 +881,7 @@ extension DiskPersistence.Transaction {
existingIndex: DiskPersistence.Datastore.Index?,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "persist(entry:...) called on a read-only transaction!")
guard let existingIndex
else { throw DatastoreInterfaceError.indexNotFound }

Expand Down Expand Up @@ -937,6 +951,7 @@ extension DiskPersistence.Transaction {
cursor: some InsertionCursorProtocol,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "persistPrimaryIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -963,6 +978,7 @@ extension DiskPersistence.Transaction {
existingIndex: DiskPersistence.Datastore.Index?,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "delete(...) called on a read-only transaction!")
guard let existingIndex
else { throw DatastoreInterfaceError.indexNotFound }

Expand Down Expand Up @@ -1026,6 +1042,7 @@ extension DiskPersistence.Transaction {
cursor: some InstanceCursorProtocol,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "deletePrimaryIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -1042,6 +1059,7 @@ extension DiskPersistence.Transaction {
func resetPrimaryIndex(
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "resetPrimaryIndex(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand Down Expand Up @@ -1103,6 +1121,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "persistDirectIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -1129,6 +1148,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "deleteDirectIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -1146,6 +1166,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "deleteDirectIndex(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand Down Expand Up @@ -1185,6 +1206,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "persistSecondaryIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -1209,6 +1231,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "deleteSecondaryIndexEntry(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand All @@ -1226,6 +1249,7 @@ extension DiskPersistence.Transaction {
indexName: IndexName,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "deleteSecondaryIndex(...) called on a read-only transaction!")
try checkIsActive()

guard let existingRootObject = try await rootObject(for: datastoreKey)
Expand Down Expand Up @@ -1298,6 +1322,7 @@ extension DiskPersistence.Transaction {
event: ObservedEvent<IdentifierType, ObservationEntry>,
datastoreKey: DatastoreKey
) async throws {
assert(!options.contains(.readOnly), "emit(event:...) called on a read-only transaction!")
try checkIsActive()

guard try await hasObservers(for: datastoreKey) else { return }
Expand Down
60 changes: 60 additions & 0 deletions Tests/CodableDatastoreTests/DiskPersistenceDatastoreTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,64 @@ final class DiskPersistenceDatastoreTests: XCTestCase {
print("Scanning \(iteratedCount) instances: \((1000*(now - start)).rounded()/1000)s")
XCTAssertEqual(count, iteratedCount)
}

func testNestedTransactions() async throws {
struct TestFormat: DatastoreFormat {
enum Version: Int, CaseIterable {
case zero
}

struct Instance: Codable, Identifiable {
var id: Int
var value: String
}

static let defaultKey: DatastoreKey = "test"
static let currentVersion = Version.zero
}

let persistence = try DiskPersistence(readWriteURL: temporaryStoreURL)
try await persistence.createPersistenceIfNecessary()

let datastore = Datastore.JSONStore(
persistence: persistence,
format: TestFormat.self,
migrations: [
.zero: { data, decoder in
try decoder.decode(TestFormat.Instance.self, from: data)
}
]
)

let valueBank = [
"Hello, World!",
"My name is Dimitri",
"Writen using CodableDatastore",
"Swift is better than Objective-C, there, I said it",
"Twenty Three is Number One"
]

try await persistence.perform {
for id in 0..<10 {
try await datastore.persist(.init(id: id, value: valueBank.randomElement()!))
}
}

try await persistence.perform {
let allInstances = datastore.load(...)
try await datastore.persist(.init(id: 10, value: valueBank.randomElement()!))
try await persistence.perform {
/// Resolve and close out the previous child transaction, which should not corrupt the parent.
let resolvedInstances = try await allInstances.reduce(into: []) { $0.append($1) }
XCTAssertEqual(resolvedInstances.count, 10)

/// Allow corruption to occur if they will.
try await Task.sleep(for: .seconds(1))

/// Check to make sure that we are reading the last written to root object, not the one that just got applied.
let lastAddedInstance = try await datastore.load(10)
XCTAssertNotNil(lastAddedInstance)
}
}
}
}
Loading