Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid serialization of pyarrow tables using PyArrowConvert in pre_transform_spec #111

Merged
merged 14 commits into from
May 19, 2022
Merged
3 changes: 3 additions & 0 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ jobs:
uses: actions/setup-node@v2
with:
node-version: '17'
- name: Install protoc
run: |
sudo snap install protobuf --classic
- name: Build package
working-directory: vegafusion-wasm/
run: |
Expand Down
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions python/vegafusion-jupyter/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* VegaFusion
* Copyright (C) 2022 Jon Mease
* Copyright (C) 2022 VegaFusion Technologies LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
Expand Down
24 changes: 18 additions & 6 deletions python/vegafusion/vegafusion/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
# this program the details of the active license.
import json
import psutil
from .transformer import to_arrow_ipc_bytes
import pyarrow as pa


class VegaFusionRuntime:
def __init__(self, cache_capacity, memory_limit, worker_threads):
Expand Down Expand Up @@ -71,9 +72,9 @@ def pre_transform_spec(self, spec, local_tz, row_limit=None, inline_datasets=Non
:param row_limit: Maximum number of dataset rows to include in the returned
specification. If exceeded, datasets will be truncated to this number of rows
and a RowLimitExceeded warning will be included in the resulting warnings list
:param inline_datasets: A dict from dataset names to pandas DataFrames. Inline
datasets may be referenced by the input specification using the following
url syntax 'vegafusion+dataset://{dataset_name}'.
:param inline_datasets: A dict from dataset names to pandas DataFrames or pyarrow
Tables. Inline datasets may be referenced by the input specification using
the following url syntax 'vegafusion+dataset://{dataset_name}'.
:return:
Two-element tuple:
0. A string containing the JSON representation of a Vega specification
Expand All @@ -88,14 +89,25 @@ def pre_transform_spec(self, spec, local_tz, row_limit=None, inline_datasets=Non
'Unsupported': No transforms in the provided Vega specification were
eligible for pre-transforming
"""
from .transformer import to_arrow_table

if self._grpc_channel:
raise ValueError("pre_transform_spec not yet supported over gRPC")
else:
# Preprocess inline_dataset
inline_datasets = inline_datasets or dict()
inline_datasets = {name: to_arrow_ipc_bytes(value, stream=True) for name, value in inline_datasets.items()}
inline_batches = dict()
for name, value in inline_datasets.items():
if isinstance(value, pa.Table):
table = value
else:
table = to_arrow_table(value)
schema = table.schema
batches = table.to_batches(max_chunksize=8096)
inline_batches[name] = (schema, batches)

new_spec, warnings = self.embedded_runtime.pre_transform_spec(
spec, local_tz=local_tz, row_limit=row_limit, inline_datasets=inline_datasets
spec, local_tz=local_tz, row_limit=row_limit, inline_datasets=inline_batches
)
warnings = json.loads(warnings)
return new_spec, warnings
Expand Down
25 changes: 19 additions & 6 deletions python/vegafusion/vegafusion/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
import pandas as pd


def to_arrow_ipc_bytes(data, stream=False):
def to_arrow_table(data):
"""
Helper to convert a Pandas DataFrame to the Arrow IPC binary format
Helper to convert a Pandas DataFrame to a PyArrow Table

:param data: Pandas DataFrame
:param stream: If True, write IPC Stream format. If Flase (defualt), write ipc file format.
:return: bytes
:return: pyarrow.Table
"""
import pyarrow as pa

Expand All @@ -36,7 +35,7 @@ def to_arrow_ipc_bytes(data, stream=False):
cat = data[col].cat
data[col] = cat.categories[cat.codes]

# Serialize DataFrame to bytes in the arrow file format
# Convert DataFrame to table
try:
table = pa.Table.from_pandas(data)
except pa.ArrowTypeError as e:
Expand All @@ -47,8 +46,22 @@ def to_arrow_ipc_bytes(data, stream=False):
if dtype.kind == "O":
mapping[col] = data[col].astype(str)
data = data.assign(**mapping)
# Try again, allowing exception to propagate
# Try again, allowing exception to propagate this time
table = pa.Table.from_pandas(data)
return table


def to_arrow_ipc_bytes(data, stream=False):
"""
Helper to convert a Pandas DataFrame to the Arrow IPC binary format

:param data: Pandas DataFrame
:param stream: If True, write IPC Stream format. If False (default), write ipc file format.
:return: bytes
"""
import pyarrow as pa

table = to_arrow_table(data)

# Next we write the Arrow table as a feather file (The Arrow IPC format on disk).
# Write it in memory first so we can hash the contents before touching disk.
Expand Down
1 change: 1 addition & 0 deletions vegafusion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ license = "AGPL-3.0-or-later"

[features]
tonic_support = [ "tonic", "tonic-build",]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]

[dependencies]
thiserror = "^1.0.29"
Expand Down
5 changes: 5 additions & 0 deletions vegafusion-python-embed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ features = [ "pyo3",]

[dependencies.vegafusion-rt-datafusion]
path = "../vegafusion-rt-datafusion"
features = ["pyarrow"]

[dependencies.tokio]
version = "1.18.1"
Expand All @@ -29,3 +30,7 @@ features = [ "macros", "rt-multi-thread",]
[dependencies.pyo3]
version = "0.16.4"
features = [ "extension-module",]

[dependencies.mimalloc]
version = "*"
default-features = false
21 changes: 18 additions & 3 deletions vegafusion-python-embed/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,26 @@
*/
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyDict, PyString};
use pyo3::types::{PyBytes, PyDict, PyList, PyString, PyTuple};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
use vegafusion_core::error::ToExternalError;
use vegafusion_core::proto::gen::pretransform::pre_transform_warning::WarningType;
use vegafusion_core::proto::gen::services::pre_transform_result;
use vegafusion_rt_datafusion::task_graph::runtime::TaskGraphRuntime;

use serde::{Deserialize, Serialize};
use vegafusion_core::arrow::datatypes::Schema;
use vegafusion_core::arrow::pyarrow::PyArrowConvert;
use vegafusion_core::arrow::record_batch::RecordBatch;
use vegafusion_core::data::table::VegaFusionTable;

use mimalloc::MiMalloc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

#[derive(Clone, Serialize, Deserialize)]
struct PreTransformWarning {
#[serde(rename = "type")]
Expand Down Expand Up @@ -76,10 +85,16 @@ impl PyTaskGraphRuntime {
.iter()
.map(|(name, table_bytes)| {
let name = name.cast_as::<PyString>()?;
let table_bytes = table_bytes.cast_as::<PyBytes>()?;
let tuple = table_bytes.cast_as::<PyTuple>()?;
let schema = Schema::from_pyarrow(tuple.get_item(0)?)?;
let list = tuple.get_item(1)?.cast_as::<PyList>()?;
let batches: Vec<_> = list
.iter()
.map(RecordBatch::from_pyarrow)
.collect::<PyResult<Vec<_>>>()?;
Ok((
name.to_string(),
VegaFusionTable::from_ipc_bytes(table_bytes.as_bytes())?,
VegaFusionTable::try_new(Arc::new(schema), batches)?,
))
})
.collect::<PyResult<HashMap<_, _>>>()?;
Expand Down
3 changes: 3 additions & 0 deletions vegafusion-rt-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ version = "0.4.0"
edition = "2021"
license = "AGPL-3.0-or-later"

[features]
pyarrow = ["vegafusion-core/pyarrow"]

[dependencies]
regex = "^1.5.5"
lazy_static = "^1.4.0"
Expand Down