Skip to content

Commit

Permalink
feat: Add SplitDiscovery for use in split enumerator (#14)
Browse files Browse the repository at this point in the history
* SplitDiscovery

updated

* respond to comments

* test fix and formattiong

* fix comments
  • Loading branch information
palmere-google authored Jul 23, 2021
1 parent 2c407e9 commit b68d2b4
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

public class SingleSubscriptionSplitDiscovery implements SplitDiscovery {
private final AdminClient adminClient;
private final CursorClient cursorClient;
private final TopicPath topicPath;
private final SubscriptionPath subscriptionPath;
private long partitionCount;

private SingleSubscriptionSplitDiscovery(
AdminClient adminClient,
CursorClient cursorClient,
TopicPath topicPath,
SubscriptionPath subscriptionPath,
long partitionCount) {
this.adminClient = adminClient;
this.cursorClient = cursorClient;
this.topicPath = topicPath;
this.subscriptionPath = subscriptionPath;
this.partitionCount = partitionCount;
}

static SingleSubscriptionSplitDiscovery create(
AdminClient adminClient,
CursorClient cursorClient,
TopicPath topicPath,
SubscriptionPath subscriptionPath) {
return new SingleSubscriptionSplitDiscovery(
adminClient, cursorClient, topicPath, subscriptionPath, 0L);
}

static SingleSubscriptionSplitDiscovery fromCheckpoint(
SplitEnumeratorCheckpoint.Discovery proto,
Collection<SubscriptionPartitionSplit> currentSplits,
AdminClient adminClient,
CursorClient cursorClient) {
SubscriptionPath subscriptionPath = SubscriptionPath.parse(proto.getSubscription());
TopicPath topicPath = TopicPath.parse(proto.getTopic());
Set<Long> partitions = new TreeSet<>();
for (SubscriptionPartitionSplit s : currentSplits) {
if (!s.subscriptionPath().equals(subscriptionPath)) {
throw new IllegalStateException(
"Split discovery configured with subscription "
+ subscriptionPath
+ " but current splits contains a split from subscription "
+ s);
}
partitions.add(s.partition().value());
}
long partitionCount = partitions.size();
for (long p = 0; p < partitions.size(); p++) {
if (!partitions.contains(p)) {
throw new IllegalStateException(
"Split set is not continuous, missing split for partition " + p + " " + currentSplits);
}
}
return new SingleSubscriptionSplitDiscovery(
adminClient, cursorClient, topicPath, subscriptionPath, partitionCount);
}

public synchronized List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException {
try {
List<SubscriptionPartitionSplit> newSplits = new ArrayList<>();
long newPartitionCount = adminClient.getTopicPartitionCount(topicPath).get();
if (newPartitionCount == partitionCount) {
return newSplits;
}
Map<Partition, Offset> cursorMap = cursorClient.listPartitionCursors(subscriptionPath).get();
for (long p = partitionCount; p < newPartitionCount; p++) {
Partition partition = Partition.of(p);
Offset offset = cursorMap.getOrDefault(partition, Offset.of(0));
newSplits.add(SubscriptionPartitionSplit.create(subscriptionPath, partition, offset));
}
partitionCount = newPartitionCount;
return newSplits;
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t).underlying;
}
}

public synchronized SplitEnumeratorCheckpoint.Discovery checkpoint() {
return SplitEnumeratorCheckpoint.Discovery.newBuilder()
.setSubscription(subscriptionPath.toString())
.setTopic(topicPath.toString())
.build();
}

@Override
public synchronized void close() {
try (AdminClient a = adminClient;
CursorClient c = cursorClient) {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import java.util.List;

interface SplitDiscovery extends AutoCloseable {
List<SubscriptionPartitionSplit> discoverNewSplits() throws ApiException;

SplitEnumeratorCheckpoint.Discovery checkpoint();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.enumerator;

import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleSubscriptionPath;
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleTopicPath;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SingleSubscriptionSplitDiscoveryTest {

@Mock CursorClient mockCursorClient;
@Mock AdminClient mockAdminClient;

SplitDiscovery discovery;

@Before
public void setUp() {
discovery =
SingleSubscriptionSplitDiscovery.create(
mockAdminClient, mockCursorClient, exampleTopicPath(), exampleSubscriptionPath());
}

@Test
public void testDiscovery() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
List<SubscriptionPartitionSplit> splits = discovery.discoverNewSplits();
assertThat(splits)
.containsExactly(
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(0), Offset.of(0)),
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(1), Offset.of(2)));
}

@Test
public void testDiscovery_Incremental() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L))
.thenReturn(ApiFutures.immediateFuture(3L))
.thenReturn(ApiFutures.immediateFuture(3L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(1), Offset.of(2))));
assertThat(discovery.discoverNewSplits()).hasSize(2);
assertThat(discovery.discoverNewSplits()).hasSize(1);
assertThat(discovery.discoverNewSplits()).hasSize(0);
}

@Test
public void testDiscovery_AdminFailure() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException("", StatusCode.Code.INTERNAL)));
assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
}

@Test
public void testDiscovery_CursorFailure() {
when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(2L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(
ApiFutures.immediateFailedFuture(
new CheckedApiException("", StatusCode.Code.INTERNAL)));
assertThrows(ApiException.class, () -> discovery.discoverNewSplits());
}

@Test
public void testCheckpoint() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();
assertThat(proto.getSubscription()).isEqualTo(exampleSubscriptionPath().toString());
assertThat(proto.getTopic()).isEqualTo(exampleTopicPath().toString());
}

@Test
public void testCheckpointRestore() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();

List<SubscriptionPartitionSplit> splits =
ImmutableList.of(
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(0), Offset.of(4)),
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(1), Offset.of(4)),
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(2), Offset.of(4)));
SplitDiscovery restored =
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);

when(mockAdminClient.getTopicPartitionCount(exampleTopicPath()))
.thenReturn(ApiFutures.immediateFuture(4L));
when(mockCursorClient.listPartitionCursors(exampleSubscriptionPath()))
.thenReturn(ApiFutures.immediateFuture(ImmutableMap.of(Partition.of(3), Offset.of(2))));
assertThat(restored.discoverNewSplits()).hasSize(1);
}

@Test
public void testCheckpointRestore_SubscriptionMismatch() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();

List<SubscriptionPartitionSplit> splits =
ImmutableList.of(
SubscriptionPartitionSplit.create(
SubscriptionPath.parse(exampleSubscriptionPath().toString() + "-other"),
Partition.of(0),
Offset.of(4)));
assertThrows(
IllegalStateException.class,
() -> {
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);
});
}

@Test
public void testCheckpointRestore_NonContinuousPartitions() {
SplitEnumeratorCheckpoint.Discovery proto = discovery.checkpoint();

List<SubscriptionPartitionSplit> splits =
ImmutableList.of(
SubscriptionPartitionSplit.create(
exampleSubscriptionPath(), Partition.of(1), Offset.of(4)));
assertThrows(
IllegalStateException.class,
() -> {
SingleSubscriptionSplitDiscovery.fromCheckpoint(
proto, splits, mockAdminClient, mockCursorClient);
});
}

@Test
public void testClose() throws Exception {
discovery.close();
verify(mockAdminClient).close();
verify(mockCursorClient).close();
}
}

0 comments on commit b68d2b4

Please sign in to comment.