Skip to content

Commit

Permalink
Merge branch 'apache:master' into SPARK-32915-followup
Browse files Browse the repository at this point in the history
  • Loading branch information
Victsm authored Jul 26, 2021
2 parents a590947 + 21450b3 commit f1db4dd
Show file tree
Hide file tree
Showing 59 changed files with 2,249 additions and 870 deletions.
4 changes: 4 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ github:
- jdbc
- sql
- spark
enabled_merge_buttons:
merge: false
squash: true
rebase: true
16 changes: 9 additions & 7 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ jobs:
- name: Run tests
env: ${{ fromJSON(needs.configure-jobs.outputs.envs) }}
run: |
# Hive and SQL tests become flaky when running in parallel as it's too intensive.
if [[ "$MODULES_TO_TEST" == "hive" ]] || [[ "$MODULES_TO_TEST" == "sql" ]]; then export SERIAL_SBT_TESTS=1; fi
./dev/run-tests --parallelism 2 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS"
# Hive "other tests" test needs larger metaspace size based on experiment.
if [[ "$MODULES_TO_TEST" == "hive" ]] && [[ "$EXCLUDED_TAGS" == "org.apache.spark.tags.SlowHiveTest" ]]; then export METASPACE_SIZE=2g; fi
export SERIAL_SBT_TESTS=1
./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST" --included-tags "$INCLUDED_TAGS" --excluded-tags "$EXCLUDED_TAGS"
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -205,6 +206,7 @@ jobs:
GITHUB_PREV_SHA: ${{ github.event.before }}
SPARK_LOCAL_IP: localhost
SKIP_UNIDOC: true
METASPACE_SIZE: 128m
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
Expand Down Expand Up @@ -250,7 +252,7 @@ jobs:
- name: Run tests
run: |
export PATH=$PATH:$HOME/miniconda/bin
./dev/run-tests --parallelism 2 --modules "$MODULES_TO_TEST"
./dev/run-tests --parallelism 1 --modules "$MODULES_TO_TEST"
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -316,7 +318,7 @@ jobs:
# R issues at docker environment
export TZ=UTC
export _R_CHECK_SYSTEM_CLOCK_=FALSE
./dev/run-tests --parallelism 2 --modules sparkr
./dev/run-tests --parallelism 1 --modules sparkr
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand Down Expand Up @@ -413,7 +415,7 @@ jobs:
- name: Java linter
run: ./dev/lint-java
- name: Python linter
run: ./dev/lint-python
run: PYTHON_EXECUTABLE=python3.9 ./dev/lint-python
- name: R linter
run: ./dev/lint-r
- name: JS linter
Expand Down Expand Up @@ -715,7 +717,7 @@ jobs:
./buildContainerImage.sh -v 18.4.0 -x
- name: Run tests
run: |
./dev/run-tests --parallelism 2 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest
./dev/run-tests --parallelism 1 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
Expand Down
6 changes: 2 additions & 4 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@ addDebugger () {
# so they need not be dicked around with individually.
get_mem_opts () {
local mem=${1:-$sbt_default_mem}
local codecache=$(( $mem / 8 ))
(( $codecache > 128 )) || codecache=128
(( $codecache < 2048 )) || codecache=2048
local codecache=128

echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
echo "-Xms$256m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
}

require_arg () {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;

/**
* A custom version of a {@link Timer} which allows for specifying a specific {@link TimeUnit} to
* be used when accessing timing values via {@link #getSnapshot()}. Normally, though the
* {@link #update(long, TimeUnit)} method requires a unit, the extraction methods on the snapshot
* do not specify a unit, and always return nanoseconds. It can be useful to specify that a timer
* should use a different unit for its snapshot. Note that internally, all values are still stored
* with nanosecond-precision; it is only before being returned to the caller that the nanosecond
* value is converted to the custom time unit.
*/
public class TimerWithCustomTimeUnit extends Timer {

private final TimeUnit timeUnit;
private final double nanosPerUnit;

public TimerWithCustomTimeUnit(TimeUnit timeUnit) {
this(timeUnit, Clock.defaultClock());
}

TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) {
super(new ExponentiallyDecayingReservoir(), clock);
this.timeUnit = timeUnit;
this.nanosPerUnit = timeUnit.toNanos(1);
}

@Override
public Snapshot getSnapshot() {
return new SnapshotWithCustomTimeUnit(super.getSnapshot());
}

private double toUnit(double nanos) {
// TimeUnit.convert() truncates (loses precision), so floating-point division is used instead
return nanos / nanosPerUnit;
}

private long toUnit(long nanos) {
return timeUnit.convert(nanos, TimeUnit.NANOSECONDS);
}

private class SnapshotWithCustomTimeUnit extends Snapshot {

private final Snapshot wrappedSnapshot;

SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) {
this.wrappedSnapshot = wrappedSnapshot;
}

@Override
public double getValue(double v) {
return toUnit(wrappedSnapshot.getValue(v));
}

@Override
public long[] getValues() {
long[] nanoValues = wrappedSnapshot.getValues();
long[] customUnitValues = new long[nanoValues.length];
for (int i = 0; i < nanoValues.length; i++) {
customUnitValues[i] = toUnit(nanoValues[i]);
}
return customUnitValues;
}

@Override
public int size() {
return wrappedSnapshot.size();
}

@Override
public long getMax() {
return toUnit(wrappedSnapshot.getMax());
}

@Override
public double getMean() {
return toUnit(wrappedSnapshot.getMean());
}

@Override
public long getMin() {
return toUnit(wrappedSnapshot.getMin());
}

@Override
public double getStdDev() {
return toUnit(wrappedSnapshot.getStdDev());
}

@Override
public void dump(OutputStream outputStream) {
try (PrintWriter writer = new PrintWriter(outputStream)) {
for (long value : getValues()) {
writer.println(value);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Clock;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.junit.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

/** Tests for {@link TimerWithCustomTimeUnit} */
public class TimerWithCustomUnitSuite {

private static final double EPSILON = 1.0 / 1_000_000_000;

@Test
public void testTimerWithMillisecondTimeUnit() {
testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
}

@Test
public void testTimerWithNanosecondTimeUnit() {
testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS);
}

private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) {
Timer timer = new TimerWithCustomTimeUnit(timeUnit);
Duration[] durations = {
Duration.ofNanos(1),
Duration.ofMillis(1),
Duration.ofMillis(5),
Duration.ofMillis(100),
Duration.ofSeconds(10)
};
Arrays.stream(durations).forEach(timer::update);

Snapshot snapshot = timer.getSnapshot();
assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin());
assertEquals(toTimeUnitFloating(durations[0], timeUnit), snapshot.getValue(0), EPSILON);
assertEquals(toTimeUnitFloating(durations[2], timeUnit), snapshot.getMedian(), EPSILON);
assertEquals(toTimeUnitFloating(durations[3], timeUnit), snapshot.get75thPercentile(), EPSILON);
assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax());

assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, timeUnit)).toArray(),
snapshot.getValues());
double total = Arrays.stream(durations).mapToDouble(d -> toTimeUnitFloating(d, timeUnit)).sum();
assertEquals(total / durations.length, snapshot.getMean(), EPSILON);
}

@Test
public void testTimingViaContext() {
ManualClock clock = new ManualClock();
Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock);
Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), Duration.ofMillis(1000) };
for (Duration d : durations) {
Timer.Context context = timer.time();
clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS));
context.stop();
}

Snapshot snapshot = timer.getSnapshot();
assertEquals(0, snapshot.getMin());
assertEquals(100, snapshot.getMedian(), EPSILON);
assertEquals(1000, snapshot.getMax(), EPSILON);
}

private static long toTimeUnit(Duration duration, TimeUnit timeUnit) {
return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
}

private static double toTimeUnitFloating(Duration duration, TimeUnit timeUnit) {
return ((double) duration.toNanos()) / timeUnit.toNanos(1);
}

private static class ManualClock extends Clock {

private long currTick = 1;

void advance(long nanos) {
currTick += nanos;
}

@Override
public long getTick() {
return currTick;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.network.shuffle;

import java.io.FileNotFoundException;
import java.net.ConnectException;

import com.google.common.base.Throwables;
Expand Down Expand Up @@ -82,8 +83,12 @@ class BlockPushErrorHandler implements ErrorHandler {

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time out or a connection closed exception, no need to retry.
if (t.getCause() != null && t.getCause() instanceof ConnectException) {
// If it is a connection time-out or a connection closed exception, no need to retry.
// If it is a FileNotFoundException originating from the client while pushing the shuffle
// blocks to the server, even then there is no need to retry. We will still log this exception
// once which helps with debugging.
if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
t.getCause() instanceof FileNotFoundException)) {
return false;
}
// If the block is too late, there is no need to retry it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.codahale.metrics.Gauge;
Expand All @@ -49,6 +50,7 @@
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;

Expand Down Expand Up @@ -299,13 +301,17 @@ private void checkAuth(TransportClient client, String appId) {
public class ShuffleMetrics implements MetricSet {
private final Map<String, Metric> allMetrics;
// Time latency for open block request in ms
private final Timer openBlockRequestLatencyMillis = new Timer();
private final Timer openBlockRequestLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for executor registration latency in ms
private final Timer registerExecutorRequestLatencyMillis = new Timer();
private final Timer registerExecutorRequestLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing fetch merged blocks meta request latency in ms
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
private final Timer fetchMergedBlocksMetaLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing finalize shuffle merge request latency in ms
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
private final Timer finalizeShuffleMergeLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Block transfer rate in blocks per second
private final Meter blockTransferRate = new Meter();
// Block fetch message rate per second. When using non-batch fetches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public boolean equals(Object other) {
if (other != null && other instanceof FinalizeShuffleMerge) {
FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
return Objects.equal(appId, o.appId)
&& appAttemptId == appAttemptId
&& appAttemptId == o.appAttemptId
&& shuffleId == o.shuffleId;
}
return false;
Expand Down
Loading

0 comments on commit f1db4dd

Please sign in to comment.