Skip to content

Commit

Permalink
improves logging on errors
Browse files Browse the repository at this point in the history
refs #6872
  • Loading branch information
robfrank committed Dec 28, 2016
1 parent 01574e2 commit 5cb5908
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,16 @@
import java.util.Collection;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

/**
* ETL processor class.
*
* @author Luca Garulli (l.garulli-at-orientechnologies.com)
*/
public class OETLProcessor {
public enum LOG_LEVELS {
NONE, ERROR, INFO, DEBUG
}

protected final OETLComponentFactory factory = new OETLComponentFactory();
protected final OETLProcessorStats stats = new OETLProcessorStats();
protected List<OBlock> beginBlocks;
Expand Down Expand Up @@ -103,28 +94,6 @@ public OETLProcessor(final List<OBlock> iBeginBlocks, final OSource iSource, fin
init();
}

protected void init() {
final String cfgLog = (String) context.getVariable("log");
if (cfgLog != null)
logLevel = LOG_LEVELS.valueOf(cfgLog.toUpperCase());

final Boolean cfgHaltOnError = (Boolean) context.getVariable("haltOnError");
if (cfgHaltOnError != null)
haltOnError = cfgHaltOnError;

final Object parallelSetting = context.getVariable("parallel");
if (parallelSetting != null)
parallel = (Boolean) parallelSetting;

if (parallel) {
final int cores = Runtime.getRuntime().availableProcessors();

if (cores >= 2)
workers = cores - 1;
}

}

public OETLProcessor() {
}

Expand Down Expand Up @@ -182,6 +151,28 @@ protected static OCommandContext createDefaultContext() {
return context;
}

protected void init() {
final String cfgLog = (String) context.getVariable("log");
if (cfgLog != null)
logLevel = LOG_LEVELS.valueOf(cfgLog.toUpperCase());

final Boolean cfgHaltOnError = (Boolean) context.getVariable("haltOnError");
if (cfgHaltOnError != null)
haltOnError = cfgHaltOnError;

final Object parallelSetting = context.getVariable("parallel");
if (parallelSetting != null)
parallel = (Boolean) parallelSetting;

if (parallel) {
final int cores = Runtime.getRuntime().availableProcessors();

if (cores >= 2)
workers = cores - 1;
}

}

public OETLProcessor parse(final ODocument cfg, final OCommandContext iContext) {
return parse(cfg.<Collection<ODocument>>field("begin"),
cfg.<ODocument>field("source"),
Expand All @@ -202,6 +193,7 @@ public OETLProcessor parse(final ODocument cfg, final OCommandContext iContext)
* @param iLoader Loader component configuration
* @param iEndBlocks List of Block configurations to execute at the end of processing
* @param iContext Execution Context
*
* @return Current OETProcessor instance
**/
public OETLProcessor parse(final Collection<ODocument> iBeginBlocks,
Expand Down Expand Up @@ -594,6 +586,25 @@ else if (iClassName.equals("OrientEdge"))
return inClass;
}

public enum LOG_LEVELS {
NONE(Level.OFF),
ERROR(Level.SEVERE),
INFO(Level.INFO),
DEBUG(Level.FINE);

private final Level julLevel;

LOG_LEVELS(Level julLevel) {

this.julLevel = julLevel;
}

public Level toJulLevel() {
return julLevel;
}

}

private static final class OETLPipelineWorker implements Callable<Boolean> {

private final BlockingQueue<OExtractedItem> queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public abstract class OAbstractTransformer extends OAbstractETLPipelineComponent implements OTransformer {
@Override
public Object transform(final Object input) {
public Object transform(final Object input) throws Exception {
log(OETLProcessor.LOG_LEVELS.DEBUG, "Transformer input: %s", input);

if (input == null)
Expand All @@ -47,5 +47,5 @@ public Object transform(final Object input) {
return input;
}

protected abstract Object executeTransform(final Object input);
protected abstract Object executeTransform(final Object input) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,25 @@ public String getName() {
}

@Override
public Object executeTransform(final Object input) {
public Object executeTransform(final Object input) throws Exception {
if (input == null)
return null;

params.put("input", input);
if (input instanceof OIdentifiable)
params.put("record", ((OIdentifiable) input).getRecord());

Object result = cmd.executeInContext(context, params);
try {
Object result = cmd.executeInContext(context, params);

log(OETLProcessor.LOG_LEVELS.DEBUG, "executed code=%s, result=%s", cmd, result);
log(OETLProcessor.LOG_LEVELS.DEBUG, "input=%s executed code=%s, result=%s", input, cmd, result);

return result;
return result;
} catch (Exception e) {

log(OETLProcessor.LOG_LEVELS.ERROR, "exception=%s - input=%s - command=%s ", e.getMessage(), input, cmd);

throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public String getName() {
}

@Override
public Object executeTransform(final Object input) {
public Object executeTransform(final Object input) throws Exception {
String runtimeCommand = (String) resolve(command);
final OCommandRequest cmd;
if (language.equals("sql")) {
Expand All @@ -67,8 +67,19 @@ public Object executeTransform(final Object input) {
cmd = new OCommandScript(language, runtimeCommand);
}
cmd.setContext(context);
Object result = pipeline.getDocumentDatabase().command(cmd).execute();
log(OETLProcessor.LOG_LEVELS.DEBUG, "executed command=%s, result=%s", cmd, result);
return result;

try {
Object result = pipeline.getDocumentDatabase().command(cmd).execute();
log(OETLProcessor.LOG_LEVELS.DEBUG, "executed command=%s, result=%s", cmd, result);

return result;
} catch (Exception e) {

log(OETLProcessor.LOG_LEVELS.ERROR, "exception=%s - input=%s - command=%s ", e.getMessage(), input, cmd);

throw e;
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
*/
public interface OTransformer extends OETLPipelineComponent {

Object transform(final Object input);
Object transform(final Object input) throws Exception;

}

0 comments on commit 5cb5908

Please sign in to comment.