Skip to content

Commit

Permalink
feat: add the pubsublite sink (#21)
Browse files Browse the repository at this point in the history
* sink

sink stuff

schema changes

remove serailzed supplier

stuff

fixes from rebasing

synchronize

formatting

* improvements

* formatting

* address comments
  • Loading branch information
palmere-google authored Jul 29, 2021
1 parent 5ecfa6b commit 186c93b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 2 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@
<artifactId>flink-metrics-core</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.Date;
import java.time.Instant;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;

public interface PubsubLiteSerializationSchema<T> extends Serializable {
static <T> PubsubLiteSerializationSchema<T> dataOnly(SerializationSchema<T> schema) {
Expand All @@ -42,6 +43,18 @@ public Message serialize(T value, Instant timestamp) {
};
}

static PubsubLiteSerializationSchema<Message> messageSchema() {
return new PubsubLiteSerializationSchema<Message>() {
@Override
public void open(InitializationContext context) {}

@Override
public Message serialize(Message value, Instant timestamp) {
return value;
}
};
}

void open(SerializationSchema.InitializationContext context) throws Exception;

Message serialize(T value, Instant timestamp);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink;

import com.google.cloud.Tuple;
import com.google.cloud.pubsublite.flink.sink.BulkWaitPublisher;
import com.google.cloud.pubsublite.flink.sink.MessagePublisher;
import com.google.cloud.pubsublite.flink.sink.PerServerPublisherCache;
import com.google.cloud.pubsublite.flink.sink.SerializingPublisher;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Instant;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class PubsubLiteSink<T> extends RichSinkFunction<T> implements CheckpointedFunction {
private final PubsubLiteSinkSettings<T> settings;

@GuardedBy("this")
private transient BulkWaitPublisher<Tuple<T, Instant>> publisher;

public PubsubLiteSink(PubsubLiteSinkSettings<T> settings) {
this.settings = settings;
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext)
throws Exception {}

@Override
public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
publisher.waitUntilNoOutstandingPublishes();
}

@Override
public synchronized void invoke(T value, Context context) throws Exception {
Long timestamp = context.timestamp();
if (timestamp == null) {
timestamp = context.currentProcessingTime();
}
publisher.publish(Tuple.of(value, Instant.ofEpochMilli(timestamp)));
}

@Override
public synchronized void open(Configuration parameters) throws Exception {
super.open(parameters);
publisher =
new SerializingPublisher<>(
new MessagePublisher(
PerServerPublisherCache.getOrCreate(settings.getPublisherConfig())),
settings.serializationSchema());
}

@Override
public synchronized void close() throws Exception {
publisher.waitUntilNoOutstandingPublishes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.pubsublite.flink;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.flink.sink.PublisherOptions;
import java.io.Serializable;

@AutoValue
public abstract class PubsubLiteSinkSettings<InputT> implements Serializable {
// Create a builder which will accept messages of type InputT and serialize them using the
// provided serialization schema.
public static <InputT> Builder<InputT> builder(PubsubLiteSerializationSchema<InputT> schema) {
return new AutoValue_PubsubLiteSinkSettings.Builder<InputT>().setSerializationSchema(schema);
}

// Create a sink which will accept already serialized pubsub messages/
public static Builder<Message> messagesBuilder() {
return builder(PubsubLiteSerializationSchema.messageSchema());
}

// Required. The path of the topic to publish messages to.
public abstract TopicPath topicPath();

// Internal.
abstract PubsubLiteSerializationSchema<InputT> serializationSchema();

PublisherOptions getPublisherConfig() {
return PublisherOptions.create(topicPath());
}

@AutoValue.Builder
abstract static class Builder<InputT> {
// Required.
public abstract Builder<InputT> setTopicPath(TopicPath value);

// Internal.
abstract Builder<InputT> setSerializationSchema(PubsubLiteSerializationSchema<InputT> value);

public abstract PubsubLiteSinkSettings<InputT> build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class MessagePublisher implements BulkWaitPublisher<Message> {
public MessagePublisher(Publisher<MessageMetadata> publisher) {
this.publisher = publisher;
this.publishes = new ArrayList<>();
this.publisher.startAsync();
this.publisher.awaitRunning();
}

@Override
Expand Down

0 comments on commit 186c93b

Please sign in to comment.