Skip to content

Commit

Permalink
Added mechanisms for attempting to ingest a file for which the file t…
Browse files Browse the repository at this point in the history
…ype was not properly identified previously.

Added extra diagnostics (ref #4865)
  • Loading branch information
landreev committed Aug 1, 2018
1 parent 1e3cf6c commit eaa5d84
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 14 deletions.
27 changes: 21 additions & 6 deletions src/main/java/edu/harvard/iq/dataverse/api/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,17 @@ public Response uningestDatafile(@PathParam("id") String id) {

}

// Note, ingestAsTabular attempts to queue an *existing* (non-tabular) DataFile
// for tabular ingest.
// reingest attempts to queue an *existing* DataFile
// for tabular ingest. It can be used on non-tabular datafiles; to try to
// ingest a file that has previously failed ingest, or to ingest a file of a
// type for which ingest was not previously supported.
// It will also be possible to *force* reingest of a datafile that's already
// ingested as Tabular data; for example, to address a bug that has been
// found in an ingest plugin.

@Path("{id}/ingestAsTabular")
@Path("{id}/reingest")
@POST
public Response ingestDatafileAsTabular(@PathParam("id") String id) {
public Response reingest(@PathParam("id") String id) {

AuthenticatedUser u;
try {
Expand All @@ -361,7 +366,7 @@ public Response ingestDatafileAsTabular(@PathParam("id") String id) {
}

if (dataFile.isTabularData()) {
return error(Response.Status.BAD_REQUEST, "Datafile already ingested as Tabular.");
return error(Response.Status.BAD_REQUEST, "Datafile already ingested as Tabular (use the \"force\" option to override).");
}

Dataset dataset = dataFile.getOwner();
Expand All @@ -383,8 +388,18 @@ public Response ingestDatafileAsTabular(@PathParam("id") String id) {
dataFile.SetIngestScheduled();

// queue the data ingest job for asynchronous execution:
ingestService.startIngestJobs(dataset, u);
String status = ingestService.startIngestJobForSingleFile(dataFile, u);

if (status != null) {
// This most likely indicate some sort of a problem (for example,
// the ingest job was not put on the JMS queue because of the size
// of the file). But we are still returning the OK status - because
// from the point of view of the API, it's a success - we have
// successfully gone through the process of trying to schedule the
// ingest job...

return ok(status);
}
return ok("Datafile " + id + " queued for ingest");

}
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public IngestMessage() {

public IngestMessage(int messageLevel) {
this.messageLevel = messageLevel;
//dataFiles = new ArrayList<DataFile>();
datafile_ids = new ArrayList<Long>();
}

Expand All @@ -52,8 +51,8 @@ public IngestMessage(int messageLevel) {
private Long datasetVersionId;
private String versionNote;
private String datasetVersionNumber;
//private List<DataFile> dataFiles;
private List<Long> datafile_ids;
private Boolean forceTypeCheck;

public String getVersionNote() {
return versionNote;
Expand Down Expand Up @@ -115,4 +114,14 @@ public void addFileId(Long file_id) {
datafile_ids.add(file_id);
}

public void setForceTypeCheck(boolean forceTypeCheck) {
this.forceTypeCheck = forceTypeCheck;
}

public boolean isForceTypeCheck() {
if (forceTypeCheck != null) {
return forceTypeCheck;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,13 @@ public void onMessage(Message message) {

Iterator iter = ingestMessage.getFileIds().iterator();
datafile_id = null;
// TODO:
// is it going to work if multiple files are submitted for ingest?
// -- L.A. Aug. 13 2014

while (iter.hasNext()) {
datafile_id = (Long) iter.next();

logger.fine("Start ingest job;");
try {
if (ingestService.ingestAsTabular(datafile_id)) {
if (ingestService.ingestAsTabular(datafile_id, ingestMessage.isForceTypeCheck())) {
//Thread.sleep(10000);
logger.fine("Finished ingest job;");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,90 @@ public int compare(DataFile d1, DataFile d2) {
}
}

public String startIngestJobForSingleFile(DataFile dataFile, AuthenticatedUser user) {

IngestMessage ingestMessage = null;

if (dataFile.isIngestScheduled()) {

// refresh the copy of the DataFile:
dataFile = fileService.find(dataFile.getId());

long ingestSizeLimit = -1;
try {
ingestSizeLimit = systemConfig.getTabularIngestSizeLimit(getTabDataReaderByMimeType(dataFile.getContentType()).getFormatName());
} catch (IOException ioex) {
logger.warning("IO Exception trying to retrieve the ingestable format identifier from the plugin for type " + dataFile.getContentType() + " (non-fatal);");
}

if (ingestSizeLimit == -1 || dataFile.getFilesize() < ingestSizeLimit) {
dataFile.SetIngestInProgress();
dataFile = fileService.save(dataFile);
} else {
dataFile.setIngestDone();
dataFile = fileService.save(dataFile);

String message = "Skipping tabular ingest of the file " + dataFile.getFileMetadata().getLabel() + ", because of the size limit (set to " + ingestSizeLimit + " bytes).";
logger.info(message);
return message;
}
} else {
return "(Re)ingest queueing request submitted on a file not scheduled for ingest! ("+dataFile.getFileMetadata().getLabel()+")";
}

String message = "Attempting to queue the file " + dataFile.getFileMetadata().getLabel() + " for ingest, for dataset: " + dataFile.getOwner().getGlobalIdString();
logger.info(message);

datasetService.addDatasetLock(dataFile.getOwner().getId(),
DatasetLock.Reason.Ingest,
(user != null) ? user.getId() : null,
message);

ingestMessage = new IngestMessage(IngestMessage.INGEST_MESAGE_LEVEL_INFO);

ingestMessage.addFileId(dataFile.getId());
ingestMessage.setForceTypeCheck(true);

QueueConnection conn = null;
QueueSession session = null;
QueueSender sender = null;
String statusMessage = null;

try {
conn = factory.createQueueConnection();
session = conn.createQueueSession(false, 0);
sender = session.createSender(queue);

Message queueMessage = session.createObjectMessage(ingestMessage);

sender.send(queueMessage);

} catch (JMSException ex) {
ex.printStackTrace();
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
statusMessage = "Failed to queue the (re)ingest job for DataFile (JMS Exception)" + (ex.getMessage() != null ? ex.getMessage() : "");
} finally {
try {

if (sender != null) {
sender.close();
}
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception ex) {
logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below.");
ex.printStackTrace();
}
}

return statusMessage;
}


public void produceSummaryStatistics(DataFile dataFile, File generatedTabularFile) throws IOException {
/*
logger.info("Skipping summary statistics and UNF.");
Expand Down Expand Up @@ -646,7 +730,7 @@ public void sendFailNotification(Long dataset_id) {
}


public boolean ingestAsTabular(Long datafile_id) { //DataFile dataFile) throws IOException {
public boolean ingestAsTabular(Long datafile_id, boolean forceTypeCheck) {
DataFile dataFile = fileService.find(datafile_id);
boolean ingestSuccessful = false;

Expand All @@ -656,7 +740,11 @@ public boolean ingestAsTabular(Long datafile_id) { //DataFile dataFile) throws I
TabularDataFileReader ingestPlugin = getTabDataReaderByMimeType(dataFile.getContentType());
logger.fine("Using ingest plugin " + ingestPlugin.getClass());

if (ingestPlugin == null) {
if (!forceTypeCheck && ingestPlugin == null) {
// If this is a reingest request, we'll still have a chance
// to find an ingest plugin for this file, once we try
// to identify the file type again.

dataFile.SetIngestProblem();
FileUtil.createIngestFailureReport(dataFile, "No ingest plugin found for file type "+dataFile.getContentType());
dataFile = fileService.save(dataFile);
Expand Down

0 comments on commit eaa5d84

Please sign in to comment.