Skip to content

Commit

Permalink
orc data source support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan authored and liancheng committed May 16, 2015
1 parent 578bfee commit 305418c
Show file tree
Hide file tree
Showing 10 changed files with 1,883 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.common.`type`.HiveVarchar
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow}

import scala.collection.JavaConversions._

/**
* We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use
* this class.
*
*/
private[hive] object HadoopTypeConverter extends HiveInspectors {
/**
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
def unwrappers(fieldRefs: Seq[StructField]): Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
_.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
case oi: ByteObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
case oi: ShortObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
case oi: IntObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
case oi: LongObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
case oi: FloatObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
case oi: DoubleObjectInspector =>
(value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
case oi =>
(value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi)
}
}

/**
* Wraps with Hive types based on object inspector.
*/
def wrappers(oi: ObjectInspector): Any => Any = oi match {
case _: JavaHiveVarcharObjectInspector =>
(o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)

case _: JavaHiveDecimalObjectInspector =>
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying())

case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
val struct = soi.create()
(soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
}

case loi: ListObjectInspector =>
val wrapper = wrapperFor(loi.getListElementObjectInspector)
(o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))

case moi: MapObjectInspector =>
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
(o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
keyWrapper(key) -> valueWrapper(value)
})

case _ =>
identity[Any]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType

private[orc] object OrcFileOperator extends Logging{

def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
var conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
OrcFile.createReader(fs, orcFiles(0))
}

def readSchema(path: String, conf: Option[Configuration]): StructType = {
val reader = getFileReader(path, conf)
val readerInspector: StructObjectInspector = reader.getObjectInspector
.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}

def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
val reader = getFileReader(path, conf)
val readerInspector: StructObjectInspector = reader.getObjectInspector
.asInstanceOf[StructObjectInspector]
readerInspector
}

def deletePath(pathStr: String, conf: Configuration): Unit = {
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
try {
fs.delete(fspath, true)
} catch {
case e: IOException =>
throw new IOException(
s"Unable to clear output directory ${fspath.toString} prior"
+ s" to InsertIntoOrcTable:\n${e.toString}")
}
}

def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))

if (paths == null || paths.size == 0) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
logInfo("Qualified file list: ")
paths.foreach{x=>logInfo(x.toString)}
paths
}
}
117 changes: 117 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.spark.Logging
import org.apache.spark.sql.sources._

private[sql] object OrcFilters extends Logging {

def createFilter(expr: Array[Filter]): Option[SearchArgument] = {
if (expr == null || expr.size == 0) return None
var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder())
sarg.get.startAnd()
expr.foreach {
x => {
sarg match {
case Some(s1) => sarg = createFilter(x, s1)
case _ => None
}
}
}
sarg match {
case Some(b) => Some(b.end.build)
case _ => None
}
}

def createFilter(expression: Filter, builder: Builder): Option[Builder] = {
expression match {
case p@And(left: Filter, right: Filter) => {
val b1 = builder.startAnd()
val b2 = createFilter(left, b1)
b2 match {
case Some(b) => val b3 = createFilter(right, b)
if (b3.isDefined) {
Some(b3.get.end)
} else {
None
}
case _ => None
}
}
case p@Or(left: Filter, right: Filter) => {
val b1 = builder.startOr()
val b2 = createFilter(left, b1)
b2 match {
case Some(b) => val b3 = createFilter(right, b)
if (b3.isDefined) {
Some(b3.get.end)
} else {
None
}
case _ => None
}
}
case p@Not(child: Filter) => {
val b1 = builder.startNot()
val b2 = createFilter(child, b1)
b2 match {
case Some(b) => Some(b.end)
case _ => None
}
}
case p@EqualTo(attribute: String, value: Any) => {
val b1 = builder.equals(attribute, value)
Some(b1)
}
case p@LessThan(attribute: String, value: Any) => {
val b1 = builder.lessThan(attribute ,value)
Some(b1)
}
case p@LessThanOrEqual(attribute: String, value: Any) => {
val b1 = builder.lessThanEquals(attribute, value)
Some(b1)
}
case p@GreaterThan(attribute: String, value: Any) => {
val b1 = builder.startNot().lessThanEquals(attribute, value).end()
Some(b1)
}
case p@GreaterThanOrEqual(attribute: String, value: Any) => {
val b1 = builder.startNot().lessThan(attribute, value).end()
Some(b1)
}
case p@IsNull(attribute: String) => {
val b1 = builder.startNot().isNull(attribute).end()
Some(b1)
}
case p@In(attribute: String, values: Array[Any]) => {
val b1 = builder.in(attribute, values)
Some(b1)
}
// not supported in filter
// case p@EqualNullSafe(left: String, right: String) => {
// val b1 = builder.nullSafeEquals(left, right)
// Some(b1)
// }
case _ => None
}
}
}
Loading

0 comments on commit 305418c

Please sign in to comment.