From bef8eae8036e685464ce9af461b2f58f7cc012f1 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Wed, 1 Nov 2023 17:26:49 +0800 Subject: [PATCH] [hotfix][python] Fix Kafka csv example --- .../examples/datastream/connectors/kafka_csv_format.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py index 4dbb243fcf984..39c134a8ed336 100644 --- a/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py +++ b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py @@ -21,8 +21,7 @@ from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer -from pyflink.datastream.formats.csv import CsvRowSerializationSchema -from pyflink.datastream.formats.json import JsonRowDeserializationSchema +from pyflink.datastream.formats.csv import CsvRowSerializationSchema, CsvRowDeserializationSchema # Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is @@ -46,9 +45,8 @@ def write_to_kafka(env): def read_from_kafka(env): - deserialization_schema = JsonRowDeserializationSchema.Builder() \ - .type_info(Types.ROW([Types.INT(), Types.STRING()])) \ - .build() + type_info = Types.ROW([Types.INT(), Types.STRING()]) + deserialization_schema = CsvRowDeserializationSchema.Builder(type_info).build() kafka_consumer = FlinkKafkaConsumer( topics='test_csv_topic',