Skip to content

Commit

Permalink
update readme and io
Browse files Browse the repository at this point in the history
  • Loading branch information
cody-scott committed Sep 19, 2024
1 parent 27c0af8 commit d69cc27
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 96 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ defs = Definitions(

```python
from dagster import asset, Definitions
from dagster_mssql_bcp import PolarsBCPIOManager
from dagster_mssql_bcp import PandasBCPIOManager
import pandas as pd

io_manager = PolarsBCPIOManager(
io_manager = PandasBCPIOManager(
host="my_mssql_server",
database="my_database",
user="username",
Expand All @@ -101,11 +101,11 @@ io_manager = PolarsBCPIOManager(
}
)
def my_polars_asset(context):
return pl.DataFrame({"id": [1, 2, 3]})
return pd.DataFrame({"id": [1, 2, 3]})


defs = Definitions(
assets=[my_polars_asset],
assets=[my_pandas_asset],
io_managers={
"io_manager": io_manager,
},
Expand Down
121 changes: 73 additions & 48 deletions dagster_mssql_bcp_tests/bcp_pandas/test_io_manager.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@

from dagster_mssql_bcp.bcp_pandas import pandas_mssql_io_manager
import os

from contextlib import contextmanager
from sqlalchemy import create_engine, URL, text
from dagster import build_output_context, asset, DailyPartitionsDefinition, StaticPartitionsDefinition, materialize
from dagster import (
build_output_context,
asset,
DailyPartitionsDefinition,
StaticPartitionsDefinition,
materialize,
)
import pandas as pd


Expand Down Expand Up @@ -50,9 +55,7 @@ def io(self):
query_props={
"TrustServerCertificate": "yes",
},
bcp_arguments={
'-u': ''
},
bcp_arguments={"-u": ""},
bcp_path="/opt/mssql-tools18/bin/bcp",
)

Expand Down Expand Up @@ -174,8 +177,8 @@ def test_handle_output_basic(self):
io_manager.handle_output(ctx, data)

def test_handle_output_time_partition(self):
schema = 'test_pandas_bcp_schema'
table = 'my_pandas_asset_time_part'
schema = "test_pandas_bcp_schema"
table = "my_pandas_asset_time_part"
drop = f"""DROP TABLE IF EXISTS {schema}.{table}"""

with self.connect_mssql() as connection:
Expand All @@ -190,20 +193,20 @@ def test_handle_output_time_partition(self):

@asset(
name=table,
key_prefix=['data'],
key_prefix=["data"],
metadata={
"asset_schema": asset_schema,
"schema": schema,
"partition_expr": 'b'
"partition_expr": "b",
},
partitions_def=DailyPartitionsDefinition(
start_date="2021-01-01", end_date="2021-01-03"
)
),
)
def my_asset(context):
return data
# original structure

# original structure
data = pd.DataFrame(
{
"a": [1, 1],
Expand Down Expand Up @@ -231,7 +234,7 @@ def my_asset(context):
data = pd.DataFrame(
{
"a": [2, 2, 2],
"b": ["2021-01-01", "2021-01-01", '2021-01-02'],
"b": ["2021-01-01", "2021-01-01", "2021-01-02"],
}
)
materialize(
Expand All @@ -241,8 +244,8 @@ def my_asset(context):
)

def test_handle_output_static_partition(self):
schema = 'test_pandas_bcp_schema'
table = 'my_pandas_asset_static_part'
schema = "test_pandas_bcp_schema"
table = "my_pandas_asset_static_part"
drop = f"""DROP TABLE IF EXISTS {schema}.{table}"""

with self.connect_mssql() as connection:
Expand All @@ -257,20 +260,19 @@ def test_handle_output_static_partition(self):

@asset(
name=table,
key_prefix=['data'],
key_prefix=["data"],
metadata={
"asset_schema": asset_schema,
"schema": schema,
"partition_expr": 'b'
"partition_expr": "b",
},
partitions_def=StaticPartitionsDefinition(
['a', 'b']
)
partitions_def=StaticPartitionsDefinition(["a", "b"]),
)
def my_asset(context):
return data

# original structure

# original structure

data = pd.DataFrame(
{
"a": [1, 1],
Expand All @@ -297,7 +299,7 @@ def my_asset(context):
data = pd.DataFrame(
{
"a": [1, 1, 2],
"b": ["a", "a", 'a'],
"b": ["a", "a", "a"],
}
)
materialize(
Expand All @@ -306,11 +308,9 @@ def my_asset(context):
resources={"io_manager": io_manager},
)


def test_basic_no_extras(self):

schema = 'test_pandas_bcp_schema'
table = 'basic_no_extra'
schema = "test_pandas_bcp_schema"
table = "basic_no_extra"
drop = f"""DROP TABLE IF EXISTS {schema}.{table}"""

with self.connect_mssql() as connection:
Expand All @@ -327,15 +327,16 @@ def test_basic_no_extras(self):
name=table,
metadata={
"asset_schema": asset_schema,
'add_row_hash': False,
'add_load_datetime': False,
'add_load_uuid': False
"add_row_hash": False,
"add_load_datetime": False,
"add_load_uuid": False,
},
)
def my_asset(context):
return data

# original structure

# original structure

data = pd.DataFrame(
{
"a": [1, 1],
Expand All @@ -348,9 +349,8 @@ def my_asset(context):
)

# def test_geo(self):

# schema = 'dbo'
# table = 'geo_table'
# schema = "dbo"
# table = "geo_table"
# drop = f"""DROP TABLE IF EXISTS {schema}.{table}"""

# with self.connect_mssql() as connection:
Expand All @@ -361,35 +361,60 @@ def my_asset(context):
# asset_schema = [
# {"name": "a", "alias": "a", "type": "INT", "identity": True},
# {"name": "b", "type": "VARBINARY"},
# {"name": "c", "type": "VARBINARY"},
# ]

# @asset(
# name=table,
# metadata={
# "asset_schema": asset_schema,
# 'add_row_hash': False,
# 'add_load_datetime': False,
# 'add_load_uuid': False
# "add_row_hash": False,
# "add_load_datetime": False,
# "add_load_uuid": False,
# },
# )
# def my_asset(context):
# import geopandas as gpd
# from shapely.geometry import Point
# d = {'geo': ['name1', 'name2'], 'geometry': [Point(1, 2), Point(2, 1)]}
# gdf = gpd.GeoDataFrame(d, crs="EPSG:4326")
# gdf['b'] = gdf['geometry'].to_wkb(True)
# from shapely.geometry import LineString, Polygon

# d = {
# "geo": ["name1"],
# "b": [
# Polygon(
# [
# [-80.54962058626626, 43.45142912346685],
# [-80.54962058626626, 43.39711241629678],
# [-80.41053208968418, 43.39711241629678],
# [-80.41053208968418, 43.45142912346685],
# [-80.54962058626626, 43.45142912346685],
# ]
# )
# ],
# "c": [
# LineString(
# [
# [-80.62480125364407, 43.42751074871268],
# [-80.61613488881885, 43.47504704269912],
# [-80.48882864676696, 43.518998328579784],
# [-80.39489789141057, 43.48407197511389],
# ]
# ),
# ],
# }
# gdf = gpd.GeoDataFrame(d)
# gdf["b"] = gdf.set_geometry('b').to_wkb(True)['b']
# gdf["c"] = gdf.set_geometry('c').to_wkb(True)['c']
# return gdf

# @asset(
# deps=[my_asset]
# )

# @asset(deps=[my_asset])
# def convert_geo(context):
# with self.connect_mssql() as conn:
# sql = f'SELECT b, geography::STGeomFromWKB(b, 4326) FROM {schema}.{table}'
# sql = (
# f"SELECT b, geography::STGeomFromWKB(b, 4326) FROM {schema}.{table}"
# )
# print(sql)
# conn.exec_driver_sql(sql)


# materialize(
# assets=[my_asset, convert_geo],
# resources={"io_manager": io_manager},
Expand Down
Loading

0 comments on commit d69cc27

Please sign in to comment.