Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile #19365

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/create-release/release-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ MVN="build/mvn --force"
# Hive-specific profiles for some builds
HIVE_PROFILES="-Phive -Phive-thriftserver"
# Profiles for publishing snapshots and release to Maven Central
PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
# Profiles for building binary releases
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
# Scala 2.11 only profiles for some builds
SCALA_2_11_PROFILES="-Pkafka-0-8"
# Scala 2.12 only profiles for some builds
Expand Down
2 changes: 1 addition & 1 deletion dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"

SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"

Expand Down
1 change: 1 addition & 0 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
-Pmesos \
-Pkafka-0-8 \
-Pyarn \
-Pflume \
-Phive \
-Phive-thriftserver \
scalastyle test:scalastyle \
Expand Down
20 changes: 19 additions & 1 deletion dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ def __hash__(self):
source_file_regexes=[
"external/flume-sink",
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
},
sbt_test_goals=[
"streaming-flume-sink/test",
]
Expand All @@ -291,6 +297,12 @@ def __hash__(self):
source_file_regexes=[
"external/flume",
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
},
sbt_test_goals=[
"streaming-flume/test",
]
Expand All @@ -302,7 +314,13 @@ def __hash__(self):
dependencies=[streaming_flume, streaming_flume_sink],
source_file_regexes=[
"external/flume-assembly",
]
],
build_profile_flags=[
"-Pflume",
],
environ={
"ENABLE_FLUME_TESTS": "1"
}
)


Expand Down
2 changes: 1 addition & 1 deletion dev/test-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.

# NOTE: These should match those in the release publishing script
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
hadoop-2.6
Expand Down
6 changes: 6 additions & 0 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.

Kafka 0.10 support is still automatically built.

## Building with Flume support

Apache Flume support must be explicitly enabled with the `flume` profile.

./build/mvn -Pflume -DskipTests clean package

## Building submodules individually

It's possible to build Spark sub-modules using the `mvn -pl` option.
Expand Down
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,13 @@
<module>sql/core</module>
<module>sql/hive</module>
<module>assembly</module>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
<module>examples</module>
<module>repl</module>
<module>launcher</module>
<module>external/kafka-0-10</module>
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
<!-- See additional modules enabled by profiles below -->
</modules>

<properties>
Expand Down Expand Up @@ -2584,6 +2582,15 @@
</dependencies>
</profile>

<profile>
<id>flume</id>
<modules>
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
</modules>
</profile>

<!-- Ganglia integration is not included by default due to LGPL-licensed code -->
<profile>
<id>spark-ganglia-lgpl</id>
Expand Down
17 changes: 9 additions & 8 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ object BuildCommons {
"catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
).map(ProjectRef(buildLocation, _))

val streamingProjects@Seq(
streaming, streamingFlumeSink, streamingFlume, streamingKafka010
) = Seq(
"streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10"
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(streaming, streamingKafka010) =
Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))

val allProjects@Seq(
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
Expand All @@ -56,9 +53,13 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl,
streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
val optionallyEnabledProjects@Seq(mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))

val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
Expand Down
16 changes: 13 additions & 3 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or "
"'build/mvn package' before running this test.")
"'build/mvn -Pkafka-0-8 package' before running this test.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related, but I noticed this when making a similar modification in the message below

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LINE

elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
Expand All @@ -1495,7 +1495,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test.")
"'build/mvn -Pflume package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
Expand All @@ -1516,6 +1516,9 @@ def search_kinesis_asl_assembly_jar():
return jars[0]


# Must be same as the variable and condition defined in modules.py
flume_test_environ_var = "ENABLE_FLUME_TESTS"
are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
# Must be same as the variable and condition defined in modules.py
kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
Expand All @@ -1538,9 +1541,16 @@ def search_kinesis_asl_assembly_jar():

os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
FlumeStreamTests, FlumePollingStreamTests,
StreamingListenerTests]

if are_flume_tests_enabled:
testcases.append(FlumeStreamTests)
testcases.append(FlumePollingStreamTests)
else:
sys.stderr.write(
"Skipped test_flume_stream (enable by setting environment variable %s=1"
% flume_test_environ_var)

if are_kafka_tests_enabled:
testcases.append(KafkaStreamTests)
else:
Expand Down