From 101ef1b132b301102e2e2a095efcdab414762aef Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 16:36:05 -0700 Subject: [PATCH] Qualify table name in relation when fetching extra option Signed-off-by: Chen Dai --- .../flint/spark/mv/FlintSparkMaterializedView.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index abe78b13b..019cc7aa5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -84,7 +84,7 @@ case class FlintSparkMaterializedView( aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => - relation.copy(isStreaming = true, options = optionsWithExtra(relation)) + relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation)) } logicalPlanToDataFrame(spark, streamingPlan) } @@ -98,9 +98,12 @@ case class FlintSparkMaterializedView( EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } - private def optionsWithExtra(relation: UnresolvedRelation): CaseInsensitiveStringMap = { + private def optionsWithExtra( + spark: SparkSession, + relation: UnresolvedRelation): CaseInsensitiveStringMap = { val originalOptions = relation.options.asCaseSensitiveMap - val extraOptions = options.extraSourceOptions(relation.tableName).asJava + val tableName = qualifyTableName(spark, relation.tableName) + val extraOptions = options.extraSourceOptions(tableName).asJava new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava) }