diff --git a/pom.xml b/pom.xml
index 7ec07850..d918bd05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,12 +120,6 @@
${flink.version}
provided
-
- org.apache.flink
- flink-runtime
- ${flink.version}
- provided
-
junit
junit
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
index 0f58a7bf..a94c76eb 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
@@ -15,11 +15,10 @@
*/
package com.google.cloud.pubsublite.flink;
-import com.google.cloud.Timestamp;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
+import com.google.protobuf.util.Timestamps;
import java.io.Serializable;
-import java.sql.Date;
import java.time.Instant;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -36,7 +35,7 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
public PubSubMessage serialize(T value, Instant timestamp) {
return PubSubMessage.newBuilder()
.setData(ByteString.copyFrom(schema.serialize(value)))
- .setEventTime(Timestamp.of(Date.from(timestamp)).toProto())
+ .setEventTime(Timestamps.fromMillis(timestamp.toEpochMilli()))
.build();
}
};
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
index b471bd79..74627d82 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
@@ -15,67 +15,35 @@
*/
package com.google.cloud.pubsublite.flink;
-import com.google.cloud.Tuple;
-import com.google.cloud.pubsublite.flink.internal.sink.BulkWaitPublisher;
import com.google.cloud.pubsublite.flink.internal.sink.MessagePublisher;
import com.google.cloud.pubsublite.flink.internal.sink.PerServerPublisherCache;
-import com.google.cloud.pubsublite.flink.internal.sink.SerializingPublisher;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.time.Instant;
-import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import com.google.cloud.pubsublite.flink.internal.sink.PubsubLiteSinkWriter;
+import java.io.IOException;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
-public class PubsubLiteSink extends RichSinkFunction implements CheckpointedFunction {
+public class PubsubLiteSink implements Sink {
private static final long serialVersionUID = 849752028745098L;
private final PubsubLiteSinkSettings settings;
- @GuardedBy("this")
- private transient BulkWaitPublisher> publisher;
-
public PubsubLiteSink(PubsubLiteSinkSettings settings) {
this.settings = settings;
}
@Override
- public void initializeState(FunctionInitializationContext functionInitializationContext) {}
-
- @Override
- public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
- throws Exception {
- publisher.waitUntilNoOutstandingPublishes();
- }
-
- @Override
- public synchronized void invoke(T value, Context context) throws Exception {
- Long timestamp = context.timestamp();
- if (timestamp == null) {
- timestamp = context.currentProcessingTime();
+ public SinkWriter createWriter(InitContext initContext) throws IOException {
+ PubsubLiteSerializationSchema schema = settings.serializationSchema();
+ try {
+ schema.open(initContext.asSerializationSchemaInitializationContext());
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException(e);
}
- publisher.publish(Tuple.of(value, Instant.ofEpochMilli(timestamp)));
- }
-
- @Override
- public synchronized void open(Configuration parameters) throws Exception {
- super.open(parameters);
- settings
- .serializationSchema()
- .open(
- RuntimeContextInitializationContextAdapters.serializationAdapter(
- getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
- publisher =
- new SerializingPublisher<>(
- new MessagePublisher(
- PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
- settings.serializationSchema());
- }
-
- @Override
- public synchronized void close() throws Exception {
- publisher.waitUntilNoOutstandingPublishes();
+ return new PubsubLiteSinkWriter<>(
+ new MessagePublisher(
+ PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
+ schema);
}
}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/BulkWaitPublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/BulkWaitPublisher.java
index ce2658e5..5f419214 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/BulkWaitPublisher.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/BulkWaitPublisher.java
@@ -15,12 +15,11 @@
*/
package com.google.cloud.pubsublite.flink.internal.sink;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.proto.PubSubMessage;
+import java.io.Flushable;
-// Thread-compatible.
-public interface BulkWaitPublisher {
+/** A publisher that can be flushed to wait on all outstanding messages. */
+public interface BulkWaitPublisher extends Flushable {
- void publish(T message) throws InterruptedException;
-
- void waitUntilNoOutstandingPublishes() throws CheckedApiException;
+ void publish(PubSubMessage message) throws InterruptedException;
}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java
index 8b4df90b..6c2410d3 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java
@@ -21,7 +21,6 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
@@ -31,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MessagePublisher implements BulkWaitPublisher {
+public class MessagePublisher implements BulkWaitPublisher {
private static final Logger LOG = LoggerFactory.getLogger(MessagePublisher.class);
private final Publisher publisher;
private final List> publishes;
@@ -69,12 +68,12 @@ public void publish(PubSubMessage message) throws InterruptedException {
}
@Override
- public void waitUntilNoOutstandingPublishes() throws CheckedApiException {
+ public void flush() {
try {
ApiFutures.allAsList(publishes).get();
publishes.clear();
} catch (Exception e) {
- throw ExtractStatus.toCanonical(e);
+ throw ExtractStatus.toCanonical(e).underlying;
}
}
}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriter.java b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriter.java
new file mode 100644
index 00000000..50626030
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink.internal.sink;
+
+import com.google.cloud.pubsublite.flink.PubsubLiteSerializationSchema;
+import java.io.IOException;
+import java.time.Instant;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+public class PubsubLiteSinkWriter implements SinkWriter {
+ private final BulkWaitPublisher publisher;
+ private final PubsubLiteSerializationSchema schema;
+
+ public PubsubLiteSinkWriter(
+ BulkWaitPublisher publisher, PubsubLiteSerializationSchema schema) {
+ this.publisher = publisher;
+ this.schema = schema;
+ }
+
+ @Override
+ public void write(T value, Context context) throws InterruptedException {
+ Long timestamp = context.timestamp();
+ if (timestamp == null) {
+ timestamp = context.currentWatermark();
+ }
+ if (timestamp == TimestampAssigner.NO_TIMESTAMP) {
+ timestamp = System.currentTimeMillis();
+ }
+ publisher.publish(schema.serialize(value, Instant.ofEpochMilli(timestamp)));
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException {
+ publisher.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ publisher.flush();
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisher.java
deleted file mode 100644
index 96ddb848..00000000
--- a/src/main/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisher.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2021 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.pubsublite.flink.internal.sink;
-
-import com.google.cloud.Tuple;
-import com.google.cloud.pubsublite.flink.PubsubLiteSerializationSchema;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.proto.PubSubMessage;
-import java.time.Instant;
-
-public class SerializingPublisher implements BulkWaitPublisher> {
- private final BulkWaitPublisher inner;
- private final PubsubLiteSerializationSchema schema;
-
- public SerializingPublisher(
- BulkWaitPublisher inner, PubsubLiteSerializationSchema schema) {
- this.inner = inner;
- this.schema = schema;
- }
-
- @Override
- public void publish(Tuple message) throws InterruptedException {
- inner.publish(schema.serialize(message.x(), message.y()));
- }
-
- @Override
- public void waitUntilNoOutstandingPublishes() throws CheckedApiException {
- inner.waitUntilNoOutstandingPublishes();
- }
-}
diff --git a/src/test/java/com/google/cloud/pubsublite/flink/ITSourceAndSinkTest.java b/src/test/java/com/google/cloud/pubsublite/flink/ITSourceAndSinkTest.java
index 5dc3f9d4..f36dafec 100644
--- a/src/test/java/com/google/cloud/pubsublite/flink/ITSourceAndSinkTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/flink/ITSourceAndSinkTest.java
@@ -240,7 +240,7 @@ public void testSourceWithFailure() throws Exception {
public void testSink() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(INTEGER_STRINGS)
- .addSink(new PubsubLiteSink<>(sinkSettings()))
+ .sinkTo(new PubsubLiteSink<>(sinkSettings()))
.name("PSL Sink");
env.fromSource(
@@ -281,7 +281,7 @@ public void testSinkWithFailure() throws Exception {
RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.MILLISECONDS)));
env.fromCollection(INTEGER_STRINGS)
- .addSink(new PubsubLiteSink<>(sinkSettings()))
+ .sinkTo(new PubsubLiteSink<>(sinkSettings()))
.name("PSL Sink");
env.fromSource(
diff --git a/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java b/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java
index 9c3027c9..e935da46 100644
--- a/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java
@@ -24,6 +24,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
@@ -63,7 +64,7 @@ public void testPublish() throws Exception {
messagePublisher.publish(message1);
- messagePublisher.waitUntilNoOutstandingPublishes();
+ messagePublisher.flush();
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
}
@@ -77,8 +78,7 @@ public void testSinglePublishFailure() throws Exception {
messagePublisher.publish(message1);
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
- assertThrows(
- CheckedApiException.class, () -> messagePublisher.waitUntilNoOutstandingPublishes());
+ assertThrows(ApiException.class, () -> messagePublisher.flush());
}
@Test
@@ -94,7 +94,7 @@ public void testCheckpointWithOutstandingPublish() throws Exception {
.submit(
() -> {
try {
- messagePublisher.waitUntilNoOutstandingPublishes();
+ messagePublisher.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisherTest.java b/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriterTest.java
similarity index 65%
rename from src/test/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisherTest.java
rename to src/test/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriterTest.java
index cd3bcbe7..46d804db 100644
--- a/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/SerializingPublisherTest.java
+++ b/src/test/java/com/google/cloud/pubsublite/flink/internal/sink/PubsubLiteSinkWriterTest.java
@@ -18,11 +18,11 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.cloud.Tuple;
import com.google.cloud.pubsublite.flink.PubsubLiteSerializationSchema;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import java.time.Instant;
+import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -30,29 +30,41 @@
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-public class SerializingPublisherTest {
- @Mock BulkWaitPublisher mockPublisher;
+public class PubsubLiteSinkWriterTest {
+ @Mock BulkWaitPublisher mockPublisher;
@Mock PubsubLiteSerializationSchema mockSchema;
- SerializingPublisher publisher;
+ PubsubLiteSinkWriter writer;
@Before
public void setUp() {
- publisher = new SerializingPublisher<>(mockPublisher, mockSchema);
+ writer = new PubsubLiteSinkWriter<>(mockPublisher, mockSchema);
}
@Test
- public void testWaitUntilNoOutstandingPublishes() throws Exception {
- publisher.waitUntilNoOutstandingPublishes();
- verify(mockPublisher).waitUntilNoOutstandingPublishes();
+ public void testFlush() throws Exception {
+ writer.flush(false);
+ verify(mockPublisher).flush();
}
@Test
public void testPublish() throws Exception {
- Instant timestamp = Instant.ofEpochMilli(1000);
+ long timestamp = 10000;
PubSubMessage message =
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("data")).build();
- when(mockSchema.serialize("message", timestamp)).thenReturn(message);
- publisher.publish(Tuple.of("message", timestamp));
+ when(mockSchema.serialize("message", Instant.ofEpochMilli(timestamp))).thenReturn(message);
+ writer.write(
+ "message",
+ new Context() {
+ @Override
+ public long currentWatermark() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public Long timestamp() {
+ return timestamp;
+ }
+ });
verify(mockPublisher).publish(message);
}
}