Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved handling of Globus uploads (experimental async framework) #10781

Merged
merged 36 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
65ec69f
a quick experimental AddReplaceFileHelper implementation of adding Gl…
landreev Aug 5, 2024
0495160
no need to try to calculate checksums if this globus storage isn't da…
landreev Aug 6, 2024
ba66138
more globus mods (work in progress). #10623
landreev Aug 12, 2024
dac5302
new class files that weren't included in the last commit #10623
landreev Aug 12, 2024
4080341
fixing some bad changes that got committed earlier #10623
landreev Aug 12, 2024
e086a60
cleanup #10623
landreev Aug 13, 2024
35ce7ef
more testing/debugging #10623
landreev Aug 14, 2024
d4b9bac
this is a working, but still work-in-progress state of things - needs…
landreev Aug 17, 2024
9c62b81
refined logging #10623
landreev Aug 19, 2024
8cdff8d
Added notifications for various failure cases. #10623
landreev Aug 19, 2024
531e25c
Config guide entry. #10623
landreev Aug 20, 2024
007d715
Added a few more doc notes. #10623
landreev Aug 20, 2024
6fcb285
typo #10623
landreev Aug 20, 2024
f6882df
cut-and-paste error #10623
landreev Aug 20, 2024
4ae3ee6
(#10623)
landreev Aug 20, 2024
9cf4e1b
some minor cleanup changes #10623
landreev Aug 21, 2024
45fb938
cosmetic #10623
landreev Aug 21, 2024
b3f79fe
extra L in SUCCESSFUL (#10623)
landreev Aug 21, 2024
1acae68
better Globus service availability checks #10623
landreev Aug 21, 2024
5ba2888
better Globus service availability checks #10623
landreev Aug 21, 2024
2512eab
removed an unnecessary @todo (#10623)
landreev Aug 21, 2024
6b06d94
cosmetic #10623
landreev Aug 21, 2024
6d06927
more changes per feedback. (saving the api token in the GlobusTask en…
landreev Aug 22, 2024
69cfe29
changed the polling interval default in the new TaskMonitoringService…
landreev Aug 23, 2024
d223a8f
more changes/refinements per review feedback (#10623)
landreev Aug 23, 2024
0ca5e62
added an upfront locks check to the /addGlobusFiles api #10623
landreev Aug 23, 2024
23d0f6c
added an upfront locks check to the /addGlobusFiles api #10623
landreev Aug 23, 2024
9d8bf0e
Merge branch 'develop' into 10623-globus-improvements
landreev Sep 4, 2024
5dc386f
Merge branch 'develop' into 10623-globus-improvements
landreev Sep 10, 2024
7b6f81e
remove tabs to make reviewdog happy. woof! #10623
pdurbin Sep 23, 2024
2baf62e
globus doc tweaks #10623
pdurbin Sep 23, 2024
5d8c760
Merge branch 'develop' into 10623-globus-improvements #10623
pdurbin Sep 23, 2024
6c5194f
Merge branch 'develop' into 10623-globus-improvements
pdurbin Sep 23, 2024
d610094
Updated the docs to reflect the new name of a JVM option (#10623)
landreev Sep 25, 2024
bf91ce2
Merge branch '10623-globus-improvements' of https://github.com/IQSS/d…
landreev Sep 25, 2024
682c89f
improve release note #10623
pdurbin Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/release-notes/10623-globus-improvements.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A new alternative implementation of Globus polling during upload data transfers has been added in this release. This experimental framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. See the `globus-use-experimental-async-framework` feature flag in the Configuration guide.
2 changes: 2 additions & 0 deletions doc/sphinx-guides/source/developers/big-data-support.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,5 @@ As described in that document, Globus transfers can be initiated by choosing the
An overview of the control and data transfer interactions between components was presented at the 2022 Dataverse Community Meeting and can be viewed in the `Integrations and Tools Session Video <https://youtu.be/3ek7F_Dxcjk?t=5289>`_ around the 1 hr 28 min mark.

See also :ref:`Globus settings <:GlobusSettings>`.

An alternative, experimental implementation of Globus polling of ongoing upload transfers has been added in v6.4. This framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. Due to its experimental nature it is not enabled by default. See the ``globus-use-experimental-async-framework`` feature flag and the JVM option ``dataverse.globus.taskMonitoringServer`` described in the Configuration guide.
2 changes: 2 additions & 0 deletions doc/sphinx-guides/source/developers/globus-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ As the transfer can take significant time and the API call is asynchronous, the

Once the transfer completes, Dataverse will remove the write permission for the principal.

An alternative, experimental implementation of Globus polling of ongoing upload transfers has been added in v6.4. This new framework does not rely on the instance staying up continuously for the duration of the transfer and saves the state information about Globus upload requests in the database. Due to its experimental nature it is not enabled by default. See the ``globus-use-experimental-async-framework`` feature flag and the JVM option ``dataverse.globus.taskMonitoringServer`` described in the Configuration guide.

Note that when using a managed endpoint that uses the Globus S3 Connector, the checksum should be correct as Dataverse can validate it. For file-based endpoints, the checksum should be included if available but Dataverse cannot verify it.

In the remote/reference case, where there is no transfer to monitor, the standard /addFiles API call (see :ref:`direct-add-to-dataset-api`) is used instead. There are no changes for the Globus case.
Expand Down
10 changes: 10 additions & 0 deletions doc/sphinx-guides/source/installation/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3259,6 +3259,13 @@ The email for your institution that you'd like to appear in bag-info.txt. See :r

Can also be set via *MicroProfile Config API* sources, e.g. the environment variable ``DATAVERSE_BAGIT_SOURCEORG_EMAIL``.

.. _dataverse.globus.taskMonitoringServer:

dataverse.globus.taskMonitoringServer
+++++++++++++++++++++++++++++++++++++

This setting is required in conjunction with the ``globus-use-experimental-async-framework`` feature flag. Setting it to true designates the Dataverse instance to serve as the dedicated polling server. It is needed so that the new framework can be used in a multi-node installation.

.. _feature-flags:

Feature Flags
Expand Down Expand Up @@ -3294,6 +3301,9 @@ please find all known feature flags below. Any of these flags can be activated u
* - disable-return-to-author-reason
- Removes the reason field in the `Publish/Return To Author` dialog that was added as a required field in v6.2 and makes the reason an optional parameter in the :ref:`return-a-dataset` API call.
- ``Off``
* - globus-use-experimental-async-framework
- Activates a new experimental implementation of Globus polling of ongoing remote data transfers that does not rely on the instance staying up continuously for the duration of the transfers and saves the state information about Globus upload requests in the database. Added in v6.4. Note that the JVM option ``dataverse.globus.taskMonitoringServer`` described above must also be enabled on one (and only one, in a multi-node installation) Dataverse instance.
- ``Off``


**Note:** Feature flags can be set via any `supported MicroProfile Config API source`_, e.g. the environment variable
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/DatasetServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,20 @@ public boolean checkDatasetLock(Long datasetId) {
List<DatasetLock> lock = lockCounter.getResultList();
return lock.size()>0;
}


public List<DatasetLock> getLocksByDatasetId(Long datasetId) {
TypedQuery<DatasetLock> locksQuery = em.createNamedQuery("DatasetLock.getLocksByDatasetId", DatasetLock.class);
locksQuery.setParameter("datasetId", datasetId);
return locksQuery.getResultList();
}

public List<DatasetLock> getDatasetLocksByUser( AuthenticatedUser user) {

return listLocks(null, user);
}

// @todo: we'll be better off getting rid of this method and using the other
// version of addDatasetLock() (that uses datasetId instead of Dataset).
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public DatasetLock addDatasetLock(Dataset dataset, DatasetLock lock) {
lock.setDataset(dataset);
Expand Down Expand Up @@ -467,6 +475,7 @@ public DatasetLock addDatasetLock(Long datasetId, DatasetLock.Reason reason, Lon
* is {@code aReason}.
* @param dataset the dataset whose locks (for {@code aReason}) will be removed.
* @param aReason The reason of the locks that will be removed.
* @todo this should probably take dataset_id, not a dataset
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void removeDatasetLocks(Dataset dataset, DatasetLock.Reason aReason) {
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/edu/harvard/iq/dataverse/EditDatafilesPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2121,8 +2121,12 @@ public void handleFileUpload(FileUploadEvent event) throws IOException {
}

/**
* Using information from the DropBox choose, ingest the chosen files
* https://www.dropbox.com/developers/dropins/chooser/js
* External, aka "Direct" Upload.
* The file(s) have been uploaded to physical storage (such as S3) directly,
* this call is to create and add the DataFiles to the Dataset on the Dataverse
* side. The method does NOT finalize saving the datafiles in the database -
* that will happen when the user clicks 'Save', similar to how the "normal"
* uploads are handled.
*
* @param event
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package edu.harvard.iq.dataverse;

import jakarta.persistence.Column;
import jakarta.persistence.Index;
import jakarta.persistence.NamedQueries;
import jakarta.persistence.NamedQuery;
import jakarta.persistence.Table;
import java.io.Serializable;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;

/**
*
* @author landreev
*
* The name of the class is provisional. I'm open to better-sounding alternatives,
* if anyone can think of any.
* But I wanted to avoid having the word "Globus" in the entity name. I'm adding
* it specifically for the Globus use case. But I'm guessing there's a chance
* this setup may come in handy for other types of datafile uploads that happen
* externally. (?)
*/
@NamedQueries({
@NamedQuery( name="ExternalFileUploadInProgress.deleteByTaskId",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [reviewdog] <com.puppycrawl.tools.checkstyle.checks.whitespace.FileTabCharacterCheck> reported by reviewdog 🐶
File contains tab characters (this is the first instance).

query="DELETE FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId"),
@NamedQuery(name = "ExternalFileUploadInProgress.findByTaskId",
query = "SELECT f FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId")})
@Entity
@Table(indexes = {@Index(columnList="taskid")})
public class ExternalFileUploadInProgress implements Serializable {

private static final long serialVersionUID = 1L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

/**
* Rather than saving various individual fields defining the datafile,
* which would essentially replicate the DataFile table, we are simply
* storing the full json record as passed to the API here.
*/
@Column(columnDefinition = "TEXT", nullable=false)
private String fileInfo;

/**
* This is Globus-specific task id associated with the upload in progress
*/
@Column(nullable=false)
private String taskId;

public ExternalFileUploadInProgress() {
}

public ExternalFileUploadInProgress(String taskId, String fileInfo) {
this.taskId = taskId;
this.fileInfo = fileInfo;
}

public String getFileInfo() {
return fileInfo;
}

public void setFileInfo(String fileInfo) {
this.fileInfo = fileInfo;
}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}

@Override
public int hashCode() {
int hash = 0;
hash += (id != null ? id.hashCode() : 0);
return hash;
}

@Override
public boolean equals(Object object) {
// TODO: Warning - this method won't work in the case the id fields are not set
if (!(object instanceof ExternalFileUploadInProgress)) {
return false;
}
ExternalFileUploadInProgress other = (ExternalFileUploadInProgress) object;
if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) {
return false;
}
return true;
}

@Override
public String toString() {
return "edu.harvard.iq.dataverse.ExternalFileUploadInProgress[ id=" + id + " ]";
}

}
27 changes: 26 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/MailServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ public String getMessageTextBasedOnNotification(UserNotification userNotificatio
comment
)) ;
return downloadCompletedMessage;

case GLOBUSUPLOADCOMPLETEDWITHERRORS:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
Expand All @@ -633,8 +634,30 @@ public String getMessageTextBasedOnNotification(UserNotification userNotificatio
comment
)) ;
return uploadCompletedWithErrorsMessage;

case GLOBUSUPLOADREMOTEFAILURE:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String uploadFailedRemotelyMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.upload.failedRemotely", Arrays.asList(
systemConfig.getDataverseSiteUrl(),
dataset.getGlobalId().asString(),
dataset.getDisplayName(),
comment
)) ;
return uploadFailedRemotelyMessage;

case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADLOCALFAILURE:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String uploadFailedLocallyMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.upload.failedLocally", Arrays.asList(
systemConfig.getDataverseSiteUrl(),
dataset.getGlobalId().asString(),
dataset.getDisplayName(),
comment
)) ;
return uploadFailedLocallyMessage;

case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
dataset = (Dataset) targetObject;
messageText = BundleUtil.getStringFromBundle("notification.email.greeting.html");
String downloadCompletedWithErrorsMessage = messageText + BundleUtil.getStringFromBundle("notification.mail.globus.download.completedWithErrors", Arrays.asList(
Expand Down Expand Up @@ -763,6 +786,8 @@ public Object getObjectOfNotification (UserNotification userNotification){
return versionService.find(userNotification.getObjectId());
case GLOBUSUPLOADCOMPLETED:
case GLOBUSUPLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADREMOTEFAILURE:
case GLOBUSUPLOADLOCALFAILURE:
case GLOBUSDOWNLOADCOMPLETED:
case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
return datasetService.find(userNotification.getObjectId());
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/UserNotification.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public enum Type {
CHECKSUMIMPORT, CHECKSUMFAIL, CONFIRMEMAIL, APIGENERATED, INGESTCOMPLETED, INGESTCOMPLETEDWITHERRORS,
PUBLISHFAILED_PIDREG, WORKFLOW_SUCCESS, WORKFLOW_FAILURE, STATUSUPDATED, DATASETCREATED, DATASETMENTIONED,
GLOBUSUPLOADCOMPLETED, GLOBUSUPLOADCOMPLETEDWITHERRORS,
GLOBUSDOWNLOADCOMPLETED, GLOBUSDOWNLOADCOMPLETEDWITHERRORS, REQUESTEDFILEACCESS;
GLOBUSDOWNLOADCOMPLETED, GLOBUSDOWNLOADCOMPLETEDWITHERRORS, REQUESTEDFILEACCESS,
GLOBUSUPLOADREMOTEFAILURE, GLOBUSUPLOADLOCALFAILURE;

public String getDescription() {
return BundleUtil.getStringFromBundle("notification.typeDescription." + this.name());
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/api/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ private ApiConstants() {
public static final String DS_VERSION_LATEST = ":latest";
public static final String DS_VERSION_DRAFT = ":draft";
public static final String DS_VERSION_LATEST_PUBLISHED = ":latest-published";

// addFiles call
public static final String API_ADD_FILES_COUNT_PROCESSED = "Total number of files";
public static final String API_ADD_FILES_COUNT_SUCCESSFULL = "Number of files successfully added";
}
9 changes: 8 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/api/Datasets.java
Original file line number Diff line number Diff line change
Expand Up @@ -4009,6 +4009,7 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,
logger.info(" ==== (api addGlobusFilesToDataset) jsonData ====== " + jsonData);

if (!systemConfig.isHTTPUpload()) {
// @todo why isHTTPUpload()? - shouldn't it be checking isGlobusUpload() here?
return error(Response.Status.SERVICE_UNAVAILABLE, BundleUtil.getStringFromBundle("file.api.httpDisabled"));
}

Expand All @@ -4034,6 +4035,8 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,
return wr.getResponse();
}

// @todo check if the dataset is already locked!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - could there be a constraint when trying to create a lock, e.g. only one lock of a given time per dataset rather than a separate check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant "of a given type", right? The addDatasetLock() already has a check for an existing lock of the same type (then it will just return the existing lock instead of creating a new one).
As for implementing this as a hard constraint - are we positive we'll never want multiple locks of the same type - some workflows maybe?
I addressed this specific situation with a separate lock check. We don't check for locks consistently in our APIs, I'm assuming under the assumption that it will be done when the the relevant commands are executed. But in this specific case, there is a potential for doing a huge amount of work before that UpdateDatasetVersionCommand is called in the end.


JsonObject jsonObject = null;
try {
jsonObject = JsonUtil.getJsonObject(jsonData);
Expand Down Expand Up @@ -4075,7 +4078,11 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc,
String requestUrl = SystemConfig.getDataverseSiteUrlStatic();

// Async Call
globusService.globusUpload(jsonObject, token, dataset, requestUrl, authUser);
try {
globusService.globusUpload(jsonObject, token, dataset, requestUrl, authUser);
} catch (IllegalArgumentException ex) {
return badRequest("Invalid parameters: "+ex.getMessage());
}

return ok("Async call to Globus Upload started ");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ public void displayNotification() {
case GLOBUSUPLOADCOMPLETEDWITHERRORS:
case GLOBUSDOWNLOADCOMPLETED:
case GLOBUSDOWNLOADCOMPLETEDWITHERRORS:
case GLOBUSUPLOADREMOTEFAILURE:
case GLOBUSUPLOADLOCALFAILURE:
userNotification.setTheObject(datasetService.find(userNotification.getObjectId()));
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,9 +2139,9 @@ public Response addFiles(String jsonData, Dataset dataset, User authUser) {
logger.log(Level.WARNING, "Dataset not locked for EditInProgress ");
} else {
datasetService.removeDatasetLocks(dataset, DatasetLock.Reason.EditInProgress);
logger.log(Level.INFO, "Removed EditInProgress lock ");
logger.log(Level.INFO, "Removed EditInProgress lock "+eipLock.getId());
}

try {
Command<Dataset> cmd = new UpdateDatasetVersionCommand(dataset, dvRequest, clone);
((UpdateDatasetVersionCommand) cmd).setValidateLenient(true);
Expand All @@ -2167,8 +2167,8 @@ public Response addFiles(String jsonData, Dataset dataset, User authUser) {
}

JsonObjectBuilder result = Json.createObjectBuilder()
.add("Total number of files", totalNumberofFiles)
.add("Number of files successfully added", successNumberofFiles);
.add(ApiConstants.API_ADD_FILES_COUNT_PROCESSED, totalNumberofFiles)
.add(ApiConstants.API_ADD_FILES_COUNT_SUCCESSFULL, successNumberofFiles);


return Response.ok().entity(Json.createObjectBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import edu.harvard.iq.dataverse.engine.command.RequiredPermissions;
import edu.harvard.iq.dataverse.engine.command.exception.CommandException;
import edu.harvard.iq.dataverse.engine.command.exception.IllegalCommandException;
import edu.harvard.iq.dataverse.util.BundleUtil;
import edu.harvard.iq.dataverse.util.DatasetFieldUtil;
import edu.harvard.iq.dataverse.util.FileMetadataUtil;

Expand Down Expand Up @@ -102,7 +103,10 @@ public Dataset execute(CommandContext ctxt) throws CommandException {
}

Dataset theDataset = getDataset();
ctxt.permissions().checkUpdateDatasetVersionLock(theDataset, getRequest(), this);
//ctxt.permissions().checkUpdateDatasetVersionLock(theDataset, getRequest(), this);
// this is an experiment (probably temporary)
checkUpdateDatasetVersionLock(ctxt);

Dataset savedDataset = null;

DatasetVersion persistedVersion = clone;
Expand Down Expand Up @@ -297,5 +301,23 @@ public boolean onSuccess(CommandContext ctxt, Object r) {
ctxt.index().asyncIndexDataset((Dataset) r, true);
return true;
}


private void checkUpdateDatasetVersionLock(CommandContext ctxt) throws IllegalCommandException {
List<DatasetLock> locks = ctxt.datasets().getLocksByDatasetId(getDataset().getId());
//locks.forEach(lock -> {
for (DatasetLock lock : locks) {
// Ingest lock is ok:
if (DatasetLock.Reason.Ingest != lock.getReason()) {
// with Workflow lock *some* users can edit;
// any other kind of lock - nope
if (DatasetLock.Reason.Workflow != lock.getReason()
|| !ctxt.permissions().isMatchingWorkflowLock(getDataset(),
getUser().getIdentifier(),
getRequest().getWFInvocationId())) {
throw new IllegalCommandException(
BundleUtil.getStringFromBundle("dataset.message.locked.editNotAllowed"), this);
}
}
}
}
}
Loading
Loading