Skip to content

Commit

Permalink
Optimize getSplits() during compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Nov 22, 2019
1 parent bc5c2e5 commit d9297ea
Showing 1 changed file with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ class CarbonMergerRDD[K, V](
loadMetadataDetails = SegmentStatusManager
.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
}

val validSegIds: java.util.List[String] = new util.ArrayList[String]()
// for each valid segment.
for (eachSeg <- carbonMergerMapping.validSegments) {
// In case of range column get the size for calculation of number of ranges
Expand All @@ -369,44 +371,47 @@ class CarbonMergerRDD[K, V](
}
}
}
validSegIds.add(eachSeg.getSegmentNo)
}

// map for keeping the relation of a task and its blocks.
job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
// map for keeping the relation of a task and its blocks.
job.getConfiguration
.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(","))

if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
}

val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
// get splits
val splits = format.getSplits(job)
val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
// get splits
val splits = format.getSplits(job)

// keep on assigning till last one is reached.
if (null != splits && splits.size > 0) {
splitsOfLastSegment = splits.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
}
val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
val blockInfo = new TableBlockInfo(entry.getFilePath,
entry.getStart, entry.getSegmentId,
entry.getLocations, entry.getLength, entry.getVersion,
updateStatusManager.getDeleteDeltaFilePath(
entry.getFilePath,
Segment.toSegment(entry.getSegmentId).getSegmentNo)
)
(!updated || (updated && (!CarbonUtil
.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
updateDetails, updateStatusManager)))) &&
FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
}
if (rangeColumn != null) {
totalTaskCount = totalTaskCount +
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
// keep on assigning till last one is reached.
if (null != splits && splits.size > 0) {
splitsOfLastSegment = splits.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
}
val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
val blockInfo = new TableBlockInfo(entry.getFilePath,
entry.getStart, entry.getSegmentId,
entry.getLocations, entry.getLength, entry.getVersion,
updateStatusManager.getDeleteDeltaFilePath(
entry.getFilePath,
segmentId)
)
if (updateStatusManager.getUpdateStatusDetails.length != 0) {
updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
}
carbonInputSplits ++:= filteredSplits
allSplits.addAll(filteredSplits.asJava)
(!updated || (updated && (!CarbonUtil
.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
updateDetails, updateStatusManager)))) &&
FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
}
if (rangeColumn != null) {
totalTaskCount = totalTaskCount +
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
}
carbonInputSplits ++:= filteredSplits
allSplits.addAll(filteredSplits.asJava)
totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
var allRanges: Array[Object] = new Array[Object](0)
Expand Down

0 comments on commit d9297ea

Please sign in to comment.