Skip to content

Commit

Permalink
Adds support for arrays in snowflake
Browse files Browse the repository at this point in the history
Signed-off-by: john.lemmon <john.lemmon@medely.com>
  • Loading branch information
JohnLemmonMedely committed Jan 8, 2024
1 parent 052182b commit e419e68
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 12 deletions.
29 changes: 29 additions & 0 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import json
import os
import uuid
import warnings
Expand Down Expand Up @@ -51,6 +52,16 @@
)
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.types import (
Array,
Bool,
Float32,
Float64,
Int32,
Int64,
String,
UnixTimestamp,
)
from feast.usage import log_exceptions_and_usage

try:
Expand Down Expand Up @@ -320,6 +331,7 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(
feature_refs, project, registry
),
feature_views=feature_views,
metadata=RetrievalMetadata(
features=feature_refs,
keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),
Expand Down Expand Up @@ -398,9 +410,12 @@ def __init__(
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
feature_views: Optional[List[FeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):

if feature_views is None:
feature_views = []
if not isinstance(query, str):
self._query_generator = query
else:
Expand All @@ -416,6 +431,7 @@ def query_generator() -> Iterator[str]:
self.config = config
self._full_feature_names = full_feature_names
self._on_demand_feature_views = on_demand_feature_views or []
self._feature_views = feature_views
self._metadata = metadata
self.export_path: Optional[str]
if self.config.offline_store.blob_export_location:
Expand All @@ -436,6 +452,19 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
self.snowflake_conn, self.to_sql()
).fetch_pandas_all()

for feature_view in self._feature_views:
for feature in feature_view.features:
if feature.dtype in [
Array(String),
Array(Int32),
Array(Int64),
Array(UnixTimestamp),
Array(Float64),
Array(Float32),
Array(Bool),
]:
df[feature.name] = [json.loads(x) for x in df[feature.name]]

return df

def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ def get_table_column_names_and_types(
else:
row["snowflake_type"] = "NUMBERwSCALE"

elif row["type_code"] in [5, 9, 10, 12]:
elif row["type_code"] in [5, 9, 12]:
error = snowflake_unsupported_map[row["type_code"]]
raise NotImplementedError(
f"The following Snowflake Data Type is not supported: {error}"
)
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 11, 13]:
elif row["type_code"] in [1, 2, 3, 4, 6, 7, 8, 10, 11, 13]:
row["snowflake_type"] = snowflake_type_code_map[row["type_code"]]
else:
raise NotImplementedError(
Expand All @@ -305,14 +305,14 @@ def get_table_column_names_and_types(
6: "TIMESTAMP_LTZ",
7: "TIMESTAMP_TZ",
8: "TIMESTAMP_NTZ",
10: "ARRAY",
11: "BINARY",
13: "BOOLEAN",
}

snowflake_unsupported_map = {
5: "VARIANT -- Try converting to VARCHAR",
9: "OBJECT -- Try converting to VARCHAR",
10: "ARRAY -- Try converting to VARCHAR",
12: "TIME -- Try converting to VARCHAR",
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,62 @@ CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_varchar_to_string_pro
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_varchar_to_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@STAGE_HOLDER/feast.zip');

CREATE FUNCTION IF NOT EXISTS feast_PROJECT_NAME_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
LANGUAGE PYTHON
Expand Down
192 changes: 192 additions & 0 deletions sdk/python/feast/infra/utils/snowflake/snowpark/snowflake_udfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import sys
from binascii import unhexlify
from datetime import datetime

import numpy as np
import pandas
from _snowflake import vectorized

Expand All @@ -13,6 +15,13 @@
)
from feast.value_type import ValueType


def _cast_array_list_elements_to(array: np.ndarray, mapper):
for row in array:
for i, elem in enumerate(row):
row[i] = mapper(elem)


"""
CREATE OR REPLACE FUNCTION feast_snowflake_binary_to_bytes_proto(df BINARY)
RETURNS BINARY
Expand Down Expand Up @@ -59,6 +68,189 @@ def feast_snowflake_varchar_to_string_proto(df):
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_bytes_to_list_bytes_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_bytes_to_list_bytes_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""
# ValueType.STRING_LIST = 12
@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_bytes_to_list_bytes_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = df[0].to_numpy()
# Sometimes bytes come in as strings so we need to convert back to float
_cast_array_list_elements_to(numpy_arrays, str.encode)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.BYTES_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_varchar_to_list_string_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_varchar_to_list_string_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_varchar_to_list_string_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.STRING_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int32_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int32_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int32_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT32_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_number_to_list_int64_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_number_to_list_int64_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_number_to_list_int64_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.INT64_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_float_to_list_double_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_float_to_list_double_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_float_to_list_double_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = df[0].to_numpy()
# Sometimes floats come in as ints so we need to convert back to float
_cast_array_list_elements_to(numpy_arrays, float)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(numpy_arrays, ValueType.DOUBLE_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_boolean_to_list_bool_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_boolean_to_list_bool_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_boolean_to_list_bool_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(df[0].to_numpy(), ValueType.BOOL_LIST),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df ARRAY)
RETURNS BINARY
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('protobuf', 'pandas')
HANDLER = 'feast.infra.utils.snowflake.snowpark.snowflake_udfs.feast_snowflake_array_timestamp_to_list_unix_timestamp_proto'
IMPORTS = ('@feast_stage/feast.zip');
"""


@vectorized(input=pandas.DataFrame)
def feast_snowflake_array_timestamp_to_list_unix_timestamp_proto(df):
sys._xoptions["snowflake_partner_attribution"].append("feast")

numpy_arrays = df[0].to_numpy()
# Timestamps are coming in as strings so we should convert to timestamps
_cast_array_list_elements_to(
numpy_arrays, lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f")
)

df = list(
map(
ValueProto.SerializeToString,
python_values_to_proto_values(
df[0].to_numpy(), ValueType.UNIX_TIMESTAMP_LIST
),
)
)
return df


"""
CREATE OR REPLACE FUNCTION feast_snowflake_number_to_int32_proto(df NUMBER)
RETURNS BINARY
Expand Down
Loading

0 comments on commit e419e68

Please sign in to comment.