Skip to content

Commit

Permalink
refactors of ETL, first steps to improve logging and error handling
Browse files Browse the repository at this point in the history
- removes system out in favor of OLogManager
- adds specialized logging config file to etl scripts (bat/sh)
- renames classes adding OETL prefix to all
- refactors tests to use one in memory db for each test method

refs #6872
  • Loading branch information
robfrank committed Dec 28, 2016
1 parent a317b8f commit 5e69391
Show file tree
Hide file tree
Showing 73 changed files with 606 additions and 477 deletions.
4 changes: 4 additions & 0 deletions distribution/src/main/assembly/archive.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@
<directory>${basedir}/../server/config</directory>
<outputDirectory>config</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../etl/config</directory>
<outputDirectory>config</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../tools/config</directory>
<outputDirectory>config</outputDirectory>
Expand Down
46 changes: 46 additions & 0 deletions etl/config/orientdb-etl-log.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# /*
# * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
# *
# * Licensed under the Apache License, Version 2.0 (the "License");
# * you may not use this file except in compliance with the License.
# * You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# *
# * For more information: http://www.orientechnologies.com
# */
#

# Specify the handlers to create in the root logger
# (all loggers are children of the root logger)
# The following creates two handlers
handlers = java.util.logging.ConsoleHandler, java.util.logging.FileHandler

# Set the default logging level for the root logger
.level = INFO
com.orientechnologies.level = INFO
com.orientechnologies.orient.server.distributed.level = INFO

# Set the default logging level for new ConsoleHandler instances
java.util.logging.ConsoleHandler.level = INFO
# Set the default formatter for new ConsoleHandler instances
java.util.logging.ConsoleHandler.formatter = com.orientechnologies.common.log.OAnsiLogFormatter

# Set the default logging level for new FileHandler instances
java.util.logging.FileHandler.level = INFO
# Naming style for the output file
java.util.logging.FileHandler.pattern=../log/orient-etl.log
# Set the default formatter for new FileHandler instances
java.util.logging.FileHandler.formatter = com.orientechnologies.common.log.OLogFormatter
# Limiting size of output file in bytes:
java.util.logging.FileHandler.limit=10000000
# Number of output files to cycle through, by appending an
# integer to the base file name:
java.util.logging.FileHandler.count=10
4 changes: 3 additions & 1 deletion etl/script/oetl.bat
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ goto END
set JAVA_MAX_DIRECT=-XX:MaxDirectMemorySize=512g
:END

if NOT exist "%CONFIG_FILE%" set CONFIG_FILE=%ORIENTDB_HOME%/config/orientdb-etl-config.xml


set KEYSTORE=%ORIENTDB_HOME%\config\cert\orientdb-console.ks
set KEYSTORE_PASS=password
Expand All @@ -45,6 +47,6 @@ set TRUSTSTORE_PASS=password
set SSL_OPTS="-Dclient.ssl.enabled=false -Djavax.net.ssl.keyStore=%KEYSTORE% -Djavax.net.ssl.keyStorePassword=%KEYSTORE_PASS% -Djavax.net.ssl.trustStore=%TRUSTSTORE% -Djavax.net.ssl.trustStorePassword=%TRUSTSTORE_PASS%"

set ORIENTDB_SETTINGS=%JAVA_MAX_DIRECT% -Xmx512m -Djava.util.logging.config.file="%ORIENTDB_HOME%\config\orientdb-client-log.properties" -Djava.awt.headless=true
call %JAVA% -server %SSL_OPTS% %ORIENTDB_SETTINGS% -Dfile.encoding=utf-8 -Dorientdb.build.number="@BUILD@" -cp "%ORIENTDB_HOME%\lib\*;" com.orientechnologies.orient.etl.OETLProcessor %CMD_LINE_ARGS%
call %JAVA% -server %SSL_OPTS% %ORIENTDB_SETTINGS% -Djava.util.logging.config.file="%LOG_FILE%" -Dfile.encoding=utf-8 -Dorientdb.build.number="@BUILD@" -cp "%ORIENTDB_HOME%\lib\*;" com.orientechnologies.orient.etl.OETLProcessor %CMD_LINE_ARGS%

:end
12 changes: 11 additions & 1 deletion etl/script/oetl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ PRGDIR=`dirname "$PRG"`
[ -f "$ORIENTDB_HOME"/lib/orientdb-etl-@VERSION@.jar ] || ORIENTDB_HOME=`cd "$PRGDIR/.." ; pwd`
export ORIENTDB_HOME

if [ -z "$ORIENTDB_ETL_LOG_CONF" ] ; then
ORIENTDB_ETL_LOG_CONF=$ORIENTDB_HOME/config/orientdb-etl-log.properties
fi


# Set JavaHome if it exists
if [ -f "${JAVA_HOME}/bin/java" ]; then
JAVA=${JAVA_HOME}/bin/java
Expand All @@ -42,4 +47,9 @@ TRUSTSTORE=$ORIENTDB_HOME/config/cert/orientdb-console.ts
TRUSTSTORE_PASS=password
SSL_OPTS="-Dclient.ssl.enabled=false -Djavax.net.ssl.keyStore=$KEYSTORE -Djavax.net.ssl.keyStorePassword=$KEYSTORE_PASS -Djavax.net.ssl.trustStore=$TRUSTSTORE -Djavax.net.ssl.trustStorePassword=$TRUSTSTORE_PASS"

$JAVA -server $JAVA_OPTS $ORIENTDB_SETTINGS $SSL_OPTS -Dfile.encoding=utf-8 -Dorientdb.build.number="@BUILD@" -cp "$ORIENTDB_HOME/lib/*" com.orientechnologies.orient.etl.OETLProcessor $*
$JAVA -server $JAVA_OPTS \
$ORIENTDB_SETTINGS \
$SSL_OPTS \
-Djava.util.logging.config.file="$ORIENTDB_ETL_LOG_CONF" \
-Dfile.encoding=utf-8 -Dorientdb.build.number="@BUILD@" \
-cp "$ORIENTDB_HOME/lib/*" com.orientechnologies.orient.etl.OETLProcessor $*
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@

package com.orientechnologies.orient.etl;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.common.parser.OVariableParser;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.sql.filter.OSQLFilter;
import com.orientechnologies.orient.core.sql.filter.OSQLPredicate;
import com.orientechnologies.orient.etl.OETLProcessor.LOG_LEVELS;

import java.util.logging.Level;

import static com.orientechnologies.common.parser.OSystemVariableResolver.VAR_BEGIN;
import static com.orientechnologies.common.parser.OSystemVariableResolver.VAR_END;

/**
* ETL abstract component.
*/
public abstract class OAbstractETLComponent implements OETLComponent {
public abstract class OETLAbstractComponent implements OETLComponent {
protected OETLProcessor processor;
protected OCommandContext context;
protected LOG_LEVELS logLevel = LOG_LEVELS.INFO;
protected Level logLevel = Level.INFO;
protected String output;
protected String ifExpression;
protected ODocument configuration;
Expand All @@ -56,7 +58,7 @@ public void configure(final ODocument iConfiguration, final OCommandContext iCon
@Override
public void begin() {
if (configuration.containsField("log"))
logLevel = LOG_LEVELS.valueOf(configuration.field("log").toString().toUpperCase());
logLevel = Level.parse(configuration.field("log").toString().toUpperCase());
else
logLevel = processor.getLogLevel();

Expand Down Expand Up @@ -91,7 +93,7 @@ protected boolean skip(final Object input) {
if (ifFilter != null) {
final ODocument doc = input instanceof OIdentifiable ? (ODocument) ((OIdentifiable) input).getRecord() : null;

log(LOG_LEVELS.DEBUG, "Evaluating conditional expression if=%s...", ifFilter);
log(Level.FINE, "Evaluating conditional expression if=%s...", ifFilter);

final Object result = ifFilter.evaluate(doc, null, context);
if (!(result instanceof Boolean))
Expand All @@ -109,15 +111,23 @@ protected OSQLFilter getIfFilter() {
return null;
}

protected void log(final LOG_LEVELS iLevel, String iText, final Object... iArgs) {
if (logLevel.ordinal() >= iLevel.ordinal()) {
final Long extractedNum = context != null ? (Long) context.getVariable("extractedNum") : null;
if (extractedNum != null)
System.out.println("[" + extractedNum + ":" + getName() + "] " + iLevel + " " + String.format(iText, iArgs));
else
System.out.println("[" + getName() + "] " + iLevel + " " + String.format(iText, iArgs));
protected void log(final Level iLevel, String iText, final Object... iArgs) {
// if (logLevel.ordinal() >= iLevel.ordinal()) {
final Long extractedNum = context != null ? (Long) context.getVariable("extractedNum") : null;
if (extractedNum != null) {
// System.out.println("[" + extractedNum + ":" + getName() + "] " + iLevel + " " + String.format(iText, iArgs));

OLogManager.instance()
.log(this, iLevel, "[" + extractedNum + ":" + getName() + "] " + iLevel + " " + iText, null, iArgs);

} else {
// System.out.println("[" + getName() + "] " + iLevel + " " + String.format(iText, iArgs));

OLogManager.instance()
.log(this, iLevel, "[" + getName() + "] " + iLevel + " " + iText, null, iArgs);
}

// }
}

protected String stringArray2Json(final Object[] iObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* ETL Pipeline abstract component.
*/
public abstract class OAbstractETLPipelineComponent extends OAbstractETLComponent implements OETLPipelineComponent {
public abstract class OETLAbstractPipelineComponent extends OETLAbstractComponent implements OETLPipelineComponent {
protected OETLDatabaseProvider databaseProvider;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package com.orientechnologies.orient.etl;

import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.etl.block.OBlock;
import com.orientechnologies.orient.etl.block.OCodeBlock;
import com.orientechnologies.orient.etl.block.OConsoleBlock;
import com.orientechnologies.orient.etl.block.OLetBlock;
import com.orientechnologies.orient.etl.block.OETLBlock;
import com.orientechnologies.orient.etl.block.OETLCodeBlock;
import com.orientechnologies.orient.etl.block.OETLConsoleBlock;
import com.orientechnologies.orient.etl.block.OETLLetBlock;
import com.orientechnologies.orient.etl.extractor.*;
import com.orientechnologies.orient.etl.loader.OLoader;
import com.orientechnologies.orient.etl.loader.OOrientDBLoader;
import com.orientechnologies.orient.etl.loader.OOutputLoader;
import com.orientechnologies.orient.etl.loader.OETLLoader;
import com.orientechnologies.orient.etl.loader.OETLOrientDBLoader;
import com.orientechnologies.orient.etl.loader.OETLOutputLoader;
import com.orientechnologies.orient.etl.source.*;
import com.orientechnologies.orient.etl.transformer.*;

Expand All @@ -39,45 +39,45 @@
* @author Luca Garulli (l.garulli--(at)--orientdb.com) (l.garulli-at-orientdb.com)
*/
public class OETLComponentFactory {
protected final Map<String, Class<? extends OSource>> sources = new HashMap<String, Class<? extends OSource>>();
protected final Map<String, Class<? extends OBlock>> blocks = new HashMap<String, Class<? extends OBlock>>();
protected final Map<String, Class<? extends OExtractor>> extractors = new HashMap<String, Class<? extends OExtractor>>();
protected final Map<String, Class<? extends OTransformer>> transformers = new HashMap<String, Class<? extends OTransformer>>();
protected final Map<String, Class<? extends OLoader>> loaders = new HashMap<String, Class<? extends OLoader>>();
protected final Map<String, Class<? extends OETLSource>> sources = new HashMap<String, Class<? extends OETLSource>>();
protected final Map<String, Class<? extends OETLBlock>> blocks = new HashMap<String, Class<? extends OETLBlock>>();
protected final Map<String, Class<? extends OETLExtractor>> extractors = new HashMap<String, Class<? extends OETLExtractor>>();
protected final Map<String, Class<? extends OETLTransformer>> transformers = new HashMap<String, Class<? extends OETLTransformer>>();
protected final Map<String, Class<? extends OETLLoader>> loaders = new HashMap<String, Class<? extends OETLLoader>>();

public OETLComponentFactory() {
registerSource(OFileSource.class);
registerSource(OHttpSource.class);
registerSource(OInputSource.class);
registerSource(OContentSource.class);

registerBlock(OCodeBlock.class);
registerBlock(OLetBlock.class);
registerBlock(OConsoleBlock.class);

registerExtractor(OJDBCExtractor.class);
registerExtractor(ORowExtractor.class);
registerExtractor(OJsonExtractor.class);
registerExtractor(OXmlExtractor.class);
registerExtractor(OCSVExtractor.class);

registerTransformer(OBlockTransformer.class);
registerTransformer(OCodeTransformer.class);
registerTransformer(OCommandTransformer.class);
registerTransformer(OEdgeTransformer.class);
registerTransformer(OFieldTransformer.class);
registerTransformer(OJSONTransformer.class);
registerTransformer(OLinkTransformer.class);
registerTransformer(OLogTransformer.class);
registerTransformer(OMergeTransformer.class);
registerTransformer(OFlowTransformer.class);
registerTransformer(OVertexTransformer.class);

registerLoader(OOrientDBLoader.class);
registerLoader(OOutputLoader.class);
registerSource(OETLFileSource.class);
registerSource(OETLHttpSource.class);
registerSource(OETLInputSource.class);
registerSource(OETLContentSource.class);

registerBlock(OETLCodeBlock.class);
registerBlock(OETLLetBlock.class);
registerBlock(OETLConsoleBlock.class);

registerExtractor(OETLJDBCExtractor.class);
registerExtractor(OETLRowExtractor.class);
registerExtractor(OETLJsonExtractor.class);
registerExtractor(OETLXmlExtractor.class);
registerExtractor(OETLCSVExtractor.class);

registerTransformer(OETLBlockTransformer.class);
registerTransformer(OETLCodeTransformer.class);
registerTransformer(OETLCommandTransformer.class);
registerTransformer(OETLEdgeTransformer.class);
registerTransformer(OETLFieldTransformer.class);
registerTransformer(OETLJSONTransformer.class);
registerTransformer(OETLLinkTransformer.class);
registerTransformer(OETLLogTransformer.class);
registerTransformer(OETLMergeTransformer.class);
registerTransformer(OETLFlowTransformer.class);
registerTransformer(OETLVertexTransformer.class);

registerLoader(OETLOrientDBLoader.class);
registerLoader(OETLOutputLoader.class);
}

public OETLComponentFactory registerSource(final Class<? extends OSource> iComponent) {
public OETLComponentFactory registerSource(final Class<? extends OETLSource> iComponent) {
try {
sources.put(iComponent.newInstance().getName(), iComponent);
} catch (Exception e) {
Expand All @@ -86,7 +86,7 @@ public OETLComponentFactory registerSource(final Class<? extends OSource> iCompo
return this;
}

public OETLComponentFactory registerBlock(final Class<? extends OBlock> iComponent) {
public OETLComponentFactory registerBlock(final Class<? extends OETLBlock> iComponent) {
try {
blocks.put(iComponent.newInstance().getName(), iComponent);
} catch (Exception e) {
Expand All @@ -95,7 +95,7 @@ public OETLComponentFactory registerBlock(final Class<? extends OBlock> iCompone
return this;
}

public OETLComponentFactory registerExtractor(final Class<? extends OExtractor> iComponent) {
public OETLComponentFactory registerExtractor(final Class<? extends OETLExtractor> iComponent) {
try {
extractors.put(iComponent.newInstance().getName(), iComponent);
} catch (Exception e) {
Expand All @@ -104,7 +104,7 @@ public OETLComponentFactory registerExtractor(final Class<? extends OExtractor>
return this;
}

public OETLComponentFactory registerTransformer(final Class<? extends OTransformer> iComponent) {
public OETLComponentFactory registerTransformer(final Class<? extends OETLTransformer> iComponent) {
try {
transformers.put(iComponent.newInstance().getName(), iComponent);
} catch (Exception e) {
Expand All @@ -113,7 +113,7 @@ public OETLComponentFactory registerTransformer(final Class<? extends OTransform
return this;
}

public OETLComponentFactory registerLoader(final Class<? extends OLoader> iComponent) {
public OETLComponentFactory registerLoader(final Class<? extends OETLLoader> iComponent) {
try {
loaders.put(iComponent.newInstance().getName(), iComponent);
} catch (Exception e) {
Expand All @@ -122,36 +122,36 @@ public OETLComponentFactory registerLoader(final Class<? extends OLoader> iCompo
return this;
}

public OExtractor getExtractor(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OExtractor> cls = extractors.get(iName);
public OETLExtractor getExtractor(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OETLExtractor> cls = extractors.get(iName);
if (cls == null)
throw new IllegalArgumentException("Extractor '" + iName + "' not found");
return cls.newInstance();
}

public OTransformer getTransformer(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OTransformer> cls = transformers.get(iName);
public OETLTransformer getTransformer(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OETLTransformer> cls = transformers.get(iName);
if (cls == null)
throw new IllegalArgumentException("Transformer '" + iName + "' not found");
return cls.newInstance();
}

public OBlock getBlock(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OBlock> cls = blocks.get(iName);
public OETLBlock getBlock(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OETLBlock> cls = blocks.get(iName);
if (cls == null)
throw new IllegalArgumentException("Block '" + iName + "' not found");
return cls.newInstance();
}

public OLoader getLoader(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OLoader> cls = loaders.get(iName);
public OETLLoader getLoader(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OETLLoader> cls = loaders.get(iName);
if (cls == null)
throw new IllegalArgumentException("Loader '" + iName + "' not found");
return cls.newInstance();
}

public OSource getSource(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OSource> cls = sources.get(iName);
public OETLSource getSource(final String iName) throws IllegalAccessException, InstantiationException {
final Class<? extends OETLSource> cls = sources.get(iName);
if (cls == null)
throw new IllegalArgumentException("Source '" + iName + "' not found");
return cls.newInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@
/**
* Immutable Object representing extracted item.
*/
public class OExtractedItem {
public class OETLExtractedItem {
public final long num;
public final Object payload;
public final boolean finished;

public OExtractedItem(final long iCurrent, final Object iPayload) {
public OETLExtractedItem(final long iCurrent, final Object iPayload) {
num = iCurrent;
payload = iPayload;
finished = false;
}

public OExtractedItem(boolean iFinished) {
public OETLExtractedItem(boolean iFinished) {
num = 0;
payload = null;
finished = iFinished;
Expand Down
Loading

0 comments on commit 5e69391

Please sign in to comment.