-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathSQLTableTransformation.scala
54 lines (38 loc) · 1.23 KB
/
SQLTableTransformation.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package diamond.transform.table
import common.utility.stringFunctions
import diamond.transform.TransformationContext
import org.apache.spark.sql.DataFrame
/**
* Uses Spark SQL given a query string (sql) to construct a new DataFrame.
*
* The new DataFrame may be computed with reference to the existing DataFrame,
* e.g. projection, and to any values in the TransformationContext.
*
* Created by markmo on 16/12/2015.
*/
trait SQLTableTransformation extends TableTransformation {
import stringFunctions._
val tableName: String
val sql: String
def apply(df: DataFrame, ctx: TransformationContext): DataFrame = {
df.registerTempTable(tableName)
val params = ctx("sqlparams").asInstanceOf[Map[String, String]]
df.sqlContext.sql(sql.template(params))
}
}
object SQLTableTransformation {
def apply(name: String,
tableName: String,
sql: String
)(op: (DataFrame, TransformationContext) => DataFrame) = {
val myName = name
val myTableName = tableName
val mySql = sql
new SQLTableTransformation {
val name = myName
val tableName = myTableName
val sql = mySql
def append(df: DataFrame, ctx: TransformationContext) = op(df, ctx)
}
}
}