Skip to content

Commit

Permalink
rebased with master
Browse files Browse the repository at this point in the history
  • Loading branch information
anubhav100 committed May 28, 2017
1 parent ad3d24e commit a8138ba
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.carbondata.hive;


import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
Expand All @@ -41,7 +40,11 @@
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
Expand All @@ -61,7 +64,7 @@ public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
private CarbonObjectInspector objInspector;

public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
InputSplit inputSplit, JobConf jobConf) throws IOException {
InputSplit inputSplit, JobConf jobConf) throws IOException {
super(queryModel, readSupport);
initialize(inputSplit, jobConf);
}
Expand All @@ -78,16 +81,16 @@ public void initialize(InputSplit inputSplit, Configuration conf) throws IOExcep
}
List<TableBlockInfo> tableBlockInfoList = CarbonHiveInputSplit.createBlocks(splitList);
queryModel.setTableBlockInfos(tableBlockInfoList);
readSupport.initialize(queryModel.getProjectionColumns(),
queryModel.getAbsoluteTableIdentifier());
readSupport
.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
try {
carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
} catch (QueryExecutionException e) {
throw new IOException(e.getMessage(), e.getCause());
}
if (valueObj == null) {
valueObj = new ArrayWritable(Writable.class,
new Writable[queryModel.getProjectionColumns().length]);
valueObj =
new ArrayWritable(Writable.class, new Writable[queryModel.getProjectionColumns().length]);
}

final TypeInfo rowTypeInfo;
Expand All @@ -108,20 +111,23 @@ public void initialize(InputSplit inputSplit, Configuration conf) throws IOExcep
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
if (!colIds.equals("")) {
String[] arraySelectedColId = colIds.split(",");
List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();

String[] arraySelectedColId = colIds.split(",");
List<TypeInfo> reqColTypes = new ArrayList<TypeInfo>();

for (String anArrayColId : arraySelectedColId) {
reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
for (String anArrayColId : arraySelectedColId) {
reqColTypes.add(columnTypes.get(Integer.parseInt(anArrayColId)));
}
// Create row related objects
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
} else {
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}
// Create row related objects
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, reqColTypes);
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
}

@Override
public boolean next(Void aVoid, ArrayWritable value) throws IOException {
@Override public boolean next(Void aVoid, ArrayWritable value) throws IOException {
if (carbonIterator.hasNext()) {
Object obj = readSupport.readRow(carbonIterator.next());
ArrayWritable tmpValue = null;
Expand All @@ -138,11 +144,12 @@ public boolean next(Void aVoid, ArrayWritable value) throws IOException {
System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
} else {
if (arrValue.length != arrCurrent.length) {
throw new IOException("CarbonHiveInput : size of object differs. Value" +
" size : " + arrValue.length + ", Current Object size : " + arrCurrent.length);
throw new IOException(
"CarbonHiveInput : size of object differs. Value" + " size : " + arrValue.length
+ ", Current Object size : " + arrCurrent.length);
} else {
throw new IOException("CarbonHiveInput can not support RecordReaders that" +
" don't return same key & value & value is null");
throw new IOException("CarbonHiveInput can not support RecordReaders that"
+ " don't return same key & value & value is null");
}
}
}
Expand All @@ -156,23 +163,19 @@ public ArrayWritable createArrayWritable(Object obj) throws SerDeException {
return createStruct(obj, objInspector);
}

@Override
public Void createKey() {
@Override public Void createKey() {
return null;
}

@Override
public ArrayWritable createValue() {
@Override public ArrayWritable createValue() {
return valueObj;
}

@Override
public long getPos() throws IOException {
@Override public long getPos() throws IOException {
return 0;
}

@Override
public float getProgress() throws IOException {
@Override public float getProgress() throws IOException {
return 0;
}

Expand All @@ -190,7 +193,7 @@ public ArrayWritable createStruct(Object obj, StructObjectInspector inspector)
}

private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
throws SerDeException {
throws SerDeException {
List sourceArray = inspector.getList(obj);
ObjectInspector subInspector = inspector.getListElementObjectInspector();
List array = new ArrayList();
Expand All @@ -208,7 +211,7 @@ private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(),
(Writable[]) array.toArray(new Writable[array.size()]));

return new ArrayWritable(Writable.class, new Writable[]{subArray});
return new ArrayWritable(Writable.class, new Writable[] { subArray });
}
return null;
}
Expand Down Expand Up @@ -236,8 +239,8 @@ private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector)
case STRING:
return new Text(obj.toString());
case DECIMAL:
return new HiveDecimalWritable(HiveDecimal.create(
((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
return new HiveDecimalWritable(
HiveDecimal.create(((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
}
throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
}
Expand All @@ -253,4 +256,4 @@ private Writable createObject(Object obj, ObjectInspector inspector) throws SerD
}
throw new SerDeException("Unknown data type" + inspector.getCategory());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void initialize(@Nullable Configuration configuration, Properties tbl)
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
if (colIds != null) {
if (colIds != null && !colIds.equals("")) {
reqColNames = new ArrayList<String>();

String[] arraySelectedColId = colIds.split(",");
Expand All @@ -114,7 +114,8 @@ public void initialize(@Nullable Configuration configuration, Properties tbl)
// Create row related objects
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(reqColNames, reqColTypes);
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
} else {
}
else {
// Create row related objects
rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
Expand All @@ -135,7 +136,7 @@ public Class<? extends Writable> getSerializedClass() {
public Writable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException {
if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) {
throw new SerDeException("Cannot serialize " + objInspector.getCategory()
+ ". Can only serialize a struct");
+ ". Can only serialize a struct");
}
serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size();
status = LAST_OPERATION.SERIALIZE;
Expand All @@ -156,7 +157,7 @@ public ArrayWritable createStruct(Object obj, StructObjectInspector inspector)
}

private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
throws SerDeException {
throws SerDeException {
List sourceArray = inspector.getList(obj);
ObjectInspector subInspector = inspector.getListElementObjectInspector();
List array = new ArrayList();
Expand All @@ -180,7 +181,7 @@ private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
}

private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector)
throws SerDeException {
throws SerDeException {
if (obj == null) {
return null;
}
Expand Down Expand Up @@ -246,4 +247,4 @@ public Object deserialize(Writable writable) throws SerDeException {
public ObjectInspector getObjectInspector() throws SerDeException {
return objInspector;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
Expand All @@ -41,28 +43,26 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;


public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {

@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
@Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
InputSplit[] splits = new InputSplit[splitList.size()];
CarbonInputSplit split = null;
for (int i = 0; i < splitList.size(); i++) {
split = (CarbonInputSplit) splitList.get(i);
splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(),
split.getStart(), split.getLength(), split.getLocations(),
split.getNumberOfBlocklets(), split.getVersion(), split.getBlockStorageIdMap());
splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(), split.getStart(),
split.getLength(), split.getLocations(), split.getNumberOfBlocklets(), split.getVersion(),
split.getBlockStorageIdMap());
}
return splits;
}

@Override
public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
Reporter reporter) throws IOException {
Reporter reporter) throws IOException {
QueryModel queryModel = getQueryModel(jobConf);
CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
Expand All @@ -72,17 +72,39 @@ public QueryModel getQueryModel(Configuration configuration) throws IOException
CarbonTable carbonTable = getCarbonTable(configuration);
// getting the table absoluteTableIdentifier from the carbonTable
// to avoid unnecessary deserialization

String colNames = "";
AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();

// query plan includes projection column

String projection = getColumnProjection(configuration);
if (projection == null) {
projection = configuration.get("hive.io.file.readcolumn.names");
}
if (projection.equals("")) {
List<CarbonDimension> carbonDimensionList = carbonTable.getAllDimensions();
List<CarbonMeasure> carbonMeasureList = carbonTable.getAllMeasures();

for (CarbonDimension aCarbonDimensionList : carbonDimensionList) {
colNames = (colNames + (aCarbonDimensionList.getColName())) + ",";
}
if (carbonMeasureList.size() < 1) {
colNames = colNames.substring(0, colNames.lastIndexOf(","));
}
for (int index = 0; index < carbonMeasureList.size(); index++) {
if (!carbonMeasureList.get(index).getColName().equals("default_dummy_measure")) {
if (index == carbonMeasureList.size() - 1) {
colNames = (colNames + (carbonMeasureList.get(index).getColName()));
} else {
colNames = (colNames + (carbonMeasureList.get(index).getColName())) + ",";
}
}
}
projection = colNames.trim();
configuration.set("hive.io.file.readcolumn.names", colNames);
}
CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);

// set the filter to the query model in order to filter blocklet before scan
Expression filter = getFilterPredicates(configuration);
CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
Expand All @@ -92,8 +114,7 @@ public QueryModel getQueryModel(Configuration configuration) throws IOException
return queryModel;
}

@Override
public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
@Override public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return true;
}
}

0 comments on commit a8138ba

Please sign in to comment.