diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 571cc1d26b..7871cea02c 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -441,9 +441,11 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: with self._query_generator() as query: - df = self._execute_query(query).to_dataframe(create_bqstorage_client=True) + df = self._execute_query(query=query, timeout=timeout).to_dataframe( + create_bqstorage_client=True + ) return df def to_sql(self) -> str: @@ -507,15 +509,15 @@ def to_bigquery( print(f"Done writing to '{dest}'.") return str(dest) - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: with self._query_generator() as query: - q = self._execute_query(query=query) + q = self._execute_query(query=query, timeout=timeout) assert q return q.to_arrow() @log_exceptions_and_usage def _execute_query( - self, query, job_config=None, timeout: int = 1800 + self, query, job_config=None, timeout: Optional[int] = None ) -> Optional[bigquery.job.query.QueryJob]: bq_job = self.client.query(query, job_config=job_config) @@ -525,7 +527,7 @@ def _execute_query( ) return None - block_until_done(client=self.client, bq_job=bq_job, timeout=timeout) + block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800) return bq_job def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index e3bb4e8cca..2e1fc0d983 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -375,7 +375,7 @@ def get_temp_table_dml_header( return temp_table_dml_header @log_exceptions_and_usage - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: with self._query_generator() as query: temp_table_name = "_" + str(uuid.uuid4()).replace("-", "") temp_external_location = self.get_temp_s3_path() @@ -392,7 +392,7 @@ def _to_df_internal(self) -> pd.DataFrame: ) @log_exceptions_and_usage - def _to_arrow_internal(self) -> pa.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: with self._query_generator() as query: temp_table_name = "_" + str(uuid.uuid4()).replace("-", "") temp_external_location = self.get_temp_s3_path() diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index 8dc5f6c654..5849105869 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -327,7 +327,7 @@ def __init__( engine: Engine, config: MsSqlServerOfflineStoreConfig, full_feature_names: bool, - on_demand_feature_views: Optional[List[OnDemandFeatureView]], + on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, metadata: Optional[RetrievalMetadata] = None, drop_columns: Optional[List[str]] = None, ): @@ -347,10 +347,10 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views - def _to_df_internal(self) -> pandas.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pandas.DataFrame: return pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan) - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: result = pandas.read_sql(self.query, con=self.engine).fillna(value=np.nan) return pyarrow.Table.from_pandas(result) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index ada41c023b..837b9091e7 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -241,7 +241,7 @@ def __init__( query: Union[str, Callable[[], ContextManager[str]]], config: RepoConfig, full_feature_names: bool, - on_demand_feature_views: Optional[List[OnDemandFeatureView]], + on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None, metadata: Optional[RetrievalMetadata] = None, ): if not isinstance(query, str): @@ -267,7 +267,7 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: # We use arrow format because it gives better control of the table schema return self._to_arrow_internal().to_pandas() @@ -275,7 +275,7 @@ def to_sql(self) -> str: with self._query_generator() as query: return query - def _to_arrow_internal(self) -> pa.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: with self._query_generator() as query: with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur: conn.set_session(readonly=True) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 665a65fec5..f51bd810ea 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -336,13 +336,13 @@ def to_spark_df(self) -> pyspark.sql.DataFrame: *_, last = map(self.spark_session.sql, statements) return last - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously""" return self.to_spark_df().toPandas() - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" - return pyarrow.Table.from_pandas(self._to_df_internal()) + return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout)) def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): """ diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index a5a51311eb..7a7afa1665 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -85,16 +85,16 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """Return dataset as Pandas DataFrame synchronously including on demand transforms""" results = self._client.execute_query(query_text=self._query) self.pyarrow_schema = results.pyarrow_schema return results.to_dataframe() - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return payrrow dataset as synchronously including on demand transforms""" return pyarrow.Table.from_pandas( - self._to_df_internal(), schema=self.pyarrow_schema + self._to_df_internal(timeout=timeout), schema=self.pyarrow_schema ) def to_sql(self) -> str: diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 15e614a5a3..44c67a56c7 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -76,14 +76,14 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views @log_exceptions_and_usage - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function().compute() df = df.reset_index(drop=True) return df @log_exceptions_and_usage - def _to_arrow_internal(self): + def _to_arrow_internal(self, timeout: Optional[int] = None): # Only execute the evaluation function to build the final historical retrieval dataframe at the last moment. df = self.evaluation_function().compute() return pyarrow.Table.from_pandas(df) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 9331b75ec2..27a98a120f 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -62,7 +62,9 @@ class RetrievalJob(ABC): """A RetrievalJob manages the execution of a query to retrieve data from the offline store.""" def to_df( - self, validation_reference: Optional["ValidationReference"] = None + self, + validation_reference: Optional["ValidationReference"] = None, + timeout: Optional[int] = None, ) -> pd.DataFrame: """ Synchronously executes the underlying query and returns the result as a pandas dataframe. @@ -72,8 +74,9 @@ def to_df( Args: validation_reference (optional): The validation to apply against the retrieved dataframe. + timeout (optional): The query timeout if applicable. """ - features_df = self._to_df_internal() + features_df = self._to_df_internal(timeout=timeout) if self.on_demand_feature_views: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs @@ -101,7 +104,9 @@ def to_df( return features_df def to_arrow( - self, validation_reference: Optional["ValidationReference"] = None + self, + validation_reference: Optional["ValidationReference"] = None, + timeout: Optional[int] = None, ) -> pyarrow.Table: """ Synchronously executes the underlying query and returns the result as an arrow table. @@ -111,11 +116,12 @@ def to_arrow( Args: validation_reference (optional): The validation to apply against the retrieved dataframe. + timeout (optional): The query timeout if applicable. """ if not self.on_demand_feature_views and not validation_reference: - return self._to_arrow_internal() + return self._to_arrow_internal(timeout=timeout) - features_df = self._to_df_internal() + features_df = self._to_df_internal(timeout=timeout) if self.on_demand_feature_views: for odfv in self.on_demand_feature_views: features_df = features_df.join( @@ -147,20 +153,24 @@ def to_sql(self) -> str: pass @abstractmethod - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: """ Synchronously executes the underlying query and returns the result as a pandas dataframe. + timeout: RetreivalJob implementations may implement a timeout. + Does not handle on demand transformations or dataset validation. For either of those, `to_df` should be used. """ pass @abstractmethod - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """ Synchronously executes the underlying query and returns the result as an arrow table. + timeout: RetreivalJob implementations may implement a timeout. + Does not handle on demand transformations or dataset validation. For either of those, `to_arrow` should be used. """ diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 82b5150eaf..ffa30ba015 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -400,7 +400,7 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views @log_exceptions_and_usage - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: with self._query_generator() as query: return aws_utils.unload_redshift_query_to_df( self._redshift_client, @@ -414,7 +414,7 @@ def _to_df_internal(self) -> pd.DataFrame: ) @log_exceptions_and_usage - def _to_arrow_internal(self) -> pa.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: with self._query_generator() as query: return aws_utils.unload_redshift_query_to_pa( self._redshift_client, diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index e126b05934..2401458be7 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -410,7 +410,7 @@ def full_feature_names(self) -> bool: def on_demand_feature_views(self) -> List[OnDemandFeatureView]: return self._on_demand_feature_views - def _to_df_internal(self) -> pd.DataFrame: + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: with self._query_generator() as query: df = execute_snowflake_statement( @@ -419,7 +419,7 @@ def _to_df_internal(self) -> pd.DataFrame: return df - def _to_arrow_internal(self) -> pyarrow.Table: + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: with self._query_generator() as query: pa_table = execute_snowflake_statement( diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 3db3b74314..274a0dbc25 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -33,8 +33,6 @@ anyio==3.6.2 # watchfiles appdirs==1.4.4 # via fissix -appnope==0.1.3 - # via ipython asn1crypto==1.5.1 # via # oscrypto @@ -236,9 +234,9 @@ google-api-core[grpc]==2.11.0 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-api-python-client==2.78.0 +google-api-python-client==2.79.0 # via firebase-admin -google-auth==2.16.0 +google-auth==2.16.1 # via # gcsfs # google-api-core @@ -267,7 +265,7 @@ google-cloud-core==2.3.2 # google-cloud-storage google-cloud-datastore==2.13.2 # via feast (setup.py) -google-cloud-firestore==2.9.1 +google-cloud-firestore==2.10.0 # via firebase-admin google-cloud-storage==2.7.0 # via @@ -292,7 +290,7 @@ greenlet==2.0.2 # via sqlalchemy grpc-google-iam-v1==0.12.6 # via google-cloud-bigtable -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # google-api-core @@ -303,13 +301,13 @@ grpcio==1.51.1 # grpcio-status # grpcio-testing # grpcio-tools -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) -grpcio-status==1.51.1 +grpcio-status==1.51.3 # via google-api-core -grpcio-testing==1.51.1 +grpcio-testing==1.51.3 # via feast (setup.py) -grpcio-tools==1.51.1 +grpcio-tools==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -499,7 +497,7 @@ portalocker==2.7.0 # via msal-extensions pre-commit==3.0.4 # via feast (setup.py) -prompt-toolkit==3.0.36 +prompt-toolkit==3.0.37 # via ipython proto-plus==1.22.2 # via @@ -651,6 +649,8 @@ pyyaml==6.0 # uvicorn redis==4.2.2 # via feast (setup.py) +regex==2022.10.31 + # via feast (setup.py) requests==2.28.2 # via # adal @@ -693,7 +693,7 @@ s3fs==2022.1.0 # via feast (setup.py) s3transfer==0.5.2 # via boto3 -scipy==1.10.0 +scipy==1.10.1 # via great-expectations six==1.16.0 # via @@ -783,33 +783,33 @@ trino==0.321.0 # via feast (setup.py) typeguard==2.13.3 # via feast (setup.py) -types-docutils==0.19.1.4 +types-docutils==0.19.1.6 # via types-setuptools types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf -types-pymysql==1.0.19.3 +types-pymysql==1.0.19.5 # via feast (setup.py) -types-pyopenssl==23.0.0.3 +types-pyopenssl==23.0.0.4 # via types-redis -types-python-dateutil==2.8.19.7 +types-python-dateutil==2.8.19.8 # via feast (setup.py) -types-pytz==2022.7.1.0 +types-pytz==2022.7.1.2 # via feast (setup.py) -types-pyyaml==6.0.12.6 +types-pyyaml==6.0.12.8 # via feast (setup.py) -types-redis==4.5.1.1 +types-redis==4.5.1.3 # via feast (setup.py) -types-requests==2.28.11.13 +types-requests==2.28.11.14 # via feast (setup.py) -types-setuptools==67.3.0.1 +types-setuptools==67.4.0.1 # via feast (setup.py) -types-tabulate==0.9.0.0 +types-tabulate==0.9.0.1 # via feast (setup.py) -types-toml==0.10.8.4 +types-toml==0.10.8.5 # via responses -types-urllib3==1.26.25.6 +types-urllib3==1.26.25.7 # via types-requests typing-extensions==4.5.0 # via @@ -870,7 +870,7 @@ xmltodict==0.13.0 # via moto yarl==1.8.2 # via aiohttp -zipp==3.13.0 +zipp==3.14.0 # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: diff --git a/sdk/python/requirements/py3.10-requirements.txt b/sdk/python/requirements/py3.10-requirements.txt index 53812b8f6e..451ffa3253 100644 --- a/sdk/python/requirements/py3.10-requirements.txt +++ b/sdk/python/requirements/py3.10-requirements.txt @@ -51,11 +51,11 @@ fsspec==2023.1.0 # via dask greenlet==2.0.2 # via sqlalchemy -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # grpcio-reflection -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -84,7 +84,7 @@ mmh3==3.0.0 # via feast (setup.py) moreorless==0.4.0 # via bowler -mypy==1.0.0 +mypy==1.0.1 # via sqlalchemy mypy-extensions==1.0.0 # via mypy diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 5edd948335..4e94a67023 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -33,8 +33,6 @@ anyio==3.6.2 # watchfiles appdirs==1.4.4 # via fissix -appnope==0.1.3 - # via ipython asn1crypto==1.5.1 # via # oscrypto @@ -240,9 +238,9 @@ google-api-core[grpc]==2.11.0 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-api-python-client==2.78.0 +google-api-python-client==2.79.0 # via firebase-admin -google-auth==2.16.0 +google-auth==2.16.1 # via # gcsfs # google-api-core @@ -271,7 +269,7 @@ google-cloud-core==2.3.2 # google-cloud-storage google-cloud-datastore==2.13.2 # via feast (setup.py) -google-cloud-firestore==2.9.1 +google-cloud-firestore==2.10.0 # via firebase-admin google-cloud-storage==2.7.0 # via @@ -296,7 +294,7 @@ greenlet==2.0.2 # via sqlalchemy grpc-google-iam-v1==0.12.6 # via google-cloud-bigtable -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # google-api-core @@ -307,13 +305,13 @@ grpcio==1.51.1 # grpcio-status # grpcio-testing # grpcio-tools -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) -grpcio-status==1.51.1 +grpcio-status==1.51.3 # via google-api-core -grpcio-testing==1.51.1 +grpcio-testing==1.51.3 # via feast (setup.py) -grpcio-tools==1.51.1 +grpcio-tools==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -348,7 +346,7 @@ importlib-metadata==6.0.0 # via # great-expectations # sphinx -importlib-resources==5.10.2 +importlib-resources==5.12.0 # via jsonschema iniconfig==2.0.0 # via pytest @@ -509,7 +507,7 @@ portalocker==2.7.0 # via msal-extensions pre-commit==3.0.4 # via feast (setup.py) -prompt-toolkit==3.0.36 +prompt-toolkit==3.0.37 # via ipython proto-plus==1.22.2 # via @@ -661,6 +659,8 @@ pyyaml==6.0 # uvicorn redis==4.2.2 # via feast (setup.py) +regex==2022.10.31 + # via feast (setup.py) requests==2.28.2 # via # adal @@ -705,7 +705,7 @@ s3fs==2022.1.0 # via feast (setup.py) s3transfer==0.5.2 # via boto3 -scipy==1.10.0 +scipy==1.10.1 # via great-expectations six==1.16.0 # via @@ -795,33 +795,33 @@ trino==0.321.0 # via feast (setup.py) typeguard==2.13.3 # via feast (setup.py) -types-docutils==0.19.1.4 +types-docutils==0.19.1.6 # via types-setuptools types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf -types-pymysql==1.0.19.3 +types-pymysql==1.0.19.5 # via feast (setup.py) -types-pyopenssl==23.0.0.3 +types-pyopenssl==23.0.0.4 # via types-redis -types-python-dateutil==2.8.19.7 +types-python-dateutil==2.8.19.8 # via feast (setup.py) -types-pytz==2022.7.1.0 +types-pytz==2022.7.1.2 # via feast (setup.py) -types-pyyaml==6.0.12.6 +types-pyyaml==6.0.12.8 # via feast (setup.py) -types-redis==4.5.1.1 +types-redis==4.5.1.3 # via feast (setup.py) -types-requests==2.28.11.13 +types-requests==2.28.11.14 # via feast (setup.py) -types-setuptools==67.3.0.1 +types-setuptools==67.4.0.1 # via feast (setup.py) -types-tabulate==0.9.0.0 +types-tabulate==0.9.0.1 # via feast (setup.py) -types-toml==0.10.8.4 +types-toml==0.10.8.5 # via responses -types-urllib3==1.26.25.6 +types-urllib3==1.26.25.7 # via types-requests typing-extensions==4.5.0 # via @@ -885,7 +885,7 @@ xmltodict==0.13.0 # via moto yarl==1.8.2 # via aiohttp -zipp==3.13.0 +zipp==3.14.0 # via # importlib-metadata # importlib-resources diff --git a/sdk/python/requirements/py3.8-requirements.txt b/sdk/python/requirements/py3.8-requirements.txt index 7477ea06d3..0429a8313c 100644 --- a/sdk/python/requirements/py3.8-requirements.txt +++ b/sdk/python/requirements/py3.8-requirements.txt @@ -51,11 +51,11 @@ fsspec==2023.1.0 # via dask greenlet==2.0.2 # via sqlalchemy -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # grpcio-reflection -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -72,7 +72,7 @@ idna==3.4 # anyio # requests # rfc3986 -importlib-resources==5.10.2 +importlib-resources==5.12.0 # via jsonschema jinja2==3.1.2 # via feast (setup.py) @@ -86,7 +86,7 @@ mmh3==3.0.0 # via feast (setup.py) moreorless==0.4.0 # via bowler -mypy==1.0.0 +mypy==1.0.1 # via sqlalchemy mypy-extensions==1.0.0 # via mypy @@ -189,5 +189,5 @@ watchfiles==0.18.1 # via uvicorn websockets==10.4 # via uvicorn -zipp==3.13.0 +zipp==3.14.0 # via importlib-resources diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 78355555b2..df472dd6b0 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -33,8 +33,6 @@ anyio==3.6.2 # watchfiles appdirs==1.4.4 # via fissix -appnope==0.1.3 - # via ipython asn1crypto==1.5.1 # via # oscrypto @@ -236,9 +234,9 @@ google-api-core[grpc]==2.11.0 # google-cloud-datastore # google-cloud-firestore # google-cloud-storage -google-api-python-client==2.78.0 +google-api-python-client==2.79.0 # via firebase-admin -google-auth==2.16.0 +google-auth==2.16.1 # via # gcsfs # google-api-core @@ -267,7 +265,7 @@ google-cloud-core==2.3.2 # google-cloud-storage google-cloud-datastore==2.13.2 # via feast (setup.py) -google-cloud-firestore==2.9.1 +google-cloud-firestore==2.10.0 # via firebase-admin google-cloud-storage==2.7.0 # via @@ -292,7 +290,7 @@ greenlet==2.0.2 # via sqlalchemy grpc-google-iam-v1==0.12.6 # via google-cloud-bigtable -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # google-api-core @@ -303,13 +301,13 @@ grpcio==1.51.1 # grpcio-status # grpcio-testing # grpcio-tools -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) -grpcio-status==1.51.1 +grpcio-status==1.51.3 # via google-api-core -grpcio-testing==1.51.1 +grpcio-testing==1.51.3 # via feast (setup.py) -grpcio-tools==1.51.1 +grpcio-tools==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -501,7 +499,7 @@ portalocker==2.7.0 # via msal-extensions pre-commit==3.0.4 # via feast (setup.py) -prompt-toolkit==3.0.36 +prompt-toolkit==3.0.37 # via ipython proto-plus==1.22.2 # via @@ -653,6 +651,8 @@ pyyaml==6.0 # uvicorn redis==4.2.2 # via feast (setup.py) +regex==2022.10.31 + # via feast (setup.py) requests==2.28.2 # via # adal @@ -697,7 +697,7 @@ s3fs==2022.1.0 # via feast (setup.py) s3transfer==0.5.2 # via boto3 -scipy==1.10.0 +scipy==1.10.1 # via great-expectations six==1.16.0 # via @@ -787,33 +787,33 @@ trino==0.321.0 # via feast (setup.py) typeguard==2.13.3 # via feast (setup.py) -types-docutils==0.19.1.4 +types-docutils==0.19.1.6 # via types-setuptools types-protobuf==3.19.22 # via # feast (setup.py) # mypy-protobuf -types-pymysql==1.0.19.3 +types-pymysql==1.0.19.5 # via feast (setup.py) -types-pyopenssl==23.0.0.3 +types-pyopenssl==23.0.0.4 # via types-redis -types-python-dateutil==2.8.19.7 +types-python-dateutil==2.8.19.8 # via feast (setup.py) -types-pytz==2022.7.1.0 +types-pytz==2022.7.1.2 # via feast (setup.py) -types-pyyaml==6.0.12.6 +types-pyyaml==6.0.12.8 # via feast (setup.py) -types-redis==4.5.1.1 +types-redis==4.5.1.3 # via feast (setup.py) -types-requests==2.28.11.13 +types-requests==2.28.11.14 # via feast (setup.py) -types-setuptools==67.3.0.1 +types-setuptools==67.4.0.1 # via feast (setup.py) -types-tabulate==0.9.0.0 +types-tabulate==0.9.0.1 # via feast (setup.py) -types-toml==0.10.8.4 +types-toml==0.10.8.5 # via responses -types-urllib3==1.26.25.6 +types-urllib3==1.26.25.7 # via types-requests typing-extensions==4.5.0 # via @@ -877,7 +877,7 @@ xmltodict==0.13.0 # via moto yarl==1.8.2 # via aiohttp -zipp==3.13.0 +zipp==3.14.0 # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: diff --git a/sdk/python/requirements/py3.9-requirements.txt b/sdk/python/requirements/py3.9-requirements.txt index 48bd178f2e..c33622325c 100644 --- a/sdk/python/requirements/py3.9-requirements.txt +++ b/sdk/python/requirements/py3.9-requirements.txt @@ -51,11 +51,11 @@ fsspec==2023.1.0 # via dask greenlet==2.0.2 # via sqlalchemy -grpcio==1.51.1 +grpcio==1.51.3 # via # feast (setup.py) # grpcio-reflection -grpcio-reflection==1.51.1 +grpcio-reflection==1.51.3 # via feast (setup.py) h11==0.14.0 # via @@ -84,7 +84,7 @@ mmh3==3.0.0 # via feast (setup.py) moreorless==0.4.0 # via bowler -mypy==1.0.0 +mypy==1.0.1 # via sqlalchemy mypy-extensions==1.0.0 # via mypy diff --git a/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py new file mode 100644 index 0000000000..662be20b31 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py @@ -0,0 +1,84 @@ +from unittest.mock import Mock, patch + +import pandas as pd +import pyarrow +import pytest + +from feast.infra.offline_stores.bigquery import ( + BigQueryOfflineStoreConfig, + BigQueryRetrievalJob, +) +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.repo_config import RepoConfig + + +@pytest.fixture +def pandas_dataframe(): + return pd.DataFrame( + data={ + "key": [1, 2, 3], + "value": ["a", None, "c"], + } + ) + + +@pytest.fixture +def big_query_result(pandas_dataframe): + class BigQueryResult: + def to_dataframe(self, **kwargs): + return pandas_dataframe + + def to_arrow(self, **kwargs): + return pyarrow.Table.from_pandas(pandas_dataframe) + + def exception(self, timeout=None): + return None + + return BigQueryResult() + + +class TestBigQueryRetrievalJob: + query = "SELECT * FROM bigquery" + client = Mock() + retrieval_job = BigQueryRetrievalJob( + query=query, + client=client, + config=RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ), + full_feature_names=True, + on_demand_feature_views=[], + ) + + def test_to_sql(self): + assert self.retrieval_job.to_sql() == self.query + + def test_to_df(self, big_query_result, pandas_dataframe): + self.client.query.return_value = big_query_result + actual = self.retrieval_job.to_df() + pd.testing.assert_frame_equal(actual, pandas_dataframe) + + def test_to_df_timeout(self, big_query_result): + self.client.query.return_value = big_query_result + with patch.object(self.retrieval_job, "_execute_query"): + self.retrieval_job.to_df(timeout=30) + self.retrieval_job._execute_query.assert_called_once_with( + query=self.query, timeout=30 + ) + + def test_to_arrow(self, big_query_result, pandas_dataframe): + self.client.query.return_value = big_query_result + actual = self.retrieval_job.to_arrow() + pd.testing.assert_frame_equal(actual.to_pandas(), pandas_dataframe) + + def test_to_arrow_timeout(self, big_query_result): + self.client.query.return_value = big_query_result + with patch.object(self.retrieval_job, "_execute_query"): + self.retrieval_job.to_arrow(timeout=30) + self.retrieval_job._execute_query.assert_called_once_with( + query=self.query, timeout=30 + ) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py new file mode 100644 index 0000000000..53e9d061ad --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -0,0 +1,220 @@ +from typing import List, Optional +from unittest.mock import MagicMock, patch + +import pandas as pd +import pyarrow +import pytest + +from feast.infra.offline_stores.contrib.athena_offline_store.athena import ( + AthenaOfflineStoreConfig, + AthenaRetrievalJob, +) +from feast.infra.offline_stores.contrib.mssql_offline_store.mssql import ( + MsSqlServerOfflineStoreConfig, + MsSqlServerRetrievalJob, +) +from feast.infra.offline_stores.contrib.postgres_offline_store.postgres import ( + PostgreSQLOfflineStoreConfig, + PostgreSQLRetrievalJob, +) +from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( + SparkOfflineStoreConfig, + SparkRetrievalJob, +) +from feast.infra.offline_stores.contrib.trino_offline_store.trino import ( + TrinoRetrievalJob, +) +from feast.infra.offline_stores.file import FileRetrievalJob +from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata +from feast.infra.offline_stores.redshift import ( + RedshiftOfflineStoreConfig, + RedshiftRetrievalJob, +) +from feast.infra.offline_stores.snowflake import ( + SnowflakeOfflineStoreConfig, + SnowflakeRetrievalJob, +) +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.saved_dataset import SavedDatasetStorage + + +class MockRetrievalJob(RetrievalJob): + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + """ + Synchronously executes the underlying query and returns the result as a pandas dataframe. + + Does not handle on demand transformations or dataset validation. For either of those, + `to_df` should be used. + """ + pass + + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: + """ + Synchronously executes the underlying query and returns the result as an arrow table. + + Does not handle on demand transformations or dataset validation. For either of those, + `to_arrow` should be used. + """ + pass + + @property + def full_feature_names(self) -> bool: + """Returns True if full feature names should be applied to the results of the query.""" + pass + + @property + def on_demand_feature_views(self) -> List[OnDemandFeatureView]: + """Returns a list containing all the on demand feature views to be handled.""" + pass + + def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + """ + Synchronously executes the underlying query and persists the result in the same offline store + at the specified destination. + + Args: + storage: The saved dataset storage object specifying where the result should be persisted. + allow_overwrite: If True, a pre-existing location (e.g. table or file) can be overwritten. + Currently not all individual offline store implementations make use of this parameter. + """ + pass + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + """Returns metadata about the retrieval job.""" + pass + + +# Since RetreivalJob are not really tested for subclasses we add some tests here. +@pytest.fixture( + params=[ + MockRetrievalJob, + FileRetrievalJob, + RedshiftRetrievalJob, + SnowflakeRetrievalJob, + AthenaRetrievalJob, + MsSqlServerRetrievalJob, + PostgreSQLRetrievalJob, + SparkRetrievalJob, + TrinoRetrievalJob, + ] +) +def retrieval_job(request, environment): + if request.param is FileRetrievalJob: + return FileRetrievalJob(lambda: 1, full_feature_names=False) + elif request.param is RedshiftRetrievalJob: + offline_store_config = RedshiftOfflineStoreConfig( + cluster_id="feast-integration-tests", + region="us-west-2", + user="admin", + database="feast", + s3_staging_location="s3://feast-integration-tests/redshift/tests/ingestion", + iam_role="arn:aws:iam::402087665549:role/redshift_s3_access_role", + ) + environment.test_repo_config.offline_store = offline_store_config + return RedshiftRetrievalJob( + query="query", + redshift_client="", + s3_resource="", + config=environment.test_repo_config, + full_feature_names=False, + ) + elif request.param is SnowflakeRetrievalJob: + offline_store_config = SnowflakeOfflineStoreConfig( + type="snowflake.offline", + account="snow", + user="snow", + password="snow", + role="snow", + warehouse="snow", + database="FEAST", + schema="OFFLINE", + storage_integration_name="FEAST_S3", + blob_export_location="s3://feast-snowflake-offload/export", + ) + environment.test_repo_config.offline_store = offline_store_config + environment.test_repo_config.project = "project" + return SnowflakeRetrievalJob( + query="query", + snowflake_conn=MagicMock(), + config=environment.test_repo_config, + full_feature_names=False, + ) + elif request.param is AthenaRetrievalJob: + offline_store_config = AthenaOfflineStoreConfig( + data_source="athena", + region="athena", + database="athena", + workgroup="athena", + s3_staging_location="athena", + ) + + environment.test_repo_config.offline_store = offline_store_config + return AthenaRetrievalJob( + query="query", + athena_client="client", + s3_resource="", + config=environment.test_repo_config.offline_store, + full_feature_names=False, + ) + elif request.param is MsSqlServerRetrievalJob: + + return MsSqlServerRetrievalJob( + query="query", + engine=MagicMock(), + config=MsSqlServerOfflineStoreConfig( + connection_string="str" + ), # TODO: this does not match the RetrievalJob pattern. Suppose to be RepoConfig + full_feature_names=False, + ) + elif request.param is PostgreSQLRetrievalJob: + offline_store_config = PostgreSQLOfflineStoreConfig( + host="str", + database="str", + user="str", + password="str", + ) + environment.test_repo_config.offline_store = offline_store_config + return PostgreSQLRetrievalJob( + query="query", + config=environment.test_repo_config.offline_store, + full_feature_names=False, + ) + elif request.param is SparkRetrievalJob: + offline_store_config = SparkOfflineStoreConfig() + environment.test_repo_config.offline_store = offline_store_config + return SparkRetrievalJob( + spark_session=MagicMock(), + query="str", + full_feature_names=False, + config=environment.test_repo_config, + ) + elif request.param is TrinoRetrievalJob: + offline_store_config = SparkOfflineStoreConfig() + environment.test_repo_config.offline_store = offline_store_config + return TrinoRetrievalJob( + query="str", + client=MagicMock(), + config=environment.test_repo_config, + full_feature_names=False, + ) + else: + return request.param() + + +def test_to_sql(): + assert MockRetrievalJob().to_sql() is None + + +@pytest.mark.parametrize("timeout", (None, 30)) +def test_to_df_timeout(retrieval_job, timeout: Optional[int]): + with patch.object(retrieval_job, "_to_df_internal") as mock_to_df_internal: + retrieval_job.to_df(timeout=timeout) + mock_to_df_internal.assert_called_once_with(timeout=timeout) + + +@pytest.mark.parametrize("timeout", (None, 30)) +def test_to_arrow_timeout(retrieval_job, timeout: Optional[int]): + with patch.object(retrieval_job, "_to_arrow_internal") as mock_to_arrow_internal: + retrieval_job.to_arrow(timeout=timeout) + mock_to_arrow_internal.assert_called_once_with(timeout=timeout) diff --git a/setup.py b/setup.py index 0d564ca265..581f04dcfb 100644 --- a/setup.py +++ b/setup.py @@ -110,7 +110,7 @@ ] TRINO_REQUIRED = [ - "trino>=0.305.0,<0.400.0", + "trino>=0.305.0,<0.400.0", "regex" ] POSTGRES_REQUIRED = [