diff --git a/external/maprdb/src/main/scala/com/mapr/db/spark/streaming/sink/MapRDBSink.scala b/external/maprdb/src/main/scala/com/mapr/db/spark/streaming/sink/MapRDBSink.scala index 9cbb02c4f1ae7..af1a29c081b06 100644 --- a/external/maprdb/src/main/scala/com/mapr/db/spark/streaming/sink/MapRDBSink.scala +++ b/external/maprdb/src/main/scala/com/mapr/db/spark/streaming/sink/MapRDBSink.scala @@ -1,10 +1,12 @@ package com.mapr.db.spark.streaming.sink -import com.mapr.db.spark.sql._ +import com.mapr.db.spark._ import com.mapr.db.spark.streaming.MapRDBSourceConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.plans.logical.{Command, LocalRelation, LogicalPlan, Union} import org.apache.spark.sql.execution.streaming.Sink import org.ojai.DocumentConstants @@ -26,9 +28,26 @@ private[streaming] class MapRDBSink(parameters: Map[String, String]) extends Sin val createTable = parameters.getOrElse(MapRDBSourceConfig.CreateTableOption, "false").toBoolean val bulkInsert = parameters.getOrElse(MapRDBSourceConfig.BulkModeOption, "false").toBoolean - data.saveToMapRDB(tablePath.get, idFieldPath, createTable, bulkInsert) + val logicalPlan: LogicalPlan = { + // For various commands (like DDL) and queries with side effects, we force query execution + // to happen right away to let these side effects take place eagerly. + data.queryExecution.analyzed match { + case c: Command => + LocalRelation(c.output, data.queryExecution.executedPlan.executeCollect()) + case u@Union(children) if children.forall(_.isInstanceOf[Command]) => + LocalRelation(u.output, data.queryExecution.executedPlan.executeCollect()) + case _ => + data.queryExecution.analyzed + } + } + + val encoder = RowEncoder(data.schema).resolveAndBind( + logicalPlan.output, + data.sparkSession.sessionState.analyzer) + data.queryExecution.toRdd.map(encoder.fromRow).saveToMapRDB(tablePath.get, createTable, bulkInsert, idFieldPath) latestBatchId = batchId } } + }