Skip to content

Commit

Permalink
Merge pull request apache#5 from foryou2030/onePass_lionx
Browse files Browse the repository at this point in the history
One pass lionx
  • Loading branch information
lion-x authored Nov 22, 2016
2 parents 9c4ced1 + e5e657f commit b6b2ea9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) thr
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
DictionaryKey key = (DictionaryKey) e.getMessage();
System.out.println(key.getData());
dictKeyQueue.offer(key);
super.messageReceived(ctx, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class IncrementalColumnDictionaryGenerator

private CarbonDimension dimension;

private CarbonDictionaryWriter dictionaryWriter = null;

public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxValue) {
this.maxDictionary = maxValue;
this.dimension = dimension;
Expand Down Expand Up @@ -100,6 +102,7 @@ public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxVa
if (dict == null) {
dict = ++maxDictionary;
incrementalCache.put(value, dict);
reverseIncrementalCache.put(dict, value);
}
return dict;
}
Expand Down Expand Up @@ -137,6 +140,8 @@ public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxVa
// write sort index
writeSortIndex(distinctValues, dictionary,
dictionaryService, tableIdentifier, columnIdentifier, storePath);
// update Meta Data
updateMetaData();
}

/**
Expand Down Expand Up @@ -175,7 +180,7 @@ private List<String> writeDictionary(Dictionary dictionary,
String storePath,
CarbonTablePath carbonTablePath) throws IOException {
List<String> distinctValues = new ArrayList<>();
CarbonDictionaryWriter dictionaryWriter = null;
// CarbonDictionaryWriter dictionaryWriter = null;
try {
dictionaryWriter = dictionaryService
.getDictionaryWriter(dictIdentifier.getCarbonTableIdentifier(),
Expand All @@ -184,24 +189,27 @@ private List<String> writeDictionary(Dictionary dictionary,
dictionaryWriter.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL);
}
if (incrementalCache.size() > 0) {
// TODO: map need sort first
for (String key : incrementalCache.keySet()) {
reverseIncrementalCache.put(incrementalCache.get(key), key);
}
for (int i = 2; i < reverseIncrementalCache.size() + 2; i++) {
String parsedValue = DataTypeUtil
.normalizeColumnValueForItsDataType(reverseIncrementalCache.get(i), dimension);
if (null != parsedValue) {
if (!isDictionaryExists(dictIdentifier, carbonTablePath)) {
// write value to dictionary file
if (reverseIncrementalCache.size() >= 1) {
if (isDictionaryExists(dictIdentifier, carbonTablePath)) {
for (int i = 2; i < reverseIncrementalCache.size() + 2; i++) {
String value = reverseIncrementalCache.get(i);
String parsedValue = DataTypeUtil
.normalizeColumnValueForItsDataType(value, dimension);
if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
CarbonCommonConstants.INVALID_SURROGATE_KEY) {
dictionaryWriter.write(parsedValue);
distinctValues.add(parsedValue);
}
}
} else {
for (int i = 2; i < reverseIncrementalCache.size() + 2; i++) {
String value = reverseIncrementalCache.get(i);
String parsedValue = DataTypeUtil
.normalizeColumnValueForItsDataType(value, dimension);
if (null != parsedValue) {
dictionaryWriter.write(parsedValue);
distinctValues.add(parsedValue);
} else {
if (dictionary.getSurrogateKey(parsedValue) ==
CarbonCommonConstants.INVALID_SURROGATE_KEY) {
dictionaryWriter.write(parsedValue);
distinctValues.add(parsedValue);
}
}
}
}
Expand Down Expand Up @@ -256,4 +264,13 @@ private void writeSortIndex(List<String> distinctValues,
}
}
}

/**
* update dictionary metadata
*/
private void updateMetaData() throws IOException{
if (null != dictionaryWriter) {
dictionaryWriter.commit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ object CarbonExample {
// in each node to do data loading, another uses a multi-thread framework without Kettle (See
// AbstractDataLoadProcessorStep)
// Load data with Kettle
// cc.sql(s"""
// LOAD DATA LOCAL INPATH '$testData' into table t3
// """)
//
// // Perform a query
// cc.sql("""
// SELECT country, count(salary) AS amount
// FROM t3
// WHERE country IN ('china','france')
// GROUP BY country
// """).show()
cc.sql(s"""
LOAD DATA LOCAL INPATH '$testData' into table t3
""")

// Perform a query
cc.sql("""
SELECT country, count(salary) AS amount
FROM t3
WHERE country IN ('china','france')
GROUP BY country
""").show()

// Load data without kettle
cc.sql(s"""
LOAD DATA LOCAL INPATH '$testData' into table t3
OPTIONS('USE_KETTLE'='false', 'USEONEPASS'='true')
OPTIONS('USE_KETTLE'='false')
""")

// Perform a query
Expand Down

0 comments on commit b6b2ea9

Please sign in to comment.