diff --git a/src/main/kotlin/de/cyface/collector/handler/UploadFailureHandler.kt b/src/main/kotlin/de/cyface/collector/handler/UploadFailureHandler.kt index a4e58292..ac4ecc0a 100644 --- a/src/main/kotlin/de/cyface/collector/handler/UploadFailureHandler.kt +++ b/src/main/kotlin/de/cyface/collector/handler/UploadFailureHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Cyface Data Collector. * @@ -18,18 +18,21 @@ */ package de.cyface.collector.handler +import de.cyface.collector.handler.HTTPStatus.HTTP_CONFLICT import de.cyface.collector.handler.HTTPStatus.NOT_FOUND import de.cyface.collector.handler.HTTPStatus.SERVER_ERROR import de.cyface.collector.handler.exception.UnexpectedContentRange +import de.cyface.collector.storage.exception.UploadAlreadyExists import io.vertx.core.Handler import io.vertx.ext.web.RoutingContext import org.slf4j.LoggerFactory /** - * A handler to process exception occuring during the reception of new measurements. + * A handler to process exception occurring during the reception of new measurements. * * @author Klemens Muthmann - * @version 1.0.0 + * @author Armin Schnabel + * @version 1.1.0 * @property ctx The failed routing context, containing the error information. */ class UploadFailureHandler(private val ctx: RoutingContext) : Handler { @@ -40,6 +43,11 @@ class UploadFailureHandler(private val ctx: RoutingContext) : Handler ctx.response().setStatusCode(NOT_FOUND).end() } + is UploadAlreadyExists -> { + // Android client interprets this error code as "UPLOAD_SUCCESSFUL" and continues with the next upload. + ctx.response().setStatusCode(HTTP_CONFLICT) + } + else -> { LOGGER.debug(event.localizedMessage) ctx.fail(SERVER_ERROR, event) diff --git a/src/main/kotlin/de/cyface/collector/storage/cloud/Database.kt b/src/main/kotlin/de/cyface/collector/storage/cloud/Database.kt index 42347a97..c71598fb 100644 --- a/src/main/kotlin/de/cyface/collector/storage/cloud/Database.kt +++ b/src/main/kotlin/de/cyface/collector/storage/cloud/Database.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Serialization. * @@ -19,12 +19,14 @@ package de.cyface.collector.storage.cloud import de.cyface.collector.storage.UploadMetaData +import io.vertx.core.CompositeFuture import io.vertx.core.Future /** * The interface to a database storing the metadata of some uploaded data. * * @author Klemens Muthmann + * @author Armin Schnabel */ interface Database { /** @@ -47,4 +49,11 @@ interface Database { * @return A [Future] that is called upon successful or failed completion of this operation. */ fun exists(deviceIdentifier: String, measurementIdentifier: Long, attachmentId: Long): Future + + /** + * Creates indices required by this application asynchronously, if they do not yet exist. + * + * @return A [Future] that is notified of the success or failure of this application, upon completion. + */ + fun createIndices(): Future } diff --git a/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageService.kt b/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageService.kt index ad956e93..1f46ab31 100644 --- a/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageService.kt +++ b/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageService.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Cyface Data Collector. * @@ -20,12 +20,14 @@ package de.cyface.collector.storage.cloud +import com.mongodb.MongoWriteException import de.cyface.collector.storage.CleanupOperation import de.cyface.collector.storage.DataStorageService import de.cyface.collector.storage.Status import de.cyface.collector.storage.StatusType import de.cyface.collector.storage.UploadMetaData import de.cyface.collector.storage.exception.ContentRangeNotMatchingFileSize +import de.cyface.collector.storage.exception.UploadAlreadyExists import io.vertx.core.Future import io.vertx.core.Promise import io.vertx.core.Vertx @@ -47,6 +49,7 @@ import java.util.concurrent.Callable * [here](https://cloud.google.com/docs/authentication/application-default-credentials). * * @author Klemens Muthmann + * @author Armin Schnabel * @property dao The data access object to write an uploads' metadata. * @property vertx A Vertx instance of the current Vertx environment. * @property cloudStorageFactory A factory to create a [GoogleCloudStorage] instance on demand. @@ -154,8 +157,32 @@ class GoogleCloudStorageService( .onSuccess { resultPromise.complete(Status(uploadIdentifier, StatusType.COMPLETE, bytesUploaded)) } - .onFailure { - resultPromise.fail(it) + .onFailure { daoFailure -> + // This probably happens when a file is uploaded, but while it's transferred to Google Cloud the + // client connection is interrupted, so the client is allowed to re-upload the file while the + // `google` collection document was not yet written. [RFR-1188] + // When the second upload writes the file to Google Cloud and tries to create the `google` + // collection document on success, this fails here due to the unique index on the `google` + // collection, but the Google Cloud file still exists. Thus, we clean this file here: + clean(uploadIdentifier) + .onSuccess { + if (daoFailure is MongoWriteException && daoFailure.code == DUPLICATE_KEY) { + resultPromise.fail(UploadAlreadyExists(daoFailure)) + } else { + resultPromise.fail(daoFailure) + } + } + .onFailure { + logger.error( + String.format( + Locale.getDefault(), + "Failed to clean %s after dao failure: %s", + uploadIdentifier, + it.message + ) + ) + resultPromise.fail(it) + } } } else { resultPromise.complete(Status(uploadIdentifier, StatusType.INCOMPLETE, bytesUploaded)) @@ -210,6 +237,13 @@ class GoogleCloudStorageService( */ return dao.exists(deviceId, measurementId, attachmentId) } + + companion object { + /** + * Mongo Database error code for duplicate key. + */ + private const val DUPLICATE_KEY = 11000 + } } /** diff --git a/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageServiceBuilder.kt b/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageServiceBuilder.kt index 90520e27..6322aa04 100644 --- a/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageServiceBuilder.kt +++ b/src/main/kotlin/de/cyface/collector/storage/cloud/GoogleCloudStorageServiceBuilder.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Serialization. * @@ -39,7 +39,7 @@ import io.vertx.core.Vertx * (https://github.com/googleapis/google-auth-library-java/blob/040acefec507f419f6e4ec4eab9645a6e3888a15/samples/snippets/src/main/java/AuthenticateExplicit.java). * @property projectIdentifier The Google Cloud project identifier used by the service created from this builder. * @property bucketName The name of the Google Cloud Storage bucket used by the created builder to store data. - * @property dao A data access object to store an uploads metadata. + * @property dao A data access object for storing uploads' metadata. * @property vertx The Vertx instance this application runs on. * @property bufferSize The size of the internal data buffer in bytes. * This is the amount of bytes the system accumulates before sending data to Google. @@ -58,8 +58,12 @@ class GoogleCloudStorageServiceBuilder( override fun create(): Future { val ret = Promise.promise() vertx.runOnContext { - val cloudStorageFactory = GoogleCloudStorageFactory(credentials, projectIdentifier, bucketName) - ret.complete(GoogleCloudStorageService(dao, vertx, cloudStorageFactory, bufferSize)) + dao.createIndices().onSuccess { + val cloudStorageFactory = GoogleCloudStorageFactory(credentials, projectIdentifier, bucketName) + ret.complete(GoogleCloudStorageService(dao, vertx, cloudStorageFactory, bufferSize)) + }.onFailure { + ret.fail(it) + } } return ret.future() } diff --git a/src/main/kotlin/de/cyface/collector/storage/cloud/MongoDatabase.kt b/src/main/kotlin/de/cyface/collector/storage/cloud/MongoDatabase.kt index 645933eb..170cc1be 100644 --- a/src/main/kotlin/de/cyface/collector/storage/cloud/MongoDatabase.kt +++ b/src/main/kotlin/de/cyface/collector/storage/cloud/MongoDatabase.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Serialization. * @@ -21,9 +21,11 @@ package de.cyface.collector.storage.cloud import de.cyface.collector.model.FormAttributes import de.cyface.collector.storage.UploadMetaData import de.cyface.collector.storage.exception.DuplicatesInDatabase +import io.vertx.core.CompositeFuture import io.vertx.core.Future import io.vertx.core.Promise import io.vertx.core.json.JsonObject +import io.vertx.ext.mongo.IndexOptions import io.vertx.ext.mongo.MongoClient import org.slf4j.LoggerFactory import java.time.Instant @@ -84,7 +86,7 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection val queryCall = mongoClient.find(collectionName, query) queryCall.onSuccess { ids -> try { - if (ids.size > 1) { + if (ids.size > 1) { // Should not be possible anymore with collection index logger.error( "More than one measurement found for did {} mid {}", deviceIdentifier, @@ -129,7 +131,7 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection val queryCall = mongoClient.find(collectionName, query) queryCall.onSuccess { ids -> try { - if (ids.size > 1) { + if (ids.size > 1) { // Should not be possible anymore with collection index logger.error( "More than one attachment found for did {} mid {} aid {}", deviceIdentifier, @@ -162,6 +164,36 @@ class MongoDatabase(private val mongoClient: MongoClient, private val collection return ret.future() } + override fun createIndices(): Future { + val ret = Promise.promise() + + // Create indices + val unique = IndexOptions().unique(true) + val measurementIndex = JsonObject().put("properties.deviceId", 1).put("properties.measurementId", 1) + val measurementFilter = JsonObject().put("properties.attachmentId", JsonObject().put("\$exists", false)) + val measurementIndexCreation = mongoClient.createIndexWithOptions( + collectionName, + measurementIndex, + unique.partialFilterExpression(measurementFilter) + ) + val attachmentIndex = JsonObject() + .put("properties.deviceId", 1) + .put("properties.measurementId", 1) + .put("properties.attachmentId", 1) + val attachmentFilter = JsonObject().put("properties.attachmentId", JsonObject().put("\$exists", true)) + val attachmentIndexCreation = mongoClient.createIndexWithOptions( + collectionName, + attachmentIndex, + unique.partialFilterExpression(attachmentFilter) + ) + + Future.all(measurementIndexCreation, attachmentIndexCreation).onComplete { + ret.complete(it.result()) + } + + return ret.future() + } + companion object { /** * The field name for the database entry which contains the user id. diff --git a/src/main/kotlin/de/cyface/collector/storage/exception/DuplicatesInDatabase.kt b/src/main/kotlin/de/cyface/collector/storage/exception/DuplicatesInDatabase.kt index 861ea3aa..9140c78a 100644 --- a/src/main/kotlin/de/cyface/collector/storage/exception/DuplicatesInDatabase.kt +++ b/src/main/kotlin/de/cyface/collector/storage/exception/DuplicatesInDatabase.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022 Cyface GmbH + * Copyright 2022-2025 Cyface GmbH * * This file is part of the Cyface Data Collector. * @@ -21,10 +21,10 @@ package de.cyface.collector.storage.exception import java.lang.Exception /** - * An `Exception` thrown when a measurement to upload already exists in the data store. + * An `Exception` thrown when a file to upload already more than once in the data store. * * @author Klemens Muthmann - * @version 1.0.0 + * @version 1.0.1 * @param message A user readable message to display as part of the stack trace. */ class DuplicatesInDatabase(message: String) : Exception(message) diff --git a/src/main/kotlin/de/cyface/collector/storage/exception/UploadAlreadyExists.kt b/src/main/kotlin/de/cyface/collector/storage/exception/UploadAlreadyExists.kt new file mode 100644 index 00000000..94fc6e78 --- /dev/null +++ b/src/main/kotlin/de/cyface/collector/storage/exception/UploadAlreadyExists.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2025 Cyface GmbH + * + * This file is part of the Cyface Data Collector. + * + * The Cyface Data Collector is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The Cyface Data Collector is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the Cyface Data Collector. If not, see . + */ +package de.cyface.collector.storage.exception + +/** + * An `Exception` thrown when a file to upload already exists in the data store. + * + * @author Armin Schnabel + * @version 1.0.0 + * @param e The cause leading to this exception. + */ +class UploadAlreadyExists(e: Throwable) : Exception(e) diff --git a/src/main/kotlin/de/cyface/collector/verticle/CollectorApiVerticle.kt b/src/main/kotlin/de/cyface/collector/verticle/CollectorApiVerticle.kt index e01256d1..c7b68b8a 100644 --- a/src/main/kotlin/de/cyface/collector/verticle/CollectorApiVerticle.kt +++ b/src/main/kotlin/de/cyface/collector/verticle/CollectorApiVerticle.kt @@ -71,8 +71,6 @@ class CollectorApiVerticle( override fun start(startPromise: Promise) { logger.info("Starting collector API!") - // Start http server - val mongoClient = MongoClient.createShared( vertx, mongoDatabaseConfiguration,