Skip to content

Commit

Permalink
Parquet export IR (#183)
Browse files Browse the repository at this point in the history
* Parquet export IR

Exporting data through a CSV intermediary is subject to loss
of specificity and type info. This is particularly noticable
for read_pandas, where the resulting dataframe has every column
of type `object` and NULLs are indistinguishable from zero values.

I used a small hack to export data from Dolt into a DataFrame using parquet
instead of CSV. This requires the pyarrow dependency.

I left TODOs for improvements on the Dolt side that would make
this code cleaner and Dolt issues for the associated features.

There is one bug with NULL datetime values that I added a Dolt issue
for.

* fix fmt

* revert python changes, use early pyarrow
  • Loading branch information
max-hoffman authored Jun 1, 2022
1 parent f3c83cc commit 0043e7e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 14 deletions.
37 changes: 35 additions & 2 deletions doltpy/cli/read.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import logging
from pathlib import Path
import random
import string
from tempfile import TemporaryDirectory
from typing import List, Mapping
import io


from doltcli import Dolt # type: ignore
from doltcli.utils import ( # type: ignore
get_read_table_asof_query,
Expand All @@ -13,6 +18,12 @@
)
import pandas as pd # type: ignore

read_formats = {
"parquet": ".parquet",
"pq": ".parquet",
"csv": ".csv",
}

logger = logging.getLogger(__name__)


Expand All @@ -24,5 +35,27 @@ def read_pandas_sql(dolt: Dolt, sql: str) -> pd.DataFrame:
return read_table_sql(dolt, sql, result_parser=parse_to_pandas)


def read_pandas(dolt: Dolt, table: str, as_of: str = None) -> pd.DataFrame:
return read_pandas_sql(dolt, get_read_table_asof_query(table, as_of))
def read_pandas_parquet(dolt: Dolt, table: str, asof: str = None) -> pd.DataFrame:
# TODO: either dolt export should support as of, or sql query should
# support parquet output format
ab = dolt.active_branch
letters = string.ascii_lowercase
tmp_branch = "".join(random.choice(letters) for i in range(10))
try:
dolt.checkout(tmp_branch, checkout_branch=True, start_point=asof)
with TemporaryDirectory() as tmpdir:
fpath = Path(tmpdir) / "tmp.parquet"
dolt.table_export(table, filename=str(fpath))
return pd.read_parquet(fpath)
finally:
dolt.checkout(ab)
dolt.branch(tmp_branch, delete=True)


def read_pandas(dolt: Dolt, table: str, as_of: str = None, fmt="csv") -> pd.DataFrame:
if fmt == "csv":
return read_pandas_sql(dolt, get_read_table_asof_query(table, as_of))
elif fmt == "parquet" or fmt == "pq":
return read_pandas_parquet(dolt, table, as_of)
else:
raise RuntimeError(f"unexpected read format: {fmt}; expected: 'parquet' or 'csv'")
65 changes: 57 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ wcwidth = "0.2.5"
more-itertools = "^8.6.0"
mysql-connector-python = "^8.0.20"
doltcli = "^0.1.14"
pyarrow = ">=6.0.0"

[tool.poetry.dev-dependencies]
black = "^20.8b1"
Expand Down
17 changes: 13 additions & 4 deletions tests/cli/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
TEST_TABLE = 'characters'
TEST_DATA_INITIAL = [
{'name': 'Anna', 'adjective': 'tragic', 'id': 1, 'date_of_death': '1877-01-01'},
{'name': 'Vronksy', 'adjective': 'honorable', 'id': 2, 'date_of_death': ''},
{'name': 'Oblonksy', 'adjective': 'buffoon', 'id': 3, 'date_of_death': ''},
{'name': 'Vronksy', 'adjective': 'honorable', 'id': 2, 'date_of_death': '1877-1-1'},
{'name': 'Oblonksy', 'adjective': 'buffoon', 'id': 3, 'date_of_death': '1877'},
]

TEST_DATA_UPDATE = [
{'name': 'Levin', 'adjective': 'tiresome', 'id': 4, 'date_of_death': ''}
{'name': 'Levin', 'adjective': 'tiresome', 'id': 4, 'date_of_death': '1877-01-01'}
]

TEST_DATA_COMBINED = TEST_DATA_INITIAL + TEST_DATA_UPDATE
Expand All @@ -37,11 +37,20 @@ def _write_helper(dolt: Dolt, data: List[dict], update_type: str):
return dolt, commit_hash


def test_read_pandas(with_initial_test_data):
def test_read_pandas_csv(with_initial_test_data):
dolt, first_commit = with_initial_test_data
second_commit = update_test_data(dolt)
first_write = read_pandas(dolt, TEST_TABLE, first_commit)
compare_rows(TEST_DATA_INITIAL, first_write.to_dict('records'), "id")
second_write = read_pandas(dolt, TEST_TABLE, second_commit)
compare_rows(TEST_DATA_COMBINED, second_write.to_dict('records'), "id")


def test_read_pandas_pq(with_initial_test_data):
dolt, first_commit = with_initial_test_data
second_commit = update_test_data(dolt)
first_write = read_pandas(dolt, TEST_TABLE, first_commit, fmt="parquet")
compare_rows(TEST_DATA_INITIAL, first_write.to_dict('records'), "id")
second_write = read_pandas(dolt, TEST_TABLE, second_commit, fmt="pq")
compare_rows(TEST_DATA_COMBINED, second_write.to_dict('records'), "id")

0 comments on commit 0043e7e

Please sign in to comment.