Skip to content

Commit

Permalink
[SPARKNLP-940] Adding changes to correctly copy cluster index storage… (
Browse files Browse the repository at this point in the history
#14167)

* [SPARKNLP-940] Adding changes to correctly copy cluster index storage when defined

* [SPARKNLP-940] Moving local mode control to its right place

* [SPARKNLP-940] Refactoring sentToCLuster method
  • Loading branch information
danilojsl authored Feb 11, 2024
1 parent 0e9b54d commit 219fc19
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 20 deletions.
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,17 @@ lazy val utilDependencies = Seq(
exclude ("com.fasterxml.jackson.core", "jackson-annotations")
exclude ("com.fasterxml.jackson.core", "jackson-databind")
exclude ("com.fasterxml.jackson.core", "jackson-core")
exclude ("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor")
exclude ("commons-configuration", "commons-configuration"),
liblevenshtein
exclude ("com.google.guava", "guava")
exclude ("org.apache.commons", "commons-lang3")
exclude ("com.google.code.findbugs", "annotations")
exclude ("org.slf4j", "slf4j-api"),
gcpStorage,
gcpStorage
exclude ("com.fasterxml.jackson.core", "jackson-core")
exclude ("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor")
,
greex,
azureIdentity,
azureStorage)
Expand Down
4 changes: 3 additions & 1 deletion examples/util/Load_Model_from_GCP_Storage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@
"1. GCP connector: You need to identify your hadoop version and set the required dependency in `spark.jars.packages`\n",
"2. ADC credentials: After following the instructions to setup ADC, you will have a JSON file that holds your authenticiation information. This file is setup in `spark.hadoop.google.cloud.auth.service.account.json.keyfile`\n",
"3. Hadoop File System: You also need to setup the Hadoop implementation to work with GCP Storage as file system. This is define in `spark.hadoop.fs.gs.impl`\n",
"3. Finally, to mitigate conflicts between Spark's dependencies and user dependencies. You must define `spark.driver.userClassPathFirst` as true. You may also need to define `spark.executor.userClassPathFirst` as true.\n",
"4. To mitigate conflicts between Spark's dependencies and user dependencies. You must define `spark.driver.userClassPathFirst` as true. You may also need to define `spark.executor.userClassPathFirst` as true.\n",
"5. Additonaly, to avoid conflict errors whe need to exclude the following dependency: `com.fasterxml.jackson.core:jackson-core`\n",
"\n"
]
},
Expand Down Expand Up @@ -128,6 +129,7 @@
" \"spark.jars.packages\": \"com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.8\",\n",
" \"spark.hadoop.fs.gs.impl\": \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\",\n",
" \"spark.driver.userClassPathFirst\": \"true\",\n",
" \"spark.jars.excludes\": \"com.fasterxml.jackson.core:jackson-core\",\n",
" \"spark.hadoop.google.cloud.auth.service.account.json.keyfile\": json_keyfile,\n",
" \"spark.jsl.settings.gcp.project_id\": PROJECT_ID,\n",
" \"spark.jsl.settings.pretrained.cache_folder\": CACHE_FOLDER\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ final class RocksDBConnection private (path: String) extends AutoCloseable {
}

def findLocalIndex: String = {
val localPath = RocksDBConnection.getLocalPath(path)
if (new File(localPath).exists()) {
localPath
val tmpIndexStorageLocalPath = RocksDBConnection.getTmpIndexStorageLocalPath(path)
if (new File(tmpIndexStorageLocalPath).exists()) {
tmpIndexStorageLocalPath
} else if (new File(path).exists()) {
path
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ object RocksDBConnection {
def getOrCreate(database: Database.Name, refName: String): RocksDBConnection =
getOrCreate(database.toString, refName)

def getLocalPath(fileName: String): String = {
def getTmpIndexStorageLocalPath(fileName: String): String = {
Path
.mergePaths(new Path(SparkFiles.getRootDirectory()), new Path("/storage/" + fileName))
.toString
Expand Down
49 changes: 35 additions & 14 deletions src/main/scala/com/johnsnowlabs/storage/StorageHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,38 @@ object StorageHelper {
sparkContext: SparkContext): Unit = {
destinationScheme match {
case "file" => {
val destination = new Path(RocksDBConnection.getLocalPath(clusterFileName))
copyIndexToLocal(source, destination, sparkContext)
val sourceFileSystemScheme = source.getFileSystem(sparkContext.hadoopConfiguration)
val tmpIndexStorageLocalPath =
RocksDBConnection.getTmpIndexStorageLocalPath(clusterFileName)
sourceFileSystemScheme.getScheme match {
case "file" => {
if (!doesDirectoryExistJava(tmpIndexStorageLocalPath) ||
!doesDirectoryExistHadoop(tmpIndexStorageLocalPath, sparkContext)) {
copyIndexToLocal(source, new Path(tmpIndexStorageLocalPath), sparkContext)
}
}
case "s3a" =>
copyIndexToLocal(source, new Path(tmpIndexStorageLocalPath), sparkContext)
case _ => copyIndexToCluster(source, clusterFilePath, sparkContext)
}
}
case _ => {
copyIndexToCluster(source, clusterFilePath, sparkContext)
}
case _ => copyIndexToCluster(source, clusterFilePath, sparkContext)
}
}

private def doesDirectoryExistJava(path: String): Boolean = {
val directory = new File(path)
directory.exists && directory.isDirectory
}

private def doesDirectoryExistHadoop(path: String, sparkContext: SparkContext): Boolean = {
val localPath = new Path(path)
val fileSystem = localPath.getFileSystem(sparkContext.hadoopConfiguration)
fileSystem.exists(localPath)
}

private def copyIndexToCluster(
sourcePath: Path,
dst: Path,
Expand Down Expand Up @@ -129,21 +154,17 @@ object StorageHelper {
val fileSystemDestination = destination.getFileSystem(sparkContext.hadoopConfiguration)
val fileSystemSource = source.getFileSystem(sparkContext.hadoopConfiguration)

if (fileSystemDestination.exists(destination)) {
return
}

if (fileSystemSource.getScheme == "s3a" && fileSystemDestination.getScheme == "file") {
if (fileSystemSource.getScheme == "file") {
fileSystemDestination.copyFromLocalFile(false, true, source, destination)
} else {
CloudResources.downloadBucketToLocalTmp(
source.toString,
destination.toString,
isIndex = true)
sparkContext.addFile(destination.toString, recursive = true)
return
}

if (fileSystemDestination.getScheme != "s3a") {
fileSystemDestination.copyFromLocalFile(false, true, source, destination)
val isLocalMode = sparkContext.master.startsWith("local")
if (isLocalMode) {
sparkContext.addFile(destination.toString, recursive = true)
}
}
}

Expand Down

0 comments on commit 219fc19

Please sign in to comment.