Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-882] Add SORT_COLUMNS support in dataframe writer #737

Merged
merged 1 commit into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;

/**
Expand Down Expand Up @@ -139,22 +138,6 @@ public SegmentTaskIndexWrapper get(TableSegmentUniqueIdentifier tableSegmentUniq
lruCache.remove(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier());
}

/**
* returns block timestamp value from the given task
* @param taskKey
* @param listOfUpdatedFactFiles
* @return
*/
private String getTimeStampValueFromBlock(String taskKey, List<String> listOfUpdatedFactFiles) {
for (String blockName : listOfUpdatedFactFiles) {
if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) {
blockName = blockName.substring(blockName.lastIndexOf('-') + 1, blockName.lastIndexOf('.'));
return blockName;
}
}
return null;
}

/**
* Below method will be used to load the segment of segments
* One segment may have multiple task , so table segment will be loaded
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata
}
}


/**
* This API will return the update status file name.
* @param segmentList
Expand All @@ -582,22 +583,6 @@ public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
return "";
}

/**
* getting the task numbers present in the segment.
* @param segmentId
* @return
*/
public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
updateStatusManager) {
List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
for (String eachFileName : list) {
taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
}
return taskList;
}


public static class ValidAndInvalidSegmentsInfo {
private final List<String> listOfValidSegments;
private final List<String> listOfValidUpdatedSegments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ object CompareTest {
.option("tempCSV", "false")
.option("single_pass", "true")
.option("dictionary_exclude", "id") // id is high cardinality column
.option("sort_columns", "")
.mode(SaveMode.Overwrite)
.save()
}
Expand All @@ -278,6 +279,7 @@ object CompareTest {
val loadParquetTime = loadParquetTable(spark, df)
val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
df.unpersist()
spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ public String getBucketId() {
}

// Comparing the time task id of the file to other
// if both the task id of the file is same then we need to compare the
// offset of
// the file
// if both the task id of the file is same then we need to compare the offset of the file
String filePath1 = this.getPath().getName();
String filePath2 = other.getPath().getName();
if (CarbonTablePath.isCarbonDataFile(filePath1)) {
int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
int firstTaskId =
Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]);
int otherTaskId =
Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]);
if (firstTaskId != otherTaskId) {
return firstTaskId - otherTaskId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
)
}

test("test load dataframe without sort") {
df.write
.format("carbondata")
.option("tableName", "carbon3")
.option("sort_columns", "")
.mode(SaveMode.Overwrite)
.save()
sql("select count(*) from carbon3 where c3 > 400").show
df.registerTempTable("temp")
sql("select count(*) from temp where c3 > 400").show
//sql("select * from carbon3 where c3 > 500").show
checkAnswer(
sql("select count(*) from carbon3 where c3 > 500"), Row(500)
)
}

test("test load dataframe using sort_columns") {
df.write
.format("carbondata")
.option("tableName", "carbon3")
.option("sort_columns", "c2, c3")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon3 where c3 > 500"), Row(500)
)
}

test("test decimal values for dataframe load"){
dataFrame.write
.format("carbondata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,7 @@ class CarbonOption(options: Map[String, String]) {
def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
options.contains("bucketnumber")

def sortColumns: Option[String] = options.get("sort_columns")

def toMap: Map[String, String] = options
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.execution.command.LoadTable
Expand All @@ -33,8 +35,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {

def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
// create a new table using dataframe's schema and write its content into the table
sqlContext.sparkSession.sql(
makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
val sqlString = makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))
sqlContext.sparkSession.sql(sqlString)
writeToCarbonFile(parameters)
}

Expand Down Expand Up @@ -84,7 +86,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")

try {
sqlContext.sql(makeLoadString(tempCSVFolder, options))
val sqlString = makeLoadString(tempCSVFolder, options)
sqlContext.sql(sqlString)
} finally {
fs.delete(tempCSVPath, true)
}
Expand Down Expand Up @@ -164,7 +167,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
}
val property = Map(
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
"SORT_COLUMNS" -> options.sortColumns
).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
s"""
| CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,12 +853,12 @@ public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDeta
* @return
*/
public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
throws IOException {
throws IOException {

SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
try {
validAndInvalidSegments =
new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
} catch (IOException e) {
LOGGER.error("Error while getting valid segment list for a table identifier");
throw new IOException();
Expand Down Expand Up @@ -913,9 +913,9 @@ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(

private static boolean isSegmentValid(LoadMetadataDetails seg) {
return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
|| seg.getLoadStatus()
.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
|| seg.getLoadStatus()
.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
}

/**
Expand Down Expand Up @@ -1200,16 +1200,16 @@ public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFi
CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
try {
deleteDeltaBlockDetails =
dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
} catch (Exception e) {
String blockFilePath = fullBlockFilePath
.substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
.substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
throw new IOException();
}
CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
FileFactory.getFileType(fullBlockFilePath));
new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
FileFactory.getFileType(fullBlockFilePath));
try {
carbonDeleteWriter.write(deleteDeltaBlockDetails);
} catch (IOException e) {
Expand All @@ -1220,11 +1220,11 @@ public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFi
}

public static Boolean updateStatusFile(
List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {

List<SegmentUpdateDetails> segmentUpdateDetails =
new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());


// Check the list output.
Expand All @@ -1235,10 +1235,10 @@ public static Boolean updateStatusFile(
tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());

for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
.getUpdateStatusDetails()) {
.getUpdateStatusDetails()) {
if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
&& origDetails.getSegmentName()
.equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
&& origDetails.getSegmentName()
.equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {

tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
Expand All @@ -1247,9 +1247,9 @@ public static Boolean updateStatusFile(
}

tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
tempSegmentUpdateDetails
.setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
.setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());

segmentUpdateDetails.add(tempSegmentUpdateDetails);
} else return false;
Expand All @@ -1262,8 +1262,8 @@ public static Boolean updateStatusFile(
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();

CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());

String tableStatusPath = carbonTablePath.getTableStatusFilePath();

Expand All @@ -1277,38 +1277,38 @@ public static Boolean updateStatusFile(
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
"Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ " for table status updation");
"Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ " for table status updation");

LoadMetadataDetails[] listOfLoadFolderDetailsArray =
segmentStatusManager.readLoadMetadata(metaDataFilepath);
segmentStatusManager.readLoadMetadata(metaDataFilepath);

for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
loadMetadata.setUpdateStatusFileName(
CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
}
}
try {
segmentStatusManager
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
} catch (IOException e) {
return false;
}
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
.getDatabaseName() + "." + table.getFactTableName());
.getDatabaseName() + "." + table.getFactTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + table.getDatabaseName()
+ "." + table.getFactTableName());
"Table unlocked successfully after table status updation" + table.getDatabaseName()
+ "." + table.getFactTableName());
} else {
LOGGER.error(
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
.getFactTableName() + " during table status updation");
"Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
.getFactTableName() + " during table status updation");
}
}
}
Expand All @@ -1326,7 +1326,7 @@ public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,

String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
AbsoluteTableIdentifier absoluteTableIdentifier =
model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
List<LoadMetadataDetails> originalList = Arrays.asList(details);
Expand All @@ -1340,24 +1340,24 @@ public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,


ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
LockUsage.TABLE_STATUS_LOCK);
model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
LockUsage.TABLE_STATUS_LOCK);

try {
if (carbonTableStatusLock.lockWithRetries()) {
LOGGER.info(
"Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
+ " for table status updation ");
+ " for table status updation ");
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());

segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
originalList.toArray(new LoadMetadataDetails[originalList.size()]));
originalList.toArray(new LoadMetadataDetails[originalList.size()]));
} else {
LOGGER.error(
"Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
.getTableName() + "for table status updation");
"Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
.getTableName() + "for table status updation");
throw new Exception("Failed to update the MajorCompactionStatus.");
}
} catch (IOException e) {
Expand All @@ -1366,11 +1366,11 @@ public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,
} finally {
if (carbonTableStatusLock.unlock()) {
LOGGER.info(
"Table unlocked successfully after table status updation" + model.getDatabaseName()
+ "." + model.getTableName());
"Table unlocked successfully after table status updation" + model.getDatabaseName()
+ "." + model.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
.getTableName() + " during table status updation");
.getTableName() + " during table status updation");
}
}

Expand Down