From b435a7d9a9f52da83cff8e9dd0454ad3027f07ac Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Fri, 21 Feb 2025 10:19:37 -0800 Subject: [PATCH 1/2] fix: update csv parser for builder compatibility --- .../sources/declarative/decoders/composite_raw_decoder.py | 3 ++- .../declarative/parsers/model_to_component_factory.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 2cb618175..b8e8e3315 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -126,7 +126,8 @@ def parse( """ text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",") - yield from reader + for row in reader: + yield row @dataclass diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d9cc3e3ef..57837e77f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2091,10 +2091,10 @@ def create_dynamic_schema_loader( def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> Decoder: return JsonDecoder(parameters={}) - @staticmethod - def create_csv_decoder(model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: Any) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod From fa13d8fb93eb19d118b2e30ee386deb4f524c398 Mon Sep 17 00:00:00 2001 From: ChristoGrab Date: Fri, 21 Feb 2025 10:40:35 -0800 Subject: [PATCH 2/2] chore: update gzip decoder to not emit messages on builder read --- .../declarative/parsers/model_to_component_factory.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 57837e77f..0ad66e1b0 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2103,10 +2103,12 @@ def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any parser=ModelToComponentFactory._get_parser(model, config), stream_response=True ) - @staticmethod - def create_gzip_decoder(model: GzipDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_gzip_decoder( + self, model: GzipDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) @staticmethod