Skip to content

Commit

Permalink
feat: Add the pubsub lite source reader (#13)
Browse files Browse the repository at this point in the history
* SourceReader

* fix pom version

* deflake test

* add comments

* formatting
  • Loading branch information
palmere-google authored Jul 20, 2021
1 parent e4649d5 commit 2c407e9
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-test-utils</artifactId>
<version>1.13.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.reader;

import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Consumer;

public class CheckpointCursorCommitter {
@GuardedBy("this")
private final Set<SubscriptionPartitionSplit> finished = new HashSet<>();

@GuardedBy("this")
private final LinkedHashMap<Long, List<SubscriptionPartitionSplit>> checkpoints =
new LinkedHashMap<>();

private final Consumer<SubscriptionPartitionSplit> cursorCommitter;

public CheckpointCursorCommitter(Consumer<SubscriptionPartitionSplit> cursorCommitter) {
this.cursorCommitter = cursorCommitter;
}

public synchronized void addCheckpoint(
long checkpointId, Collection<SubscriptionPartitionSplit> checkpoint) {
checkpoints.put(
checkpointId,
ImmutableList.<SubscriptionPartitionSplit>builder()
.addAll(checkpoint)
.addAll(finished)
.build());
}

public synchronized void notifyCheckpointComplete(long checkpointId) {
if (!checkpoints.containsKey(checkpointId)) {
return;
}
// Commit offsets corresponding to this checkpoint.
List<SubscriptionPartitionSplit> splits = checkpoints.get(checkpointId);
splits.forEach(cursorCommitter);
// Prune all checkpoints created before the one we just committed.
Iterator<Entry<Long, List<SubscriptionPartitionSplit>>> iter =
checkpoints.entrySet().iterator();
while (iter.hasNext()) {
long id = iter.next().getKey();
iter.remove();
if (id == checkpointId) {
break;
}
}
}

public synchronized void notifySplitFinished(Collection<SubscriptionPartitionSplit> splits) {
finished.addAll(splits);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.reader;

import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplitState;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;

/**
* The pubsub lite record emitter emits records to the source output and also tracks the position of
* the source reader within the split by updating the current offset on the split state.
*/
public class PubsubLiteRecordEmitter<T>
implements RecordEmitter<Record<T>, T, SubscriptionPartitionSplitState> {
@Override
public void emitRecord(
Record<T> record,
SourceOutput<T> sourceOutput,
SubscriptionPartitionSplitState subscriptionPartitionSplitState) {
if (record.value().isPresent()) {
sourceOutput.collect(record.value().get(), record.timestamp().toEpochMilli());
}
// Update the position of the source reader within the split.
subscriptionPartitionSplitState.setCurrent(record.offset());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.reader;

import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplitState;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;

public class PubsubLiteSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
Record<T>, T, SubscriptionPartitionSplit, SubscriptionPartitionSplitState> {
private final CheckpointCursorCommitter checkpointCursorCommitter;

public PubsubLiteSourceReader(
RecordEmitter<Record<T>, T, SubscriptionPartitionSplitState> recordEmitter,
Consumer<SubscriptionPartitionSplit> cursorCommitter,
Supplier<SplitReader<Record<T>, SubscriptionPartitionSplit>> splitReaderSupplier,
Configuration config,
SourceReaderContext context) {
super(splitReaderSupplier, recordEmitter, config, context);
this.checkpointCursorCommitter = new CheckpointCursorCommitter(cursorCommitter);
}

@Override
public List<SubscriptionPartitionSplit> snapshotState(long checkpointId) {
// When a checkpoint is started we intercept the checkpoint call and save the checkpoint.
// Once the checkpoint has been committed (notifyCheckpointComplete is called) we will propagate
// the cursors to pubsub lite.
List<SubscriptionPartitionSplit> checkpoint = super.snapshotState(checkpointId);
checkpointCursorCommitter.addCheckpoint(checkpointId, checkpoint);
return checkpoint;
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
checkpointCursorCommitter.notifyCheckpointComplete(checkpointId);
}

@Override
protected SubscriptionPartitionSplitState initializedState(
SubscriptionPartitionSplit sourceSplit) {
return new SubscriptionPartitionSplitState(sourceSplit);
}

@Override
protected SubscriptionPartitionSplit toSplitType(
String splitState, SubscriptionPartitionSplitState state) {
return state.toSplit();
}

@Override
protected void onSplitFinished(Map<String, SubscriptionPartitionSplitState> map) {
// When splits are saved, we track the final position of the split so we can commit it to
// pubsub lite. This is necessary because the split will not appear in future checkpoints.
checkpointCursorCommitter.notifySplitFinished(
map.values().stream()
.map(SubscriptionPartitionSplitState::toSplit)
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.split;

import com.google.cloud.pubsublite.Offset;

public final class SubscriptionPartitionSplitState {
SubscriptionPartitionSplit split;
Offset current;

public SubscriptionPartitionSplitState(SubscriptionPartitionSplit split) {
this.split = split;
this.current = split.start();
}

public void setCurrent(Offset offset) {
current = offset;
}

public SubscriptionPartitionSplit toSplit() {
return SubscriptionPartitionSplit.create(split.subscriptionPath(), split.partition(), current);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.reader;

import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.examplePartition;
import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.exampleSubscriptionPath;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
import com.google.common.collect.ImmutableList;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class CheckpointCursorCommitterTest {
@Mock Consumer<SubscriptionPartitionSplit> mockConsumer;
CheckpointCursorCommitter cursorCommitter;

@Before
public void setUp() {
cursorCommitter = new CheckpointCursorCommitter(mockConsumer);
}

public static SubscriptionPartitionSplit splitFromPartition(Partition partition) {
return SubscriptionPartitionSplit.create(exampleSubscriptionPath(), partition, Offset.of(0));
}

@Test
public void testFinishedSplits() {
SubscriptionPartitionSplit split = splitFromPartition(examplePartition());
cursorCommitter.notifySplitFinished(ImmutableList.of(split));
cursorCommitter.addCheckpoint(1, ImmutableList.of());
cursorCommitter.notifyCheckpointComplete(1);
verify(mockConsumer).accept(split);
}

@Test
public void testCheckpointCommitted() {
SubscriptionPartitionSplit split = splitFromPartition(examplePartition());
cursorCommitter.addCheckpoint(1, ImmutableList.of(split));
cursorCommitter.notifyCheckpointComplete(1);
verify(mockConsumer).accept(split);
}

@Test
public void testUnknownCheckpoint() {
SubscriptionPartitionSplit split = splitFromPartition(examplePartition());
cursorCommitter.addCheckpoint(1, ImmutableList.of(split));
cursorCommitter.notifyCheckpointComplete(4);
verifyNoInteractions(mockConsumer);
}

@Test
public void testIntermediateCheckpointSkipped() {
SubscriptionPartitionSplit split1 = splitFromPartition(Partition.of(1));
SubscriptionPartitionSplit split2 = splitFromPartition(Partition.of(2));
// The numeric ids of the checkpoints don't matter, just the order they're taken in.
cursorCommitter.addCheckpoint(2, ImmutableList.of(split2));
cursorCommitter.addCheckpoint(1, ImmutableList.of(split1));

// Checkpoint 1 is committed, removing checkpoint 2
cursorCommitter.notifyCheckpointComplete(1);
verify(mockConsumer).accept(split1);
cursorCommitter.notifyCheckpointComplete(2);
verifyNoMoreInteractions(mockConsumer);
}
}
Loading

0 comments on commit 2c407e9

Please sign in to comment.