Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23103][core] Ensure correct sort order for negative values in LevelDB. #20284

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ byte[] toKey(Object value, byte prefix) {
byte[] key = new byte[bytes * 2 + 2];
long longValue = ((Number) value).longValue();
key[0] = prefix;
key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;

for (int i = 0; i < key.length - 2; i++) {
int masked = (int) ((longValue >>> (4 * i)) & 0xF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ default BaseComparator reverse() {
private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> {
return Integer.valueOf(t1.num).compareTo(t2.num);
};
private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);

/**
Expand Down Expand Up @@ -112,7 +114,8 @@ public void setup() throws Exception {
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + RND.nextInt(MAX_ENTRIES);
t.num = RND.nextInt(MAX_ENTRIES);
// Force one item to have an integer value of zero to test the fix for SPARK-23103.
t.num = (i != 0) ? (int) RND.nextLong() : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the chang from RND.nextInt(MAX_ENTRIES)? this seems fine, just seemed like you were more likely to stress collision on this index before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextInt doesn't return negative values. I can add a mod operator here, but the code already explicitly generates some clashes a few lines below.

t.child = "child" + (i % MIN_ENTRIES);
allEntries.add(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.io.FileUtils;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -74,11 +76,7 @@ public void testReopenAndVersionCheckDb() throws Exception {

@Test
public void testObjectWriteReadDelete() throws Exception {
CustomType1 t = new CustomType1();
t.key = "key";
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

try {
db.read(CustomType1.class, t.key);
Expand Down Expand Up @@ -106,17 +104,9 @@ public void testObjectWriteReadDelete() throws Exception {

@Test
public void testMultipleObjectWriteReadDelete() throws Exception {
CustomType1 t1 = new CustomType1();
t1.key = "key1";
t1.id = "id";
t1.name = "name1";
t1.child = "child1";

CustomType1 t2 = new CustomType1();
t2.key = "key2";
t2.id = "id";
t2.name = "name2";
t2.child = "child2";
CustomType1 t1 = createCustomType1(1);
CustomType1 t2 = createCustomType1(2);
t2.id = t1.id;

db.write(t1);
db.write(t2);
Expand All @@ -142,11 +132,7 @@ public void testMultipleObjectWriteReadDelete() throws Exception {

@Test
public void testMultipleTypesWriteReadDelete() throws Exception {
CustomType1 t1 = new CustomType1();
t1.key = "1";
t1.id = "id";
t1.name = "name1";
t1.child = "child1";
CustomType1 t1 = createCustomType1(1);

IntKeyType t2 = new IntKeyType();
t2.key = 2;
Expand Down Expand Up @@ -188,10 +174,7 @@ public void testMultipleTypesWriteReadDelete() throws Exception {
public void testMetadata() throws Exception {
assertNull(db.getMetadata(CustomType1.class));

CustomType1 t = new CustomType1();
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

db.setMetadata(t);
assertEquals(t, db.getMetadata(CustomType1.class));
Expand All @@ -202,11 +185,7 @@ public void testMetadata() throws Exception {

@Test
public void testUpdate() throws Exception {
CustomType1 t = new CustomType1();
t.key = "key";
t.id = "id";
t.name = "name";
t.child = "child";
CustomType1 t = createCustomType1(1);

db.write(t);

Expand All @@ -222,13 +201,7 @@ public void testUpdate() throws Exception {
@Test
public void testSkip() throws Exception {
for (int i = 0; i < 10; i++) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + i;
t.child = "child" + i;

db.write(t);
db.write(createCustomType1(i));
}

KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
Expand All @@ -240,6 +213,36 @@ public void testSkip() throws Exception {
assertFalse(it.hasNext());
}

@Test
public void testNegativeIndexValues() throws Exception {
List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);

expected.stream().forEach(i -> {
try {
db.write(createCustomType1(i));
} catch (Exception e) {
throw new RuntimeException(e);
}
});

List<Integer> results = StreamSupport
.stream(db.view(CustomType1.class).index("int").spliterator(), false)
.map(e -> e.num)
.collect(Collectors.toList());

assertEquals(expected, results);
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + i;
t.num = i;
t.child = "child" + i;
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,15 +894,19 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val dropped = stages.drop(1).head

// Cache some quantiles by calling AppStatusStore.taskSummary(). For quantiles to be
// calculcated, we need at least one finished task.
// calculated, we need at least one finished task. The code in AppStatusStore uses
// `executorRunTime` to detect valid tasks, so that metric needs to be updated in the
// task end event.
time += 1
val task = createTasks(1, Array("1")).head
listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))

time += 1
task.markFinished(TaskState.FINISHED, time)
val metrics = TaskMetrics.empty
metrics.setExecutorRunTime(42L)
listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
"taskType", Success, task, null))
"taskType", Success, task, metrics))

new AppStatusStore(store)
.taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
Expand Down