From eaa5d849bd086b17dc30a545fc01a91cecb328f1 Mon Sep 17 00:00:00 2001 From: Leonid Andreev Date: Wed, 1 Aug 2018 11:13:42 -0400 Subject: [PATCH] Added mechanisms for attempting to ingest a file for which the file type was not properly identified previously. Added extra diagnostics (ref #4865) --- .../edu/harvard/iq/dataverse/api/Files.java | 27 ++++-- .../iq/dataverse/ingest/IngestMessage.java | 13 ++- .../dataverse/ingest/IngestMessageBean.java | 6 +- .../dataverse/ingest/IngestServiceBean.java | 92 ++++++++++++++++++- 4 files changed, 124 insertions(+), 14 deletions(-) diff --git a/src/main/java/edu/harvard/iq/dataverse/api/Files.java b/src/main/java/edu/harvard/iq/dataverse/api/Files.java index c74a5a8854c..88f24001129 100644 --- a/src/main/java/edu/harvard/iq/dataverse/api/Files.java +++ b/src/main/java/edu/harvard/iq/dataverse/api/Files.java @@ -332,12 +332,17 @@ public Response uningestDatafile(@PathParam("id") String id) { } - // Note, ingestAsTabular attempts to queue an *existing* (non-tabular) DataFile - // for tabular ingest. + // reingest attempts to queue an *existing* DataFile + // for tabular ingest. It can be used on non-tabular datafiles; to try to + // ingest a file that has previously failed ingest, or to ingest a file of a + // type for which ingest was not previously supported. + // It will also be possible to *force* reingest of a datafile that's already + // ingested as Tabular data; for example, to address a bug that has been + // found in an ingest plugin. - @Path("{id}/ingestAsTabular") + @Path("{id}/reingest") @POST - public Response ingestDatafileAsTabular(@PathParam("id") String id) { + public Response reingest(@PathParam("id") String id) { AuthenticatedUser u; try { @@ -361,7 +366,7 @@ public Response ingestDatafileAsTabular(@PathParam("id") String id) { } if (dataFile.isTabularData()) { - return error(Response.Status.BAD_REQUEST, "Datafile already ingested as Tabular."); + return error(Response.Status.BAD_REQUEST, "Datafile already ingested as Tabular (use the \"force\" option to override)."); } Dataset dataset = dataFile.getOwner(); @@ -383,8 +388,18 @@ public Response ingestDatafileAsTabular(@PathParam("id") String id) { dataFile.SetIngestScheduled(); // queue the data ingest job for asynchronous execution: - ingestService.startIngestJobs(dataset, u); + String status = ingestService.startIngestJobForSingleFile(dataFile, u); + if (status != null) { + // This most likely indicate some sort of a problem (for example, + // the ingest job was not put on the JMS queue because of the size + // of the file). But we are still returning the OK status - because + // from the point of view of the API, it's a success - we have + // successfully gone through the process of trying to schedule the + // ingest job... + + return ok(status); + } return ok("Datafile " + id + " queued for ingest"); } diff --git a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessage.java b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessage.java index 4242ca5bcdd..2a64957df1b 100644 --- a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessage.java +++ b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessage.java @@ -42,7 +42,6 @@ public IngestMessage() { public IngestMessage(int messageLevel) { this.messageLevel = messageLevel; - //dataFiles = new ArrayList(); datafile_ids = new ArrayList(); } @@ -52,8 +51,8 @@ public IngestMessage(int messageLevel) { private Long datasetVersionId; private String versionNote; private String datasetVersionNumber; - //private List dataFiles; private List datafile_ids; + private Boolean forceTypeCheck; public String getVersionNote() { return versionNote; @@ -115,4 +114,14 @@ public void addFileId(Long file_id) { datafile_ids.add(file_id); } + public void setForceTypeCheck(boolean forceTypeCheck) { + this.forceTypeCheck = forceTypeCheck; + } + + public boolean isForceTypeCheck() { + if (forceTypeCheck != null) { + return forceTypeCheck; + } + return false; + } } diff --git a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessageBean.java b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessageBean.java index f90e37a0581..66035b424e4 100644 --- a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessageBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestMessageBean.java @@ -68,15 +68,13 @@ public void onMessage(Message message) { Iterator iter = ingestMessage.getFileIds().iterator(); datafile_id = null; - // TODO: - // is it going to work if multiple files are submitted for ingest? - // -- L.A. Aug. 13 2014 + while (iter.hasNext()) { datafile_id = (Long) iter.next(); logger.fine("Start ingest job;"); try { - if (ingestService.ingestAsTabular(datafile_id)) { + if (ingestService.ingestAsTabular(datafile_id, ingestMessage.isForceTypeCheck())) { //Thread.sleep(10000); logger.fine("Finished ingest job;"); } else { diff --git a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java index 9ea3ce8db5f..f6bc05404d1 100644 --- a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java @@ -503,6 +503,90 @@ public int compare(DataFile d1, DataFile d2) { } } + public String startIngestJobForSingleFile(DataFile dataFile, AuthenticatedUser user) { + + IngestMessage ingestMessage = null; + + if (dataFile.isIngestScheduled()) { + + // refresh the copy of the DataFile: + dataFile = fileService.find(dataFile.getId()); + + long ingestSizeLimit = -1; + try { + ingestSizeLimit = systemConfig.getTabularIngestSizeLimit(getTabDataReaderByMimeType(dataFile.getContentType()).getFormatName()); + } catch (IOException ioex) { + logger.warning("IO Exception trying to retrieve the ingestable format identifier from the plugin for type " + dataFile.getContentType() + " (non-fatal);"); + } + + if (ingestSizeLimit == -1 || dataFile.getFilesize() < ingestSizeLimit) { + dataFile.SetIngestInProgress(); + dataFile = fileService.save(dataFile); + } else { + dataFile.setIngestDone(); + dataFile = fileService.save(dataFile); + + String message = "Skipping tabular ingest of the file " + dataFile.getFileMetadata().getLabel() + ", because of the size limit (set to " + ingestSizeLimit + " bytes)."; + logger.info(message); + return message; + } + } else { + return "(Re)ingest queueing request submitted on a file not scheduled for ingest! ("+dataFile.getFileMetadata().getLabel()+")"; + } + + String message = "Attempting to queue the file " + dataFile.getFileMetadata().getLabel() + " for ingest, for dataset: " + dataFile.getOwner().getGlobalIdString(); + logger.info(message); + + datasetService.addDatasetLock(dataFile.getOwner().getId(), + DatasetLock.Reason.Ingest, + (user != null) ? user.getId() : null, + message); + + ingestMessage = new IngestMessage(IngestMessage.INGEST_MESAGE_LEVEL_INFO); + + ingestMessage.addFileId(dataFile.getId()); + ingestMessage.setForceTypeCheck(true); + + QueueConnection conn = null; + QueueSession session = null; + QueueSender sender = null; + String statusMessage = null; + + try { + conn = factory.createQueueConnection(); + session = conn.createQueueSession(false, 0); + sender = session.createSender(queue); + + Message queueMessage = session.createObjectMessage(ingestMessage); + + sender.send(queueMessage); + + } catch (JMSException ex) { + ex.printStackTrace(); + logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below."); + statusMessage = "Failed to queue the (re)ingest job for DataFile (JMS Exception)" + (ex.getMessage() != null ? ex.getMessage() : ""); + } finally { + try { + + if (sender != null) { + sender.close(); + } + if (session != null) { + session.close(); + } + if (conn != null) { + conn.close(); + } + } catch (Exception ex) { + logger.warning("Caught exception trying to close connections after starting a (re)ingest job in the JMS queue! Stack trace below."); + ex.printStackTrace(); + } + } + + return statusMessage; + } + + public void produceSummaryStatistics(DataFile dataFile, File generatedTabularFile) throws IOException { /* logger.info("Skipping summary statistics and UNF."); @@ -646,7 +730,7 @@ public void sendFailNotification(Long dataset_id) { } - public boolean ingestAsTabular(Long datafile_id) { //DataFile dataFile) throws IOException { + public boolean ingestAsTabular(Long datafile_id, boolean forceTypeCheck) { DataFile dataFile = fileService.find(datafile_id); boolean ingestSuccessful = false; @@ -656,7 +740,11 @@ public boolean ingestAsTabular(Long datafile_id) { //DataFile dataFile) throws I TabularDataFileReader ingestPlugin = getTabDataReaderByMimeType(dataFile.getContentType()); logger.fine("Using ingest plugin " + ingestPlugin.getClass()); - if (ingestPlugin == null) { + if (!forceTypeCheck && ingestPlugin == null) { + // If this is a reingest request, we'll still have a chance + // to find an ingest plugin for this file, once we try + // to identify the file type again. + dataFile.SetIngestProblem(); FileUtil.createIngestFailureReport(dataFile, "No ingest plugin found for file type "+dataFile.getContentType()); dataFile = fileService.save(dataFile);