Skip to content

Commit

Permalink
refactor main method for #92 #93
Browse files Browse the repository at this point in the history
parameters passed on command line now overrides global config
begin block are executed only one time
  • Loading branch information
robfrank committed Sep 18, 2015
1 parent 76e8532 commit 1323842
Show file tree
Hide file tree
Showing 36 changed files with 320 additions and 187 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.orientechnologies.common.parser.OSystemVariableResolver;
import com.orientechnologies.common.parser.OVariableParser;
import com.orientechnologies.common.parser.OVariableParserListener;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
Expand All @@ -33,13 +33,13 @@
*/
public abstract class OAbstractETLComponent implements OETLComponent {
protected OETLProcessor processor;
protected OBasicCommandContext context;
protected OCommandContext context;
protected OETLProcessor.LOG_LEVELS logLevel;
protected String output = null;
protected String ifExpression;

@Override
public void configure(final OETLProcessor iProcessor, final ODocument iConfiguration, final OBasicCommandContext iContext) {
public void configure(final OETLProcessor iProcessor, final ODocument iConfiguration, final OCommandContext iContext) {
processor = iProcessor;
context = iContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.orientechnologies.orient.etl;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.record.impl.ODocument;

/**
Expand All @@ -27,7 +27,7 @@
public interface OETLComponent {
ODocument getConfiguration();

void configure(OETLProcessor iProcessor, ODocument iConfiguration, OBasicCommandContext iSettings);
void configure(OETLProcessor iProcessor, ODocument iConfiguration, OCommandContext iSettings);

void begin();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.orientechnologies.common.concur.ONeedRetryException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.etl.loader.OLoader;
import com.orientechnologies.orient.etl.transformer.OTransformer;
Expand All @@ -37,7 +38,7 @@ public class OETLPipeline {
protected final OETLProcessor processor;
protected final List<OTransformer> transformers;
protected final OLoader loader;
protected final OBasicCommandContext context;
protected final OCommandContext context;
protected final OETLProcessor.LOG_LEVELS logLevel;
protected boolean haltOnError = true;
protected final int maxRetries;
Expand Down Expand Up @@ -97,7 +98,7 @@ public OETLPipeline setGraphDatabase(final OrientBaseGraph iGraph) {
return this;
}

public OBasicCommandContext getContext() {
public OCommandContext getContext() {
return context;
}

Expand Down
116 changes: 53 additions & 63 deletions src/main/java/com/orientechnologies/orient/etl/OETLProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.serialization.serializer.OStringSerializerHelper;
import com.orientechnologies.orient.etl.block.OBlock;
import com.orientechnologies.orient.etl.extractor.OExtractor;
import com.orientechnologies.orient.etl.loader.OLoader;
Expand All @@ -46,7 +46,7 @@

/**
* ETL processor class.
*
*
* @author Luca Garulli (l.garulli-at-orientechnologies.com)
*/
public class OETLProcessor {
Expand All @@ -57,7 +57,7 @@ public class OETLProcessor {
protected OExtractor extractor;
protected OLoader loader;
protected List<OTransformer> transformers;
protected OBasicCommandContext context;
protected OCommandContext context;
protected long startTime;
protected long elapsed;
protected OETLProcessorStats stats = new OETLProcessorStats();
Expand All @@ -68,29 +68,9 @@ public class OETLProcessor {
protected int maxRetries = 10;
private Thread[] threads;

public enum LOG_LEVELS {
NONE, ERROR, INFO, DEBUG
}

public class OETLProcessorStats {
public long lastExtractorProgress = 0;
public long lastLoaderProgress = 0;
public long lastLap = 0;
public AtomicLong warnings = new AtomicLong();
public AtomicLong errors = new AtomicLong();

public long incrementWarnings() {
return warnings.incrementAndGet();
}

public long incrementErrors() {
return errors.incrementAndGet();
}
}

/**
* Creates an ETL processor by setting all the components on construction.
*
*
* @param iBeginBlocks
* List of Blocks to execute at the beginning of processing
* @param iSource
Expand All @@ -107,8 +87,7 @@ public long incrementErrors() {
* Execution Context
*/
public OETLProcessor(final List<OBlock> iBeginBlocks, final OSource iSource, final OExtractor iExtractor,
final List<OTransformer> iTransformers, final OLoader iLoader, final List<OBlock> iEndBlocks,
final OBasicCommandContext iContext) {
final List<OTransformer> iTransformers, final OLoader iLoader, final List<OBlock> iEndBlocks, final OCommandContext iContext) {
beginBlocks = iBeginBlocks;
source = iSource;
extractor = iExtractor;
Expand All @@ -123,7 +102,6 @@ public OETLProcessor() {
}

public static void main(final String[] args) {
ODocument cfgGlobal = null;

System.out.println("OrientDB etl v." + OConstants.getVersion() + " " + OConstants.ORIENT_URL);
if (args.length == 0) {
Expand All @@ -132,57 +110,50 @@ public static void main(final String[] args) {
System.exit(1);
}

final OBasicCommandContext context = createDefaultContext();
final OETLProcessor processor = parseConfigAndParameters(args);

processor.execute();
}

ODocument configuration = null;
protected static OETLProcessor parseConfigAndParameters(String[] args) {
final OCommandContext context = createDefaultContext();

ODocument configuration = new ODocument().fromJSON("{}");
for (final String arg : args) {
if (arg.charAt(0) == '-') {
final String[] parts = arg.substring(1).split("=");
context.setVariable(parts[0].toUpperCase(), parts[1]);
} else {
if (arg.charAt(0) != '-') {
try {
final String config = OIOUtils.readFileAsString(new File(arg));
configuration = new ODocument().fromJSON(config, "noMap");
cfgGlobal = configuration.field("config");
ODocument cfgGlobal = configuration.field("config");
if (cfgGlobal != null) {
for (String f : cfgGlobal.fieldNames()) {
context.setVariable(f, cfgGlobal.field(f));
}
}
} catch (IOException e) {
throw new OConfigurationException("Error on loading config file: " + arg);
throw new OConfigurationException("Error on loading config file: " + arg, e);
}
}
}

if (cfgGlobal != null) {
// INIT ThE CONTEXT WITH GLOBAL CONFIGURATION
for (String f : cfgGlobal.fieldNames()) {
context.setVariable(f, cfgGlobal.field(f));
// override with args passes by command line
for (final String arg : args) {
if (arg.charAt(0) == '-') {
final String[] parts = arg.substring(1).split("=");
context.setVariable(parts[0], parts[1]);
}
}

new OETLProcessor().parse(configuration, context).execute();
return new OETLProcessor().parse(configuration, context);
}

protected static OBasicCommandContext createDefaultContext() {
final OBasicCommandContext context = new OBasicCommandContext();
protected static OCommandContext createDefaultContext() {
final OCommandContext context = new OBasicCommandContext();
context.setVariable("dumpEveryMs", 1000);
return context;
}

protected static Collection<ODocument> parseTransformers(final String value) {
final Collection<ODocument> cfgTransformers = new ArrayList<ODocument>();
if (!value.isEmpty()) {
if (value.charAt(0) == '{') {
cfgTransformers.add(new ODocument().fromJSON(value, "noMap"));
} else if (value.charAt(0) == '[') {
final List<String> items = new ArrayList<String>();
OStringSerializerHelper.getCollection(value, 0, items);
for (String item : items)
cfgTransformers.add(new ODocument().fromJSON(item, "noMap"));
}
}
return cfgTransformers;
}

public OETLProcessor parse(final ODocument cfg, final OBasicCommandContext iContext) {
public OETLProcessor parse(final ODocument cfg, final OCommandContext iContext) {
return parse((Collection<ODocument>) cfg.field("begin"), (ODocument) cfg.field("source"), (ODocument) cfg.field("extractor"),
(Collection<ODocument>) cfg.field("transformers"), (ODocument) cfg.field("loader"),
(Collection<ODocument>) cfg.field("end"), iContext);
Expand All @@ -205,12 +176,11 @@ public OETLProcessor parse(final ODocument cfg, final OBasicCommandContext iCont
* List of Block configurations to execute at the end of processing
* @param iContext
* Execution Context
*
* @return Current OETProcessor instance
**/
public OETLProcessor parse(final Collection<ODocument> iBeginBlocks, final ODocument iSource, final ODocument iExtractor,
final Collection<ODocument> iTransformers, final ODocument iLoader, final Collection<ODocument> iEndBlocks,
final OBasicCommandContext iContext) {
final OCommandContext iContext) {
if (iExtractor == null)
throw new IllegalArgumentException("No Extractor configured");

Expand All @@ -228,7 +198,7 @@ public OETLProcessor parse(final Collection<ODocument> iBeginBlocks, final ODocu
final OBlock b = factory.getBlock(name);
beginBlocks.add(b);
configureComponent(b, (ODocument) block.field(name), iContext);
b.execute();
// b.execute();
}

if (iSource != null) {
Expand Down Expand Up @@ -317,7 +287,7 @@ public LOG_LEVELS getLogLevel() {
return logLevel;
}

public OBasicCommandContext getContext() {
public OCommandContext getContext() {
return context;
}

Expand Down Expand Up @@ -495,7 +465,7 @@ protected void executeSequentially() {
}
}

protected void configureComponent(final OETLComponent iComponent, final ODocument iCfg, final OBasicCommandContext iContext) {
protected void configureComponent(final OETLComponent iComponent, final ODocument iCfg, final OCommandContext iContext) {
iComponent.configure(this, iCfg, iContext);
}

Expand Down Expand Up @@ -628,4 +598,24 @@ protected void init() {
}
}
}

public enum LOG_LEVELS {
NONE, ERROR, INFO, DEBUG
}

public class OETLProcessorStats {
public long lastExtractorProgress = 0;
public long lastLoaderProgress = 0;
public long lastLap = 0;
public AtomicLong warnings = new AtomicLong();
public AtomicLong errors = new AtomicLong();

public long incrementWarnings() {
return warnings.incrementAndGet();
}

public long incrementErrors() {
return errors.incrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.orientechnologies.orient.etl.block;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.etl.OAbstractETLComponent;

/**
Expand All @@ -33,7 +33,7 @@ public Object execute() {
}

@Override
public void setContext(final OBasicCommandContext iContext) {
public void setContext(final OCommandContext iContext) {
context = iContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.orientechnologies.orient.etl.block;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.etl.OETLComponent;

/**
Expand All @@ -27,5 +27,5 @@
public interface OBlock extends OETLComponent {
Object execute();

void setContext(OBasicCommandContext context);
void setContext(OCommandContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.orientechnologies.orient.etl.block;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.command.script.OCommandExecutorScript;
import com.orientechnologies.orient.core.command.script.OCommandScript;
import com.orientechnologies.orient.core.record.impl.ODocument;
Expand All @@ -43,7 +43,7 @@ public ODocument getConfiguration() {
}

@Override
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, OBasicCommandContext iContext) {
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, OCommandContext iContext) {
super.configure(iProcessor, iConfiguration, iContext);
if (iConfiguration.containsField("language"))
language = iConfiguration.field("language");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package com.orientechnologies.orient.etl.block;

import com.orientechnologies.orient.console.OConsoleDatabaseApp;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.OETLProcessor;
Expand All @@ -42,7 +42,7 @@ public ODocument getConfiguration() {
}

@Override
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, OBasicCommandContext iContext) {
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, OCommandContext iContext) {
super.configure(iProcessor, iConfiguration, iContext);
if (iConfiguration.containsField("file"))
file = iConfiguration.field("file");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.orientechnologies.orient.etl.block;

import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.filter.OSQLFilter;
import com.orientechnologies.orient.etl.OETLProcessor;
Expand All @@ -36,7 +36,7 @@ public ODocument getConfiguration() {
}

@Override
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, final OBasicCommandContext iContext) {
public void configure(OETLProcessor iProcessor, final ODocument iConfiguration, final OCommandContext iContext) {
super.configure(iProcessor, iConfiguration, iContext);

name = iConfiguration.field("name");
Expand Down
Loading

0 comments on commit 1323842

Please sign in to comment.