From 74de21f649cfa47c062583c8c7cbb27a894797ba Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Mon, 8 Jul 2019 11:27:01 +0400 Subject: [PATCH] making it compatible to spark 2.4 --- build.sbt | 6 +- .../spark/sql/hive/llap/CountDataReader.java | 4 +- .../sql/hive/llap/CountDataReaderFactory.java | 8 +- .../spark/sql/hive/llap/HWConf.java | 3 +- .../llap/HiveStreamingDataSourceWriter.java | 13 ++-- .../hive/llap/HiveStreamingDataWriter.java | 4 +- .../llap/HiveStreamingDataWriterFactory.java | 7 +- .../hive/llap/HiveWarehouseDataReader.java | 6 +- .../llap/HiveWarehouseDataReaderFactory.java | 10 +-- .../llap/HiveWarehouseDataSourceReader.java | 74 ++++++++++--------- .../llap/HiveWarehouseDataSourceWriter.java | 6 +- .../hive/llap/HiveWarehouseDataWriter.java | 4 +- .../llap/HiveWarehouseDataWriterFactory.java | 4 +- .../HiveStreamingDataSourceWriter.java | 6 +- 14 files changed, 78 insertions(+), 77 deletions(-) diff --git a/build.sbt b/build.sbt index 96f82b4..3aa79c2 100644 --- a/build.sbt +++ b/build.sbt @@ -10,10 +10,10 @@ organization := "com.hortonworks.hive" scalaVersion := "2.11.8" val scalatestVersion = "2.2.6" -sparkVersion := sys.props.getOrElse("spark.version", "2.3.1") +sparkVersion := sys.props.getOrElse("spark.version", "2.4.0") -val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.1.0") -val hiveVersion = sys.props.getOrElse("hive.version", "3.0.0") +val hadoopVersion = sys.props.getOrElse("hadoop.version", "3.2.0") +val hiveVersion = sys.props.getOrElse("hive.version", "3.1.1") val log4j2Version = sys.props.getOrElse("log4j2.version", "2.4.1") val tezVersion = sys.props.getOrElse("tez.version", "0.9.1") val thriftVersion = sys.props.getOrElse("thrift.version", "0.9.3") diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReader.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReader.java index 0ba36bb..3ed02b7 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReader.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReader.java @@ -3,11 +3,11 @@ import java.io.IOException; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; -import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.types.DataTypes; -public class CountDataReader implements DataReader { +public class CountDataReader implements InputPartitionReader { private long numRows; public CountDataReader(long numRows) { diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReaderFactory.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReaderFactory.java index 8ef7a52..ec37053 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReaderFactory.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/CountDataReaderFactory.java @@ -1,18 +1,16 @@ package com.hortonworks.spark.sql.hive.llap; -import org.apache.spark.sql.sources.v2.reader.DataReader; -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory; +import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.vectorized.ColumnarBatch; -public class CountDataReaderFactory implements DataReaderFactory { +public class CountDataReaderFactory implements InputPartition { private long numRows; public CountDataReaderFactory(long numRows) { this.numRows = numRows; } - @Override - public DataReader createDataReader() { + public InputPartitionReader createPartitionReader() { return new CountDataReader(numRows); } } diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java index 9a03a15..122b3d0 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HWConf.java @@ -53,7 +53,8 @@ static String warehouseKey(String keySuffix) { private static Logger LOG = LoggerFactory.getLogger(HWConf.class); public static final String HIVESERVER2_CREDENTIAL_ENABLED = "spark.security.credentials.hiveserver2.enabled"; - public static final String HIVESERVER2_JDBC_URL_PRINCIPAL = "spark.sql.hive.hiveserver2.jdbc.url.principal"; + public static final String HIVESERVER2_JDBC_URL_PRINCIPAL = "" + + "spark.sql.hive.hiveserver2.jdbc.url.principal"; public static final String HIVESERVER2_JDBC_URL = "spark.sql.hive.hiveserver2.jdbc.url"; public void setString(HiveWarehouseSessionState state, String value) { diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSourceWriter.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSourceWriter.java index af2b647..eda5957 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSourceWriter.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataSourceWriter.java @@ -4,13 +4,13 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow { +public class HiveStreamingDataSourceWriter implements DataSourceWriter { private static Logger LOG = LoggerFactory.getLogger(HiveStreamingDataSourceWriter.class); private String jobId; @@ -23,7 +23,7 @@ public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow { private String metastoreKrbPrincipal; public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commitIntervalRows, String db, - String table, List partition, final String metastoreUri, final String metastoreKrbPrincipal) { + String table, List partition, final String metastoreUri, final String metastoreKrbPrincipal) { this.jobId = jobId; this.schema = schema; this.commitIntervalRows = commitIntervalRows; @@ -35,9 +35,9 @@ public HiveStreamingDataSourceWriter(String jobId, StructType schema, long commi } @Override - public DataWriterFactory createInternalRowWriterFactory() { + public DataWriterFactory createWriterFactory() { return new HiveStreamingDataWriterFactory(jobId, schema, commitIntervalRows, db, table, partition, metastoreUri, - metastoreKrbPrincipal); + metastoreKrbPrincipal); } @Override @@ -49,5 +49,4 @@ public void commit(WriterCommitMessage[] messages) { public void abort(WriterCommitMessage[] messages) { LOG.info("Abort job {}", jobId); } -} - +} \ No newline at end of file diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriter.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriter.java index ce7d8d5..29fa050 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriter.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriter.java @@ -25,7 +25,7 @@ public class HiveStreamingDataWriter implements DataWriter { private String jobId; private StructType schema; private int partitionId; - private int attemptNumber; + private long attemptNumber; private String db; private String table; private List partition; @@ -35,7 +35,7 @@ public class HiveStreamingDataWriter implements DataWriter { private long rowsWritten = 0; private String metastoreKrbPrincipal; - public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfterNRows, int partitionId, int + public HiveStreamingDataWriter(String jobId, StructType schema, long commitAfterNRows, int partitionId, long attemptNumber, String db, String table, List partition, final String metastoreUri, final String metastoreKrbPrincipal) { this.jobId = jobId; diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriterFactory.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriterFactory.java index 2f0655a..97884c1 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriterFactory.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveStreamingDataWriterFactory.java @@ -21,7 +21,7 @@ public class HiveStreamingDataWriterFactory implements DataWriterFactory partition, final String metastoreUri, final String metastoreKrbPrincipal) { + String table, List partition, final String metastoreUri, final String metastoreKrbPrincipal) { this.jobId = jobId; this.schema = schema; this.db = db; @@ -33,16 +33,15 @@ public HiveStreamingDataWriterFactory(String jobId, StructType schema, long comm } @Override - public DataWriter createDataWriter(int partitionId, int attemptNumber) { + public DataWriter createDataWriter(int partitionId, long attemptNumber,long epochId) { ClassLoader restoredClassloader = Thread.currentThread().getContextClassLoader(); ClassLoader isolatedClassloader = HiveIsolatedClassLoader.isolatedClassLoader(); try { Thread.currentThread().setContextClassLoader(isolatedClassloader); return new HiveStreamingDataWriter(jobId, schema, commitIntervalRows, partitionId, attemptNumber, db, - table, partition, metastoreUri, metastoreKrbPrincipal); + table, partition, metastoreUri, metastoreKrbPrincipal); } finally { Thread.currentThread().setContextClassLoader(restoredClassloader); } } } - diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReader.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReader.java index c1058d1..29e5999 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReader.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReader.java @@ -7,7 +7,7 @@ import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; -import org.apache.spark.sql.sources.v2.reader.DataReader; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -28,7 +28,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; -public class HiveWarehouseDataReader implements DataReader { +public class HiveWarehouseDataReader implements InputPartitionReader { private RecordReader reader; private ArrowWrapperWritable wrapperWritable = new ArrowWrapperWritable(); @@ -67,7 +67,7 @@ protected RecordReader getRecordReader(LlapInputSplit s attemptId, childAllocatorReservation, arrowAllocatorMax); - LlapBaseInputFormat input = new LlapBaseInputFormat(true, allocator); + LlapBaseInputFormat input = new LlapBaseInputFormat(true, arrowAllocatorMax); return input.getRecordReader(split, conf, null); } diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReaderFactory.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReaderFactory.java index eb84c4a..1301581 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReaderFactory.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataReaderFactory.java @@ -3,8 +3,8 @@ import org.apache.hadoop.hive.llap.LlapInputSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.spark.sql.sources.v2.reader.DataReader; -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory; +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; +import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.vectorized.ColumnarBatch; import java.io.ByteArrayInputStream; @@ -12,7 +12,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; -public class HiveWarehouseDataReaderFactory implements DataReaderFactory { +public class HiveWarehouseDataReaderFactory implements InputPartition { private byte[] splitBytes; private byte[] confBytes; private transient InputSplit split; @@ -51,7 +51,7 @@ public String[] preferredLocations() { } @Override - public DataReader createDataReader() { + public InputPartitionReader createPartitionReader() { LlapInputSplit llapInputSplit = new LlapInputSplit(); ByteArrayInputStream splitByteArrayStream = new ByteArrayInputStream(splitBytes); ByteArrayInputStream confByteArrayStream = new ByteArrayInputStream(confBytes); @@ -67,7 +67,7 @@ public DataReader createDataReader() { } } - protected DataReader getDataReader(LlapInputSplit split, JobConf jobConf, long arrowAllocatorMax) + protected InputPartitionReader getDataReader(LlapInputSplit split, JobConf jobConf, long arrowAllocatorMax) throws Exception { return new HiveWarehouseDataReader(split, jobConf, arrowAllocatorMax); } diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.java index 6f5fc06..471ba4b 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceReader.java @@ -6,7 +6,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.sql.sources.Filter; -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory; +import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; @@ -44,7 +44,7 @@ * 5. Spark pulls factories, where factory/task are 1:1 -> createBatchDataReaderFactories(..) */ public class HiveWarehouseDataSourceReader - implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters { + implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters { //The pruned schema StructType schema = null; @@ -95,27 +95,27 @@ protected StructType getTableSchema() throws Exception { replaceSparkHiveDriver(); StatementType queryKey = getQueryType(); - String query; - if (queryKey == StatementType.FULL_TABLE_SCAN) { - String dbName = HWConf.DEFAULT_DB.getFromOptionsMap(options); - SchemaUtil.TableRef tableRef = SchemaUtil.getDbTableNames(dbName, options.get("table")); - query = selectStar(tableRef.databaseName, tableRef.tableName); - } else { - query = options.get("query"); - } - LlapBaseInputFormat llapInputFormat = null; - try { - JobConf conf = JobUtil.createJobConf(options, query); - llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE); - InputSplit[] splits = llapInputFormat.getSplits(conf, 0); - LlapInputSplit schemaSplit = (LlapInputSplit) splits[0]; - Schema schema = schemaSplit.getSchema(); - return SchemaUtil.convertSchema(schema); - } finally { - if(llapInputFormat != null) { - close(); - } + String query; + if (queryKey == StatementType.FULL_TABLE_SCAN) { + String dbName = HWConf.DEFAULT_DB.getFromOptionsMap(options); + SchemaUtil.TableRef tableRef = SchemaUtil.getDbTableNames(dbName, options.get("table")); + query = selectStar(tableRef.databaseName, tableRef.tableName); + } else { + query = options.get("query"); + } + LlapBaseInputFormat llapInputFormat = null; + try { + JobConf conf = JobUtil.createJobConf(options, query); + llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE); + InputSplit[] splits = llapInputFormat.getSplits(conf, 0); + LlapInputSplit schemaSplit = (LlapInputSplit) splits[0]; + Schema schema = schemaSplit.getSchema(); + return SchemaUtil.convertSchema(schema); + } finally { + if(llapInputFormat != null) { + close(); } + } } @Override public StructType readSchema() { @@ -134,12 +134,12 @@ protected StructType getTableSchema() throws Exception { //"returns unsupported filters." @Override public Filter[] pushFilters(Filter[] filters) { pushedFilters = Arrays.stream(filters). - filter((filter) -> FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()). - toArray(Filter[]::new); + filter((filter) -> FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()). + toArray(Filter[]::new); return Arrays.stream(filters). - filter((filter) -> !FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()). - toArray(Filter[]::new); + filter((filter) -> !FilterPushdown.buildFilterExpression(baseSchema, filter).isDefined()). + toArray(Filter[]::new); } @Override public Filter[] pushedFilters() { @@ -150,11 +150,11 @@ protected StructType getTableSchema() throws Exception { this.schema = requiredSchema; } - @Override public List> createBatchDataReaderFactories() { + public List> createBatchDataReaderFactories() { try { boolean countStar = this.schema.length() == 0; String queryString = getQueryString(SchemaUtil.columnNames(schema), pushedFilters); - List> factories = new ArrayList<>(); + List> factories = new ArrayList<>(); if (countStar) { LOG.info("Executing count with query: {}", queryString); factories.addAll(getCountStarFactories(queryString)); @@ -167,8 +167,12 @@ protected StructType getTableSchema() throws Exception { } } - protected List> getSplitsFactories(String query) { - List> tasks = new ArrayList<>(); + @Override public List> planBatchInputPartitions(){ + return createBatchDataReaderFactories(); + } + + protected List> getSplitsFactories(String query) { + List> tasks = new ArrayList<>(); try { JobConf jobConf = JobUtil.createJobConf(options, query); LlapBaseInputFormat llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE); @@ -184,12 +188,12 @@ protected List> getSplitsFactories(String query return tasks; } - protected DataReaderFactory getDataReaderFactory(InputSplit split, JobConf jobConf, long arrowAllocatorMax) { + protected InputPartition getDataReaderFactory(InputSplit split, JobConf jobConf, long arrowAllocatorMax) { return new HiveWarehouseDataReaderFactory(split, jobConf, arrowAllocatorMax); } - private List> getCountStarFactories(String query) { - List> tasks = new ArrayList<>(100); + private List> getCountStarFactories(String query) { + List> tasks = new ArrayList<>(100); long count = getCount(query); String numTasksString = HWConf.COUNT_TASKS.getFromOptionsMap(options); int numTasks = Integer.parseInt(numTasksString); @@ -205,7 +209,7 @@ private List> getCountStarFactories(String quer protected long getCount(String query) { try(Connection conn = getConnection()) { DriverResultSet rs = DefaultJDBCWrapper.executeStmt(conn, HWConf.DEFAULT_DB.getFromOptionsMap(options), query, - Long.parseLong(HWConf.MAX_EXEC_RESULTS.getFromOptionsMap(options))); + Long.parseLong(HWConf.MAX_EXEC_RESULTS.getFromOptionsMap(options))); return rs.getData().get(0).getLong(0); } catch (SQLException e) { LOG.error("Failed to connect to HS2", e); @@ -239,4 +243,4 @@ public void close() { } } -} +} \ No newline at end of file diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceWriter.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceWriter.java index 6e9a00a..e42383f 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceWriter.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataSourceWriter.java @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; @@ -35,7 +35,7 @@ import static com.hortonworks.spark.sql.hive.llap.util.HiveQlUtil.loadInto; -public class HiveWarehouseDataSourceWriter implements SupportsWriteInternalRow { +public class HiveWarehouseDataSourceWriter implements DataSourceWriter { protected String jobId; protected StructType schema; protected Path path; @@ -52,7 +52,7 @@ public HiveWarehouseDataSourceWriter(Map options, String jobId, this.conf = conf; } - @Override public DataWriterFactory createInternalRowWriterFactory() { + @Override public DataWriterFactory createWriterFactory() { return new HiveWarehouseDataWriterFactory(jobId, schema, path, new SerializableHadoopConfiguration(conf)); } diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriter.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriter.java index 3099589..0294650 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriter.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriter.java @@ -23,13 +23,13 @@ public class HiveWarehouseDataWriter implements DataWriter { private String jobId; private StructType schema; private int partitionId; - private int attemptNumber; + private long attemptNumber; private FileSystem fs; private Path filePath; private OutputWriter out; public HiveWarehouseDataWriter(Configuration conf, String jobId, StructType schema, - int partitionId, int attemptNumber, FileSystem fs, Path filePath) { + int partitionId, long attemptNumber, FileSystem fs, Path filePath) { this.jobId = jobId; this.schema = schema; this.partitionId = partitionId; diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriterFactory.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriterFactory.java index 63ae001..5229ccf 100755 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriterFactory.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/HiveWarehouseDataWriterFactory.java @@ -29,7 +29,7 @@ public HiveWarehouseDataWriterFactory(String jobId, StructType schema, this.conf = conf; } - @Override public DataWriter createDataWriter(int partitionId, int attemptNumber) { + @Override public DataWriter createDataWriter(int partitionId, long attemptNumber,long epochId) { Path filePath = new Path(this.path, String.format("%s_%s_%s", jobId, partitionId, attemptNumber)); FileSystem fs = null; try { @@ -42,7 +42,7 @@ public HiveWarehouseDataWriterFactory(String jobId, StructType schema, } protected DataWriter getDataWriter(Configuration conf, String jobId, - StructType schema, int partitionId, int attemptNumber, + StructType schema, int partitionId, long attemptNumber, FileSystem fs, Path filePath) { return new HiveWarehouseDataWriter(conf, jobId, schema, partitionId, attemptNumber, fs, filePath); } diff --git a/src/main/java/com/hortonworks/spark/sql/hive/llap/streaming/HiveStreamingDataSourceWriter.java b/src/main/java/com/hortonworks/spark/sql/hive/llap/streaming/HiveStreamingDataSourceWriter.java index c20fe5f..ae01c6a 100644 --- a/src/main/java/com/hortonworks/spark/sql/hive/llap/streaming/HiveStreamingDataSourceWriter.java +++ b/src/main/java/com/hortonworks/spark/sql/hive/llap/streaming/HiveStreamingDataSourceWriter.java @@ -4,7 +4,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow; +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; import org.apache.spark.sql.types.StructType; @@ -13,7 +13,7 @@ import com.hortonworks.spark.sql.hive.llap.HiveStreamingDataWriterFactory; -public class HiveStreamingDataSourceWriter implements SupportsWriteInternalRow, StreamWriter { +public class HiveStreamingDataSourceWriter implements DataSourceWriter, StreamWriter { private static Logger LOG = LoggerFactory.getLogger(HiveStreamingDataSourceWriter.class); private String jobId; @@ -36,7 +36,7 @@ public HiveStreamingDataSourceWriter(String jobId, StructType schema, String db, } @Override - public DataWriterFactory createInternalRowWriterFactory() { + public DataWriterFactory createWriterFactory() { // for the streaming case, commit transaction happens on task commit() (atleast-once), so interval is set to -1 return new HiveStreamingDataWriterFactory(jobId, schema, -1, db, table, partition, metastoreUri, metastoreKerberosPrincipal);