Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrieval Tool: Indicates "complete" before all transfers finish #185

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.duracloud.retrieval.mgmt;

import java.io.File;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -40,6 +42,7 @@ public class RetrievalManager implements Runnable {
private boolean createSpaceDir;
private boolean applyTimestamps;
private boolean complete;
private Phaser phaser;

public RetrievalManager(RetrievalSource source,
File contentDir,
Expand All @@ -57,6 +60,7 @@ public RetrievalManager(RetrievalSource source,
this.outWriter = outWriter;
this.createSpaceDir = createSpaceDir;
this.applyTimestamps = applyTimestamps;
this.phaser = new Phaser();

// Create thread pool for retrieval workers
workerPool =
Expand All @@ -72,6 +76,7 @@ public RetrievalManager(RetrievalSource source,
* Begins the content retrieval process
*/
public void run() {
phaser.register();

try {
while (!complete) {
Expand Down Expand Up @@ -113,9 +118,12 @@ private boolean retrieveContent(ContentItem contentItem) {
outWriter,
createSpaceDir,
applyTimestamps);
workerPool.execute(worker);
phaser.register();
CompletableFuture.runAsync(worker, workerPool)
.thenRun(phaser::arriveAndDeregister);
return true;
} catch (RejectedExecutionException e) {
phaser.arriveAndDeregister();
return false;
}
}
Expand All @@ -128,11 +136,8 @@ public void shutdown() {
logger.info("Closing Retrieval Manager");
workerPool.shutdown();

try {
workerPool.awaitTermination(30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// Exit wait on interruption
}
logger.info("Waiting for retrievals to complete, this may take some time...");
phaser.arriveAndAwaitAdvance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you wouldn't mind adding a log message before arriveAndAwaitAdvance indicating that the application is waiting for the last retrievals to complete (and that it may take a few minutes depending on the size of the retrieval and the connection speed), I think that would be an improvment..


complete = true;
}
Expand Down