Skip to content

Commit

Permalink
Fixing issues in winutils on Windows (Azure#34206)
Browse files Browse the repository at this point in the history
* Fixing issues in winutils on Windows

* Update external_dependencies.txt
  • Loading branch information
FabianMeiswinkel authored Mar 25, 2023
1 parent 3de9b49 commit d48c536
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 10 deletions.
1 change: 1 addition & 0 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ cosmos_org.scalatest:scalatest_2.12;3.2.2
cosmos_org.scalatest:scalatest-flatspec_2.12;3.2.3
cosmos_org.scalactic:scalactic_2.12;3.2.3
cosmos_org.scalamock:scalamock_2.12;5.0.0
cosmos_com.globalmentor:hadoop-bare-naked-local-fs;0.1.0

# Maven Tools for Cosmos Spark connector only
cosmos_org.scalatest:scalatest-maven-plugin;2.0.2
Expand Down
7 changes: 7 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
</dependency>

<!-- Test -->
<dependency>
<groupId>com.globalmentor</groupId>
<artifactId>hadoop-bare-naked-local-fs</artifactId>
<version>0.1.0</version> <!-- {x-version-update;cosmos_com.globalmentor:hadoop-bare-naked-local-fs;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -334,6 +340,7 @@
<configuration>
<source>11</source>
<target>11</target>
<scalaVersion>2.12.10</scalaVersion>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CosmosCatalogBase

/**
* Called to initialize configuration.
* <p>
* <br/>
* This method is called once, just after the provider is instantiated.
*
* @param name the name used to identify and load this catalog
Expand Down Expand Up @@ -91,7 +91,7 @@ class CosmosCatalogBase

/**
* List top-level namespaces from the catalog.
* <p>
* <br/>
* If an object such as a table, view, or function exists, its parent namespaces must also exist
* and must be returned by this discovery method. For example, if table a.t exists, this method
* must return ["a"] in the result array.
Expand Down Expand Up @@ -128,7 +128,7 @@ class CosmosCatalogBase

/**
* List namespaces in a namespace.
* <p>
* <br/>
* Cosmos supports only single depth database. Hence we always return an empty list of namespaces.
* or throw if the root namespace doesn't exist
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ abstract class CosmosCatalogITestBase extends IntegrationSpec with CosmosClient
.enableHiveSupport()
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark.conf.set(s"spark.sql.catalog.testCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountKey", cosmosMasterKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ object SampleCosmosCatalogE2EMain {
.appName("spark connector sample")
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark.conf.set(s"spark.sql.catalog.mycatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.mycatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.mycatalog.spark.cosmos.accountKey", cosmosMasterKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object SampleE2EMain {
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ object SampleReadE2EMain {
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

val df = spark.read.format("cosmos.oltp").options(cfg).load()
df.show(numRows = 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ object SampleStructuredStreamingE2EMain {
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryStarted.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class SparkE2EConfigResolutionITest extends IntegrationSpec with CosmosClient wi
.config(sparkConfig)
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import spark.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,15 @@ abstract class SparkE2EQueryITestBase
item.getAs[String]("id") shouldEqual id

assertMetrics(meterRegistry, "cosmos.client.op.latency", expectedToFind = true)

// Gateway requests are not happening always - but they can happen
//assertMetrics(meterRegistry, "cosmos.client.req.gw", expectedToFind = true)

assertMetrics(meterRegistry, "cosmos.client.req.rntbd", expectedToFind = true)
assertMetrics(meterRegistry, "cosmos.client.rntbd", expectedToFind = true)
assertMetrics(meterRegistry, "cosmos.client.rntbd.addressResolution", expectedToFind = true)

// address resolution requests can but don't have to happen - they are optional
// assertMetrics(meterRegistry, "cosmos.client.rntbd.addressResolution", expectedToFind = true)
}

private def insertDummyValue() : Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ package com.azure.cosmos.spark
import com.azure.cosmos.CosmosAsyncContainer
import com.azure.cosmos.implementation.{TestConfigurations, Utils}
import com.azure.cosmos.models.{ModelBridgeInternal, PartitionKey, ThroughputProperties}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}

import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.spark.udf.GetFeedRangeForPartitionKeyValue
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.{StreamingQueryListener, Trigger}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.Retries
import org.scalatest.tagobjects.Retryable

import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern

class SparkE2EStructuredStreamingITest
Expand Down Expand Up @@ -780,6 +779,8 @@ class SparkE2EStructuredStreamingITest
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import com.azure.cosmos.models.{ChangeFeedPolicy, CosmosBulkOperations, CosmosCo
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, CosmosException}
import com.fasterxml.jackson.databind.node.ObjectNode
import com.globalmentor.apache.hadoop.fs.{BareLocalFileSystem, NakedLocalFileSystem}
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.composite.CompositeMeterRegistry
import org.apache.commons.lang3.RandomStringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
import reactor.core.publisher.Sinks
import reactor.core.scala.publisher.SMono.PimpJFlux

import java.net.URI
import java.time.Duration
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -45,6 +48,8 @@ trait Spark extends BeforeAndAfterAll with BasicLoggingTrait {
.master("local")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark
}

Expand Down Expand Up @@ -79,6 +84,8 @@ trait SparkWithDropwizardAndSlf4jMetrics extends Spark {
.config("spark.cosmos.metrics.intervalInSeconds", "10")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark
}
}
Expand All @@ -97,6 +104,8 @@ trait SparkWithJustDropwizardAndNoSlf4jMetrics extends Spark {
.config("spark.cosmos.metrics.slf4j.enabled", "false")
.getOrCreate()

LocalJavaFileSystem.applyToSparkSession(spark)

spark
}
}
Expand Down Expand Up @@ -372,3 +381,32 @@ object Platform {
+ " and the Spark version, or unknown version, attempts to access DirectByteBuffer via reflection.")
}
}

class NakedLocalJavaFileSystem() extends NakedLocalFileSystem {

// The NakedLocalFileSystem requires to use schema file:/// - which conflicts
// with some spark code paths where this would automatically trigger winutils to be
// used - overriding the schema here to allow using NakedLocalFileSystem instead of winutils
override def getUri: URI = {
LocalJavaFileSystem.NAME
}

override def checkPath(path: Path): Unit = {
super.checkPath(path)
}
}

// Just a wrapper to allow injecting the NakedLocalFileSystem with modified schema
class BareLocalJavaFileSystem() extends BareLocalFileSystem(new NakedLocalJavaFileSystem()) {
}

object LocalJavaFileSystem {

val NAME = URI.create("localfs:///")

def applyToSparkSession(spark: SparkSession) = {
spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "localfs:///")
spark.sparkContext.hadoopConfiguration.setClass(
"fs.localfs.impl", classOf[BareLocalJavaFileSystem], classOf[FileSystem])
}
}

0 comments on commit d48c536

Please sign in to comment.