Skip to content

Commit

Permalink
fix char replacements
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-scott committed Dec 17, 2024
1 parent 5879892 commit 050136f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 42 deletions.
136 changes: 94 additions & 42 deletions dagster_mssql_bcp_tests/bcp_polars/test_bcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ def polars_io(self):
query_props={
"TrustServerCertificate": "yes",
},
bcp_arguments={
'-u': '',
'-b': 20
},
bcp_arguments={"-u": "", "-b": 20},
bcp_path="/opt/mssql-tools18/bin/bcp",
)

@contextmanager
def connect_mssql(self):
config = self.get_database_connection()
Expand Down Expand Up @@ -100,7 +97,7 @@ def test_bcp_load(self, polars_io: polars_mssql_bcp.PolarsBCP):
)
polars_io.load_bcp(df, schema, table, asset_schema)

df = df.with_columns(e = pl.Series([1, 2, 3]))
df = df.with_columns(e=pl.Series([1, 2, 3]))
polars_io.load_bcp(df, schema, table, asset_schema)

with self.connect_mssql() as con:
Expand All @@ -123,16 +120,13 @@ def test_bcp_load_alternative_column_names(self):
query_props={
"TrustServerCertificate": "yes",
},
bcp_arguments={
'-u': '',
'-b': 20
},
bcp_arguments={"-u": "", "-b": 20},
bcp_path="/opt/mssql-tools18/bin/bcp",
load_datetime_column_name='dt_col',
load_uuid_column_name='uuid_col',
row_hash_column_name='hash_col',
load_datetime_column_name="dt_col",
load_uuid_column_name="uuid_col",
row_hash_column_name="hash_col",
)

schema = "test"
table = "table_data_alt_cols"
with self.connect_mssql() as con:
Expand Down Expand Up @@ -160,7 +154,7 @@ def test_bcp_load_alternative_column_names(self):
)
polars_io.load_bcp(df, schema, table, asset_schema)

df = df.with_columns(e = pl.Series([1, 2, 3]))
df = df.with_columns(e=pl.Series([1, 2, 3]))
polars_io.load_bcp(df, schema, table, asset_schema)

with self.connect_mssql() as con:
Expand All @@ -173,7 +167,6 @@ def test_bcp_load_alternative_column_names(self):
)
polars_io.load_bcp(df, schema, table, asset_schema)


def test_validate_columns(self, polars_io):
result = polars_io._validate_columns(
["a", "b", "c"], ["a", "b", "c"], ["a", "b", "c"]
Expand Down Expand Up @@ -249,7 +242,12 @@ def test_replace_values(self, polars_io):
{"a": ["1,000", "2", "3"], "b": [4000, 5, 6], "c": ["a", "b", "c"]}
)
expected = df = pl.DataFrame(
{"a": ["1000", "2", "3"], "b": [4000, 5, 6], "c": ["a", "b", "c"], 'should_process_replacements': [0, 0, 0]}
{
"a": ["1000", "2", "3"],
"b": [4000, 5, 6],
"c": ["a", "b", "c"],
"should_process_replacements": [0, 0, 0],
}
)
schema = polars_mssql_bcp.AssetSchema(
[
Expand All @@ -262,26 +260,35 @@ def test_replace_values(self, polars_io):
pl_testing.assert_frame_equal(df, expected)

df = pl.DataFrame(
{"c": ["nan", "NAN", "c", "abc\tdef", "abc\t\ndef", "abc\ndef", "nan", "somenanthing"]}
{
"c": [
"nan",
"NAN",
"c",
"abc\tdef",
"abc\t\ndef",
"abc\ndef",
"nan",
"somenanthing",
]
}
)
expected = pl.DataFrame(
{
"c": [
"",
"",
'',
'',
"c",
"abc__TAB__def",
"abc__TAB____NEWLINE__def",
"abc__NEWLINE__def",
"",
"somenanthing"
'',
"somenanthing",
],
'should_process_replacements': [
0, 0, 0, 1, 1, 1, 0, 0
]
"should_process_replacements": [0, 0, 0, 1, 1, 1, 0, 0],
}
)
schema = polars_mssql_bcp.AssetSchema(
)
schema = polars_mssql_bcp.AssetSchema(
[
{"name": "c", "type": "NVARCHAR", "length": 50},
]
Expand All @@ -307,7 +314,7 @@ def test_replace_values(self, polars_io):
# "2021-01-01 00:00:00-05:00",
],
"d": ["2021-01-01 00:00:00-05:00", "2021-01-01 00:00:00-05:00"],
"should_process_replacements": [0, 0]
"should_process_replacements": [0, 0],
}
)

Expand All @@ -326,6 +333,14 @@ def test_replace_values(self, polars_io):
expected = pl.DataFrame({"a": [1, 1, 0]})
pl_testing.assert_frame_equal(df, expected)

schema = polars_mssql_bcp.AssetSchema(
[{"name": "a", "type": "NVARCHAR", "length": 20}]
)
df = pl.DataFrame({"a": ['""', "a", '', '"adsf"']})
expected = pl.DataFrame({"a": ['', "a", '', '"adsf"'], "should_process_replacements": [0,0,0,0]})
df = polars_io._replace_values(df, schema)
pl_testing.assert_frame_equal(df, expected)

def test_process_datetime(self, polars_io: polars_mssql_bcp.PolarsBCP):
input = pl.DataFrame(
{
Expand Down Expand Up @@ -373,20 +388,52 @@ def test_process_datetime(self, polars_io: polars_mssql_bcp.PolarsBCP):
df = polars_io._process_datetime(input, schema).collect()
pl_testing.assert_frame_equal(df, expected)


schema = polars_mssql_bcp.AssetSchema(
[
{'name': 'a', 'type': 'DATETIME2'},
{'name': 'b', 'type': 'DATETIME2'}
]
[{"name": "a", "type": "DATETIME2"}, {"name": "b", "type": "DATETIME2"}]
)

input = (
pl.datetime_range(
datetime.datetime(2021, 1, 1), datetime.datetime(2021, 1, 3), eager=True
)
.alias("a")
.to_frame()
)
input = input.with_columns(pl.lit(None).alias("b"))
input = input.lazy()
df = polars_io._process_datetime(input, schema).collect()
expected = pl.DataFrame(
{
"a": [
"2021-01-01 00:00:00+00:00",
"2021-01-02 00:00:00+00:00",
"2021-01-03 00:00:00+00:00",
],
"b": [None, None, None],
}
)
pl_testing.assert_frame_equal(df, expected)

input = pl.datetime_range(datetime.datetime(2021,1,1), datetime.datetime(2021,1,3), eager=True).alias('a').to_frame()
input = input.with_columns(pl.lit(None).alias('b'))
input = (
pl.date_range(
datetime.date(2021, 1, 1), datetime.date(2021, 1, 3), eager=True
)
.alias("a")
.to_frame()
)
input = input.with_columns(pl.lit(None).alias("b"))
input = input.lazy()

df = polars_io._process_datetime(input, schema).collect()
expected = pl.DataFrame(
{'a': ["2021-01-01 00:00:00+00:00", "2021-01-02 00:00:00+00:00", "2021-01-03 00:00:00+00:00"], 'b': [None, None, None]}
{
"a": [
"2021-01-01 00:00:00+00:00",
"2021-01-02 00:00:00+00:00",
"2021-01-03 00:00:00+00:00",
],
"b": [None, None, None],
}
)
pl_testing.assert_frame_equal(df, expected)

Expand Down Expand Up @@ -420,11 +467,15 @@ def test_get_sql_columns(self, polars_io):
)
"""
conn.execute(text(sql))
columns = polars_io._get_sql_columns(conn, "test", "polars_test_sql_columns")
columns = polars_io._get_sql_columns(
conn, "test", "polars_test_sql_columns"
)
assert columns == ["a", "b", "c"]
self.cleanup_table(conn, "test", "polars_test_sql_columns")

columns = polars_io._get_sql_columns(conn, "test", "polars_test_sql_columns")
columns = polars_io._get_sql_columns(
conn, "test", "polars_test_sql_columns"
)
assert columns is None

def test_create_table(self, polars_io):
Expand Down Expand Up @@ -461,7 +512,9 @@ def test_create_table(self, polars_io):
schema = polars_mssql_bcp.AssetSchema(base_schema.schema[:])
schema.add_column(load_uuid_col)
schema.add_column(load_datetime_col)
polars_io._create_table(conn, "test", "pandas_test_create_table", schema.get_sql_columns())
polars_io._create_table(
conn, "test", "pandas_test_create_table", schema.get_sql_columns()
)
columns = polars_io._get_sql_columns(
conn,
"test",
Expand All @@ -470,11 +523,10 @@ def test_create_table(self, polars_io):
assert columns == ["a", "b", "c", "load_uuid", "load_datetime"]
self.cleanup_table(conn, "test", "pandas_test_create_table")


def test_row_hash(self, polars_io: polars_mssql_bcp.PolarsBCP):
with self.connect_mssql() as conn:
schema = 'test'
table = 'polars_test_row_hash'
schema = "test"
table = "polars_test_row_hash"

create_schema_sql = f"""
IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = '{schema}')
Expand Down
12 changes: 12 additions & 0 deletions src/dagster_mssql_bcp/bcp_polars/polars_mssql_bcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def _replace_values(self, data: pl.LazyFrame, asset_schema: AssetSchema):
.str.replace_all("\n", "__NEWLINE__")
.str.replace_all("^nan$", "")
.str.replace_all("^NAN$", "")
.str.replace_all('^""$', "")
for _ in string_cols
if _ not in number_columns_that_are_strings
]
Expand All @@ -87,6 +88,7 @@ def _replace_values(self, data: pl.LazyFrame, asset_schema: AssetSchema):
.str.replace_all(",", "")
.str.replace_all("^nan$", "")
.str.replace_all("^NAN$", "")
.str.replace_all('^""$', "")
for _ in number_columns_that_are_strings
]
+ [
Expand All @@ -95,6 +97,8 @@ def _replace_values(self, data: pl.LazyFrame, asset_schema: AssetSchema):
]
)

# data = data.with_columns(pl.col(pl.String).replace("", None))

return data

def _process_datetime(
Expand Down Expand Up @@ -129,6 +133,14 @@ def _process_datetime(
date_cols = data.select(cs.date()).collect_schema().names()
data = data.with_columns([pl.col(_) for _ in date_cols])

# change date/time to datetime
data = data.with_columns(
[
pl.col(_).cast(pl.Datetime)
for _ in data.select(cs.date()).collect_schema().names()
]
)

dt_columns_in_tz = (
data.select(cs.datetime(time_zone="*")).collect_schema().names()
)
Expand Down

0 comments on commit 050136f

Please sign in to comment.