diff --git a/pom.xml b/pom.xml
index 74daccf9..44336450 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,6 +101,16 @@
flink-metrics-core
1.13.0
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ 1.13.0
+
+
+ org.apache.flink
+ flink-runtime_2.11
+ 1.13.0
+
junit
junit
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
index eca7c0af..d9926302 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSerializationSchema.java
@@ -22,6 +22,7 @@
import java.sql.Date;
import java.time.Instant;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
public interface PubsubLiteSerializationSchema extends Serializable {
static PubsubLiteSerializationSchema dataOnly(SerializationSchema schema) {
@@ -42,6 +43,18 @@ public Message serialize(T value, Instant timestamp) {
};
}
+ static PubsubLiteSerializationSchema messageSchema() {
+ return new PubsubLiteSerializationSchema() {
+ @Override
+ public void open(InitializationContext context) {}
+
+ @Override
+ public Message serialize(Message value, Instant timestamp) {
+ return value;
+ }
+ };
+ }
+
void open(SerializationSchema.InitializationContext context) throws Exception;
Message serialize(T value, Instant timestamp);
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
new file mode 100644
index 00000000..703847cd
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink;
+
+import com.google.cloud.Tuple;
+import com.google.cloud.pubsublite.flink.sink.BulkWaitPublisher;
+import com.google.cloud.pubsublite.flink.sink.MessagePublisher;
+import com.google.cloud.pubsublite.flink.sink.PerServerPublisherCache;
+import com.google.cloud.pubsublite.flink.sink.SerializingPublisher;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import java.time.Instant;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+public class PubsubLiteSink extends RichSinkFunction implements CheckpointedFunction {
+ private final PubsubLiteSinkSettings settings;
+
+ @GuardedBy("this")
+ private transient BulkWaitPublisher> publisher;
+
+ public PubsubLiteSink(PubsubLiteSinkSettings settings) {
+ this.settings = settings;
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext)
+ throws Exception {}
+
+ @Override
+ public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
+ throws Exception {
+ publisher.waitUntilNoOutstandingPublishes();
+ }
+
+ @Override
+ public synchronized void invoke(T value, Context context) throws Exception {
+ Long timestamp = context.timestamp();
+ if (timestamp == null) {
+ timestamp = context.currentProcessingTime();
+ }
+ publisher.publish(Tuple.of(value, Instant.ofEpochMilli(timestamp)));
+ }
+
+ @Override
+ public synchronized void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ publisher =
+ new SerializingPublisher<>(
+ new MessagePublisher(
+ PerServerPublisherCache.getOrCreate(settings.getPublisherConfig())),
+ settings.serializationSchema());
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ publisher.waitUntilNoOutstandingPublishes();
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java
new file mode 100644
index 00000000..750c4e84
--- /dev/null
+++ b/src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.pubsublite.flink;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.Message;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.flink.sink.PublisherOptions;
+import java.io.Serializable;
+
+@AutoValue
+public abstract class PubsubLiteSinkSettings implements Serializable {
+ // Create a builder which will accept messages of type InputT and serialize them using the
+ // provided serialization schema.
+ public static Builder builder(PubsubLiteSerializationSchema schema) {
+ return new AutoValue_PubsubLiteSinkSettings.Builder().setSerializationSchema(schema);
+ }
+
+ // Create a sink which will accept already serialized pubsub messages/
+ public static Builder messagesBuilder() {
+ return builder(PubsubLiteSerializationSchema.messageSchema());
+ }
+
+ // Required. The path of the topic to publish messages to.
+ public abstract TopicPath topicPath();
+
+ // Internal.
+ abstract PubsubLiteSerializationSchema serializationSchema();
+
+ PublisherOptions getPublisherConfig() {
+ return PublisherOptions.create(topicPath());
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ // Required.
+ public abstract Builder setTopicPath(TopicPath value);
+
+ // Internal.
+ abstract Builder setSerializationSchema(PubsubLiteSerializationSchema value);
+
+ public abstract PubsubLiteSinkSettings build();
+ }
+}
diff --git a/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java b/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java
index f6c8cc20..df86acde 100644
--- a/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java
+++ b/src/main/java/com/google/cloud/pubsublite/flink/sink/MessagePublisher.java
@@ -32,8 +32,6 @@ public class MessagePublisher implements BulkWaitPublisher {
public MessagePublisher(Publisher publisher) {
this.publisher = publisher;
this.publishes = new ArrayList<>();
- this.publisher.startAsync();
- this.publisher.awaitRunning();
}
@Override