Skip to content

Commit

Permalink
Got rid of some duplicate code in IngestServiceBean. (#4865)
Browse files Browse the repository at this point in the history
  • Loading branch information
landreev committed Aug 3, 2018
1 parent 94f886a commit f443963
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 123 deletions.
2 changes: 1 addition & 1 deletion src/main/java/edu/harvard/iq/dataverse/DatasetPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2671,7 +2671,7 @@ public String save() {

// Call Ingest Service one more time, to
// queue the data ingest jobs for asynchronous execution:
ingestService.startIngestJobs(dataset, (AuthenticatedUser) session.getUser());
ingestService.startIngestJobsForDataset(dataset, (AuthenticatedUser) session.getUser());

//After dataset saved, then persist prov json data
if(systemConfig.isProvCollectionEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ public String save() {
// Call Ingest Service one more time, to
// queue the data ingest jobs for asynchronous execution:
if (mode == FileEditMode.UPLOAD) {
ingestService.startIngestJobs(dataset, (AuthenticatedUser) session.getUser());
ingestService.startIngestJobsForDataset(dataset, (AuthenticatedUser) session.getUser());
}

if (mode == FileEditMode.SINGLE && fileMetadatas.size() > 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/api/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import edu.harvard.iq.dataverse.util.FileUtil;
import edu.harvard.iq.dataverse.util.SystemConfig;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ResourceBundle;
import java.util.logging.Level;
Expand Down Expand Up @@ -390,7 +391,7 @@ public Response reingest(@PathParam("id") String id) {
dataFile = fileService.save(dataFile);

// queue the data ingest job for asynchronous execution:
String status = ingestService.startIngestJobForSingleFile(dataFile, u);
String status = ingestService.startIngestJobs(new ArrayList<DataFile>(Arrays.asList(dataFile)), u);

if (status != null) {
// This most likely indicate some sort of a problem (for example,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ DepositReceipt replaceOrAddFiles(String uri, Deposit deposit, AuthCredentials au
throw returnEarly("EJBException: " + sb.toString());
}

ingestService.startIngestJobs(dataset, user);
ingestService.startIngestJobsForDataset(dataset, user);

ReceiptGenerator receiptGenerator = new ReceiptGenerator();
String baseUrl = urlManager.getHostnamePlusBaseUrlPath(uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ private boolean step_100_startIngestJobs(){
// start the ingest!
//

ingestService.startIngestJobs(dataset, dvRequest.getAuthenticatedUser());
ingestService.startIngestJobsForDataset(dataset, dvRequest.getAuthenticatedUser());

msg("post ingest start");
return true;
Expand Down
168 changes: 50 additions & 118 deletions src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,56 +398,72 @@ public boolean accept(Path file) throws IOException {
// TODO: consider creating a version of this method that would take
// datasetversion as the argument.
// -- L.A. 4.6
// @Asynchronous - just an experiment...
public void startIngestJobs(Dataset dataset, AuthenticatedUser user) {
int count = 0;
public void startIngestJobsForDataset(Dataset dataset, AuthenticatedUser user) {
List<DataFile> scheduledFiles = new ArrayList<>();

IngestMessage ingestMessage = null;


for (DataFile dataFile : dataset.getFiles()) {
if (dataFile.isIngestScheduled()) {
// todo: investigate why when calling save with the file object
// gotten from the loop, the roles assignment added at create is removed
// (switching to refinding via id resolves that)
dataFile = fileService.find(dataFile.getId());

long ingestSizeLimit = -1;
scheduledFiles.add(dataFile);
}
}

startIngestJobs(scheduledFiles, user);
}

public String startIngestJobs(List<DataFile> dataFiles, AuthenticatedUser user) {

IngestMessage ingestMessage = null;
StringBuilder sb = new StringBuilder();

List<DataFile> scheduledFiles = new ArrayList<>();
for (DataFile dataFile : dataFiles) {
if (dataFile.isIngestScheduled()) {

// refresh the copy of the DataFile:
dataFile = fileService.find(dataFile.getId());

long ingestSizeLimit = -1;
try {
ingestSizeLimit = systemConfig.getTabularIngestSizeLimit(getTabDataReaderByMimeType(dataFile.getContentType()).getFormatName());
} catch (IOException ioex) {
logger.warning("IO Exception trying to retrieve the ingestable format identifier from the plugin for type "+dataFile.getContentType()+" (non-fatal);");
logger.warning("IO Exception trying to retrieve the ingestable format identifier from the plugin for type " + dataFile.getContentType() + " (non-fatal);");
}

if (ingestSizeLimit == -1 || dataFile.getFilesize() < ingestSizeLimit) {
dataFile.SetIngestInProgress();
dataFile = fileService.save(dataFile);

if (ingestSizeLimit == -1 || dataFile.getFilesize() < ingestSizeLimit) {
dataFile.SetIngestInProgress();
scheduledFiles.add(dataFile);

logger.fine("Attempting to queue the file " + dataFile.getFileMetadata().getLabel() + " for ingest, for dataset: " + dataset.getGlobalIdString());
count++;
} else {
dataFile.setIngestDone();
dataFile = fileService.save(dataFile);

logger.info("Skipping tabular ingest of the file " + dataFile.getFileMetadata().getLabel() + ", because of the size limit (set to "+ ingestSizeLimit +" bytes).");
String message = "Skipping tabular ingest of the file " + dataFile.getFileMetadata().getLabel() + ", because of the size limit (set to " + ingestSizeLimit + " bytes); ";
logger.info(message);
sb.append(message);
}
dataFile = fileService.save(dataFile);
} else {
String message = "(Re)ingest queueing request submitted on a file not scheduled for ingest! (" + dataFile.getFileMetadata().getLabel() + "); ";
logger.warning(message);
sb.append(message);
}
}

int count = scheduledFiles.size();

if (count > 0) {
String info = "Ingest of " + count + " tabular data file(s) is in progress.";
logger.info(info);
datasetService.addDatasetLock(dataset.getId(),
DatasetLock.Reason.Ingest,
(user!=null)?user.getId():null,
datasetService.addDatasetLock(scheduledFiles.get(0).getOwner().getId(),
DatasetLock.Reason.Ingest,
(user != null) ? user.getId() : null,
info);


// Sort ingest jobs by file size:
DataFile[] scheduledFilesArray = (DataFile[])scheduledFiles.toArray(new DataFile[count]);
scheduledFiles = null;

// Sort ingest jobs by file size:
Arrays.sort(scheduledFilesArray, new Comparator<DataFile>() {
@Override
public int compare(DataFile d1, DataFile d2) {
Expand All @@ -456,34 +472,29 @@ public int compare(DataFile d1, DataFile d2) {
return Long.valueOf(a).compareTo(b);
}
});

ingestMessage = new IngestMessage(IngestMessage.INGEST_MESAGE_LEVEL_INFO);

for (int i = 0; i < count; i++) {
ingestMessage.addFileId(scheduledFilesArray[i].getId());
logger.fine("Sorted order: "+i+" (size="+scheduledFilesArray[i].getFilesize()+")");
}

QueueConnection conn = null;
QueueSession session = null;
QueueSender sender = null;

try {
conn = factory.createQueueConnection();
session = conn.createQueueSession(false, 0);
sender = session.createSender(queue);

//ingestMessage.addFile(new File(tempFileLocation));
Message message = session.createObjectMessage(ingestMessage);
Message queueMessage = session.createObjectMessage(ingestMessage);

//try {
sender.send(message);
//} catch (JMSException ex) {
// ex.printStackTrace();
//}
sender.send(queueMessage);

} catch (JMSException ex) {
ex.printStackTrace();
//throw new IOException(ex.getMessage());
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
sb.append("Failed to queue the (re)ingest job for DataFile (JMS Exception)" + (ex.getMessage() != null ? ex.getMessage() : ""));
} finally {
try {

Expand All @@ -496,93 +507,14 @@ public int compare(DataFile d1, DataFile d2) {
if (conn != null) {
conn.close();
}
} catch (JMSException ex) {
} catch (Exception ex) {
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
ex.printStackTrace();
}
}
}
}

public String startIngestJobForSingleFile(DataFile dataFile, AuthenticatedUser user) {

IngestMessage ingestMessage = null;

if (dataFile.isIngestScheduled()) {

// refresh the copy of the DataFile:
dataFile = fileService.find(dataFile.getId());

long ingestSizeLimit = -1;
try {
ingestSizeLimit = systemConfig.getTabularIngestSizeLimit(getTabDataReaderByMimeType(dataFile.getContentType()).getFormatName());
} catch (IOException ioex) {
logger.warning("IO Exception trying to retrieve the ingestable format identifier from the plugin for type " + dataFile.getContentType() + " (non-fatal);");
}

if (ingestSizeLimit == -1 || dataFile.getFilesize() < ingestSizeLimit) {
dataFile.SetIngestInProgress();
dataFile = fileService.save(dataFile);
} else {
dataFile.setIngestDone();
dataFile = fileService.save(dataFile);

String message = "Skipping tabular ingest of the file " + dataFile.getFileMetadata().getLabel() + ", because of the size limit (set to " + ingestSizeLimit + " bytes).";
logger.info(message);
return message;
}
} else {
return "(Re)ingest queueing request submitted on a file not scheduled for ingest! ("+dataFile.getFileMetadata().getLabel()+")";
}

String message = "Attempting to queue the file " + dataFile.getFileMetadata().getLabel() + " for ingest, for dataset: " + dataFile.getOwner().getGlobalIdString();
logger.info(message);

datasetService.addDatasetLock(dataFile.getOwner().getId(),
DatasetLock.Reason.Ingest,
(user != null) ? user.getId() : null,
message);

ingestMessage = new IngestMessage(IngestMessage.INGEST_MESAGE_LEVEL_INFO);

ingestMessage.addFileId(dataFile.getId());

QueueConnection conn = null;
QueueSession session = null;
QueueSender sender = null;
String statusMessage = null;

try {
conn = factory.createQueueConnection();
session = conn.createQueueSession(false, 0);
sender = session.createSender(queue);

Message queueMessage = session.createObjectMessage(ingestMessage);

sender.send(queueMessage);

} catch (JMSException ex) {
ex.printStackTrace();
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
statusMessage = "Failed to queue the (re)ingest job for DataFile (JMS Exception)" + (ex.getMessage() != null ? ex.getMessage() : "");
} finally {
try {

if (sender != null) {
sender.close();
}
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception ex) {
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
ex.printStackTrace();
}
}

return statusMessage;
return sb.toString();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@
import javax.xml.parsers.ParserConfigurationException;
import org.junit.Assert;
import static org.junit.Assert.assertTrue;
import org.junit.Ignore;
import org.junit.Test;
import org.xml.sax.SAXException;

public class XmlValidatorTest {

private static final Logger logger = Logger.getLogger(XmlValidatorTest.class.getCanonicalName());

@Ignore
@Test
public void testValidateXml() throws IOException, SAXException, ParserConfigurationException {
assertTrue(XmlValidator.validateXmlSchema("src/test/java/edu/harvard/iq/dataverse/util/xml/sendToDataCite.xml", new URL("https://schema.datacite.org/meta/kernel-3/metadata.xsd")));
// FIXME: Make sure the DDI we export is valid: https://github.com/IQSS/dataverse/issues/3648
// assertTrue(XmlValidator.validateXml("src/test/java/edu/harvard/iq/dataverse/export/ddi/dataset-finch1.xml", new URL("http://www.ddialliance.org/Specification/DDI-Codebook/2.5/XMLSchema/codebook.xsd")));
}

@Ignore
@Test
public void testWellFormedXml() {

Expand Down

0 comments on commit f443963

Please sign in to comment.