Skip to content

Commit

Permalink
Merge pull request apache#29 from bzhang02/update_indent
Browse files Browse the repository at this point in the history
Update indent size
  • Loading branch information
Sanket Chintapalli authored and GitHub Enterprise committed May 19, 2020
2 parents 35faa91 + c70d1be commit 1a8c6d9
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 108 deletions.
11 changes: 6 additions & 5 deletions src/main/scala/com/yahoo/spark/starter/ScalaWordCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ object ScalaWordCount {
def main(args: Array[String]) {

if (args.length < 2) {
System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
System.exit(1)
System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
System.exit(1)
}

val spark = SparkSession
Expand Down Expand Up @@ -36,9 +36,10 @@ object ScalaWordCount {
logger.info("Partition Size: " + iterator.size)
})

val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
val counts = textFile
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(outputFilesUri)

spark.stop()
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/yahoo/spark/starter/SparkAvroExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package com.yahoo.spark.starter
import org.apache.spark.sql.{SparkSession, DataFrame}

object SparkAvroExample {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark Avro Example")
.getOrCreate()
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark Avro Example")
.getOrCreate()

val inputDir = "avro_test/resources/"
val outputDir = "avro_test/output/"
Expand Down Expand Up @@ -43,5 +43,5 @@ object SparkAvroExample {
println(readAndWritePrimitive("randomString.avro").head().getString(0))
println(readAndWritePrimitive("randomLongMap.avro").head().getAs[Map[String, Long]](0))
println(readAndWritePrimitive("randomStringArray.avro").head().getAs[Array[String]](0))
}
}
}
87 changes: 43 additions & 44 deletions src/main/scala/com/yahoo/spark/starter/SparkClusterHBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,49 @@ import org.apache.spark.sql.SparkSession

// Simple example of accessing HBase from Spark
object SparkClusterHBase {
def main(args: Array[String]) {

def main(args: Array[String]) {

if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBase <nameSpace> <tableName>")
System.exit(1)
}

// Use the new 2.0 API. If you are using 1.6.2 create the spark conf and context as in 1.6 examples.
val spark = SparkSession.
builder.
appName("Spark HBase Example").
getOrCreate()

val hconf = HBaseConfiguration.create()
val nameSpace = args(0)
val tableName = args(1)
val qualifiedTableName = nameSpace + ":" + tableName
hconf.set(TableInputFormat.INPUT_TABLE, qualifiedTableName)
val admin = new HBaseAdmin(hconf)

// create the table if not existed
if(!admin.isTableAvailable(qualifiedTableName)) {
val tableDesc = new HTableDescriptor(qualifiedTableName)
tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()));
admin.createTable(tableDesc)
}

// put data into the table
val myTable = new HTable(hconf, qualifiedTableName);
for (i <- 0 to 5) {
val p = new Put(new String("row" + i).getBytes());
p.add("cf1".getBytes(), "column-1".getBytes(), new String("value " + i).getBytes());
myTable.put(p);
}
myTable.flushCommits();

// access the table through RDD
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
print("HBase RDD count:"+count)

spark.stop
if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBase <nameSpace> <tableName>")
System.exit(1)
}

// Use the new 2.0 API. If you are using 1.6.2 create the spark conf and context as in 1.6 examples.
val spark = SparkSession.
builder.
appName("Spark HBase Example").
getOrCreate()

val hconf = HBaseConfiguration.create()
val nameSpace = args(0)
val tableName = args(1)
val qualifiedTableName = nameSpace + ":" + tableName
hconf.set(TableInputFormat.INPUT_TABLE, qualifiedTableName)
val admin = new HBaseAdmin(hconf)

// create the table if not existed
if(!admin.isTableAvailable(qualifiedTableName)) {
val tableDesc = new HTableDescriptor(qualifiedTableName)
tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()));
admin.createTable(tableDesc)
}

// put data into the table
val myTable = new HTable(hconf, qualifiedTableName);
for (i <- 0 to 5) {
val p = new Put(new String("row" + i).getBytes());
p.add("cf1".getBytes(), "column-1".getBytes(), new String("value " + i).getBytes());
myTable.put(p);
}
myTable.flushCommits();

// access the table through RDD
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = hBaseRDD.count()
print("HBase RDD count:"+count)

spark.stop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,67 +11,66 @@ case class DataRow(name: String, pet: String, horcruxes: Int, original_name: Str
// The example creates an HBase table and writes sample data to it using the connector
// and reads back data from the same table and displays it to the stdout.
object SparkClusterHBaseConnector {
def main(args: Array[String]) {

def main(args: Array[String]) {

if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBaseConnector <nameSpace> <tableName>")
System.exit(1)
}
if (args == null || args.length < 2) {
System.err.println("Usage: SparkClusterHBaseConnector <nameSpace> <tableName>")
System.exit(1)
}

// Use the new 2.0 API.
val spark = SparkSession.
builder.
appName("Spark HBase Connector Example").
getOrCreate()
// Use the new 2.0 API.
val spark = SparkSession.
builder.
appName("Spark HBase Connector Example").
getOrCreate()

import spark.implicits._
import spark.implicits._

val nameSpace = s"${args(0)}"
val tableName = s"${args(1)}"
val nameSpace = s"${args(0)}"
val tableName = s"${args(1)}"

val df = Seq( DataRow("harry", "hedwig", 0, null), DataRow("voldy", "nagini", 7, "Tom"))
.toDF("name", "pet", "horcruxes", "original_name")
// print the original dataframe
df.show
val df = Seq( DataRow("harry", "hedwig", 0, null), DataRow("voldy", "nagini", 7, "Tom"))
.toDF("name", "pet", "horcruxes", "original_name")
// print the original dataframe
df.show

// define the catalog
def catalog = s"""{
|"table":{"namespace":"${nameSpace}", "name":"${tableName}"},
|"rowkey":"name",
|"columns":{
|"name":{"cf":"rowkey", "col":"name", "type":"string"},
|"pet":{"cf":"cf1", "col":"pet", "type":"string"},
|"horcruxes":{"cf":"cf2", "col":"horcruxes", "type":"int"},
|"original_name":{"cf":"cf3", "col":"original_name", "type":"string"}
|}
|}""".stripMargin

// write data to hbase
hbaseWrite(df, catalog)
// define the catalog
def catalog = s"""{
|"table":{"namespace":"${nameSpace}", "name":"${tableName}"},
|"rowkey":"name",
|"columns":{
|"name":{"cf":"rowkey", "col":"name", "type":"string"},
|"pet":{"cf":"cf1", "col":"pet", "type":"string"},
|"horcruxes":{"cf":"cf2", "col":"horcruxes", "type":"int"},
|"original_name":{"cf":"cf3", "col":"original_name", "type":"string"}
|}
|}""".stripMargin

// read the data from hbase
val df2 = hbaseRead(spark, catalog)
df2.show
// write data to hbase
hbaseWrite(df, catalog)

spark.stop
}
// read the data from hbase
val df2 = hbaseRead(spark, catalog)
df2.show

// Read data from an existing hbase table
def hbaseRead(spark: SparkSession, catalog: String): DataFrame = {
spark
.read
.options(Map(HBaseTableCatalog.tableCatalog->catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
spark.stop
}

// Write data to an hbase table
def hbaseWrite(df: DataFrame, catalog: String): Unit = {
df
.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save
}
// Read data from an existing hbase table
def hbaseRead(spark: SparkSession, catalog: String): DataFrame = {
spark
.read
.options(Map(HBaseTableCatalog.tableCatalog->catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

// Write data to an hbase table
def hbaseWrite(df: DataFrame, catalog: String): Unit = {
df
.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save
}
}

0 comments on commit 1a8c6d9

Please sign in to comment.