Skip to content

Commit

Permalink
fix: Change to use the sinkv2 interfaces (#186)
Browse files Browse the repository at this point in the history
* fix: Change to use the sinkv2 interfaces

* fix: Change to use the sinkv2 interfaces

* fix: Change to use the sinkv2 interfaces

* fix: Change to use the sinkv2 interfaces

* fix: Change to use the sinkv2 interfaces

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] authored Mar 3, 2023
1 parent f6153c3 commit e142808
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 128 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package com.google.cloud.pubsublite.flink;

import com.google.cloud.Timestamp;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
import java.sql.Date;
import java.time.Instant;
import org.apache.flink.api.common.serialization.SerializationSchema;

Expand All @@ -36,7 +35,7 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
public PubSubMessage serialize(T value, Instant timestamp) {
return PubSubMessage.newBuilder()
.setData(ByteString.copyFrom(schema.serialize(value)))
.setEventTime(Timestamp.of(Date.from(timestamp)).toProto())
.setEventTime(Timestamps.fromMillis(timestamp.toEpochMilli()))
.build();
}
};
Expand Down
66 changes: 17 additions & 49 deletions src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,35 @@
*/
package com.google.cloud.pubsublite.flink;

import com.google.cloud.Tuple;
import com.google.cloud.pubsublite.flink.internal.sink.BulkWaitPublisher;
import com.google.cloud.pubsublite.flink.internal.sink.MessagePublisher;
import com.google.cloud.pubsublite.flink.internal.sink.PerServerPublisherCache;
import com.google.cloud.pubsublite.flink.internal.sink.SerializingPublisher;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Instant;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.google.cloud.pubsublite.flink.internal.sink.PubsubLiteSinkWriter;
import java.io.IOException;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

public class PubsubLiteSink<T> extends RichSinkFunction<T> implements CheckpointedFunction {
public class PubsubLiteSink<T> implements Sink<T> {
private static final long serialVersionUID = 849752028745098L;

private final PubsubLiteSinkSettings<T> settings;

@GuardedBy("this")
private transient BulkWaitPublisher<Tuple<T, Instant>> publisher;

public PubsubLiteSink(PubsubLiteSinkSettings<T> settings) {
this.settings = settings;
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) {}

@Override
public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
publisher.waitUntilNoOutstandingPublishes();
}

@Override
public synchronized void invoke(T value, Context context) throws Exception {
Long timestamp = context.timestamp();
if (timestamp == null) {
timestamp = context.currentProcessingTime();
public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
PubsubLiteSerializationSchema<T> schema = settings.serializationSchema();
try {
schema.open(initContext.asSerializationSchemaInitializationContext());
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
publisher.publish(Tuple.of(value, Instant.ofEpochMilli(timestamp)));
}

@Override
public synchronized void open(Configuration parameters) throws Exception {
super.open(parameters);
settings
.serializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
publisher =
new SerializingPublisher<>(
new MessagePublisher(
PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
settings.serializationSchema());
}

@Override
public synchronized void close() throws Exception {
publisher.waitUntilNoOutstandingPublishes();
return new PubsubLiteSinkWriter<>(
new MessagePublisher(
PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package com.google.cloud.pubsublite.flink.internal.sink;

import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import java.io.Flushable;

// Thread-compatible.
public interface BulkWaitPublisher<T> {
/** A publisher that can be flushed to wait on all outstanding messages. */
public interface BulkWaitPublisher extends Flushable {

void publish(T message) throws InterruptedException;

void waitUntilNoOutstandingPublishes() throws CheckedApiException;
void publish(PubSubMessage message) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
Expand All @@ -31,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessagePublisher implements BulkWaitPublisher<PubSubMessage> {
public class MessagePublisher implements BulkWaitPublisher {
private static final Logger LOG = LoggerFactory.getLogger(MessagePublisher.class);
private final Publisher<MessageMetadata> publisher;
private final List<ApiFuture<MessageMetadata>> publishes;
Expand Down Expand Up @@ -69,12 +68,12 @@ public void publish(PubSubMessage message) throws InterruptedException {
}

@Override
public void waitUntilNoOutstandingPublishes() throws CheckedApiException {
public void flush() {
try {
ApiFutures.allAsList(publishes).get();
publishes.clear();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
throw ExtractStatus.toCanonical(e).underlying;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink.internal.sink;

import com.google.cloud.pubsublite.flink.PubsubLiteSerializationSchema;
import java.io.IOException;
import java.time.Instant;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.connector.sink2.SinkWriter;

public class PubsubLiteSinkWriter<T> implements SinkWriter<T> {
private final BulkWaitPublisher publisher;
private final PubsubLiteSerializationSchema<T> schema;

public PubsubLiteSinkWriter(
BulkWaitPublisher publisher, PubsubLiteSerializationSchema<T> schema) {
this.publisher = publisher;
this.schema = schema;
}

@Override
public void write(T value, Context context) throws InterruptedException {
Long timestamp = context.timestamp();
if (timestamp == null) {
timestamp = context.currentWatermark();
}
if (timestamp == TimestampAssigner.NO_TIMESTAMP) {
timestamp = System.currentTimeMillis();
}
publisher.publish(schema.serialize(value, Instant.ofEpochMilli(timestamp)));
}

@Override
public void flush(boolean endOfInput) throws IOException {
publisher.flush();
}

@Override
public void close() throws IOException {
publisher.flush();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void testSourceWithFailure() throws Exception {
public void testSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(INTEGER_STRINGS)
.addSink(new PubsubLiteSink<>(sinkSettings()))
.sinkTo(new PubsubLiteSink<>(sinkSettings()))
.name("PSL Sink");

env.fromSource(
Expand Down Expand Up @@ -281,7 +281,7 @@ public void testSinkWithFailure() throws Exception {
RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.MILLISECONDS)));

env.fromCollection(INTEGER_STRINGS)
.addSink(new PubsubLiteSink<>(sinkSettings()))
.sinkTo(new PubsubLiteSink<>(sinkSettings()))
.name("PSL Sink");

env.fromSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void testPublish() throws Exception {

messagePublisher.publish(message1);

messagePublisher.waitUntilNoOutstandingPublishes();
messagePublisher.flush();

verify(fakeInnerPublisher).publish(Message.fromProto(message1));
}
Expand All @@ -77,8 +78,7 @@ public void testSinglePublishFailure() throws Exception {
messagePublisher.publish(message1);
verify(fakeInnerPublisher).publish(Message.fromProto(message1));

assertThrows(
CheckedApiException.class, () -> messagePublisher.waitUntilNoOutstandingPublishes());
assertThrows(ApiException.class, () -> messagePublisher.flush());
}

@Test
Expand All @@ -94,7 +94,7 @@ public void testCheckpointWithOutstandingPublish() throws Exception {
.submit(
() -> {
try {
messagePublisher.waitUntilNoOutstandingPublishes();
messagePublisher.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,53 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.cloud.Tuple;
import com.google.cloud.pubsublite.flink.PubsubLiteSerializationSchema;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import java.time.Instant;
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class SerializingPublisherTest {
@Mock BulkWaitPublisher<PubSubMessage> mockPublisher;
public class PubsubLiteSinkWriterTest {
@Mock BulkWaitPublisher mockPublisher;
@Mock PubsubLiteSerializationSchema<String> mockSchema;
SerializingPublisher<String> publisher;
PubsubLiteSinkWriter<String> writer;

@Before
public void setUp() {
publisher = new SerializingPublisher<>(mockPublisher, mockSchema);
writer = new PubsubLiteSinkWriter<>(mockPublisher, mockSchema);
}

@Test
public void testWaitUntilNoOutstandingPublishes() throws Exception {
publisher.waitUntilNoOutstandingPublishes();
verify(mockPublisher).waitUntilNoOutstandingPublishes();
public void testFlush() throws Exception {
writer.flush(false);
verify(mockPublisher).flush();
}

@Test
public void testPublish() throws Exception {
Instant timestamp = Instant.ofEpochMilli(1000);
long timestamp = 10000;
PubSubMessage message =
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("data")).build();
when(mockSchema.serialize("message", timestamp)).thenReturn(message);
publisher.publish(Tuple.of("message", timestamp));
when(mockSchema.serialize("message", Instant.ofEpochMilli(timestamp))).thenReturn(message);
writer.write(
"message",
new Context() {
@Override
public long currentWatermark() {
return System.currentTimeMillis();
}

@Override
public Long timestamp() {
return timestamp;
}
});
verify(mockPublisher).publish(message);
}
}

0 comments on commit e142808

Please sign in to comment.