Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task/rfr 1188 handle upload conflict #179

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Cyface Data Collector.
*
Expand All @@ -18,18 +18,21 @@
*/
package de.cyface.collector.handler

import de.cyface.collector.handler.HTTPStatus.HTTP_CONFLICT
import de.cyface.collector.handler.HTTPStatus.NOT_FOUND
import de.cyface.collector.handler.HTTPStatus.SERVER_ERROR
import de.cyface.collector.handler.exception.UnexpectedContentRange
import de.cyface.collector.storage.exception.UploadAlreadyExists
import io.vertx.core.Handler
import io.vertx.ext.web.RoutingContext
import org.slf4j.LoggerFactory

/**
* A handler to process exception occuring during the reception of new measurements.
* A handler to process exception occurring during the reception of new measurements.
*
* @author Klemens Muthmann
* @version 1.0.0
* @author Armin Schnabel
* @version 1.1.0
* @property ctx The failed routing context, containing the error information.
*/
class UploadFailureHandler(private val ctx: RoutingContext) : Handler<Throwable> {
Expand All @@ -40,6 +43,11 @@ class UploadFailureHandler(private val ctx: RoutingContext) : Handler<Throwable>
ctx.response().setStatusCode(NOT_FOUND).end()
}

is UploadAlreadyExists -> {
// Android client interprets this error code as "UPLOAD_SUCCESSFUL" and continues with the next upload.
ctx.response().setStatusCode(HTTP_CONFLICT)
}

else -> {
LOGGER.debug(event.localizedMessage)
ctx.fail(SERVER_ERROR, event)
Expand Down
11 changes: 10 additions & 1 deletion src/main/kotlin/de/cyface/collector/storage/cloud/Database.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Serialization.
*
Expand All @@ -19,12 +19,14 @@
package de.cyface.collector.storage.cloud

import de.cyface.collector.storage.UploadMetaData
import io.vertx.core.CompositeFuture
import io.vertx.core.Future

/**
* The interface to a database storing the metadata of some uploaded data.
*
* @author Klemens Muthmann
* @author Armin Schnabel
*/
interface Database {
/**
Expand All @@ -47,4 +49,11 @@ interface Database {
* @return A [Future] that is called upon successful or failed completion of this operation.
*/
fun exists(deviceIdentifier: String, measurementIdentifier: Long, attachmentId: Long): Future<Boolean>

/**
* Creates indices required by this application asynchronously, if they do not yet exist.
*
* @return A [Future] that is notified of the success or failure of this application, upon completion.
*/
fun createIndices(): Future<CompositeFuture>
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Cyface Data Collector.
*
Expand All @@ -20,12 +20,14 @@

package de.cyface.collector.storage.cloud

import com.mongodb.MongoWriteException
import de.cyface.collector.storage.CleanupOperation
import de.cyface.collector.storage.DataStorageService
import de.cyface.collector.storage.Status
import de.cyface.collector.storage.StatusType
import de.cyface.collector.storage.UploadMetaData
import de.cyface.collector.storage.exception.ContentRangeNotMatchingFileSize
import de.cyface.collector.storage.exception.UploadAlreadyExists
import io.vertx.core.Future
import io.vertx.core.Promise
import io.vertx.core.Vertx
Expand All @@ -47,6 +49,7 @@ import java.util.concurrent.Callable
* [here](https://cloud.google.com/docs/authentication/application-default-credentials).
*
* @author Klemens Muthmann
* @author Armin Schnabel
* @property dao The data access object to write an uploads' metadata.
* @property vertx A Vertx instance of the current Vertx environment.
* @property cloudStorageFactory A factory to create a [GoogleCloudStorage] instance on demand.
Expand Down Expand Up @@ -154,8 +157,32 @@ class GoogleCloudStorageService(
.onSuccess {
resultPromise.complete(Status(uploadIdentifier, StatusType.COMPLETE, bytesUploaded))
}
.onFailure {
resultPromise.fail(it)
.onFailure { daoFailure ->
// This probably happens when a file is uploaded, but while it's transferred to Google Cloud the
// client connection is interrupted, so the client is allowed to re-upload the file while the
// `google` collection document was not yet written. [RFR-1188]
// When the second upload writes the file to Google Cloud and tries to create the `google`
// collection document on success, this fails here due to the unique index on the `google`
// collection, but the Google Cloud file still exists. Thus, we clean this file here:
clean(uploadIdentifier)
.onSuccess {
if (daoFailure is MongoWriteException && daoFailure.code == DUPLICATE_KEY) {
resultPromise.fail(UploadAlreadyExists(daoFailure))
} else {
resultPromise.fail(daoFailure)
}
}
.onFailure {
logger.error(
String.format(
Locale.getDefault(),
"Failed to clean %s after dao failure: %s",
uploadIdentifier,
it.message
)
)
resultPromise.fail(it)
}
}
} else {
resultPromise.complete(Status(uploadIdentifier, StatusType.INCOMPLETE, bytesUploaded))
Expand Down Expand Up @@ -210,6 +237,13 @@ class GoogleCloudStorageService(
*/
return dao.exists(deviceId, measurementId, attachmentId)
}

companion object {
/**
* Mongo Database error code for duplicate key.
*/
private const val DUPLICATE_KEY = 11000
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Serialization.
*
Expand Down Expand Up @@ -39,7 +39,7 @@ import io.vertx.core.Vertx
* (https://github.com/googleapis/google-auth-library-java/blob/040acefec507f419f6e4ec4eab9645a6e3888a15/samples/snippets/src/main/java/AuthenticateExplicit.java).
* @property projectIdentifier The Google Cloud project identifier used by the service created from this builder.
* @property bucketName The name of the Google Cloud Storage bucket used by the created builder to store data.
* @property dao A data access object to store an uploads metadata.
* @property dao A data access object for storing uploads' metadata.
* @property vertx The Vertx instance this application runs on.
* @property bufferSize The size of the internal data buffer in bytes.
* This is the amount of bytes the system accumulates before sending data to Google.
Expand All @@ -58,8 +58,12 @@ class GoogleCloudStorageServiceBuilder(
override fun create(): Future<DataStorageService> {
val ret = Promise.promise<DataStorageService>()
vertx.runOnContext {
val cloudStorageFactory = GoogleCloudStorageFactory(credentials, projectIdentifier, bucketName)
ret.complete(GoogleCloudStorageService(dao, vertx, cloudStorageFactory, bufferSize))
dao.createIndices().onSuccess {
val cloudStorageFactory = GoogleCloudStorageFactory(credentials, projectIdentifier, bucketName)
ret.complete(GoogleCloudStorageService(dao, vertx, cloudStorageFactory, bufferSize))
}.onFailure {
ret.fail(it)
}
}
return ret.future()
}
Expand Down
38 changes: 35 additions & 3 deletions src/main/kotlin/de/cyface/collector/storage/cloud/MongoDatabase.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2024 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Serialization.
*
Expand All @@ -21,9 +21,11 @@ package de.cyface.collector.storage.cloud
import de.cyface.collector.model.FormAttributes
import de.cyface.collector.storage.UploadMetaData
import de.cyface.collector.storage.exception.DuplicatesInDatabase
import io.vertx.core.CompositeFuture
import io.vertx.core.Future
import io.vertx.core.Promise
import io.vertx.core.json.JsonObject
import io.vertx.ext.mongo.IndexOptions
import io.vertx.ext.mongo.MongoClient
import org.slf4j.LoggerFactory
import java.time.Instant
Expand Down Expand Up @@ -84,7 +86,7 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection
val queryCall = mongoClient.find(collectionName, query)
queryCall.onSuccess { ids ->
try {
if (ids.size > 1) {
if (ids.size > 1) { // Should not be possible anymore with collection index
logger.error(
"More than one measurement found for did {} mid {}",
deviceIdentifier,
Expand Down Expand Up @@ -129,7 +131,7 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection
val queryCall = mongoClient.find(collectionName, query)
queryCall.onSuccess { ids ->
try {
if (ids.size > 1) {
if (ids.size > 1) { // Should not be possible anymore with collection index
logger.error(
"More than one attachment found for did {} mid {} aid {}",
deviceIdentifier,
Expand Down Expand Up @@ -162,6 +164,36 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection
return ret.future()
}

override fun createIndices(): Future<CompositeFuture> {
val ret = Promise.promise<CompositeFuture>()

// Create indices
val unique = IndexOptions().unique(true)
val measurementIndex = JsonObject().put("properties.deviceId", 1).put("properties.measurementId", 1)
val measurementFilter = JsonObject().put("properties.attachmentId", JsonObject().put("\$exists", false))
val measurementIndexCreation = mongoClient.createIndexWithOptions(
collectionName,
measurementIndex,
unique.partialFilterExpression(measurementFilter)
)
val attachmentIndex = JsonObject()
.put("properties.deviceId", 1)
.put("properties.measurementId", 1)
.put("properties.attachmentId", 1)
val attachmentFilter = JsonObject().put("properties.attachmentId", JsonObject().put("\$exists", true))
val attachmentIndexCreation = mongoClient.createIndexWithOptions(
collectionName,
attachmentIndex,
unique.partialFilterExpression(attachmentFilter)
)

Future.all(measurementIndexCreation, attachmentIndexCreation).onComplete {
ret.complete(it.result())
}

return ret.future()
}

companion object {
/**
* The field name for the database entry which contains the user id.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Cyface GmbH
* Copyright 2022-2025 Cyface GmbH
*
* This file is part of the Cyface Data Collector.
*
Expand All @@ -21,10 +21,10 @@ package de.cyface.collector.storage.exception
import java.lang.Exception

/**
* An `Exception` thrown when a measurement to upload already exists in the data store.
* An `Exception` thrown when a file to upload already more than once in the data store.
*
* @author Klemens Muthmann
* @version 1.0.0
* @version 1.0.1
* @param message A user readable message to display as part of the stack trace.
*/
class DuplicatesInDatabase(message: String) : Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2025 Cyface GmbH
*
* This file is part of the Cyface Data Collector.
*
* The Cyface Data Collector is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The Cyface Data Collector is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with the Cyface Data Collector. If not, see <http://www.gnu.org/licenses/>.
*/
package de.cyface.collector.storage.exception

/**
* An `Exception` thrown when a file to upload already exists in the data store.
*
* @author Armin Schnabel
* @version 1.0.0
* @param e The cause leading to this exception.
*/
class UploadAlreadyExists(e: Throwable) : Exception(e)
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class CollectorApiVerticle(
override fun start(startPromise: Promise<Void>) {
logger.info("Starting collector API!")

// Start http server

val mongoClient = MongoClient.createShared(
vertx,
mongoDatabaseConfiguration,
Expand Down