Skip to content

Commit

Permalink
feat: Adding query timeout to to_df and to_arrow retrieval methods (
Browse files Browse the repository at this point in the history
#3505)

* feat:Adding query timeout to `to_df` and `to_arrow` retrieval methods

Signed-off-by: gbmarc1 <marcantoine.belanger@shopify.com>

* feat: add regex requirement for trino

Signed-off-by: gbmarc1 <marcantoine.belanger@shopify.com>

* build: lock deps with new regex requirement

Signed-off-by: Marc-Antoine Belanger <mbelanger@explorance.com>

---------

Signed-off-by: gbmarc1 <marcantoine.belanger@shopify.com>
Signed-off-by: Marc-Antoine Belanger <mbelanger@explorance.com>
Co-authored-by: Marc-Antoine Belanger <mbelanger@explorance.com>
  • Loading branch information
gbmarc1 and Marc-Antoine Belanger authored Mar 2, 2023
1 parent 0c81431 commit bab6644
Show file tree
Hide file tree
Showing 19 changed files with 437 additions and 121 deletions.
14 changes: 8 additions & 6 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,11 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
df = self._execute_query(query=query, timeout=timeout).to_dataframe(
create_bqstorage_client=True
)
return df

def to_sql(self) -> str:
Expand Down Expand Up @@ -507,15 +509,15 @@ def to_bigquery(
print(f"Done writing to '{dest}'.")
return str(dest)

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
with self._query_generator() as query:
q = self._execute_query(query=query)
q = self._execute_query(query=query, timeout=timeout)
assert q
return q.to_arrow()

@log_exceptions_and_usage
def _execute_query(
self, query, job_config=None, timeout: int = 1800
self, query, job_config=None, timeout: Optional[int] = None
) -> Optional[bigquery.job.query.QueryJob]:
bq_job = self.client.query(query, job_config=job_config)

Expand All @@ -525,7 +527,7 @@ def _execute_query(
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800)
return bq_job

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def get_temp_table_dml_header(
return temp_table_dml_header

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
temp_external_location = self.get_temp_s3_path()
Expand All @@ -392,7 +392,7 @@ def _to_df_internal(self) -> pd.DataFrame:
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
temp_table_name = "_" + str(uuid.uuid4()).replace("-", "")
temp_external_location = self.get_temp_s3_path()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def __init__(
engine: Engine,
config: MsSqlServerOfflineStoreConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
drop_columns: Optional[List[str]] = None,
):
Expand All @@ -347,10 +347,10 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pandas.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pandas.DataFrame:
return pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
result = pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan)
return pyarrow.Table.from_pandas(result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def __init__(
query: Union[str, Callable[[], ContextManager[str]]],
config: RepoConfig,
full_feature_names: bool,
on_demand_feature_views: Optional[List[OnDemandFeatureView]],
on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,
metadata: Optional[RetrievalMetadata] = None,
):
if not isinstance(query, str):
Expand All @@ -267,15 +267,15 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# We use arrow format because it gives better control of the table schema
return self._to_arrow_internal().to_pandas()

def to_sql(self) -> str:
with self._query_generator() as query:
return query

def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur:
conn.set_session(readonly=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ def to_spark_df(self) -> pyspark.sql.DataFrame:
*_, last = map(self.spark_session.sql, statements)
return last

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously"""
return self.to_spark_df().toPandas()

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""Return dataset as pyarrow Table synchronously"""
return pyarrow.Table.from_pandas(self._to_df_internal())
return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout))

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""Return dataset as Pandas DataFrame synchronously including on demand transforms"""
results = self._client.execute_query(query_text=self._query)
self.pyarrow_schema = results.pyarrow_schema
return results.to_dataframe()

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""Return payrrow dataset as synchronously including on demand transforms"""
return pyarrow.Table.from_pandas(
self._to_df_internal(), schema=self.pyarrow_schema
self._to_df_internal(timeout=timeout), schema=self.pyarrow_schema
)

def to_sql(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
df = df.reset_index(drop=True)
return df

@log_exceptions_and_usage
def _to_arrow_internal(self):
def _to_arrow_internal(self, timeout: Optional[int] = None):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
return pyarrow.Table.from_pandas(df)
Expand Down
24 changes: 17 additions & 7 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class RetrievalJob(ABC):
"""A RetrievalJob manages the execution of a query to retrieve data from the offline store."""

def to_df(
self, validation_reference: Optional["ValidationReference"] = None
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> pd.DataFrame:
"""
Synchronously executes the underlying query and returns the result as a pandas dataframe.
Expand All @@ -72,8 +74,9 @@ def to_df(
Args:
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
features_df = self._to_df_internal()
features_df = self._to_df_internal(timeout=timeout)

if self.on_demand_feature_views:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
Expand Down Expand Up @@ -101,7 +104,9 @@ def to_df(
return features_df

def to_arrow(
self, validation_reference: Optional["ValidationReference"] = None
self,
validation_reference: Optional["ValidationReference"] = None,
timeout: Optional[int] = None,
) -> pyarrow.Table:
"""
Synchronously executes the underlying query and returns the result as an arrow table.
Expand All @@ -111,11 +116,12 @@ def to_arrow(
Args:
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
if not self.on_demand_feature_views and not validation_reference:
return self._to_arrow_internal()
return self._to_arrow_internal(timeout=timeout)

features_df = self._to_df_internal()
features_df = self._to_df_internal(timeout=timeout)
if self.on_demand_feature_views:
for odfv in self.on_demand_feature_views:
features_df = features_df.join(
Expand Down Expand Up @@ -147,20 +153,24 @@ def to_sql(self) -> str:
pass

@abstractmethod
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
"""
Synchronously executes the underlying query and returns the result as a pandas dataframe.
timeout: RetreivalJob implementations may implement a timeout.
Does not handle on demand transformations or dataset validation. For either of those,
`to_df` should be used.
"""
pass

@abstractmethod
def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
"""
Synchronously executes the underlying query and returns the result as an arrow table.
timeout: RetreivalJob implementations may implement a timeout.
Does not handle on demand transformations or dataset validation. For either of those,
`to_arrow` should be used.
"""
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
Expand All @@ -414,7 +414,7 @@ def _to_df_internal(self) -> pd.DataFrame:
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return self._on_demand_feature_views

def _to_df_internal(self) -> pd.DataFrame:
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
with self._query_generator() as query:

df = execute_snowflake_statement(
Expand All @@ -419,7 +419,7 @@ def _to_df_internal(self) -> pd.DataFrame:

return df

def _to_arrow_internal(self) -> pyarrow.Table:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
with self._query_generator() as query:

pa_table = execute_snowflake_statement(
Expand Down
Loading

0 comments on commit bab6644

Please sign in to comment.