Skip to content

Commit

Permalink
Add Segment with path command added
Browse files Browse the repository at this point in the history
This closes apache#31
  • Loading branch information
ravipesala authored and jackylk committed Jul 8, 2019
1 parent e59fda7 commit 1d7004b
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 4 deletions.
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ public class Segment implements Serializable, Writable {
*/
private boolean isCacheable = true;

/**
* Path of segment where it exists
*/
private transient String segmentPath;

/**
* Properties of the segment.
*/
private transient Map<String, String> options;

public Segment() {

}
Expand Down Expand Up @@ -106,6 +116,13 @@ public Segment(String segmentNo, String segmentFileName) {
}
}

public Segment(String segmentNo, String segmentFileName, String segmentPath,
Map<String, String> options) {
this(segmentNo, segmentFileName);
this.segmentPath = segmentPath;
this.options = options;
}

/**
*
* @param segmentNo
Expand Down Expand Up @@ -295,6 +312,18 @@ public void setCacheable(boolean cacheable) {
isCacheable = cacheable;
}

public String getSegmentPath() {
return segmentPath;
}

public Map<String, String> getOptions() {
return options;
}

public void setOptions(Map<String, String> options) {
this.options = options;
}

@Override public void write(DataOutput out) throws IOException {
out.writeUTF(segmentNo);
boolean writeSegmentFileName = segmentFileName != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ public class BlockDataMap extends CoarseGrainDataMap
// structure
byte[] filePath = null;
boolean isPartitionTable = blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
if (isPartitionTable || !blockletDataMapInfo.getCarbonTable().isTransactionalTable()
|| blockletDataMapInfo.getCarbonTable().isSupportFlatFolder()) {
if (isPartitionTable || !blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
// if the segment data is written in tablepath then no need to store whole path of file.
!blockletDataMapInfo.getFilePath().startsWith(
blockletDataMapInfo.getCarbonTable().getTablePath())) {
filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
isFilePathStored = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,52 @@ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId,
return writeSegmentFile(carbonTable, segmentId, UUID, null);
}


/**
* Write segment file to the metadata folder of the table.
*
* @param carbonTable CarbonTable
* @param segment segment
* @return boolean , whether write is success or fail.
*/
public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment)
throws IOException {
String tablePath = carbonTable.getTablePath();
CarbonFile segmentFolder = FileFactory.getCarbonFile(segment.getSegmentPath());
CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
}
});
if (indexFiles != null && indexFiles.length > 0) {
SegmentFile segmentFile = new SegmentFile();
segmentFile.setOptions(segment.getOptions());
FolderDetails folderDetails = new FolderDetails();
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
folderDetails.setRelative(false);
segmentFile.addPath(segment.getSegmentPath(), folderDetails);
for (CarbonFile file : indexFiles) {
if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
folderDetails.setMergeFileName(file.getName());
} else {
folderDetails.getFiles().add(file.getName());
}
}
String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
if (!carbonFile.exists()) {
carbonFile.mkdirs(segmentFileFolder);
}
// write segment info to new file.
writeSegmentFile(segmentFile,
segmentFileFolder + File.separator + segment.getSegmentFileName());

return true;
}
return false;
}

/**
* Write segment file to the metadata folder of the table selecting only the current load files
*
Expand Down Expand Up @@ -205,7 +251,6 @@ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId,
return null;
}


/**
* Move the loaded data from source folder to destination folder.
*/
Expand Down Expand Up @@ -298,6 +343,18 @@ public static String getSegmentFilePath(String tablePath, String segmentFileName
*/
public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentId,
String segmentFile, String tableId, SegmentFileStore segmentFileStore) throws IOException {
return updateSegmentFile(carbonTable, segmentId, segmentFile, tableId, segmentFileStore, null);
}

/**
* This API will update the segmentFile of a passed segment.
*
* @return boolean which determines whether status update is done or not.
* @throws IOException
*/
public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentId,
String segmentFile, String tableId, SegmentFileStore segmentFileStore,
SegmentStatus segmentStatus) throws IOException {
boolean status = false;
String tablePath = carbonTable.getTablePath();
String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
Expand Down Expand Up @@ -327,6 +384,9 @@ public static boolean updateSegmentFile(CarbonTable carbonTable, String segmentI
detail.setSegmentFile(segmentFile);
detail.setIndexSize(String.valueOf(CarbonUtil
.getCarbonIndexSize(segmentFileStore, segmentFileStore.getLocationMap())));
if (segmentStatus != null) {
detail.setSegmentStatus(segmentStatus);
}
break;
}
}
Expand Down Expand Up @@ -1024,6 +1084,11 @@ public static class SegmentFile implements Serializable {
*/
private Map<String, FolderDetails> locationMap;

/**
* Segment option properties
*/
private Map<String, String> options;

SegmentFile() {
locationMap = new HashMap<>();
}
Expand Down Expand Up @@ -1059,6 +1124,13 @@ void addPath(String path, FolderDetails details) {
locationMap.put(path, details);
}

public Map<String, String> getOptions() {
return options;
}

public void setOptions(Map<String, String> options) {
this.options = options;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.addsegment

import java.io.File
import java.nio.file.{Files, Paths}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.rdd.CarbonScanRDD

class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {

override def beforeAll {
dropTable

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")

}

test("Test add segment ") {

sql(
"""
| CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp,
| workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
| projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
| utilization int,salary int, empno int)
| STORED BY 'org.apache.carbondata.format'
""".stripMargin)

sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")

sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")

sql("select count(*) from addsegment1").show()
val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
val newPath = storeLocation + "/" + "addsegtest"
move(path, newPath)
sql("delete from table addsegment1 where segment.id in (1)")
sql("clean files for table addsegment1")
val rows = sql("select count(*) from addsegment1").collect()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))

sql(s"add segment on table addsegment1 with path '$newPath' options('format'='carbon')").show()
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
sql("select * from addsegment1").show()
FileFactory.deleteAllFilesOfDir(new File(newPath))
}

def move(oldLoc: String, newLoc: String): Unit = {
val oldFolder = FileFactory.getCarbonFile(oldLoc)
FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
val oldFiles = oldFolder.listFiles
for (file <- oldFiles) {
Files.copy(Paths.get(file.getParentFile.getPath, file.getName), Paths.get(newLoc, file.getName))
}
}


override def afterAll = {
dropTable
}

def dropTable = {
sql("drop table if exists addsegment1")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val STREAMS = carbonKeyWord("STREAMS")
protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES")
protected val CARBONCLI = carbonKeyWord("CARBONCLI")
protected val PATH = carbonKeyWord("PATH")

protected val JSON = carbonKeyWord("JSON")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command.management

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.execution.command.{Checker, MetadataCommand}
import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.util.CarbonLoaderUtil


/**
* User can add external data folder as a segment to a transactional table.
* In case of external carbon data folder user no need to specify the format in options. But for
* other formats like parquet user must specify the format=parquet in options.
*/
case class CarbonAddLoadCommand(
databaseNameOp: Option[String],
tableName: String,
segmentPath: String,
options: Option[Map[String, String]])
extends MetadataCommand {

override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
setAuditTable(carbonTable)
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}

// if insert overwrite in progress, do not allow add segment
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
}

val model = new CarbonLoadModel
model.setCarbonTransactionalTable(true)
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
model.setDatabaseName(carbonTable.getDatabaseName)
model.setTableName(carbonTable.getTableName)

CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(model, false)

val segment = new Segment(model.getSegmentId,
SegmentFileStore.genSegmentFileName(
model.getSegmentId,
System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
segmentPath,
options.map(o => new util.HashMap[String, String](o.asJava)).getOrElse(new util.HashMap()))
val isSuccess =
SegmentFileStore.writeSegmentFile(carbonTable, segment)

if (isSuccess) {
SegmentFileStore.updateSegmentFile(
carbonTable,
model.getSegmentId,
segment.getSegmentFileName,
carbonTable.getCarbonTableIdentifier.getTableId,
new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName),
SegmentStatus.SUCCESS)
} else {
throw new AnalysisException("Adding segment with path failed.")
}
Seq.empty
}

override protected def opName: String = "ADD SEGMENT WITH PATH"
}
Loading

0 comments on commit 1d7004b

Please sign in to comment.