Skip to content

Commit

Permalink
[CARBONDATA-863] Support creation and deletion of dictionary files th…
Browse files Browse the repository at this point in the history
…rough RDD during alter add and drop This closes #733
  • Loading branch information
ravipesala committed Apr 6, 2017
2 parents 4a7adfa + b5ba4c6 commit 65e6791
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.carbondata.core.cache.dictionary;

import java.io.IOException;
import java.util.List;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
Expand All @@ -30,8 +29,7 @@
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;

Expand All @@ -51,62 +49,59 @@ public class ManageDictionary {
* This method will delete the dictionary files for the given column IDs and
* clear the dictionary cache
*
* @param dictionaryColumns
* @param carbonTable
* @param columnSchema
* @param carbonTableIdentifier
* @param storePath
*/
public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns,
CarbonTable carbonTable) {
if (!dictionaryColumns.isEmpty()) {
CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier);
String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
CarbonFile metadataDir = FileFactory
.getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
for (final CarbonColumn column : dictionaryColumns) {
// sort index file is created with dictionary size appended to it. So all the files
// with a given column ID need to be listed
CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile path) {
if (path.getName().startsWith(column.getColumnId())) {
return true;
}
return false;
}
});
for (CarbonFile file : listFiles) {
// try catch is inside for loop because even if one deletion fails, other files
// still need to be deleted
try {
FileFactory.deleteFile(file.getCanonicalPath(),
FileFactory.getFileType(file.getCanonicalPath()));
} catch (IOException e) {
LOGGER.error(
"Failed to delete dictionary or sortIndex file for column " + column.getColName()
+ "with column ID " + column.getColumnId());
}
public static void deleteDictionaryFileAndCache(final ColumnSchema columnSchema,
CarbonTableIdentifier carbonTableIdentifier, String storePath) {
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
CarbonFile metadataDir = FileFactory
.getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath));
// sort index file is created with dictionary size appended to it. So all the files
// with a given column ID need to be listed
CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile path) {
if (path.getName().startsWith(columnSchema.getColumnUniqueId())) {
return true;
}
// remove dictionary cache
removeDictionaryColumnFromCache(carbonTable, column.getColumnId());
return false;
}
});
for (CarbonFile file : listFiles) {
// try catch is inside for loop because even if one deletion fails, other files
// still need to be deleted
try {
FileFactory
.deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath()));
} catch (IOException e) {
LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema
.getColumnName() + "with column ID " + columnSchema.getColumnUniqueId());
}
}
// remove dictionary cache
removeDictionaryColumnFromCache(carbonTableIdentifier, storePath,
columnSchema.getColumnUniqueId());
}

/**
* This method will remove dictionary cache from driver for both reverse and forward dictionary
*
* @param carbonTable
* @param carbonTableIdentifier
* @param storePath
* @param columnId
*/
public static void removeDictionaryColumnFromCache(CarbonTable carbonTable, String columnId) {
Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance()
.createCache(CacheType.REVERSE_DICTIONARY, carbonTable.getStorePath());
public static void removeDictionaryColumnFromCache(CarbonTableIdentifier carbonTableIdentifier,
String storePath, String columnId) {
Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, storePath);
DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
new DictionaryColumnUniqueIdentifier(carbonTable.getCarbonTableIdentifier(),
new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
new ColumnIdentifier(columnId, null, null));
dictCache.invalidate(dictionaryColumnUniqueIdentifier);
dictCache = CacheProvider.getInstance()
.createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath());
dictCache = CacheProvider.getInstance().createCache(CacheType.FORWARD_DICTIONARY, storePath);
dictCache.invalidate(dictionaryColumnUniqueIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.spark.rdd

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.spark.util.GlobalDictionaryUtil

/**
* This is a partitioner class for dividing the newly added columns into partitions
*
* @param rddId
* @param idx
* @param schema
*/
class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition {
override def index: Int = idx

override def hashCode(): Int = 41 * (41 + rddId) + idx

val columnSchema = schema
}

/**
* This class is aimed at generating dictionary file for the newly added columns
*/
class AlterTableAddColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
alterTableModel: AlterTableAddColumnsModel,
carbonTableIdentifier: CarbonTableIdentifier,
carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {

override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new DropColumnPartition(id, column._2, column._1)
}.toArray
}

override def compute(split: Partition,
context: TaskContext): Iterator[(Int, String)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val iter = new Iterator[(Int, String)] {
try {
val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
// create dictionary file if it is a dictionary column
if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
!columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
val carbonTablePath = CarbonStorePath
.getCarbonTablePath(carbonStorePath, carbonTableIdentifier)
var rawData: String = null
if (null != columnSchema.getDefaultValue) {
rawData = new String(columnSchema.getDefaultValue,
CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
}
GlobalDictionaryUtil
.loadDefaultDictionaryValueForNewColumn(carbonTablePath,
columnSchema,
carbonTableIdentifier,
carbonStorePath,
rawData)
}
} catch {
case ex: Exception =>
throw ex
}

var finished = false

override def hasNext: Boolean = {

if (!finished) {
finished = true
finished
} else {
!finished
}
}

override def next(): (Int, String) = {
(split.index, status)
}
}
iter
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.spark.rdd

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.dictionary.ManageDictionary
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema

/**
* This is a partitioner class for dividing the newly added columns into partitions
*
* @param rddId
* @param idx
* @param schema
*/
class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition {
override def index: Int = idx

override def hashCode(): Int = 41 * (41 + rddId) + idx

val columnSchema = schema
}

/**
* This class is aimed at generating dictionary file for the newly added columns
*/
class AlterTableDropColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: CarbonTableIdentifier,
carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {

override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
new DropColumnPartition(id, column._2, column._1)
}.toArray
}

override def compute(split: Partition,
context: TaskContext): Iterator[(Int, String)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val iter = new Iterator[(Int, String)] {
try {
val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
!columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
ManageDictionary
.deleteDictionaryFileAndCache(columnSchema, carbonTableIdentifier, carbonStorePath)
}
} catch {
case ex: Exception =>
LOGGER.error(ex, ex.getMessage)
throw ex
}

var finished = false

override def hasNext: Boolean = {

if (!finished) {
finished = true
finished
} else {
!finished
}
}

override def next(): (Int, String) = {
(split.index, status)
}
}
iter
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable.Map

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier

Expand All @@ -42,6 +43,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
import org.apache.carbondata.spark.load.FailureCauses
import org.apache.carbondata.spark.merger.CompactionType
import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD
import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}

case class TableModel(
Expand Down Expand Up @@ -156,7 +158,7 @@ class AlterTableProcessor(
tableInfo: TableInfo,
carbonTablePath: CarbonTablePath,
tableIdentifier: CarbonTableIdentifier,
storePath: String) {
storePath: String, sc: SparkContext) {

val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)

Expand Down Expand Up @@ -227,7 +229,6 @@ class AlterTableProcessor(
tablePropertiesMap.put(x._1, x._2)
}
}

// This part will create dictionary file for all newly added dictionary columns
// if valid default value is provided,
// then that value will be included while creating dictionary file
Expand All @@ -251,17 +252,13 @@ class AlterTableProcessor(
}
}
}
if (col.getEncodingList.contains(Encoding.DICTIONARY) &&
!col.getEncodingList.contains(Encoding.DIRECT_DICTIONARY)) {
GlobalDictionaryUtil
.loadDefaultDictionaryValueForNewColumn(carbonTablePath,
col,
tableIdentifier,
storePath,
rawData)
}
}

// generate dictionary files for the newly added columns
new AlterTableAddColumnRDD(sc,
newCols,
alterTableModel,
tableIdentifier,
storePath).collect()
tableSchema.setListOfColumns(allColumns.asJava)
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
Expand Down
Loading

0 comments on commit 65e6791

Please sign in to comment.