Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CSV options to the CSV parser #28491

Merged
merged 73 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
f744e2c
remove invalid legacy option
girarda Jul 19, 2023
fb5a57d
remove unused option
girarda Jul 19, 2023
3230205
the tests pass but this is quite messy
girarda Jul 19, 2023
f6a67db
very slight clean up
girarda Jul 19, 2023
d01200b
Add skip options to csv format
girarda Jul 19, 2023
b271a9e
fix some of the typing issues
girarda Jul 19, 2023
7add1c7
fixme comment
girarda Jul 19, 2023
e8c88be
remove extra log message
girarda Jul 19, 2023
9e73b51
fix typing issues
girarda Jul 19, 2023
84cabeb
merge
girarda Jul 25, 2023
79f7748
skip before header
girarda Jul 25, 2023
0ae95da
skip after header
girarda Jul 25, 2023
6324257
format
girarda Jul 25, 2023
0fd42ca
add another test
girarda Jul 25, 2023
8b54aff
Automated Commit - Formatting Changes
girarda Jul 25, 2023
b9a4a71
auto generate column names
girarda Jul 26, 2023
9982834
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 26, 2023
32844ce
delete dead code
girarda Jul 26, 2023
cd48738
update title and description
girarda Jul 26, 2023
43ce434
true and false values
girarda Jul 26, 2023
df47586
Update the tests
girarda Jul 26, 2023
ce9a672
Add comment
girarda Jul 26, 2023
2c03349
missing test
girarda Jul 26, 2023
c445b02
rename
girarda Jul 26, 2023
ff8f5d4
update expected spec
girarda Jul 26, 2023
87a3bcb
move to method
girarda Jul 26, 2023
9a1954f
Update comment
girarda Jul 26, 2023
72caf7d
fix typo
girarda Jul 26, 2023
ecea4e0
remove unused import
girarda Jul 26, 2023
cf298b7
Add a comment
girarda Jul 26, 2023
8cd05a4
None records do not pass the WaitForDiscoverPolicy
girarda Jul 26, 2023
d1fb6ae
format
girarda Jul 26, 2023
124cfcf
remove second branch to ensure we always go through the same processing
girarda Jul 26, 2023
a9ee16b
Raise an exception if the record is None
girarda Jul 26, 2023
a629ef0
reset
girarda Jul 26, 2023
f11a551
Update tests
girarda Jul 26, 2023
da274bc
handle unquoted newlines
girarda Jul 26, 2023
b373221
Automated Commit - Formatting Changes
girarda Jul 26, 2023
ce51b3d
Update test case so the quoting is explicit
girarda Jul 26, 2023
f8d76a1
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 26, 2023
b857737
Update comment
girarda Jul 26, 2023
e8609c4
Automated Commit - Formatting Changes
girarda Jul 27, 2023
59f00be
Fail validation if skipping rows before header and header is autogene…
girarda Jul 27, 2023
1cdaf60
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 27, 2023
d1c4036
always fail if a record cannot be parsed
girarda Aug 1, 2023
903074a
merge
girarda Aug 1, 2023
bdfccee
format
girarda Aug 1, 2023
355d596
set write line_no in error message
girarda Aug 1, 2023
0426b4c
remove none check
girarda Aug 2, 2023
8b7d519
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
8a7bcf7
Automated Commit - Formatting Changes
girarda Aug 2, 2023
9252651
enable autogenerate test
girarda Aug 2, 2023
06157dc
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Aug 2, 2023
e5a1c0e
remove duplicate test
girarda Aug 2, 2023
9c9dc72
missing unit tests
girarda Aug 2, 2023
146680a
Update
girarda Aug 2, 2023
4cfd721
remove branching
girarda Aug 2, 2023
6f10047
remove unused none check
girarda Aug 2, 2023
e4986e8
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
1f57507
Update tests
girarda Aug 2, 2023
0441c28
remove branching
girarda Aug 2, 2023
c2b3a37
format
girarda Aug 2, 2023
d8538f9
extract to function
girarda Aug 2, 2023
16df89d
comment
girarda Aug 2, 2023
7d7f6dd
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
cef6a41
missing type
girarda Aug 2, 2023
cec32dc
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Aug 2, 2023
05067a7
type annotation
girarda Aug 2, 2023
bdbd413
use set
girarda Aug 3, 2023
bf525b4
Document that the strings are case-sensitive
girarda Aug 3, 2023
d32a94f
public -> private
girarda Aug 3, 2023
69240b0
add unit test
girarda Aug 3, 2023
bfe4d47
newline
girarda Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
format
  • Loading branch information
girarda committed Jul 25, 2023
commit 6324257d8c7e0577c4d326268d3ee41982200edf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import codecs
from enum import Enum
from typing import Optional, List
from typing import List, Optional

from pydantic import BaseModel, Field, validator
from typing_extensions import Literal
Expand Down Expand Up @@ -49,17 +49,15 @@ class CsvFormat(BaseModel):
null_values: List[str] = Field(
title="Null Values",
default=[],
description="A set of strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field."
description="A set of strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
)
skip_rows_before_header: int = Field(
title="Skip Rows Before Header",
default=0,
description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
)
skip_rows_after_header: int = Field(
title="Skip Rows After Header",
default=0,
description="The number of rows to skip after the header row."
title="Skip Rows After Header", default=0, description="The number of rows to skip after the header row."
)

# Noting that the existing S3 connector had a config option newlines_in_values. This was only supported by pyarrow and not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import logging
from distutils.util import strtobool
from io import IOBase
from typing import Any, Dict, Iterable, Mapping, Optional, Set, List, IO
from typing import Any, Dict, Iterable, List, Mapping, Optional

from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, QuotingBehavior
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
Expand All @@ -27,7 +27,6 @@
}



class CsvParser(FileTypeParser):
async def infer_schema(
self,
Expand Down Expand Up @@ -132,6 +131,7 @@ def _skip_rows_before_header(fp: IOBase, rows_to_skip: int) -> None:
for _ in range(rows_to_skip):
fp.readline()


def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logging.Logger) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

import asyncio
import itertools
import logging
import traceback
from configparser import ParsingError
import json
from functools import cache
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Union

Expand All @@ -19,7 +16,7 @@
InvalidSchemaError,
MissingSchemaError,
SchemaInferenceError,
StopSyncPerValidationPolicy, RecordParseError,
StopSyncPerValidationPolicy,
)
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import merge_schemas, schemaless_schema
Expand Down Expand Up @@ -106,7 +103,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
level=Level.ERROR,
message=f"{FileBasedSourceError.ERROR_PARSING_RECORD.value} stream={self.name} file={file.uri} line_no={line_no} n_skipped={n_skipped}",
stack_trace=traceback.format_exc(),
)
),
)
return
if self.config.schemaless:
Expand All @@ -129,7 +126,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Mapping
)
break

except Exception as e:
except Exception:
yield AirbyteMessage(
type=MessageType.LOG,
log=AirbyteLogMessage(
Expand Down
30 changes: 27 additions & 3 deletions airbyte-cdk/python/unit_tests/sources/file_based/test_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,33 @@
success_multi_stream_scenario,
success_user_provided_schema_scenario,
)
from unit_tests.sources.file_based.scenarios.csv_scenarios import *

from unit_tests.sources.file_based.scenarios.csv_scenarios import (
csv_custom_delimiter_in_double_quotes_scenario,
csv_custom_delimiter_with_escape_char_scenario,
csv_custom_format_scenario,
csv_double_quote_is_set_scenario,
csv_escape_char_is_set_scenario,
csv_legacy_format_scenario,
csv_multi_stream_scenario,
csv_newline_in_values_quoted_value_scenario,
csv_simple_scenario,
csv_single_stream_scenario,
csv_skip_after_header_scenario,
csv_skip_before_header_scenario,
csv_string_can_be_null_with_input_schemas_scenario,
csv_string_not_null_if_no_null_values_scenario,
csv_strings_can_be_null_not_quoted_scenario,
empty_schema_inference_scenario,
invalid_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_limit_for_inference,
multi_stream_custom_format,
schemaless_csv_multi_stream_scenario,
schemaless_csv_scenario,
schemaless_with_user_input_schema_fails_connection_check_multi_stream_scenario,
schemaless_with_user_input_schema_fails_connection_check_scenario,
single_csv_scenario,
)
from unit_tests.sources.file_based.scenarios.incremental_scenarios import (
multi_csv_different_timestamps_scenario,
multi_csv_include_missing_files_within_history_range,
Expand Down Expand Up @@ -245,7 +270,6 @@ def _verify_read_output(output: Dict[str, Any], scenario: TestScenario) -> None:
elif "state" in actual:
assert actual["state"]["data"] == expected


if scenario.expected_logs:
read_logs = scenario.expected_logs.get("read")
assert len(logs) == (len(read_logs) if read_logs else 0)
Expand Down