Skip to content

Commit

Permalink
new framework for stopping harvest jobs (#7940)
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Nov 22, 2022
1 parent 948c0d0 commit 76eca87
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public void setId(Long id) {
this.id = id;
}

public enum RunResultType { SUCCESS, FAILURE, INPROGRESS };
public enum RunResultType { SUCCESS, FAILURE, INPROGRESS, INTERRUPTED };

private static String RESULT_LABEL_SUCCESS = "SUCCESS";
private static String RESULT_LABEL_FAILURE = "FAILED";
private static String RESULT_LABEL_INPROGRESS = "IN PROGRESS";
private static String RESULT_DELETE_IN_PROGRESS = "DELETE IN PROGRESS";
private static String RESULT_LABEL_INTERRUPTED = "INTERRUPTED";

@ManyToOne
@JoinColumn(nullable = false)
Expand Down Expand Up @@ -76,6 +77,8 @@ public String getResultLabel() {
return RESULT_LABEL_FAILURE;
} else if (isInProgress()) {
return RESULT_LABEL_INPROGRESS;
} else if (isInterrupted()) {
return RESULT_LABEL_INTERRUPTED;
}
return null;
}
Expand All @@ -84,8 +87,8 @@ public String getDetailedResultLabel() {
if (harvestingClient != null && harvestingClient.isDeleteInProgress()) {
return RESULT_DELETE_IN_PROGRESS;
}
if (isSuccess()) {
String resultLabel = RESULT_LABEL_SUCCESS;
if (isSuccess() || isInterrupted()) {
String resultLabel = getResultLabel();

resultLabel = resultLabel.concat("; "+harvestedDatasetCount+" harvested, ");
resultLabel = resultLabel.concat(deletedDatasetCount+" deleted, ");
Expand Down Expand Up @@ -128,6 +131,14 @@ public void setInProgress() {
harvestResult = RunResultType.INPROGRESS;
}

public boolean isInterrupted() {
return RunResultType.INTERRUPTED == harvestResult;
}

public void setInterrupted() {
harvestResult = RunResultType.INTERRUPTED;
}

// Time of this harvest attempt:
@Temporal(value = TemporalType.TIMESTAMP)
private Date startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class HarvesterServiceBean {
public static final String HARVEST_RESULT_FAILED="failed";
public static final String DATAVERSE_PROPRIETARY_METADATA_FORMAT="dataverse_json";
public static final String DATAVERSE_PROPRIETARY_METADATA_API="/api/datasets/export?exporter="+DATAVERSE_PROPRIETARY_METADATA_FORMAT+"&persistentId=";
public static final String DATAVERSE_HARVEST_STOP_FILE="/var/run/stopharvest_";

public HarvesterServiceBean() {

Expand Down Expand Up @@ -144,7 +145,6 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId

Dataverse harvestingDataverse = harvestingClientConfig.getDataverse();

MutableBoolean harvestErrorOccurred = new MutableBoolean(false);
String logTimestamp = logFormatter.format(new Date());
Logger hdLogger = Logger.getLogger("edu.harvard.iq.dataverse.harvest.client.HarvesterServiceBean." + harvestingDataverse.getAlias() + logTimestamp);
String logFileName = "../logs" + File.separator + "harvest_" + harvestingClientConfig.getName() + "_" + logTimestamp + ".log";
Expand All @@ -155,20 +155,14 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId
PrintWriter importCleanupLog = new PrintWriter(new FileWriter( "../logs/harvest_cleanup_" + harvestingClientConfig.getName() + "_" + logTimestamp+".txt"));


List<Long> harvestedDatasetIds = null;

List<Long> harvestedDatasetIdsThisBatch = new ArrayList<Long>();

List<Long> harvestedDatasetIds = new ArrayList<Long>();
List<String> failedIdentifiers = new ArrayList<String>();
List<String> deletedIdentifiers = new ArrayList<String>();

Date harvestStartTime = new Date();

try {
boolean harvestingNow = harvestingClientConfig.isHarvestingNow();

if (harvestingNow) {
harvestErrorOccurred.setValue(true);
if (harvestingClientConfig.isHarvestingNow()) {
hdLogger.log(Level.SEVERE, "Cannot begin harvesting, Dataverse " + harvestingDataverse.getName() + " is currently being harvested.");

} else {
Expand All @@ -177,7 +171,7 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId


if (harvestingClientConfig.isOai()) {
harvestedDatasetIds = harvestOAI(dataverseRequest, harvestingClientConfig, hdLogger, importCleanupLog, harvestErrorOccurred, failedIdentifiers, deletedIdentifiers, harvestedDatasetIdsThisBatch);
harvestOAI(dataverseRequest, harvestingClientConfig, hdLogger, importCleanupLog, failedIdentifiers, deletedIdentifiers, harvestedDatasetIds);

} else {
throw new IOException("Unsupported harvest type");
Expand All @@ -187,8 +181,11 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId
hdLogger.log(Level.INFO, "Datasets created/updated: " + harvestedDatasetIds.size() + ", datasets deleted: " + deletedIdentifiers.size() + ", datasets failed: " + failedIdentifiers.size());

}
} catch (StopHarvestException she) {
hdLogger.log(Level.INFO, "HARVEST INTERRUPTED BY EXTERNAL REQUEST");
harvestingClientService.setPartiallyCompleted(harvestingClientId, new Date(), harvestedDatasetIds.size(), failedIdentifiers.size(), deletedIdentifiers.size());
} catch (Throwable e) {
harvestErrorOccurred.setValue(true);
// Any other exception should be treated as a complete failure
String message = "Exception processing harvest, server= " + harvestingClientConfig.getHarvestingUrl() + ",format=" + harvestingClientConfig.getMetadataPrefix() + " " + e.getClass().getName() + " " + e.getMessage();
hdLogger.log(Level.SEVERE, message);
logException(e, hdLogger);
Expand All @@ -215,12 +212,11 @@ public void doHarvest(DataverseRequest dataverseRequest, Long harvestingClientId
* @param harvestErrorOccurred have we encountered any errors during harvest?
* @param failedIdentifiers Study Identifiers for failed "GetRecord" requests
*/
private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClient harvestingClient, Logger hdLogger, PrintWriter importCleanupLog, MutableBoolean harvestErrorOccurred, List<String> failedIdentifiers, List<String> deletedIdentifiers, List<Long> harvestedDatasetIdsThisBatch)
throws IOException, ParserConfigurationException, SAXException, TransformerException {
private void harvestOAI(DataverseRequest dataverseRequest, HarvestingClient harvestingClient, Logger hdLogger, PrintWriter importCleanupLog, List<String> failedIdentifiers, List<String> deletedIdentifiers, List<Long> harvestedDatasetIds)
throws IOException, ParserConfigurationException, SAXException, TransformerException, StopHarvestException {

logBeginOaiHarvest(hdLogger, harvestingClient);

List<Long> harvestedDatasetIds = new ArrayList<Long>();
OaiHandler oaiHandler;
HttpClient httpClient = null;

Expand All @@ -243,6 +239,10 @@ private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClien

try {
for (Iterator<Header> idIter = oaiHandler.runListIdentifiers(); idIter.hasNext();) {
// Before each iteration, check if this harvesting job needs to be aborted:
if (checkIfStoppingJob(harvestingClient, harvestedDatasetIds.size())) {
throw new StopHarvestException("Harvesting stopped by external request");
}

Header h = idIter.next();
String identifier = h.getIdentifier();
Expand All @@ -265,18 +265,11 @@ private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClien

if (datasetId != null) {
harvestedDatasetIds.add(datasetId);

if ( harvestedDatasetIdsThisBatch == null ) {
harvestedDatasetIdsThisBatch = new ArrayList<Long>();
}
harvestedDatasetIdsThisBatch.add(datasetId);

}

if (getRecordErrorOccurred.booleanValue() == true) {
failedIdentifiers.add(identifier);
harvestErrorOccurred.setValue(true);
//temporary:
//can be uncommented out for testing failure handling:
//throw new IOException("Exception occured, stopping harvest");
}
}
Expand All @@ -286,8 +279,6 @@ private List<Long> harvestOAI(DataverseRequest dataverseRequest, HarvestingClien

logCompletedOaiHarvest(hdLogger, harvestingClient);

return harvestedDatasetIds;

}

private Long processRecord(DataverseRequest dataverseRequest, Logger hdLogger, PrintWriter importCleanupLog, OaiHandler oaiHandler, String identifier, MutableBoolean recordErrorOccurred, List<String> deletedIdentifiers, Date dateStamp, HttpClient httpClient) {
Expand Down Expand Up @@ -410,6 +401,13 @@ private void deleteHarvestedDatasetIfExists(String persistentIdentifier, Dataver
}
hdLogger.info("No dataset found for " + persistentIdentifier + ", skipping delete. ");
}

private boolean checkIfStoppingJob(HarvestingClient harvestingClient, int howmany) {
Long pid = ProcessHandle.current().pid();
String stopFileName = DATAVERSE_HARVEST_STOP_FILE + harvestingClient.getName() + "." + pid;

return new File(stopFileName).isFile();
}

private void logBeginOaiHarvest(Logger hdLogger, HarvestingClient harvestingClient) {
hdLogger.log(Level.INFO, "BEGIN HARVEST, oaiUrl="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,46 @@ public void setHarvestFailure(Long hcId, Date currentTime) {
currentRun.setFailed();
currentRun.setFinishTime(currentTime);
}
}
}

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void setPartiallyCompleted(Long hcId, Date finishTime, int harvestedCount, int failedCount, int deletedCount) {
recordHarvestJobStatus(hcId, finishTime, harvestedCount, failedCount, deletedCount, ClientHarvestRun.RunResultType.INTERRUPTED);
}

public void recordHarvestJobStatus(Long hcId, Date finishTime, int harvestedCount, int failedCount, int deletedCount, ClientHarvestRun.RunResultType result) {
HarvestingClient harvestingClient = em.find(HarvestingClient.class, hcId);
if (harvestingClient == null) {
return;
}
em.refresh(harvestingClient);

ClientHarvestRun currentRun = harvestingClient.getLastRun();

if (currentRun != null && currentRun.isInProgress()) {

currentRun.setResult(result);
currentRun.setFinishTime(finishTime);
currentRun.setHarvestedDatasetCount(Long.valueOf(harvestedCount));
currentRun.setFailedDatasetCount(Long.valueOf(failedCount));
currentRun.setDeletedDatasetCount(Long.valueOf(deletedCount));
}
}

public Long getNumberOfHarvestedDatasetByClients(List<HarvestingClient> clients) {
String dvs = null;
String clientIds = null;
for (HarvestingClient client: clients) {
if (dvs == null) {
dvs = client.getDataverse().getId().toString();
if (clientIds == null) {
clientIds = client.getId().toString();
} else {
dvs = dvs.concat(","+client.getDataverse().getId().toString());
clientIds = clientIds.concat(","+client.getId().toString());
}
}

try {
return (Long) em.createNativeQuery("SELECT count(d.id) FROM dataset d, "
+ " dvobject o WHERE d.id = o.id AND o.owner_id in ("
+ dvs + ")").getSingleResult();
return (Long) em.createNativeQuery("SELECT count(d.id) FROM dataset d "
+ " WHERE d.harvestingclient_id in ("
+ clientIds + ")").getSingleResult();

} catch (Exception ex) {
logger.info("Warning: exception trying to count harvested datasets by clients: " + ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package edu.harvard.iq.dataverse.harvest.client;

/**
*
* @author landreev
*/

public class StopHarvestException extends Exception {
public StopHarvestException(String message) {
super(message);
}

public StopHarvestException(String message, Throwable cause) {
super(message, cause);
}

}
2 changes: 1 addition & 1 deletion src/main/java/propertyFiles/Bundle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ harvestclients.btn.add=Add Client
harvestclients.tab.header.name=Nickname
harvestclients.tab.header.url=URL
harvestclients.tab.header.lastrun=Last Run
harvestclients.tab.header.lastresults=Last Results
harvestclients.tab.header.lastresults=Last Result
harvestclients.tab.header.action=Actions
harvestclients.tab.header.action.btn.run=Run Harvesting
harvestclients.tab.header.action.btn.edit=Edit
Expand Down
2 changes: 1 addition & 1 deletion src/main/webapp/dashboard.xhtml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#{dashboardPage.numberOfHarvestedDatasets}
<p class="small text-muted">
<h:outputFormat value="#{bundle['dashboard.card.harvestingclients.datasets']}">
<f:param value="#{dashboardPage.numberOfOaiSets}"/>
<f:param value="#{dashboardPage.numberOfHarvestedDatasets}"/>
</h:outputFormat>
</p>
</div>
Expand Down

0 comments on commit 76eca87

Please sign in to comment.