Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CSV options to the CSV parser #28491

Merged
merged 73 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
f744e2c
remove invalid legacy option
girarda Jul 19, 2023
fb5a57d
remove unused option
girarda Jul 19, 2023
3230205
the tests pass but this is quite messy
girarda Jul 19, 2023
f6a67db
very slight clean up
girarda Jul 19, 2023
d01200b
Add skip options to csv format
girarda Jul 19, 2023
b271a9e
fix some of the typing issues
girarda Jul 19, 2023
7add1c7
fixme comment
girarda Jul 19, 2023
e8c88be
remove extra log message
girarda Jul 19, 2023
9e73b51
fix typing issues
girarda Jul 19, 2023
84cabeb
merge
girarda Jul 25, 2023
79f7748
skip before header
girarda Jul 25, 2023
0ae95da
skip after header
girarda Jul 25, 2023
6324257
format
girarda Jul 25, 2023
0fd42ca
add another test
girarda Jul 25, 2023
8b54aff
Automated Commit - Formatting Changes
girarda Jul 25, 2023
b9a4a71
auto generate column names
girarda Jul 26, 2023
9982834
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 26, 2023
32844ce
delete dead code
girarda Jul 26, 2023
cd48738
update title and description
girarda Jul 26, 2023
43ce434
true and false values
girarda Jul 26, 2023
df47586
Update the tests
girarda Jul 26, 2023
ce9a672
Add comment
girarda Jul 26, 2023
2c03349
missing test
girarda Jul 26, 2023
c445b02
rename
girarda Jul 26, 2023
ff8f5d4
update expected spec
girarda Jul 26, 2023
87a3bcb
move to method
girarda Jul 26, 2023
9a1954f
Update comment
girarda Jul 26, 2023
72caf7d
fix typo
girarda Jul 26, 2023
ecea4e0
remove unused import
girarda Jul 26, 2023
cf298b7
Add a comment
girarda Jul 26, 2023
8cd05a4
None records do not pass the WaitForDiscoverPolicy
girarda Jul 26, 2023
d1fb6ae
format
girarda Jul 26, 2023
124cfcf
remove second branch to ensure we always go through the same processing
girarda Jul 26, 2023
a9ee16b
Raise an exception if the record is None
girarda Jul 26, 2023
a629ef0
reset
girarda Jul 26, 2023
f11a551
Update tests
girarda Jul 26, 2023
da274bc
handle unquoted newlines
girarda Jul 26, 2023
b373221
Automated Commit - Formatting Changes
girarda Jul 26, 2023
ce51b3d
Update test case so the quoting is explicit
girarda Jul 26, 2023
f8d76a1
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 26, 2023
b857737
Update comment
girarda Jul 26, 2023
e8609c4
Automated Commit - Formatting Changes
girarda Jul 27, 2023
59f00be
Fail validation if skipping rows before header and header is autogene…
girarda Jul 27, 2023
1cdaf60
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Jul 27, 2023
d1c4036
always fail if a record cannot be parsed
girarda Aug 1, 2023
903074a
merge
girarda Aug 1, 2023
bdfccee
format
girarda Aug 1, 2023
355d596
set write line_no in error message
girarda Aug 1, 2023
0426b4c
remove none check
girarda Aug 2, 2023
8b7d519
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
8a7bcf7
Automated Commit - Formatting Changes
girarda Aug 2, 2023
9252651
enable autogenerate test
girarda Aug 2, 2023
06157dc
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Aug 2, 2023
e5a1c0e
remove duplicate test
girarda Aug 2, 2023
9c9dc72
missing unit tests
girarda Aug 2, 2023
146680a
Update
girarda Aug 2, 2023
4cfd721
remove branching
girarda Aug 2, 2023
6f10047
remove unused none check
girarda Aug 2, 2023
e4986e8
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
1f57507
Update tests
girarda Aug 2, 2023
0441c28
remove branching
girarda Aug 2, 2023
c2b3a37
format
girarda Aug 2, 2023
d8538f9
extract to function
girarda Aug 2, 2023
16df89d
comment
girarda Aug 2, 2023
7d7f6dd
Merge branch 'master' into alex/csv_options
girarda Aug 2, 2023
cef6a41
missing type
girarda Aug 2, 2023
cec32dc
Merge branch 'alex/csv_options' of github.com:airbytehq/airbyte into …
girarda Aug 2, 2023
05067a7
type annotation
girarda Aug 2, 2023
bdbd413
use set
girarda Aug 3, 2023
bf525b4
Document that the strings are case-sensitive
girarda Aug 3, 2023
d32a94f
public -> private
girarda Aug 3, 2023
69240b0
add unit test
girarda Aug 3, 2023
bfe4d47
newline
girarda Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
skip before header
  • Loading branch information
girarda committed Jul 25, 2023
commit 79f77488f2788a90aa8c48a862f71dba7c22c147
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import json
import logging
from distutils.util import strtobool
from typing import Any, Dict, Iterable, Mapping, Optional, Set, List
from io import IOBase
from typing import Any, Dict, Iterable, Mapping, Optional, Set, List, IO

from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat, QuotingBehavior
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
Expand Down Expand Up @@ -52,6 +53,7 @@ async def infer_schema(
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
schema = {field.strip(): {"type": "string"} for field in next(reader)}
csv.unregister_dialect(dialect_name)
return schema
Expand Down Expand Up @@ -87,16 +89,18 @@ def parse_records(
with stream_reader.open_file(file) as fp:
# todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
# sources will likely require one. Rather than modify the interface now we can wait until the real use case
self._skip_rows_before_header(fp, config_format.skip_rows_before_header)
reader = csv.DictReader(fp, dialect=dialect_name) # type: ignore
yield from self._read_and_cast_types(reader, schema, null_values, logger)

yield from self._read_and_cast_types(reader, schema, null_values, config_format.skip_rows_before_header, logger)
else:
with stream_reader.open_file(file) as fp:
reader = csv.DictReader(fp) # type: ignore
yield from self._read_and_cast_types(reader, schema, [], logger)
yield from self._read_and_cast_types(reader, schema, [], 0, logger)

@staticmethod
def _read_and_cast_types(
reader: csv.DictReader, schema: Optional[Mapping[str, Any]], null_values: List[str], logger: logging.Logger # type: ignore
reader: csv.DictReader, schema: Optional[Mapping[str, Any]], null_values: List[str], skip_rows_before_header: int, logger: logging.Logger # type: ignore
) -> Iterable[Optional[Dict[str, Any]]]:
"""
If the user provided a schema, attempt to cast the record values to the associated type.
Expand All @@ -121,6 +125,11 @@ def _to_nullable(row: Mapping[str, str], null_values: List[str]) -> Optional[Dic
nullable = row | {k: None if v in null_values else v for k, v in row.items()}
return nullable

@staticmethod
def _skip_rows_before_header(fp: IOBase, rows_to_skip: int) -> None:
for _ in range(rows_to_skip):
fp.readline()

def cast_types(row: Dict[str, str], property_types: Dict[str, Any], logger: logging.Logger) -> Dict[str, Any]:
"""
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2185,7 +2185,7 @@
)
).build()

csv_simple = (
csv_simple_scenario = (
TestScenarioBuilder()
.set_name("csv_simple")
.set_config(
Expand Down Expand Up @@ -2213,9 +2213,9 @@
{
"a.csv": {
"contents": [
'''col1,col2''',
'''val11,val12''',
'''val21,val22''',
("col1", "col2"),
("val11", "val12"),
("val21", "val22"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
}
Expand Down Expand Up @@ -2260,3 +2260,76 @@
]
)
).build()

csv_skip_before_header_scenario = (
TestScenarioBuilder()
.set_name("csv_skip_before_header")
.set_config(
{
"streams": [
{
"name": "stream1",
"file_type": "csv",
"globs": ["*"],
"validation_policy": "emit_record",
"format": {
"csv": {
"filetype": "csv",
"skip_rows_before_header": 2
}
}
}
],
"start_date": "2023-06-04T03:54:07Z"
}
)
.set_files(
{
"a.csv": {
"contents": [
("skip_this", "skip_this"),
("skip_this_too", "skip_this_too"),
("col1", "col2"),
("val11", "val12"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
}
}
)
.set_file_type("csv")
.set_expected_catalog(
{
"streams": [
{
"default_cursor_field": ["_ab_source_file_last_modified"],
"json_schema": {
"type": "object",
"properties": {
"col1": {
"type": "string"
},
"col2": {
"type": "string"
},
"_ab_source_file_last_modified": {
"type": "string"
},
"_ab_source_file_url": {
"type": "string"
},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
}
]
}
)
.set_expected_records(
[
{"data": {"col1": "val11", "col2": "val12", "_ab_source_file_last_modified": "2023-06-05T03:54:07Z",
"_ab_source_file_url": "a.csv"}, "stream": "stream1"},
]
)
).build()
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@
csv_double_quote_is_set_scenario,
csv_custom_delimiter_with_escape_char_scenario,
csv_custom_delimiter_in_double_quotes_scenario,
csv_simple,
csv_simple_scenario,
single_avro_scenario,
avro_all_types_scenario,
multiple_avro_combine_schema_scenario,
multiple_streams_avro_scenario,
avro_file_with_decimal_as_float_scenario,
csv_skip_before_header_scenario,
]


Expand Down