Skip to content

Commit

Permalink
[Bug][seatunnel-connector-flink-KafkaSink] Use JsonRowSerializationSc…
Browse files Browse the repository at this point in the history
…hema and dataStream to solve the problem that kafkaSink cannot output data.And successfully compiled locally (apache#1598)
  • Loading branch information
realdengziqi authored Mar 29, 2022
1 parent 5290c81 commit 1537758
Showing 1 changed file with 24 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,20 @@
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.flink.util.SchemaUtil;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

Expand All @@ -54,42 +53,27 @@ public class KafkaTable implements FlinkStreamSink {
public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
StreamTableEnvironment tableEnvironment = env.getStreamTableEnvironment();
Table table = tableEnvironment.fromDataStream(dataStream);
insert(tableEnvironment, table);
return null;
}

private void insert(TableEnvironment tableEnvironment, Table table) {
TypeInformation<?>[] types = table.getSchema().getFieldTypes();
String[] fieldNames = table.getSchema().getFieldNames();
Schema schema = getSchema(types, fieldNames);
String uniqueTableName = SchemaUtil.getUniqueTableName();

tableEnvironment.connect(getKafkaConnect())
.withSchema(schema)
.withFormat(setFormat())
.inAppendMode()
.createTemporaryTable(uniqueTableName);
table.insertInto(uniqueTableName);
}

private Schema getSchema(TypeInformation<?>[] informations, String[] fieldNames) {
Schema schema = new Schema();
for (int i = 0; i < informations.length; i++) {
schema.field(fieldNames[i], informations[i]);
}
return schema;
}

private Kafka getKafkaConnect() {

Kafka kafka = new Kafka().version("universal");
kafka.topic(topic);
kafka.properties(kafkaParams);
return kafka;
}

private FormatDescriptor setFormat() {
return new Json().failOnMissingField(false).deriveSchema();
JsonRowSerializationSchema jsonRowSerializationSchema = JsonRowSerializationSchema
.builder()
.withTypeInfo(Types.ROW_NAMED(fieldNames, types))
.build();

dataStream.addSink(
new FlinkKafkaProducer<Row>(
topic,
new KafkaSerializationSchema<Row>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long timestamp) {
return new ProducerRecord<>(topic, jsonRowSerializationSchema.serialize(row));
}
},
kafkaParams,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
);
return null;
}

@Override
Expand Down

0 comments on commit 1537758

Please sign in to comment.