Skip to content

Commit

Permalink
[CARBONDATA-842]implement CarbonRowDataWriterProcessorStepImpl for no…
Browse files Browse the repository at this point in the history
… SORT_COLUMN This closes #741
  • Loading branch information
jackylk committed Apr 7, 2017
2 parents 5783718 + 9c9af81 commit 12bf455
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
boolean updateLockStatus = false;
boolean tableLockStatus = false;

String timestamp = carbonLoadModel.getFactTimeStamp();
String timestamp = "" + carbonLoadModel.getFactTimeStamp();

List<String> updatedDeltaFilesList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class CarbonLoadModel implements Serializable {
/**
* new load start time
*/
private String factTimeStamp;
private long factTimeStamp;
/**
* load Id
*/
Expand Down Expand Up @@ -590,15 +590,15 @@ public void setTaskNo(String taskNo) {
/**
* @return
*/
public String getFactTimeStamp() {
public long getFactTimeStamp() {
return factTimeStamp;
}

/**
* @param factTimeStamp
*/
public void setFactTimeStamp(long factTimeStamp) {
this.factTimeStamp = factTimeStamp + "";
this.factTimeStamp = factTimeStamp;
}

public String[] getDelimiters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.NoSortProcessorStepImpl;
import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;

Expand All @@ -59,7 +59,9 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String sto
CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
CarbonDataLoadConfiguration configuration =
createConfiguration(loadModel, storeLocation);
if (configuration.getBucketingInfo() != null) {
if (!configuration.isSortTable()) {
return buildInternalForNoSort(inputIterators, configuration);
} else if (configuration.getBucketingInfo() != null) {
return buildInternalForBucketing(inputIterators, configuration);
} else if (batchSort) {
return buildInternalForBatchSort(inputIterators, configuration);
Expand All @@ -77,16 +79,30 @@ private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterat
// data types and configurations.
AbstractDataLoadProcessorStep converterProcessorStep =
new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
// 3. Sorts the data by SortColumn or not
AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
new SortProcessorStepImpl(configuration, converterProcessorStep) :
new NoSortProcessorStepImpl(configuration, converterProcessorStep);
// 3. Sorts the data by SortColumn
AbstractDataLoadProcessorStep sortProcessorStep =
new SortProcessorStepImpl(configuration, converterProcessorStep);
// 4. Writes the sorted data in carbondata format.
AbstractDataLoadProcessorStep writerProcessorStep =
new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
return writerProcessorStep;
}

private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators,
CarbonDataLoadConfiguration configuration) {
// 1. Reads the data input iterators and parses the data.
AbstractDataLoadProcessorStep inputProcessorStep =
new InputProcessorStepImpl(configuration, inputIterators);
// 2. Converts the data like dictionary or non dictionary or complex objects depends on
// data types and configurations.
AbstractDataLoadProcessorStep converterProcessorStep =
new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
// 3. Writes the sorted data in carbondata format.
AbstractDataLoadProcessorStep writerProcessorStep =
new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
return writerProcessorStep;
}

private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
CarbonDataLoadConfiguration configuration) {
// 1. Reads the data input iterators and parses the data.
Expand All @@ -97,9 +113,8 @@ private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[]
AbstractDataLoadProcessorStep converterProcessorStep =
new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
// 3. Sorts the data by SortColumn or not
AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
new SortProcessorStepImpl(configuration, converterProcessorStep) :
new NoSortProcessorStepImpl(configuration, converterProcessorStep);
AbstractDataLoadProcessorStep sortProcessorStep =
new SortProcessorStepImpl(configuration, converterProcessorStep);
// 4. Writes the sorted data in carbondata format.
AbstractDataLoadProcessorStep writerProcessorStep =
new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
Expand All @@ -116,9 +131,8 @@ private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[]
AbstractDataLoadProcessorStep converterProcessorStep =
new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
// 3. Sorts the data by SortColumn or not
AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ?
new SortProcessorStepImpl(configuration, converterProcessorStep) :
new NoSortProcessorStepImpl(configuration, converterProcessorStep);
AbstractDataLoadProcessorStep sortProcessorStep =
new SortProcessorStepImpl(configuration, converterProcessorStep);
// 4. Writes the sorted data in carbondata format.
AbstractDataLoadProcessorStep writerProcessorStep =
new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
Expand Down
Loading

0 comments on commit 12bf455

Please sign in to comment.