Skip to content

Commit

Permalink
support job parallelism for both tags and edges (vesoft-inc#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Nov 16, 2023
1 parent c03ce3e commit 4d27095
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
Expand All @@ -26,8 +27,10 @@ import com.vesoft.exchange.common.config.{
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
Expand Down Expand Up @@ -145,9 +148,12 @@ object Exchange {
// record the failed batch number
var failures: Long = 0L

// import tags
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
var schemaConfigs: ListBuffer[SchemaConfigEntry] = new ListBuffer[SchemaConfigEntry]
schemaConfigs.append(configs.tagsConfig: _*)
schemaConfigs.append(configs.edgesConfig: _*)

schemaConfigs.par.foreach {
case tagConfig: TagConfigEntry =>
LOG.info(s">>>>>> Processing Tag ${tagConfig.name}")
spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}")

Expand Down Expand Up @@ -210,14 +216,8 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>>> Tag is not defined")
}

// import edges
if (configs.edgesConfig.nonEmpty) {
for (edgeConfig <- configs.edgesConfig) {
case edgeConfig: EdgeConfigEntry =>
LOG.info(s">>>>>> Processing Edge ${edgeConfig.name}")
spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}")

Expand Down Expand Up @@ -285,11 +285,9 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>>> Edge is not defined")
}


// reimport for failed tags and edges
val errorPath = s"${configs.errorConfig.errorPath}/${SparkEnv.get.blockManager.conf.getAppId}"
if (failures > 0 && ErrorHandler.existError(errorPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
Expand All @@ -26,8 +27,10 @@ import com.vesoft.exchange.common.config.{
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
Expand Down Expand Up @@ -144,9 +147,12 @@ object Exchange {
// record the failed batch number
var failures: Long = 0L

// import tags
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
var schemaConfigs: ListBuffer[SchemaConfigEntry] = new ListBuffer[SchemaConfigEntry]
schemaConfigs.append(configs.tagsConfig: _*)
schemaConfigs.append(configs.edgesConfig: _*)

schemaConfigs.par.foreach {
case tagConfig: TagConfigEntry =>
LOG.info(s">>>>> Processing Tag ${tagConfig.name}")
spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}")

Expand Down Expand Up @@ -210,14 +216,7 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>>> Tag is not defined")
}

// import edges
if (configs.edgesConfig.nonEmpty) {
for (edgeConfig <- configs.edgesConfig) {
case edgeConfig: EdgeConfigEntry =>
LOG.info(s">>>>> Processing Edge ${edgeConfig.name}")
spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}")

Expand Down Expand Up @@ -285,9 +284,6 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>> Edge is not defined")
}

// reimport for failed tags and edges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
EdgeConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
Expand All @@ -26,8 +27,10 @@ import com.vesoft.exchange.common.config.{
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SchemaConfigEntry,
SinkCategory,
SourceCategory,
TagConfigEntry,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
Expand Down Expand Up @@ -146,9 +149,12 @@ object Exchange {
// record the failed batch number
var failures: Long = 0L

// import tags
if (configs.tagsConfig.nonEmpty) {
for (tagConfig <- configs.tagsConfig) {
var schemaConfigs: ListBuffer[SchemaConfigEntry] = new ListBuffer[SchemaConfigEntry]
schemaConfigs.append(configs.tagsConfig: _*)
schemaConfigs.append(configs.edgesConfig: _*)

schemaConfigs.par.foreach {
case tagConfig: TagConfigEntry =>
LOG.info(s">>>>> Processing Tag ${tagConfig.name}")
spark.sparkContext.setJobGroup(tagConfig.name, s"Tag: ${tagConfig.name}")

Expand Down Expand Up @@ -211,14 +217,7 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>> Tag is not defined")
}

// import edges
if (configs.edgesConfig.nonEmpty) {
for (edgeConfig <- configs.edgesConfig) {
case edgeConfig: EdgeConfigEntry =>
LOG.info(s">>>>> Processing Edge ${edgeConfig.name}")
spark.sparkContext.setJobGroup(edgeConfig.name, s"Edge: ${edgeConfig.name}")

Expand Down Expand Up @@ -286,9 +285,6 @@ object Exchange {
totalSstRecordFailure += recordFailure.value
}
}
}
} else {
LOG.warn(">>>>> Edge is not defined")
}

// reimport for failed tags and edges
Expand Down Expand Up @@ -361,7 +357,8 @@ object Exchange {
Some(reader.read())
case SourceCategory.KAFKA => {
val kafkaConfig = config.asInstanceOf[KafkaSourceConfigEntry]
LOG.info(s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
LOG.info(
s""">>>>> Loading from Kafka ${kafkaConfig.server} and subscribe ${kafkaConfig.topic}""")
val reader = new KafkaReader(session, kafkaConfig, fields)
Some(reader.read())
}
Expand Down

0 comments on commit 4d27095

Please sign in to comment.