From 22838c8bf6e6fd0ca73e76f40d093ad8e97c7228 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 13:34:06 +0800 Subject: [PATCH 01/12] Refactor constants and add sphinx docs Signed-off-by: Terence --- sdk/python/docs/index.rst | 9 +- sdk/python/feast/cli.py | 6 +- sdk/python/feast/client.py | 125 +++++------ sdk/python/feast/config.py | 36 ++- sdk/python/feast/constants.py | 317 ++++++++++++++++----------- sdk/python/feast/grpc/auth.py | 43 ++-- sdk/python/feast/job_service.py | 8 +- sdk/python/feast/loaders/ingest.py | 7 - sdk/python/feast/pyspark/launcher.py | 88 +++----- sdk/python/feast/wait.py | 5 +- 10 files changed, 329 insertions(+), 315 deletions(-) diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index e4471ba366..f7b81ce62f 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -33,4 +33,11 @@ Feature .. automodule:: feast.feature :inherited-members: - :members: \ No newline at end of file + :members: + +Constants +================== + +.. automodule:: feast.constants + :members: + :exclude-members: AuthProvider diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 9d79c40518..e8aab55cec 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -23,7 +23,7 @@ from feast.client import Client from feast.config import Config -from feast.constants import CONFIG_SPARK_LAUNCHER +from feast.constants import ConfigOptions from feast.entity import Entity from feast.feature_table import FeatureTable from feast.job_service import start_job_service @@ -422,7 +422,7 @@ def stop_stream_to_online(feature_table: str): Stop stream to online sync job """ - spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + spark_launcher = Config().get(ConfigOptions.SPARK_LAUNCHER) if spark_launcher == "emr": import feast.pyspark.aws.jobs @@ -441,7 +441,7 @@ def list_jobs(): """ from tabulate import tabulate - spark_launcher = Config().get(CONFIG_SPARK_LAUNCHER) + spark_launcher = Config().get(ConfigOptions.SPARK_LAUNCHER) if spark_launcher == "emr": import feast.pyspark.aws.jobs diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 60b320b9f0..7c00ca8cc6 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -24,24 +24,7 @@ import pandas as pd from feast.config import Config -from feast.constants import ( - CONFIG_CORE_ENABLE_SSL_KEY, - CONFIG_CORE_SERVER_SSL_CERT_KEY, - CONFIG_CORE_URL_KEY, - CONFIG_ENABLE_AUTH_KEY, - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY, - CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, - CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY, - CONFIG_JOB_SERVICE_URL_KEY, - CONFIG_PROJECT_KEY, - CONFIG_SERVING_ENABLE_SSL_KEY, - CONFIG_SERVING_SERVER_SSL_CERT_KEY, - CONFIG_SERVING_URL_KEY, - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT, - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION, - CONFIG_SPARK_STAGING_LOCATION, - FEAST_DEFAULT_OPTIONS, -) +from feast.constants import ConfigOptions from feast.core.CoreService_pb2 import ( ApplyEntityRequest, ApplyEntityResponse, @@ -81,7 +64,6 @@ from feast.grpc import auth as feast_auth from feast.grpc.grpc import create_grpc_channel from feast.loaders.ingest import ( - BATCH_INGESTION_PRODUCTION_TIMEOUT, _check_field_mappings, _read_table_from_source, _upload_to_bq_source, @@ -158,7 +140,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): self._auth_metadata: Optional[grpc.AuthMetadataPlugin] = None # Configure Auth Metadata Plugin if auth is enabled - if self._config.getboolean(CONFIG_ENABLE_AUTH_KEY): + if self._config.getboolean(ConfigOptions.ENABLE_AUTH): self._auth_metadata = feast_auth.get_auth_metadata_plugin(self._config) @property @@ -170,12 +152,14 @@ def _core_service(self): """ if not self._core_service_stub: channel = create_grpc_channel( - url=self._config.get(CONFIG_CORE_URL_KEY), - enable_ssl=self._config.getboolean(CONFIG_CORE_ENABLE_SSL_KEY), - enable_auth=self._config.getboolean(CONFIG_ENABLE_AUTH_KEY), - ssl_server_cert_path=self._config.get(CONFIG_CORE_SERVER_SSL_CERT_KEY), + url=self._config.get(ConfigOptions.CORE_URL), + enable_ssl=self._config.getboolean(ConfigOptions.CORE_ENABLE_SSL), + enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), + ssl_server_cert_path=self._config.get( + ConfigOptions.CORE_SERVER_SSL_CERT + ), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), ) self._core_service_stub = CoreServiceStub(channel) return self._core_service_stub @@ -189,21 +173,24 @@ def _serving_service(self): """ if not self._serving_service_stub: channel = create_grpc_channel( - url=self._config.get(CONFIG_SERVING_URL_KEY), - enable_ssl=self._config.getboolean(CONFIG_SERVING_ENABLE_SSL_KEY), - enable_auth=self._config.getboolean(CONFIG_ENABLE_AUTH_KEY), + url=self._config.get(ConfigOptions.SERVING_URL), + enable_ssl=self._config.getboolean(ConfigOptions.SERVING_ENABLE_SSL), + enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), ssl_server_cert_path=self._config.get( - CONFIG_SERVING_SERVER_SSL_CERT_KEY + ConfigOptions.SERVING_SERVER_SSL_CERT ), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), ) self._serving_service_stub = ServingServiceStub(channel) return self._serving_service_stub @property def _use_job_service(self) -> bool: - return self._config.exists(CONFIG_JOB_SERVICE_URL_KEY) + return ( + self._config.exists(ConfigOptions.JOB_SERVICE_URL) + and ConfigOptions.JOB_SERVICE_URL != "" + ) @property def _job_service(self): @@ -218,21 +205,23 @@ def _job_service(self): if not self._job_service_stub: channel = create_grpc_channel( - url=self._config.get(CONFIG_JOB_SERVICE_URL_KEY), - enable_ssl=self._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY), - enable_auth=self._config.getboolean(CONFIG_ENABLE_AUTH_KEY), + url=self._config.get(ConfigOptions.JOB_SERVICE_URL), + enable_ssl=self._config.getboolean( + ConfigOptions.JOB_SERVICE_ENABLE_SSL + ), + enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), ssl_server_cert_path=self._config.get( - CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY + ConfigOptions.JOB_SERVICE_SERVER_SSL_CERT ), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), ) self._job_service_service_stub = JobServiceStub(channel) return self._job_service_service_stub def _extra_grpc_params(self) -> Dict[str, Any]: return dict( - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) @@ -244,7 +233,7 @@ def core_url(self) -> str: Returns: Feast Core URL string """ - return self._config.get(CONFIG_CORE_URL_KEY) + return self._config.get(ConfigOptions.CORE_URL) @core_url.setter def core_url(self, value: str): @@ -254,7 +243,7 @@ def core_url(self, value: str): Args: value: Feast Core URL """ - self._config.set(CONFIG_CORE_URL_KEY, value) + self._config.set(ConfigOptions.CORE_URL, value) @property def serving_url(self) -> str: @@ -264,7 +253,7 @@ def serving_url(self) -> str: Returns: Feast Serving URL string """ - return self._config.get(CONFIG_SERVING_URL_KEY) + return self._config.get(ConfigOptions.SERVING_URL) @serving_url.setter def serving_url(self, value: str): @@ -274,7 +263,7 @@ def serving_url(self, value: str): Args: value: Feast Serving URL """ - self._config.set(CONFIG_SERVING_URL_KEY, value) + self._config.set(ConfigOptions.SERVING_URL, value) @property def job_service_url(self) -> str: @@ -284,7 +273,7 @@ def job_service_url(self) -> str: Returns: Feast Job Service URL string """ - return self._config.get(CONFIG_JOB_SERVICE_URL_KEY) + return self._config.get(ConfigOptions.JOB_SERVICE_URL) @job_service_url.setter def job_service_url(self, value: str): @@ -294,7 +283,7 @@ def job_service_url(self, value: str): Args: value: Feast Job Service URL """ - self._config.set(CONFIG_JOB_SERVICE_URL_KEY, value) + self._config.set(ConfigOptions.JOB_SERVICE_URL, value) @property def core_secure(self) -> bool: @@ -304,7 +293,7 @@ def core_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(CONFIG_CORE_ENABLE_SSL_KEY) + return self._config.getboolean(ConfigOptions.CORE_ENABLE_SSL) @core_secure.setter def core_secure(self, value: bool): @@ -314,7 +303,7 @@ def core_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(CONFIG_CORE_ENABLE_SSL_KEY, value) + self._config.set(ConfigOptions.CORE_ENABLE_SSL, value) @property def serving_secure(self) -> bool: @@ -324,7 +313,7 @@ def serving_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(CONFIG_SERVING_ENABLE_SSL_KEY) + return self._config.getboolean(ConfigOptions.SERVING_ENABLE_SSL) @serving_secure.setter def serving_secure(self, value: bool): @@ -334,7 +323,7 @@ def serving_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(CONFIG_SERVING_ENABLE_SSL_KEY, value) + self._config.set(ConfigOptions.SERVING_ENABLE_SSL, value) @property def job_service_secure(self) -> bool: @@ -344,7 +333,7 @@ def job_service_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY) + return self._config.getboolean(ConfigOptions.JOB_SERVICE_ENABLE_SSL) @job_service_secure.setter def job_service_secure(self, value: bool): @@ -354,7 +343,7 @@ def job_service_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(CONFIG_JOB_SERVICE_ENABLE_SSL_KEY, value) + self._config.set(ConfigOptions.JOB_SERVICE_ENABLE_SSL, value) def version(self): """ @@ -371,7 +360,7 @@ def version(self): if self.serving_url: serving_version = self._serving_service.GetFeastServingInfo( GetFeastServingInfoRequest(), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ).version result["serving"] = {"url": self.serving_url, "version": serving_version} @@ -379,7 +368,7 @@ def version(self): if self.core_url: core_version = self._core_service.GetFeastCoreVersion( GetFeastCoreVersionRequest(), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ).version result["core"] = {"url": self.core_url, "version": core_version} @@ -394,9 +383,9 @@ def project(self) -> str: Returns: Project name """ - if not self._config.get(CONFIG_PROJECT_KEY): + if not self._config.get(ConfigOptions.PROJECT): raise ValueError("No project has been configured.") - return self._config.get(CONFIG_PROJECT_KEY) + return self._config.get(ConfigOptions.PROJECT) def set_project(self, project: Optional[str] = None): """ @@ -406,8 +395,8 @@ def set_project(self, project: Optional[str] = None): project: Project to set as active. If unset, will reset to the default project. """ if project is None: - project = FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY] - self._config.set(CONFIG_PROJECT_KEY, project) + project = ConfigOptions().PROJECT + self._config.set(ConfigOptions.PROJECT, project) def list_projects(self) -> List[str]: """ @@ -420,7 +409,7 @@ def list_projects(self) -> List[str]: response = self._core_service.ListProjects( ListProjectsRequest(), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ListProjectsResponse return list(response.projects) @@ -435,7 +424,7 @@ def create_project(self, project: str): self._core_service.CreateProject( CreateProjectRequest(name=project), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: CreateProjectResponse @@ -452,7 +441,7 @@ def archive_project(self, project): try: self._core_service_stub.ArchiveProject( ArchiveProjectRequest(name=project), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ArchiveProjectResponse except grpc.RpcError as e: @@ -460,7 +449,7 @@ def archive_project(self, project): # revert to the default project if self._project == project: - self._project = FEAST_DEFAULT_OPTIONS[CONFIG_PROJECT_KEY] + self._project = ConfigOptions().PROJECT def apply_entity(self, entities: Union[List[Entity], Entity], project: str = None): """ @@ -513,7 +502,7 @@ def _apply_entity(self, project: str, entity: Entity): try: apply_entity_response = self._core_service.ApplyEntity( ApplyEntityRequest(project=project, spec=entity_proto), # type: ignore - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ApplyEntityResponse except grpc.RpcError as e: @@ -625,7 +614,7 @@ def _apply_feature_table(self, project: str, feature_table: FeatureTable): try: apply_feature_table_response = self._core_service.ApplyFeatureTable( ApplyFeatureTableRequest(project=project, table_spec=feature_table_proto), # type: ignore - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ApplyFeatureTableResponse except grpc.RpcError as e: @@ -722,7 +711,7 @@ def ingest( project: str = None, chunk_size: int = 10000, max_workers: int = max(CPU_COUNT - 1, 1), - timeout: int = BATCH_INGESTION_PRODUCTION_TIMEOUT, + timeout: int = int(ConfigOptions().BATCH_INGESTION_PRODUCTION_TIMEOUT), ) -> None: """ Batch load feature data into a FeatureTable. @@ -849,7 +838,7 @@ def _get_grpc_metadata(self): Returns: Tuple of metadata to attach to each gRPC call """ - if self._config.getboolean(CONFIG_ENABLE_AUTH_KEY) and self._auth_metadata: + if self._config.getboolean(ConfigOptions.ENABLE_AUTH) and self._auth_metadata: return self._auth_metadata.get_signed_meta() return () @@ -895,7 +884,7 @@ def get_online_features( entity_rows=_infer_online_entity_rows(entity_rows), project=project if project is not None else self.project, ), - timeout=self._config.getint(CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY), + timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) except grpc.RpcError as e: @@ -954,10 +943,10 @@ def get_historical_features( if output_location is None: output_location = os.path.join( - self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION), + self._config.get(ConfigOptions.HISTORICAL_FEATURE_OUTPUT_LOCATION), str(uuid.uuid4()), ) - output_format = self._config.get(CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT) + output_format = self._config.get(ConfigOptions.HISTORICAL_FEATURE_OUTPUT_FORMAT) feature_sources = [ feature_table.batch_source for feature_table in feature_tables ] @@ -978,7 +967,9 @@ def get_historical_features( else: entity_source = stage_entities_to_fs( entity_source, - staging_location=self._config.get(CONFIG_SPARK_STAGING_LOCATION), + staging_location=self._config.get( + ConfigOptions.SPARK_STAGING_LOCATION + ), ) if self._use_job_service: diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index 1bbab4edbc..89c94b74c4 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -19,14 +19,7 @@ from os.path import expanduser, join from typing import Dict, Optional -from feast.constants import ( - CONFIG_FEAST_ENV_VAR_PREFIX, - CONFIG_FILE_DEFAULT_DIRECTORY, - CONFIG_FILE_NAME, - CONFIG_FILE_SECTION, - FEAST_CONFIG_FILE_ENV_KEY, -) -from feast.constants import FEAST_DEFAULT_OPTIONS as DEFAULTS +from feast.constants import ConfigOptions _logger = logging.getLogger(__name__) @@ -50,13 +43,13 @@ def _init_config(path: str): os.makedirs(os.path.dirname(config_dir), exist_ok=True) # Create the configuration file itself - config = ConfigParser(defaults=DEFAULTS) + config = ConfigParser(defaults=ConfigOptions().defaults()) if os.path.exists(path): config.read(path) # Store all configuration in a single section - if not config.has_section(CONFIG_FILE_SECTION): - config.add_section(CONFIG_FILE_SECTION) + if not config.has_section(ConfigOptions().CONFIG_FILE_SECTION): + config.add_section(ConfigOptions().CONFIG_FILE_SECTION) # Save the current configuration config.write(open(path, "w")) @@ -72,8 +65,10 @@ def _get_feast_env_vars(): """ feast_env_vars = {} for key in os.environ.keys(): - if key.upper().startswith(CONFIG_FEAST_ENV_VAR_PREFIX): - feast_env_vars[key[len(CONFIG_FEAST_ENV_VAR_PREFIX) :]] = os.environ[key] + if key.upper().startswith(ConfigOptions().CONFIG_FEAST_ENV_VAR_PREFIX): + feast_env_vars[ + key[len(ConfigOptions().CONFIG_FEAST_ENV_VAR_PREFIX) :] + ] = os.environ[key] return feast_env_vars @@ -105,9 +100,10 @@ def __init__( path = join( expanduser("~"), os.environ.get( - FEAST_CONFIG_FILE_ENV_KEY, CONFIG_FILE_DEFAULT_DIRECTORY, + ConfigOptions().FEAST_CONFIG_FILE_ENV, + ConfigOptions().CONFIG_FILE_DEFAULT_DIRECTORY, ), - CONFIG_FILE_NAME, + ConfigOptions().CONFIG_FILE_NAME, ) config = _init_config(path) @@ -130,7 +126,7 @@ def get(self, option): """ return self._config.get( - CONFIG_FILE_SECTION, + ConfigOptions().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -146,7 +142,7 @@ def getboolean(self, option): """ return self._config.getboolean( - CONFIG_FILE_SECTION, + ConfigOptions().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -162,7 +158,7 @@ def getint(self, option): """ return self._config.getint( - CONFIG_FILE_SECTION, + ConfigOptions().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -178,7 +174,7 @@ def getfloat(self, option): """ return self._config.getfloat( - CONFIG_FILE_SECTION, + ConfigOptions().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -190,7 +186,7 @@ def set(self, option, value): option: Option name to use as key value: Value to store under option """ - self._config.set(CONFIG_FILE_SECTION, option, value=str(value)) + self._config.set(ConfigOptions().CONFIG_FILE_SECTION, option, value=str(value)) def exists(self, option): """ diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 3f0614b097..688dd22b9e 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -21,135 +21,188 @@ class AuthProvider(Enum): OAUTH = "oauth" -DATETIME_COLUMN = "datetime" - -# Environmental variable to specify Feast configuration file location -FEAST_CONFIG_FILE_ENV_KEY = "FEAST_CONFIG" - -# Default prefix to Feast environmental variables -CONFIG_FEAST_ENV_VAR_PREFIX = "FEAST_" - -# Default directory to Feast configuration file -CONFIG_FILE_DEFAULT_DIRECTORY = ".feast" - -# Default Feast configuration file name -CONFIG_FILE_NAME = "config" - -# Default section in Feast configuration file to specify options -CONFIG_FILE_SECTION = "general" - -# Feast Configuration Options -CONFIG_PROJECT_KEY = "project" -CONFIG_CORE_URL_KEY = "core_url" -CONFIG_CORE_ENABLE_SSL_KEY = "core_enable_ssl" -CONFIG_ENABLE_AUTH_KEY = "enable_auth" -CONFIG_ENABLE_AUTH_TOKEN_KEY = "auth_token" -CONFIG_CORE_SERVER_SSL_CERT_KEY = "core_server_ssl_cert" -CONFIG_JOB_CONTROLLER_SERVER_KEY = "jobcontroller_url" -CONFIG_SERVING_URL_KEY = "serving_url" -CONFIG_SERVING_ENABLE_SSL_KEY = "serving_enable_ssl" -CONFIG_SERVING_SERVER_SSL_CERT_KEY = "serving_server_ssl_cert" -CONFIG_JOB_SERVICE_URL_KEY = "job_service_url" -CONFIG_JOB_SERVICE_ENABLE_SSL_KEY = "job_service_enable_ssl" -CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY = "job_service_server_ssl_cert" -CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP = "job_service_enable_control_loop" -CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY = "grpc_connection_timeout_default" -CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY = "grpc_connection_timeout_apply" -CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY = ( - "batch_feature_request_wait_time_seconds" -) -CONFIG_OAUTH_GRANT_TYPE_KEY = "oauth_grant_type" -CONFIG_OAUTH_CLIENT_ID_KEY = "oauth_client_id" -CONFIG_OAUTH_CLIENT_SECRET_KEY = "oauth_client_secret" -CONFIG_OAUTH_AUDIENCE_KEY = "oauth_audience" -CONFIG_OAUTH_TOKEN_REQUEST_URL_KEY = "oauth_token_request_url" -CONFIG_AUTH_PROVIDER = "auth_provider" - -CONFIG_TIMEOUT_KEY = "timeout" -CONFIG_MAX_WAIT_INTERVAL_KEY = "max_wait_interval" - -# Spark Job Config -CONFIG_SPARK_LAUNCHER = "spark_launcher" # standalone, dataproc, emr - -CONFIG_SPARK_STAGING_LOCATION = "spark_staging_location" - -CONFIG_SPARK_INGESTION_JOB_JAR = "spark_ingestion_jar" - -CONFIG_SPARK_STANDALONE_MASTER = "spark_standalone_master" -CONFIG_SPARK_HOME = "spark_home" - -CONFIG_SPARK_DATAPROC_CLUSTER_NAME = "dataproc_cluster_name" -CONFIG_SPARK_DATAPROC_PROJECT = "dataproc_project" -CONFIG_SPARK_DATAPROC_REGION = "dataproc_region" - -CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT = "historical_feature_output_format" -CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_LOCATION = "historical_feature_output_location" - -CONFIG_REDIS_HOST = "redis_host" -CONFIG_REDIS_PORT = "redis_port" -CONFIG_REDIS_SSL = "redis_ssl" - -CONFIG_STATSD_ENABLED = "statsd_enabled" -CONFIG_STATSD_HOST = "statsd_host" -CONFIG_STATSD_PORT = "statsd_port" - -CONFIG_DEADLETTER_PATH = "deadletter_path" -CONFIG_STENCIL_URL = "stencil_url" - -CONFIG_SPARK_EMR_REGION = "emr_region" -CONFIG_SPARK_EMR_CLUSTER_ID = "emr_cluster_id" -CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH = "emr_cluster_template_path" -CONFIG_SPARK_EMR_LOG_LOCATION = "emr_log_location" - -# Configuration option default values -FEAST_DEFAULT_OPTIONS = { - # Default Feast project to use - CONFIG_PROJECT_KEY: "default", - # Default Feast Core URL - CONFIG_CORE_URL_KEY: "localhost:6565", - # Enable or disable TLS/SSL to Feast Core - CONFIG_CORE_ENABLE_SSL_KEY: "False", - # Enable user authentication to Feast Core - CONFIG_ENABLE_AUTH_KEY: "False", - # Path to certificate(s) to secure connection to Feast Core - CONFIG_CORE_SERVER_SSL_CERT_KEY: "", - # Default Feast Job Controller URL - CONFIG_JOB_CONTROLLER_SERVER_KEY: "localhost:6570", - # Default Feast Serving URL - CONFIG_SERVING_URL_KEY: "localhost:6565", - # Enable or disable TLS/SSL to Feast Serving - CONFIG_SERVING_ENABLE_SSL_KEY: "False", - # Path to certificate(s) to secure connection to Feast Serving - CONFIG_SERVING_SERVER_SSL_CERT_KEY: "", - # Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) - CONFIG_GRPC_CONNECTION_TIMEOUT_DEFAULT_KEY: "10", - # Default gRPC connection timeout when sending an ApplyFeatureSet command to - # Feast Core (in seconds) - CONFIG_GRPC_CONNECTION_TIMEOUT_APPLY_KEY: "600", - # Time to wait for batch feature requests before timing out. - CONFIG_BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS_KEY: "600", - CONFIG_TIMEOUT_KEY: "21600", - CONFIG_MAX_WAIT_INTERVAL_KEY: "60", - # Authentication Provider - Google OpenID/OAuth - CONFIG_AUTH_PROVIDER: "google", - CONFIG_SPARK_LAUNCHER: "dataproc", - CONFIG_SPARK_INGESTION_JOB_JAR: "https://storage.googleapis.com/feast-jobs/spark/" - "ingestion/feast-ingestion-spark-develop.jar", - CONFIG_SPARK_STANDALONE_MASTER: "local[*]", - CONFIG_REDIS_HOST: "localhost", - CONFIG_REDIS_PORT: "6379", - CONFIG_REDIS_SSL: "False", - CONFIG_SPARK_HISTORICAL_FEATURE_OUTPUT_FORMAT: "parquet", - # Enable or disable TLS/SSL to Feast Service - CONFIG_JOB_SERVICE_ENABLE_SSL_KEY: "False", - # Path to certificate(s) to secure connection to Feast Job Service - CONFIG_JOB_SERVICE_SERVER_SSL_CERT_KEY: "", - # Disable control loop by default for now - CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP: "False", - CONFIG_STATSD_ENABLED: "False", - # IngestionJob DeadLetter Destination - CONFIG_DEADLETTER_PATH: "", - # ProtoRegistry Address (currently only Stencil Server is supported as registry) - # https://github.com/gojekfarm/stencil - CONFIG_STENCIL_URL: "", -} +class Option: + def __init__(self, name, default): + self._name = name + self._default = default + + def __get__(self, instance, owner): + if instance is None: + return self._name.lower() + + return self._default + + +class ConfigMeta(type): + def __new__(cls, name, bases, attrs): + keys = [ + k + for k, v in attrs.items() + if not k.startswith("_") and isinstance(v, (str, int, float)) + ] + attrs["__config_keys__"] = keys + attrs.update({k: Option(k, attrs[k]) for k in keys}) + return super().__new__(cls, name, bases, attrs) + + +class ConfigOptions(metaclass=ConfigMeta): + """ Feast Configuration Options """ + + #: Default datetime column name for point-in-time join + DATETIME_COLUMN: str = "datetime" + + #: Environmental variable to specify Feast configuration file location + FEAST_CONFIG_FILE_ENV: str = "FEAST_CONFIG" + + #: Default prefix to Feast environmental variables + CONFIG_FEAST_ENV_VAR_PREFIX: str = "FEAST_" + + #: Default directory to Feast configuration file + CONFIG_FILE_DEFAULT_DIRECTORY: str = ".feast" + + #: Default Feast configuration file name + CONFIG_FILE_NAME: str = "config" + + #: Default section in Feast configuration file to specify options + CONFIG_FILE_SECTION: str = "general" + + #: Feast project namespace to use + PROJECT: str = "default" + + #: Default Feast Core URL + CORE_URL: str = "localhost:6565" + + #: Enable or disable TLS/SSL to Feast Core + CORE_ENABLE_SSL: str = "False" + + #: Enable user authentication to Feast Core + ENABLE_AUTH: str = "False" + + #: Auth token for user authentication to Feast Core + ENABLE_AUTH_TOKEN: str = "" + + #: Path to certificate(s) to secure connection to Feast Core + CORE_SERVER_SSL_CERT: str = "" + + #: Default Feast Serving URL + SERVING_URL: str = "localhost:6566" + + #: Enable or disable TLS/SSL to Feast Serving + SERVING_ENABLE_SSL: str = "False" + + #: Path to certificate(s) to secure connection to Feast Serving + SERVING_SERVER_SSL_CERT: str = "" + + #: Default Feast Job Service URL + JOB_SERVICE_URL: str = "" + + #: Enable or disable TLS/SSL to Feast Job Service + JOB_SERVICE_ENABLE_SSL: str = "False" + + #: Path to certificate(s) to secure connection to Feast Job Service + JOB_SERVICE_SERVER_SSL_CERT: str = "" + + #: Enable or disable control loop for Feast Job Service + JOB_SERVICE_ENABLE_CONTROL_LOOP: str = "False" + + #: Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) + GRPC_CONNECTION_TIMEOUT: str = "10" + + #: Default gRPC connection timeout when sending an ApplyFeatureSet command to Feast Core (in seconds) + GRPC_CONNECTION_TIMEOUT_APPLY: str = "600" + + #: Default timeout when running batch ingestion + BATCH_INGESTION_PRODUCTION_TIMEOUT: str = "120" + + #: Time to wait for historical feature requests before timing out. + BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600" + + #: Authentication Provider - Google OpenID/OAuth + AUTH_PROVIDER: str = "google" + + #: Spark Job launcher + SPARK_LAUNCHER: str = "dataproc" # standalone, dataproc, emr + + #: Feast Spark Job ingestion jobs staging location + SPARK_STAGING_LOCATION: str = "" + + #: Feast Spark Job ingestion jar file + SPARK_INGESTION_JAR: str = "https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar" + + #: Spark resource manager master url + SPARK_STANDALONE_MASTER: str = "local[*]" + + #: Directory where Spark is installed + SPARK_HOME: str = "" + + #: Dataproc cluster to run Feast Spark Jobs in + DATAPROC_CLUSTER_NAME: str = "" + + #: Project of Dataproc cluster + DATAPROC_PROJECT: str = "" + + #: Region of Dataproc cluster + DATAPROC_REGION: str = "" + + #: File format of historical retrieval features + HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" + + #: File location of historical retrieval features + HISTORICAL_FEATURE_OUTPUT_LOCATION: str = "" + + #: Default Redis host + REDIS_HOST: str = "localhost" + + #: Default Redis port + REDIS_PORT: str = "6379" + + #: Enable or disable TLS/SSL to Redis + REDIS_SSL: str = "False" + + #: Enable or disable StatsD + STATSD_ENABLED: str = "False" + + #: Default StatsD port + STATSD_HOST: str = "" + + #: Default StatsD port + STATSD_PORT: str = "" + + #: IngestionJob DeadLetter Destination + DEADLETTER_PATH: str = "" + + #: ProtoRegistry Address (currently only Stencil Server is supported as registry) + #: https://github.com/gojekfarm/stencil + STENCIL_URL: str = "" + + #: EMR cluster to run Feast Spark Jobs in + EMR_CLUSTER_ID: str = "" + + #: Region of EMR cluster + EMR_REGION: str = "" + + #: Template path of EMR cluster + EMR_CLUSTER_TEMPLATE_PATH: str = "" + + #: Log path of EMR cluster + EMR_LOG_LOCATION: str = "" + + #: Oauth grant type + OAUTH_GRANT_TYPE: str = "" + + #: Oauth client ID + OAUTH_CLIENT_ID: str = "" + + #: Oauth client secret + OAUTH_CLIENT_SECRET: str = "" + + #: Oauth intended recipients + OAUTH_AUDIENCE: str = "" + + #: Oauth token request url + OAUTH_TOKEN_REQUEST_URL: str = "" + + MAX_WAIT_INTERVAL: str = "60" + + def defaults(self): + return {k: getattr(self, k) for k in self.__config_keys__} diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index 9680607b8e..b8f130ae96 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -18,16 +18,7 @@ from google.auth.exceptions import DefaultCredentialsError from feast.config import Config -from feast.constants import ( - CONFIG_AUTH_PROVIDER, - CONFIG_ENABLE_AUTH_TOKEN_KEY, - CONFIG_OAUTH_AUDIENCE_KEY, - CONFIG_OAUTH_CLIENT_ID_KEY, - CONFIG_OAUTH_CLIENT_SECRET_KEY, - CONFIG_OAUTH_GRANT_TYPE_KEY, - CONFIG_OAUTH_TOKEN_REQUEST_URL_KEY, - AuthProvider, -) +from feast.constants import AuthProvider, ConfigOptions def get_auth_metadata_plugin(config: Config) -> grpc.AuthMetadataPlugin: @@ -44,9 +35,9 @@ def get_auth_metadata_plugin(config: Config) -> grpc.AuthMetadataPlugin: Args: config: Feast Configuration object """ - if AuthProvider(config.get(CONFIG_AUTH_PROVIDER)) == AuthProvider.GOOGLE: + if AuthProvider(config.get(ConfigOptions.AUTH_PROVIDER)) == AuthProvider.GOOGLE: return GoogleOpenIDAuthMetadataPlugin(config) - elif AuthProvider(config.get(CONFIG_AUTH_PROVIDER)) == AuthProvider.OAUTH: + elif AuthProvider(config.get(ConfigOptions.AUTH_PROVIDER)) == AuthProvider.OAUTH: return OAuthMetadataPlugin(config) else: raise RuntimeError( @@ -75,15 +66,15 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(CONFIG_ENABLE_AUTH_TOKEN_KEY): - self._static_token = config.get(CONFIG_ENABLE_AUTH_TOKEN_KEY) + if config.exists(ConfigOptions.ENABLE_AUTH_TOKEN): + self._static_token = config.get(ConfigOptions.ENABLE_AUTH_TOKEN) self._refresh_token(config) elif ( - config.exists(CONFIG_OAUTH_GRANT_TYPE_KEY) - and config.exists(CONFIG_OAUTH_CLIENT_ID_KEY) - and config.exists(CONFIG_OAUTH_CLIENT_SECRET_KEY) - and config.exists(CONFIG_OAUTH_AUDIENCE_KEY) - and config.exists(CONFIG_OAUTH_TOKEN_REQUEST_URL_KEY) + config.exists(ConfigOptions.OAUTH_GRANT_TYPE) + and config.exists(ConfigOptions.OAUTH_CLIENT_ID) + and config.exists(ConfigOptions.OAUTH_CLIENT_SECRET) + and config.exists(ConfigOptions.OAUTH_AUDIENCE) + and config.exists(ConfigOptions.OAUTH_TOKEN_REQUEST_URL) ): self._refresh_token(config) else: @@ -112,14 +103,14 @@ def _refresh_token(self, config: Config): headers_token = {"content-type": "application/json"} data_token = { - "grant_type": config.get(CONFIG_OAUTH_GRANT_TYPE_KEY), - "client_id": config.get(CONFIG_OAUTH_CLIENT_ID_KEY), - "client_secret": config.get(CONFIG_OAUTH_CLIENT_SECRET_KEY), - "audience": config.get(CONFIG_OAUTH_AUDIENCE_KEY), + "grant_type": config.get(ConfigOptions.OAUTH_GRANT_TYPE), + "client_id": config.get(ConfigOptions.OAUTH_CLIENT_ID), + "client_secret": config.get(ConfigOptions.OAUTH_CLIENT_SECRET), + "audience": config.get(ConfigOptions.OAUTH_AUDIENCE), } data_token = json.dumps(data_token) response_token = requests.post( - config.get(CONFIG_OAUTH_TOKEN_REQUEST_URL_KEY), + config.get(ConfigOptions.OAUTH_TOKEN_REQUEST_URL), headers=headers_token, data=data_token, ) @@ -171,8 +162,8 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(CONFIG_ENABLE_AUTH_TOKEN_KEY): - self._static_token = config.get(CONFIG_ENABLE_AUTH_TOKEN_KEY) + if config.exists(ConfigOptions.ENABLE_AUTH_TOKEN): + self._static_token = config.get(ConfigOptions.ENABLE_AUTH_TOKEN) self._request = requests.Request() self._refresh_token() diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 1daf335337..6518db9acc 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -10,7 +10,7 @@ import grpc import feast -from feast.constants import CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP +from feast.constants import ConfigOptions from feast.core import JobService_pb2_grpc from feast.core.JobService_pb2 import ( CancelJobResponse, @@ -127,7 +127,9 @@ def StartStreamToOnlineIngestionJob( request.table_name, request.project ) - if self.client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP): + if self.client._config.getboolean( + ConfigOptions.JOB_SERVICE_ENABLE_CONTROL_LOOP + ): # If the control loop is enabled, return existing stream ingestion job id instead of starting a new one params = get_stream_to_online_ingestion_params( self.client, request.project, feature_table, [] @@ -212,7 +214,7 @@ def start_job_service() -> None: client = feast.Client() - if client._config.getboolean(CONFIG_JOB_SERVICE_ENABLE_CONTROL_LOOP): + if client._config.getboolean(ConfigOptions.JOB_SERVICE_ENABLE_CONTROL_LOOP): # Start the control loop thread only if it's enabled from configs thread = threading.Thread(target=start_control_loop, daemon=True) thread.start() diff --git a/sdk/python/feast/loaders/ingest.py b/sdk/python/feast/loaders/ingest.py index dc87d5b32e..b4dc1e4239 100644 --- a/sdk/python/feast/loaders/ingest.py +++ b/sdk/python/feast/loaders/ingest.py @@ -11,13 +11,6 @@ from feast.staging.storage_client import get_staging_client -GRPC_CONNECTION_TIMEOUT_DEFAULT = 3 # type: int -GRPC_CONNECTION_TIMEOUT_APPLY = 300 # type: int -FEAST_SERVING_URL_ENV_KEY = "FEAST_SERVING_URL" # type: str -FEAST_CORE_URL_ENV_KEY = "FEAST_CORE_URL" # type: str -BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS = 300 -BATCH_INGESTION_PRODUCTION_TIMEOUT = 120 # type: int - def _check_field_mappings( column_names: List[str], diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index c73472b770..37f935b415 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -2,28 +2,7 @@ from typing import TYPE_CHECKING, List, Union from feast.config import Config -from feast.constants import ( - CONFIG_DEADLETTER_PATH, - CONFIG_REDIS_HOST, - CONFIG_REDIS_PORT, - CONFIG_REDIS_SSL, - CONFIG_SPARK_DATAPROC_CLUSTER_NAME, - CONFIG_SPARK_DATAPROC_PROJECT, - CONFIG_SPARK_DATAPROC_REGION, - CONFIG_SPARK_EMR_CLUSTER_ID, - CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH, - CONFIG_SPARK_EMR_LOG_LOCATION, - CONFIG_SPARK_EMR_REGION, - CONFIG_SPARK_HOME, - CONFIG_SPARK_INGESTION_JOB_JAR, - CONFIG_SPARK_LAUNCHER, - CONFIG_SPARK_STAGING_LOCATION, - CONFIG_SPARK_STANDALONE_MASTER, - CONFIG_STATSD_ENABLED, - CONFIG_STATSD_HOST, - CONFIG_STATSD_PORT, - CONFIG_STENCIL_URL, -) +from feast.constants import ConfigOptions from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource from feast.feature_table import FeatureTable from feast.pyspark.abc import ( @@ -47,7 +26,8 @@ def _standalone_launcher(config: Config) -> JobLauncher: from feast.pyspark.launchers import standalone return standalone.StandaloneClusterLauncher( - config.get(CONFIG_SPARK_STANDALONE_MASTER), config.get(CONFIG_SPARK_HOME) + config.get(ConfigOptions.SPARK_STANDALONE_MASTER), + config.get(ConfigOptions.SPARK_HOME), ) @@ -55,10 +35,10 @@ def _dataproc_launcher(config: Config) -> JobLauncher: from feast.pyspark.launchers import gcloud return gcloud.DataprocClusterLauncher( - config.get(CONFIG_SPARK_DATAPROC_CLUSTER_NAME), - config.get(CONFIG_SPARK_STAGING_LOCATION), - config.get(CONFIG_SPARK_DATAPROC_REGION), - config.get(CONFIG_SPARK_DATAPROC_PROJECT), + config.get(ConfigOptions.DATAPROC_CLUSTER_NAME), + config.get(ConfigOptions.SPARK_STAGING_LOCATION), + config.get(ConfigOptions.DATAPROC_REGION), + config.get(ConfigOptions.DATAPROC_PROJECT), ) @@ -70,11 +50,13 @@ def _get_optional(option): return config.get(option) return aws.EmrClusterLauncher( - region=config.get(CONFIG_SPARK_EMR_REGION), - existing_cluster_id=_get_optional(CONFIG_SPARK_EMR_CLUSTER_ID), - new_cluster_template_path=_get_optional(CONFIG_SPARK_EMR_CLUSTER_TEMPLATE_PATH), - staging_location=config.get(CONFIG_SPARK_STAGING_LOCATION), - emr_log_location=config.get(CONFIG_SPARK_EMR_LOG_LOCATION), + region=config.get(ConfigOptions.EMR_REGION), + existing_cluster_id=_get_optional(ConfigOptions.EMR_CLUSTER_ID), + new_cluster_template_path=_get_optional( + ConfigOptions.EMR_CLUSTER_TEMPLATE_PATH + ), + staging_location=config.get(ConfigOptions.SPARK_STAGING_LOCATION), + emr_log_location=config.get(ConfigOptions.EMR_LOG_LOCATION), ) @@ -86,7 +68,7 @@ def _get_optional(option): def resolve_launcher(config: Config) -> JobLauncher: - return _launchers[config.get(CONFIG_SPARK_LAUNCHER)](config) + return _launchers[config.get(ConfigOptions.SPARK_LAUNCHER)](config) def _source_to_argument(source: DataSource): @@ -241,24 +223,24 @@ def start_offline_to_online_ingestion( return launcher.offline_to_online_ingestion( BatchIngestionJobParameters( - jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR), + jar=client._config.get(ConfigOptions.SPARK_INGESTION_JAR), source=_source_to_argument(feature_table.batch_source), feature_table=_feature_table_to_argument(client, project, feature_table), start=start, end=end, - redis_host=client._config.get(CONFIG_REDIS_HOST), - redis_port=client._config.getint(CONFIG_REDIS_PORT), - redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), + redis_host=client._config.get(ConfigOptions.REDIS_HOST), + redis_port=client._config.getint(ConfigOptions.REDIS_PORT), + redis_ssl=client._config.getboolean(ConfigOptions.REDIS_SSL), statsd_host=( - client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.get(CONFIG_STATSD_HOST) + client._config.getboolean(ConfigOptions.STATSD_ENABLED) + and client._config.get(ConfigOptions.STATSD_HOST) ), statsd_port=( - client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.getint(CONFIG_STATSD_PORT) + client._config.getboolean(ConfigOptions.STATSD_ENABLED) + and client._config.getint(ConfigOptions.STATSD_PORT) ), - deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH), - stencil_url=client._config.get(CONFIG_STENCIL_URL), + deadletter_path=client._config.get(ConfigOptions.DEADLETTER_PATH), + stencil_url=client._config.get(ConfigOptions.STENCIL_URL), ) ) @@ -267,19 +249,19 @@ def get_stream_to_online_ingestion_params( client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str] ) -> StreamIngestionJobParameters: return StreamIngestionJobParameters( - jar=client._config.get(CONFIG_SPARK_INGESTION_JOB_JAR), + jar=client._config.get(ConfigOptions.SPARK_INGESTION_JAR), extra_jars=extra_jars, source=_source_to_argument(feature_table.stream_source), feature_table=_feature_table_to_argument(client, project, feature_table), - redis_host=client._config.get(CONFIG_REDIS_HOST), - redis_port=client._config.getint(CONFIG_REDIS_PORT), - redis_ssl=client._config.getboolean(CONFIG_REDIS_SSL), - statsd_host=client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.get(CONFIG_STATSD_HOST), - statsd_port=client._config.getboolean(CONFIG_STATSD_ENABLED) - and client._config.getint(CONFIG_STATSD_PORT), - deadletter_path=client._config.get(CONFIG_DEADLETTER_PATH), - stencil_url=client._config.get(CONFIG_STENCIL_URL), + redis_host=client._config.get(ConfigOptions.REDIS_HOST), + redis_port=client._config.getint(ConfigOptions.REDIS_PORT), + redis_ssl=client._config.getboolean(ConfigOptions.REDIS_SSL), + statsd_host=client._config.getboolean(ConfigOptions.STATSD_ENABLED) + and client._config.get(ConfigOptions.STATSD_HOST), + statsd_port=client._config.getboolean(ConfigOptions.STATSD_ENABLED) + and client._config.getint(ConfigOptions.STATSD_PORT), + deadletter_path=client._config.get(ConfigOptions.DEADLETTER_PATH), + stencil_url=client._config.get(ConfigOptions.STENCIL_URL), ) diff --git a/sdk/python/feast/wait.py b/sdk/python/feast/wait.py index c32897606e..83dcb7084c 100644 --- a/sdk/python/feast/wait.py +++ b/sdk/python/feast/wait.py @@ -15,15 +15,14 @@ import time from typing import Any, Callable, Optional, Tuple -from feast.constants import CONFIG_MAX_WAIT_INTERVAL_KEY -from feast.constants import FEAST_DEFAULT_OPTIONS as defaults +from feast.constants import ConfigOptions def wait_retry_backoff( retry_fn: Callable[[], Tuple[Any, bool]], timeout_secs: int = 0, timeout_msg: Optional[str] = "Timeout while waiting for retry_fn() to return True", - max_interval_secs: int = int(defaults[CONFIG_MAX_WAIT_INTERVAL_KEY]), + max_interval_secs: int = int(ConfigOptions().MAX_WAIT_INTERVAL), ) -> Any: """ Repeatedly try calling given retry_fn until it returns a True boolean success flag. From 3834444e4755ab912e30db14b8ad6e1eef63f35c Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 15:19:46 +0800 Subject: [PATCH 02/12] Set default jobservice to none Signed-off-by: Terence --- sdk/python/feast/client.py | 5 +---- sdk/python/feast/constants.py | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 7c00ca8cc6..33ca5a6f8a 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -187,10 +187,7 @@ def _serving_service(self): @property def _use_job_service(self) -> bool: - return ( - self._config.exists(ConfigOptions.JOB_SERVICE_URL) - and ConfigOptions.JOB_SERVICE_URL != "" - ) + return ConfigOptions.JOB_SERVICE_URL is not None @property def _job_service(self): diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 688dd22b9e..38ae0ff5bc 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,6 +14,7 @@ # limitations under the License. # from enum import Enum +from typing import Optional class AuthProvider(Enum): @@ -94,7 +95,7 @@ class ConfigOptions(metaclass=ConfigMeta): SERVING_SERVER_SSL_CERT: str = "" #: Default Feast Job Service URL - JOB_SERVICE_URL: str = "" + JOB_SERVICE_URL: Optional[str] = None #: Enable or disable TLS/SSL to Feast Job Service JOB_SERVICE_ENABLE_SSL: str = "False" From fc5c87b76591d71c2062dd402efb5315b87563d4 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 15:54:18 +0800 Subject: [PATCH 03/12] Fix auth tests Signed-off-by: Terence --- sdk/python/feast/constants.py | 4 ++-- sdk/python/feast/grpc/auth.py | 19 +++++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 38ae0ff5bc..2974481ea6 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -79,8 +79,8 @@ class ConfigOptions(metaclass=ConfigMeta): #: Enable user authentication to Feast Core ENABLE_AUTH: str = "False" - #: Auth token for user authentication to Feast Core - ENABLE_AUTH_TOKEN: str = "" + #: Auth token for user authentication to Feast + AUTH_TOKEN: str = "" #: Path to certificate(s) to secure connection to Feast Core CORE_SERVER_SSL_CERT: str = "" diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index b8f130ae96..11911d9197 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -66,8 +66,11 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(ConfigOptions.ENABLE_AUTH_TOKEN): - self._static_token = config.get(ConfigOptions.ENABLE_AUTH_TOKEN) + if ( + config.exists(ConfigOptions.AUTH_TOKEN) + and config.get(ConfigOptions.AUTH_TOKEN) != "" + ): + self._static_token = config.get(ConfigOptions.AUTH_TOKEN) self._refresh_token(config) elif ( config.exists(ConfigOptions.OAUTH_GRANT_TYPE) @@ -75,6 +78,11 @@ def __init__(self, config: Config): and config.exists(ConfigOptions.OAUTH_CLIENT_SECRET) and config.exists(ConfigOptions.OAUTH_AUDIENCE) and config.exists(ConfigOptions.OAUTH_TOKEN_REQUEST_URL) + and config.get(ConfigOptions.OAUTH_GRANT_TYPE) != "" + and config.get(ConfigOptions.OAUTH_CLIENT_ID) != "" + and config.get(ConfigOptions.OAUTH_CLIENT_SECRET) != "" + and config.get(ConfigOptions.OAUTH_AUDIENCE) != "" + and config.get(ConfigOptions.OAUTH_TOKEN_REQUEST_URL) != "" ): self._refresh_token(config) else: @@ -162,8 +170,11 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(ConfigOptions.ENABLE_AUTH_TOKEN): - self._static_token = config.get(ConfigOptions.ENABLE_AUTH_TOKEN) + if ( + config.exists(ConfigOptions.AUTH_TOKEN) + and config.get(ConfigOptions.AUTH_TOKEN) != "" + ): + self._static_token = config.get(ConfigOptions.AUTH_TOKEN) self._request = requests.Request() self._refresh_token() From 6022ddc442662848800d7a847c781d5b16552a35 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 18:31:28 +0800 Subject: [PATCH 04/12] Some fixes Signed-off-by: Terence --- sdk/python/docs/index.rst | 2 +- sdk/python/feast/client.py | 2 +- sdk/python/feast/constants.py | 5 +++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index f7b81ce62f..782ec9c83e 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -40,4 +40,4 @@ Constants .. automodule:: feast.constants :members: - :exclude-members: AuthProvider + :exclude-members: AuthProvider, ConfigMeta diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 33ca5a6f8a..8ba16a7dc4 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -187,7 +187,7 @@ def _serving_service(self): @property def _use_job_service(self) -> bool: - return ConfigOptions.JOB_SERVICE_URL is not None + return self._config.get(ConfigOptions.JOB_SERVICE_URL) is not None @property def _job_service(self): diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 2974481ea6..db14e41430 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -35,6 +35,11 @@ def __get__(self, instance, owner): class ConfigMeta(type): + """ + Class factory which customizes ConfigOptions class instantiation. + Specifically, setting its name to lowercase of capitalized variable. + """ + def __new__(cls, name, bases, attrs): keys = [ k From fe6f4331515e88519fc6025ceb25d59dbdb28e26 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 19:38:05 +0800 Subject: [PATCH 05/12] Address comments Signed-off-by: Terence --- sdk/python/feast/cli.py | 6 +- sdk/python/feast/client.py | 111 +++++++++++++-------------- sdk/python/feast/config.py | 31 ++++---- sdk/python/feast/constants.py | 3 +- sdk/python/feast/grpc/auth.py | 51 ++++++------ sdk/python/feast/job_service.py | 8 +- sdk/python/feast/pyspark/launcher.py | 67 ++++++++-------- sdk/python/feast/wait.py | 4 +- 8 files changed, 131 insertions(+), 150 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index e8aab55cec..43f9c581a6 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -23,7 +23,7 @@ from feast.client import Client from feast.config import Config -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt from feast.entity import Entity from feast.feature_table import FeatureTable from feast.job_service import start_job_service @@ -422,7 +422,7 @@ def stop_stream_to_online(feature_table: str): Stop stream to online sync job """ - spark_launcher = Config().get(ConfigOptions.SPARK_LAUNCHER) + spark_launcher = Config().get(opt.SPARK_LAUNCHER) if spark_launcher == "emr": import feast.pyspark.aws.jobs @@ -441,7 +441,7 @@ def list_jobs(): """ from tabulate import tabulate - spark_launcher = Config().get(ConfigOptions.SPARK_LAUNCHER) + spark_launcher = Config().get(opt.SPARK_LAUNCHER) if spark_launcher == "emr": import feast.pyspark.aws.jobs diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 8ba16a7dc4..9d90cea9c5 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -24,7 +24,7 @@ import pandas as pd from feast.config import Config -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt from feast.core.CoreService_pb2 import ( ApplyEntityRequest, ApplyEntityResponse, @@ -140,7 +140,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs): self._auth_metadata: Optional[grpc.AuthMetadataPlugin] = None # Configure Auth Metadata Plugin if auth is enabled - if self._config.getboolean(ConfigOptions.ENABLE_AUTH): + if self._config.getboolean(opt.ENABLE_AUTH): self._auth_metadata = feast_auth.get_auth_metadata_plugin(self._config) @property @@ -152,14 +152,12 @@ def _core_service(self): """ if not self._core_service_stub: channel = create_grpc_channel( - url=self._config.get(ConfigOptions.CORE_URL), - enable_ssl=self._config.getboolean(ConfigOptions.CORE_ENABLE_SSL), - enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), - ssl_server_cert_path=self._config.get( - ConfigOptions.CORE_SERVER_SSL_CERT - ), + url=self._config.get(opt.CORE_URL), + enable_ssl=self._config.getboolean(opt.CORE_ENABLE_SSL), + enable_auth=self._config.getboolean(opt.ENABLE_AUTH), + ssl_server_cert_path=self._config.get(opt.CORE_SERVER_SSL_CERT), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), ) self._core_service_stub = CoreServiceStub(channel) return self._core_service_stub @@ -173,21 +171,22 @@ def _serving_service(self): """ if not self._serving_service_stub: channel = create_grpc_channel( - url=self._config.get(ConfigOptions.SERVING_URL), - enable_ssl=self._config.getboolean(ConfigOptions.SERVING_ENABLE_SSL), - enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), - ssl_server_cert_path=self._config.get( - ConfigOptions.SERVING_SERVER_SSL_CERT - ), + url=self._config.get(opt.SERVING_URL), + enable_ssl=self._config.getboolean(opt.SERVING_ENABLE_SSL), + enable_auth=self._config.getboolean(opt.ENABLE_AUTH), + ssl_server_cert_path=self._config.get(opt.SERVING_SERVER_SSL_CERT), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), ) self._serving_service_stub = ServingServiceStub(channel) return self._serving_service_stub @property def _use_job_service(self) -> bool: - return self._config.get(ConfigOptions.JOB_SERVICE_URL) is not None + return ( + self._config.exists(opt.JOB_SERVICE_URL) + and self._config.get(opt.JOB_SERVICE_URL) != "" + ) @property def _job_service(self): @@ -202,23 +201,19 @@ def _job_service(self): if not self._job_service_stub: channel = create_grpc_channel( - url=self._config.get(ConfigOptions.JOB_SERVICE_URL), - enable_ssl=self._config.getboolean( - ConfigOptions.JOB_SERVICE_ENABLE_SSL - ), - enable_auth=self._config.getboolean(ConfigOptions.ENABLE_AUTH), - ssl_server_cert_path=self._config.get( - ConfigOptions.JOB_SERVICE_SERVER_SSL_CERT - ), + url=self._config.get(opt.JOB_SERVICE_URL), + enable_ssl=self._config.getboolean(opt.JOB_SERVICE_ENABLE_SSL), + enable_auth=self._config.getboolean(opt.ENABLE_AUTH), + ssl_server_cert_path=self._config.get(opt.JOB_SERVICE_SERVER_SSL_CERT), auth_metadata_plugin=self._auth_metadata, - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), ) self._job_service_service_stub = JobServiceStub(channel) return self._job_service_service_stub def _extra_grpc_params(self) -> Dict[str, Any]: return dict( - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) @@ -230,7 +225,7 @@ def core_url(self) -> str: Returns: Feast Core URL string """ - return self._config.get(ConfigOptions.CORE_URL) + return self._config.get(opt.CORE_URL) @core_url.setter def core_url(self, value: str): @@ -240,7 +235,7 @@ def core_url(self, value: str): Args: value: Feast Core URL """ - self._config.set(ConfigOptions.CORE_URL, value) + self._config.set(opt.CORE_URL, value) @property def serving_url(self) -> str: @@ -250,7 +245,7 @@ def serving_url(self) -> str: Returns: Feast Serving URL string """ - return self._config.get(ConfigOptions.SERVING_URL) + return self._config.get(opt.SERVING_URL) @serving_url.setter def serving_url(self, value: str): @@ -260,7 +255,7 @@ def serving_url(self, value: str): Args: value: Feast Serving URL """ - self._config.set(ConfigOptions.SERVING_URL, value) + self._config.set(opt.SERVING_URL, value) @property def job_service_url(self) -> str: @@ -270,7 +265,7 @@ def job_service_url(self) -> str: Returns: Feast Job Service URL string """ - return self._config.get(ConfigOptions.JOB_SERVICE_URL) + return self._config.get(opt.JOB_SERVICE_URL) @job_service_url.setter def job_service_url(self, value: str): @@ -280,7 +275,7 @@ def job_service_url(self, value: str): Args: value: Feast Job Service URL """ - self._config.set(ConfigOptions.JOB_SERVICE_URL, value) + self._config.set(opt.JOB_SERVICE_URL, value) @property def core_secure(self) -> bool: @@ -290,7 +285,7 @@ def core_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(ConfigOptions.CORE_ENABLE_SSL) + return self._config.getboolean(opt.CORE_ENABLE_SSL) @core_secure.setter def core_secure(self, value: bool): @@ -300,7 +295,7 @@ def core_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(ConfigOptions.CORE_ENABLE_SSL, value) + self._config.set(opt.CORE_ENABLE_SSL, value) @property def serving_secure(self) -> bool: @@ -310,7 +305,7 @@ def serving_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(ConfigOptions.SERVING_ENABLE_SSL) + return self._config.getboolean(opt.SERVING_ENABLE_SSL) @serving_secure.setter def serving_secure(self, value: bool): @@ -320,7 +315,7 @@ def serving_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(ConfigOptions.SERVING_ENABLE_SSL, value) + self._config.set(opt.SERVING_ENABLE_SSL, value) @property def job_service_secure(self) -> bool: @@ -330,7 +325,7 @@ def job_service_secure(self) -> bool: Returns: Whether client-side SSL/TLS is enabled """ - return self._config.getboolean(ConfigOptions.JOB_SERVICE_ENABLE_SSL) + return self._config.getboolean(opt.JOB_SERVICE_ENABLE_SSL) @job_service_secure.setter def job_service_secure(self, value: bool): @@ -340,7 +335,7 @@ def job_service_secure(self, value: bool): Args: value: True to enable client-side SSL/TLS """ - self._config.set(ConfigOptions.JOB_SERVICE_ENABLE_SSL, value) + self._config.set(opt.JOB_SERVICE_ENABLE_SSL, value) def version(self): """ @@ -357,7 +352,7 @@ def version(self): if self.serving_url: serving_version = self._serving_service.GetFeastServingInfo( GetFeastServingInfoRequest(), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ).version result["serving"] = {"url": self.serving_url, "version": serving_version} @@ -365,7 +360,7 @@ def version(self): if self.core_url: core_version = self._core_service.GetFeastCoreVersion( GetFeastCoreVersionRequest(), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ).version result["core"] = {"url": self.core_url, "version": core_version} @@ -380,9 +375,9 @@ def project(self) -> str: Returns: Project name """ - if not self._config.get(ConfigOptions.PROJECT): + if not self._config.get(opt.PROJECT): raise ValueError("No project has been configured.") - return self._config.get(ConfigOptions.PROJECT) + return self._config.get(opt.PROJECT) def set_project(self, project: Optional[str] = None): """ @@ -392,8 +387,8 @@ def set_project(self, project: Optional[str] = None): project: Project to set as active. If unset, will reset to the default project. """ if project is None: - project = ConfigOptions().PROJECT - self._config.set(ConfigOptions.PROJECT, project) + project = opt().PROJECT + self._config.set(opt.PROJECT, project) def list_projects(self) -> List[str]: """ @@ -406,7 +401,7 @@ def list_projects(self) -> List[str]: response = self._core_service.ListProjects( ListProjectsRequest(), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ListProjectsResponse return list(response.projects) @@ -421,7 +416,7 @@ def create_project(self, project: str): self._core_service.CreateProject( CreateProjectRequest(name=project), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: CreateProjectResponse @@ -438,7 +433,7 @@ def archive_project(self, project): try: self._core_service_stub.ArchiveProject( ArchiveProjectRequest(name=project), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ArchiveProjectResponse except grpc.RpcError as e: @@ -446,7 +441,7 @@ def archive_project(self, project): # revert to the default project if self._project == project: - self._project = ConfigOptions().PROJECT + self._project = opt().PROJECT def apply_entity(self, entities: Union[List[Entity], Entity], project: str = None): """ @@ -499,7 +494,7 @@ def _apply_entity(self, project: str, entity: Entity): try: apply_entity_response = self._core_service.ApplyEntity( ApplyEntityRequest(project=project, spec=entity_proto), # type: ignore - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ApplyEntityResponse except grpc.RpcError as e: @@ -611,7 +606,7 @@ def _apply_feature_table(self, project: str, feature_table: FeatureTable): try: apply_feature_table_response = self._core_service.ApplyFeatureTable( ApplyFeatureTableRequest(project=project, table_spec=feature_table_proto), # type: ignore - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) # type: ApplyFeatureTableResponse except grpc.RpcError as e: @@ -708,7 +703,7 @@ def ingest( project: str = None, chunk_size: int = 10000, max_workers: int = max(CPU_COUNT - 1, 1), - timeout: int = int(ConfigOptions().BATCH_INGESTION_PRODUCTION_TIMEOUT), + timeout: int = int(opt().BATCH_INGESTION_PRODUCTION_TIMEOUT), ) -> None: """ Batch load feature data into a FeatureTable. @@ -835,7 +830,7 @@ def _get_grpc_metadata(self): Returns: Tuple of metadata to attach to each gRPC call """ - if self._config.getboolean(ConfigOptions.ENABLE_AUTH) and self._auth_metadata: + if self._config.getboolean(opt.ENABLE_AUTH) and self._auth_metadata: return self._auth_metadata.get_signed_meta() return () @@ -881,7 +876,7 @@ def get_online_features( entity_rows=_infer_online_entity_rows(entity_rows), project=project if project is not None else self.project, ), - timeout=self._config.getint(ConfigOptions.GRPC_CONNECTION_TIMEOUT), + timeout=self._config.getint(opt.GRPC_CONNECTION_TIMEOUT), metadata=self._get_grpc_metadata(), ) except grpc.RpcError as e: @@ -940,10 +935,10 @@ def get_historical_features( if output_location is None: output_location = os.path.join( - self._config.get(ConfigOptions.HISTORICAL_FEATURE_OUTPUT_LOCATION), + self._config.get(opt.HISTORICAL_FEATURE_OUTPUT_LOCATION), str(uuid.uuid4()), ) - output_format = self._config.get(ConfigOptions.HISTORICAL_FEATURE_OUTPUT_FORMAT) + output_format = self._config.get(opt.HISTORICAL_FEATURE_OUTPUT_FORMAT) feature_sources = [ feature_table.batch_source for feature_table in feature_tables ] @@ -964,9 +959,7 @@ def get_historical_features( else: entity_source = stage_entities_to_fs( entity_source, - staging_location=self._config.get( - ConfigOptions.SPARK_STAGING_LOCATION - ), + staging_location=self._config.get(opt.SPARK_STAGING_LOCATION), ) if self._use_job_service: diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index 89c94b74c4..edcdd6cd66 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -19,7 +19,7 @@ from os.path import expanduser, join from typing import Dict, Optional -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt _logger = logging.getLogger(__name__) @@ -43,13 +43,13 @@ def _init_config(path: str): os.makedirs(os.path.dirname(config_dir), exist_ok=True) # Create the configuration file itself - config = ConfigParser(defaults=ConfigOptions().defaults()) + config = ConfigParser(defaults=opt().defaults()) if os.path.exists(path): config.read(path) # Store all configuration in a single section - if not config.has_section(ConfigOptions().CONFIG_FILE_SECTION): - config.add_section(ConfigOptions().CONFIG_FILE_SECTION) + if not config.has_section(opt().CONFIG_FILE_SECTION): + config.add_section(opt().CONFIG_FILE_SECTION) # Save the current configuration config.write(open(path, "w")) @@ -65,10 +65,10 @@ def _get_feast_env_vars(): """ feast_env_vars = {} for key in os.environ.keys(): - if key.upper().startswith(ConfigOptions().CONFIG_FEAST_ENV_VAR_PREFIX): - feast_env_vars[ - key[len(ConfigOptions().CONFIG_FEAST_ENV_VAR_PREFIX) :] - ] = os.environ[key] + if key.upper().startswith(opt().CONFIG_FEAST_ENV_VAR_PREFIX): + feast_env_vars[key[len(opt().CONFIG_FEAST_ENV_VAR_PREFIX) :]] = os.environ[ + key + ] return feast_env_vars @@ -100,10 +100,9 @@ def __init__( path = join( expanduser("~"), os.environ.get( - ConfigOptions().FEAST_CONFIG_FILE_ENV, - ConfigOptions().CONFIG_FILE_DEFAULT_DIRECTORY, + opt().FEAST_CONFIG_FILE_ENV, opt().CONFIG_FILE_DEFAULT_DIRECTORY, ), - ConfigOptions().CONFIG_FILE_NAME, + opt().CONFIG_FILE_NAME, ) config = _init_config(path) @@ -126,7 +125,7 @@ def get(self, option): """ return self._config.get( - ConfigOptions().CONFIG_FILE_SECTION, + opt().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -142,7 +141,7 @@ def getboolean(self, option): """ return self._config.getboolean( - ConfigOptions().CONFIG_FILE_SECTION, + opt().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -158,7 +157,7 @@ def getint(self, option): """ return self._config.getint( - ConfigOptions().CONFIG_FILE_SECTION, + opt().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -174,7 +173,7 @@ def getfloat(self, option): """ return self._config.getfloat( - ConfigOptions().CONFIG_FILE_SECTION, + opt().CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -186,7 +185,7 @@ def set(self, option, value): option: Option name to use as key value: Value to store under option """ - self._config.set(ConfigOptions().CONFIG_FILE_SECTION, option, value=str(value)) + self._config.set(opt().CONFIG_FILE_SECTION, option, value=str(value)) def exists(self, option): """ diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index db14e41430..4c0931aef9 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,7 +14,6 @@ # limitations under the License. # from enum import Enum -from typing import Optional class AuthProvider(Enum): @@ -100,7 +99,7 @@ class ConfigOptions(metaclass=ConfigMeta): SERVING_SERVER_SSL_CERT: str = "" #: Default Feast Job Service URL - JOB_SERVICE_URL: Optional[str] = None + JOB_SERVICE_URL: str = "" #: Enable or disable TLS/SSL to Feast Job Service JOB_SERVICE_ENABLE_SSL: str = "False" diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index 11911d9197..6a14b8ae89 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -18,7 +18,8 @@ from google.auth.exceptions import DefaultCredentialsError from feast.config import Config -from feast.constants import AuthProvider, ConfigOptions +from feast.constants import AuthProvider +from feast.constants import ConfigOptions as opt def get_auth_metadata_plugin(config: Config) -> grpc.AuthMetadataPlugin: @@ -35,9 +36,9 @@ def get_auth_metadata_plugin(config: Config) -> grpc.AuthMetadataPlugin: Args: config: Feast Configuration object """ - if AuthProvider(config.get(ConfigOptions.AUTH_PROVIDER)) == AuthProvider.GOOGLE: + if AuthProvider(config.get(opt.AUTH_PROVIDER)) == AuthProvider.GOOGLE: return GoogleOpenIDAuthMetadataPlugin(config) - elif AuthProvider(config.get(ConfigOptions.AUTH_PROVIDER)) == AuthProvider.OAUTH: + elif AuthProvider(config.get(opt.AUTH_PROVIDER)) == AuthProvider.OAUTH: return OAuthMetadataPlugin(config) else: raise RuntimeError( @@ -66,23 +67,20 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if ( - config.exists(ConfigOptions.AUTH_TOKEN) - and config.get(ConfigOptions.AUTH_TOKEN) != "" - ): - self._static_token = config.get(ConfigOptions.AUTH_TOKEN) + if config.exists(opt.AUTH_TOKEN) and config.get(opt.AUTH_TOKEN) != "": + self._static_token = config.get(opt.AUTH_TOKEN) self._refresh_token(config) elif ( - config.exists(ConfigOptions.OAUTH_GRANT_TYPE) - and config.exists(ConfigOptions.OAUTH_CLIENT_ID) - and config.exists(ConfigOptions.OAUTH_CLIENT_SECRET) - and config.exists(ConfigOptions.OAUTH_AUDIENCE) - and config.exists(ConfigOptions.OAUTH_TOKEN_REQUEST_URL) - and config.get(ConfigOptions.OAUTH_GRANT_TYPE) != "" - and config.get(ConfigOptions.OAUTH_CLIENT_ID) != "" - and config.get(ConfigOptions.OAUTH_CLIENT_SECRET) != "" - and config.get(ConfigOptions.OAUTH_AUDIENCE) != "" - and config.get(ConfigOptions.OAUTH_TOKEN_REQUEST_URL) != "" + config.exists(opt.OAUTH_GRANT_TYPE) + and config.exists(opt.OAUTH_CLIENT_ID) + and config.exists(opt.OAUTH_CLIENT_SECRET) + and config.exists(opt.OAUTH_AUDIENCE) + and config.exists(opt.OAUTH_TOKEN_REQUEST_URL) + and config.get(opt.OAUTH_GRANT_TYPE) != "" + and config.get(opt.OAUTH_CLIENT_ID) != "" + and config.get(opt.OAUTH_CLIENT_SECRET) != "" + and config.get(opt.OAUTH_AUDIENCE) != "" + and config.get(opt.OAUTH_TOKEN_REQUEST_URL) != "" ): self._refresh_token(config) else: @@ -111,14 +109,14 @@ def _refresh_token(self, config: Config): headers_token = {"content-type": "application/json"} data_token = { - "grant_type": config.get(ConfigOptions.OAUTH_GRANT_TYPE), - "client_id": config.get(ConfigOptions.OAUTH_CLIENT_ID), - "client_secret": config.get(ConfigOptions.OAUTH_CLIENT_SECRET), - "audience": config.get(ConfigOptions.OAUTH_AUDIENCE), + "grant_type": config.get(opt.OAUTH_GRANT_TYPE), + "client_id": config.get(opt.OAUTH_CLIENT_ID), + "client_secret": config.get(opt.OAUTH_CLIENT_SECRET), + "audience": config.get(opt.OAUTH_AUDIENCE), } data_token = json.dumps(data_token) response_token = requests.post( - config.get(ConfigOptions.OAUTH_TOKEN_REQUEST_URL), + config.get(opt.OAUTH_TOKEN_REQUEST_URL), headers=headers_token, data=data_token, ) @@ -170,11 +168,8 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if ( - config.exists(ConfigOptions.AUTH_TOKEN) - and config.get(ConfigOptions.AUTH_TOKEN) != "" - ): - self._static_token = config.get(ConfigOptions.AUTH_TOKEN) + if config.exists(opt.AUTH_TOKEN) and config.get(opt.AUTH_TOKEN) != "": + self._static_token = config.get(opt.AUTH_TOKEN) self._request = requests.Request() self._refresh_token() diff --git a/sdk/python/feast/job_service.py b/sdk/python/feast/job_service.py index 6518db9acc..6742ff1493 100644 --- a/sdk/python/feast/job_service.py +++ b/sdk/python/feast/job_service.py @@ -10,7 +10,7 @@ import grpc import feast -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt from feast.core import JobService_pb2_grpc from feast.core.JobService_pb2 import ( CancelJobResponse, @@ -127,9 +127,7 @@ def StartStreamToOnlineIngestionJob( request.table_name, request.project ) - if self.client._config.getboolean( - ConfigOptions.JOB_SERVICE_ENABLE_CONTROL_LOOP - ): + if self.client._config.getboolean(opt.JOB_SERVICE_ENABLE_CONTROL_LOOP): # If the control loop is enabled, return existing stream ingestion job id instead of starting a new one params = get_stream_to_online_ingestion_params( self.client, request.project, feature_table, [] @@ -214,7 +212,7 @@ def start_job_service() -> None: client = feast.Client() - if client._config.getboolean(ConfigOptions.JOB_SERVICE_ENABLE_CONTROL_LOOP): + if client._config.getboolean(opt.JOB_SERVICE_ENABLE_CONTROL_LOOP): # Start the control loop thread only if it's enabled from configs thread = threading.Thread(target=start_control_loop, daemon=True) thread.start() diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 37f935b415..4e783e90bc 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, List, Union from feast.config import Config -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt from feast.data_source import BigQuerySource, DataSource, FileSource, KafkaSource from feast.feature_table import FeatureTable from feast.pyspark.abc import ( @@ -26,8 +26,7 @@ def _standalone_launcher(config: Config) -> JobLauncher: from feast.pyspark.launchers import standalone return standalone.StandaloneClusterLauncher( - config.get(ConfigOptions.SPARK_STANDALONE_MASTER), - config.get(ConfigOptions.SPARK_HOME), + config.get(opt.SPARK_STANDALONE_MASTER), config.get(opt.SPARK_HOME), ) @@ -35,10 +34,10 @@ def _dataproc_launcher(config: Config) -> JobLauncher: from feast.pyspark.launchers import gcloud return gcloud.DataprocClusterLauncher( - config.get(ConfigOptions.DATAPROC_CLUSTER_NAME), - config.get(ConfigOptions.SPARK_STAGING_LOCATION), - config.get(ConfigOptions.DATAPROC_REGION), - config.get(ConfigOptions.DATAPROC_PROJECT), + config.get(opt.DATAPROC_CLUSTER_NAME), + config.get(opt.SPARK_STAGING_LOCATION), + config.get(opt.DATAPROC_REGION), + config.get(opt.DATAPROC_PROJECT), ) @@ -50,13 +49,11 @@ def _get_optional(option): return config.get(option) return aws.EmrClusterLauncher( - region=config.get(ConfigOptions.EMR_REGION), - existing_cluster_id=_get_optional(ConfigOptions.EMR_CLUSTER_ID), - new_cluster_template_path=_get_optional( - ConfigOptions.EMR_CLUSTER_TEMPLATE_PATH - ), - staging_location=config.get(ConfigOptions.SPARK_STAGING_LOCATION), - emr_log_location=config.get(ConfigOptions.EMR_LOG_LOCATION), + region=config.get(opt.EMR_REGION), + existing_cluster_id=_get_optional(opt.EMR_CLUSTER_ID), + new_cluster_template_path=_get_optional(opt.EMR_CLUSTER_TEMPLATE_PATH), + staging_location=config.get(opt.SPARK_STAGING_LOCATION), + emr_log_location=config.get(opt.EMR_LOG_LOCATION), ) @@ -68,7 +65,7 @@ def _get_optional(option): def resolve_launcher(config: Config) -> JobLauncher: - return _launchers[config.get(ConfigOptions.SPARK_LAUNCHER)](config) + return _launchers[config.get(opt.SPARK_LAUNCHER)](config) def _source_to_argument(source: DataSource): @@ -223,24 +220,24 @@ def start_offline_to_online_ingestion( return launcher.offline_to_online_ingestion( BatchIngestionJobParameters( - jar=client._config.get(ConfigOptions.SPARK_INGESTION_JAR), + jar=client._config.get(opt.SPARK_INGESTION_JAR), source=_source_to_argument(feature_table.batch_source), feature_table=_feature_table_to_argument(client, project, feature_table), start=start, end=end, - redis_host=client._config.get(ConfigOptions.REDIS_HOST), - redis_port=client._config.getint(ConfigOptions.REDIS_PORT), - redis_ssl=client._config.getboolean(ConfigOptions.REDIS_SSL), + redis_host=client._config.get(opt.REDIS_HOST), + redis_port=client._config.getint(opt.REDIS_PORT), + redis_ssl=client._config.getboolean(opt.REDIS_SSL), statsd_host=( - client._config.getboolean(ConfigOptions.STATSD_ENABLED) - and client._config.get(ConfigOptions.STATSD_HOST) + client._config.getboolean(opt.STATSD_ENABLED) + and client._config.get(opt.STATSD_HOST) ), statsd_port=( - client._config.getboolean(ConfigOptions.STATSD_ENABLED) - and client._config.getint(ConfigOptions.STATSD_PORT) + client._config.getboolean(opt.STATSD_ENABLED) + and client._config.getint(opt.STATSD_PORT) ), - deadletter_path=client._config.get(ConfigOptions.DEADLETTER_PATH), - stencil_url=client._config.get(ConfigOptions.STENCIL_URL), + deadletter_path=client._config.get(opt.DEADLETTER_PATH), + stencil_url=client._config.get(opt.STENCIL_URL), ) ) @@ -249,19 +246,19 @@ def get_stream_to_online_ingestion_params( client: "Client", project: str, feature_table: FeatureTable, extra_jars: List[str] ) -> StreamIngestionJobParameters: return StreamIngestionJobParameters( - jar=client._config.get(ConfigOptions.SPARK_INGESTION_JAR), + jar=client._config.get(opt.SPARK_INGESTION_JAR), extra_jars=extra_jars, source=_source_to_argument(feature_table.stream_source), feature_table=_feature_table_to_argument(client, project, feature_table), - redis_host=client._config.get(ConfigOptions.REDIS_HOST), - redis_port=client._config.getint(ConfigOptions.REDIS_PORT), - redis_ssl=client._config.getboolean(ConfigOptions.REDIS_SSL), - statsd_host=client._config.getboolean(ConfigOptions.STATSD_ENABLED) - and client._config.get(ConfigOptions.STATSD_HOST), - statsd_port=client._config.getboolean(ConfigOptions.STATSD_ENABLED) - and client._config.getint(ConfigOptions.STATSD_PORT), - deadletter_path=client._config.get(ConfigOptions.DEADLETTER_PATH), - stencil_url=client._config.get(ConfigOptions.STENCIL_URL), + redis_host=client._config.get(opt.REDIS_HOST), + redis_port=client._config.getint(opt.REDIS_PORT), + redis_ssl=client._config.getboolean(opt.REDIS_SSL), + statsd_host=client._config.getboolean(opt.STATSD_ENABLED) + and client._config.get(opt.STATSD_HOST), + statsd_port=client._config.getboolean(opt.STATSD_ENABLED) + and client._config.getint(opt.STATSD_PORT), + deadletter_path=client._config.get(opt.DEADLETTER_PATH), + stencil_url=client._config.get(opt.STENCIL_URL), ) diff --git a/sdk/python/feast/wait.py b/sdk/python/feast/wait.py index 83dcb7084c..4eee21c0d3 100644 --- a/sdk/python/feast/wait.py +++ b/sdk/python/feast/wait.py @@ -15,14 +15,14 @@ import time from typing import Any, Callable, Optional, Tuple -from feast.constants import ConfigOptions +from feast.constants import ConfigOptions as opt def wait_retry_backoff( retry_fn: Callable[[], Tuple[Any, bool]], timeout_secs: int = 0, timeout_msg: Optional[str] = "Timeout while waiting for retry_fn() to return True", - max_interval_secs: int = int(ConfigOptions().MAX_WAIT_INTERVAL), + max_interval_secs: int = int(opt().MAX_WAIT_INTERVAL), ) -> Any: """ Repeatedly try calling given retry_fn until it returns a True boolean success flag. From d310f171bbb0132b8136e9b828d1fec3b941fb2b Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 19:53:48 +0800 Subject: [PATCH 06/12] Address comments Signed-off-by: Terence --- sdk/python/feast/config.py | 33 ++++++++++++++++++--------------- sdk/python/feast/constants.py | 29 +++++++++++++++-------------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/sdk/python/feast/config.py b/sdk/python/feast/config.py index edcdd6cd66..a3e3f1bd0b 100644 --- a/sdk/python/feast/config.py +++ b/sdk/python/feast/config.py @@ -19,6 +19,13 @@ from os.path import expanduser, join from typing import Dict, Optional +from feast.constants import ( + CONFIG_FEAST_ENV_VAR_PREFIX, + CONFIG_FILE_DEFAULT_DIRECTORY, + CONFIG_FILE_NAME, + CONFIG_FILE_SECTION, + FEAST_CONFIG_FILE_ENV, +) from feast.constants import ConfigOptions as opt _logger = logging.getLogger(__name__) @@ -48,8 +55,8 @@ def _init_config(path: str): config.read(path) # Store all configuration in a single section - if not config.has_section(opt().CONFIG_FILE_SECTION): - config.add_section(opt().CONFIG_FILE_SECTION) + if not config.has_section(CONFIG_FILE_SECTION): + config.add_section(CONFIG_FILE_SECTION) # Save the current configuration config.write(open(path, "w")) @@ -65,10 +72,8 @@ def _get_feast_env_vars(): """ feast_env_vars = {} for key in os.environ.keys(): - if key.upper().startswith(opt().CONFIG_FEAST_ENV_VAR_PREFIX): - feast_env_vars[key[len(opt().CONFIG_FEAST_ENV_VAR_PREFIX) :]] = os.environ[ - key - ] + if key.upper().startswith(CONFIG_FEAST_ENV_VAR_PREFIX): + feast_env_vars[key[len(CONFIG_FEAST_ENV_VAR_PREFIX) :]] = os.environ[key] return feast_env_vars @@ -99,10 +104,8 @@ def __init__( if not path: path = join( expanduser("~"), - os.environ.get( - opt().FEAST_CONFIG_FILE_ENV, opt().CONFIG_FILE_DEFAULT_DIRECTORY, - ), - opt().CONFIG_FILE_NAME, + os.environ.get(FEAST_CONFIG_FILE_ENV, CONFIG_FILE_DEFAULT_DIRECTORY,), + CONFIG_FILE_NAME, ) config = _init_config(path) @@ -125,7 +128,7 @@ def get(self, option): """ return self._config.get( - opt().CONFIG_FILE_SECTION, + CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -141,7 +144,7 @@ def getboolean(self, option): """ return self._config.getboolean( - opt().CONFIG_FILE_SECTION, + CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -157,7 +160,7 @@ def getint(self, option): """ return self._config.getint( - opt().CONFIG_FILE_SECTION, + CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -173,7 +176,7 @@ def getfloat(self, option): """ return self._config.getfloat( - opt().CONFIG_FILE_SECTION, + CONFIG_FILE_SECTION, option, vars={**_get_feast_env_vars(), **self._options}, ) @@ -185,7 +188,7 @@ def set(self, option, value): option: Option name to use as key value: Value to store under option """ - self._config.set(opt().CONFIG_FILE_SECTION, option, value=str(value)) + self._config.set(CONFIG_FILE_SECTION, option, value=str(value)) def exists(self, option): """ diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 4c0931aef9..aeb0d5d3dc 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -50,26 +50,27 @@ def __new__(cls, name, bases, attrs): return super().__new__(cls, name, bases, attrs) -class ConfigOptions(metaclass=ConfigMeta): - """ Feast Configuration Options """ +#: Default datetime column name for point-in-time join +DATETIME_COLUMN: str = "datetime" + +#: Environmental variable to specify Feast configuration file location +FEAST_CONFIG_FILE_ENV: str = "FEAST_CONFIG" - #: Default datetime column name for point-in-time join - DATETIME_COLUMN: str = "datetime" +#: Default prefix to Feast environmental variables +CONFIG_FEAST_ENV_VAR_PREFIX: str = "FEAST_" - #: Environmental variable to specify Feast configuration file location - FEAST_CONFIG_FILE_ENV: str = "FEAST_CONFIG" +#: Default directory to Feast configuration file +CONFIG_FILE_DEFAULT_DIRECTORY: str = ".feast" - #: Default prefix to Feast environmental variables - CONFIG_FEAST_ENV_VAR_PREFIX: str = "FEAST_" +#: Default Feast configuration file name +CONFIG_FILE_NAME: str = "config" - #: Default directory to Feast configuration file - CONFIG_FILE_DEFAULT_DIRECTORY: str = ".feast" +#: Default section in Feast configuration file to specify options +CONFIG_FILE_SECTION: str = "general" - #: Default Feast configuration file name - CONFIG_FILE_NAME: str = "config" - #: Default section in Feast configuration file to specify options - CONFIG_FILE_SECTION: str = "general" +class ConfigOptions(metaclass=ConfigMeta): + """ Feast Configuration Options """ #: Feast project namespace to use PROJECT: str = "default" From 469f38ce12878a2026562f42a8df3a3820c73674 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 21:39:47 +0800 Subject: [PATCH 07/12] Test none default Signed-off-by: Terence --- sdk/python/feast/constants.py | 20 +++++++++++++------- sdk/python/feast/grpc/auth.py | 9 ++------- sdk/python/tests/grpc/test_auth.py | 3 ++- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index aeb0d5d3dc..710048b04c 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -14,6 +14,7 @@ # limitations under the License. # from enum import Enum +from typing import Optional class AuthProvider(Enum): @@ -40,10 +41,11 @@ class ConfigMeta(type): """ def __new__(cls, name, bases, attrs): + NoneType = type(None) keys = [ k for k, v in attrs.items() - if not k.startswith("_") and isinstance(v, (str, int, float)) + if not k.startswith("_") and isinstance(v, (str, int, float, NoneType)) ] attrs["__config_keys__"] = keys attrs.update({k: Option(k, attrs[k]) for k in keys}) @@ -194,21 +196,25 @@ class ConfigOptions(metaclass=ConfigMeta): EMR_LOG_LOCATION: str = "" #: Oauth grant type - OAUTH_GRANT_TYPE: str = "" + OAUTH_GRANT_TYPE: Optional[str] = None #: Oauth client ID - OAUTH_CLIENT_ID: str = "" + OAUTH_CLIENT_ID: Optional[str] = None #: Oauth client secret - OAUTH_CLIENT_SECRET: str = "" + OAUTH_CLIENT_SECRET: Optional[str] = None #: Oauth intended recipients - OAUTH_AUDIENCE: str = "" + OAUTH_AUDIENCE: Optional[str] = None #: Oauth token request url - OAUTH_TOKEN_REQUEST_URL: str = "" + OAUTH_TOKEN_REQUEST_URL: Optional[str] = None MAX_WAIT_INTERVAL: str = "60" def defaults(self): - return {k: getattr(self, k) for k in self.__config_keys__} + return { + k: getattr(self, k) + for k in self.__config_keys__ + if getattr(self, k) is not None + } diff --git a/sdk/python/feast/grpc/auth.py b/sdk/python/feast/grpc/auth.py index 6a14b8ae89..8614015f45 100644 --- a/sdk/python/feast/grpc/auth.py +++ b/sdk/python/feast/grpc/auth.py @@ -67,7 +67,7 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(opt.AUTH_TOKEN) and config.get(opt.AUTH_TOKEN) != "": + if config.exists(opt.AUTH_TOKEN): self._static_token = config.get(opt.AUTH_TOKEN) self._refresh_token(config) elif ( @@ -76,11 +76,6 @@ def __init__(self, config: Config): and config.exists(opt.OAUTH_CLIENT_SECRET) and config.exists(opt.OAUTH_AUDIENCE) and config.exists(opt.OAUTH_TOKEN_REQUEST_URL) - and config.get(opt.OAUTH_GRANT_TYPE) != "" - and config.get(opt.OAUTH_CLIENT_ID) != "" - and config.get(opt.OAUTH_CLIENT_SECRET) != "" - and config.get(opt.OAUTH_AUDIENCE) != "" - and config.get(opt.OAUTH_TOKEN_REQUEST_URL) != "" ): self._refresh_token(config) else: @@ -168,7 +163,7 @@ def __init__(self, config: Config): self._token = None # If provided, set a static token - if config.exists(opt.AUTH_TOKEN) and config.get(opt.AUTH_TOKEN) != "": + if config.exists(opt.AUTH_TOKEN): self._static_token = config.get(opt.AUTH_TOKEN) self._request = requests.Request() diff --git a/sdk/python/tests/grpc/test_auth.py b/sdk/python/tests/grpc/test_auth.py index 7f023aabcf..f5b9204f82 100644 --- a/sdk/python/tests/grpc/test_auth.py +++ b/sdk/python/tests/grpc/test_auth.py @@ -14,6 +14,7 @@ # limitations under the License. import json +from configparser import NoOptionError from http import HTTPStatus from unittest.mock import call, patch @@ -141,7 +142,7 @@ def test_get_auth_metadata_plugin_oauth_should_raise_when_response_is_not_200( def test_get_auth_metadata_plugin_oauth_should_raise_when_config_is_incorrect( config_with_missing_variable, ): - with raises(RuntimeError): + with raises(NoOptionError): get_auth_metadata_plugin(config_with_missing_variable) From 36bc636753792e8840a19e0f6eed88eb8c954e98 Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 21:59:29 +0800 Subject: [PATCH 08/12] Set default as none Signed-off-by: Terence --- sdk/python/feast/client.py | 5 +---- sdk/python/feast/constants.py | 28 ++++++++++++++-------------- sdk/python/tests/grpc/test_auth.py | 2 +- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index 9d90cea9c5..231230a155 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -183,10 +183,7 @@ def _serving_service(self): @property def _use_job_service(self) -> bool: - return ( - self._config.exists(opt.JOB_SERVICE_URL) - and self._config.get(opt.JOB_SERVICE_URL) != "" - ) + return self._config.exists(opt.JOB_SERVICE_URL) @property def _job_service(self): diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 710048b04c..a07318608b 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -87,7 +87,7 @@ class ConfigOptions(metaclass=ConfigMeta): ENABLE_AUTH: str = "False" #: Auth token for user authentication to Feast - AUTH_TOKEN: str = "" + AUTH_TOKEN: Optional[str] = None #: Path to certificate(s) to secure connection to Feast Core CORE_SERVER_SSL_CERT: str = "" @@ -102,7 +102,7 @@ class ConfigOptions(metaclass=ConfigMeta): SERVING_SERVER_SSL_CERT: str = "" #: Default Feast Job Service URL - JOB_SERVICE_URL: str = "" + JOB_SERVICE_URL: Optional[str] = None #: Enable or disable TLS/SSL to Feast Job Service JOB_SERVICE_ENABLE_SSL: str = "False" @@ -132,7 +132,7 @@ class ConfigOptions(metaclass=ConfigMeta): SPARK_LAUNCHER: str = "dataproc" # standalone, dataproc, emr #: Feast Spark Job ingestion jobs staging location - SPARK_STAGING_LOCATION: str = "" + SPARK_STAGING_LOCATION: Optional[str] = None #: Feast Spark Job ingestion jar file SPARK_INGESTION_JAR: str = "https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar" @@ -141,22 +141,22 @@ class ConfigOptions(metaclass=ConfigMeta): SPARK_STANDALONE_MASTER: str = "local[*]" #: Directory where Spark is installed - SPARK_HOME: str = "" + SPARK_HOME: Optional[str] = None #: Dataproc cluster to run Feast Spark Jobs in - DATAPROC_CLUSTER_NAME: str = "" + DATAPROC_CLUSTER_NAME: Optional[str] = None #: Project of Dataproc cluster - DATAPROC_PROJECT: str = "" + DATAPROC_PROJECT: Optional[str] = None #: Region of Dataproc cluster - DATAPROC_REGION: str = "" + DATAPROC_REGION: Optional[str] = None #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" #: File location of historical retrieval features - HISTORICAL_FEATURE_OUTPUT_LOCATION: str = "" + HISTORICAL_FEATURE_OUTPUT_LOCATION: Optional[str] = None #: Default Redis host REDIS_HOST: str = "localhost" @@ -171,10 +171,10 @@ class ConfigOptions(metaclass=ConfigMeta): STATSD_ENABLED: str = "False" #: Default StatsD port - STATSD_HOST: str = "" + STATSD_HOST: Optional[str] = None #: Default StatsD port - STATSD_PORT: str = "" + STATSD_PORT: Optional[str] = None #: IngestionJob DeadLetter Destination DEADLETTER_PATH: str = "" @@ -184,16 +184,16 @@ class ConfigOptions(metaclass=ConfigMeta): STENCIL_URL: str = "" #: EMR cluster to run Feast Spark Jobs in - EMR_CLUSTER_ID: str = "" + EMR_CLUSTER_ID: Optional[str] = None #: Region of EMR cluster - EMR_REGION: str = "" + EMR_REGION: Optional[str] = None #: Template path of EMR cluster - EMR_CLUSTER_TEMPLATE_PATH: str = "" + EMR_CLUSTER_TEMPLATE_PATH: Optional[str] = None #: Log path of EMR cluster - EMR_LOG_LOCATION: str = "" + EMR_LOG_LOCATION: Optional[str] = None #: Oauth grant type OAUTH_GRANT_TYPE: Optional[str] = None diff --git a/sdk/python/tests/grpc/test_auth.py b/sdk/python/tests/grpc/test_auth.py index f5b9204f82..29f781cbed 100644 --- a/sdk/python/tests/grpc/test_auth.py +++ b/sdk/python/tests/grpc/test_auth.py @@ -142,7 +142,7 @@ def test_get_auth_metadata_plugin_oauth_should_raise_when_response_is_not_200( def test_get_auth_metadata_plugin_oauth_should_raise_when_config_is_incorrect( config_with_missing_variable, ): - with raises(NoOptionError): + with raises((RuntimeError, NoOptionError)): get_auth_metadata_plugin(config_with_missing_variable) From 889c0cd585dc7fa7ef7e8ca2fd877615989161df Mon Sep 17 00:00:00 2001 From: Terence Date: Wed, 18 Nov 2020 22:42:16 +0800 Subject: [PATCH 09/12] Update docs Signed-off-by: Terence --- sdk/python/feast/constants.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index a07318608b..303c2627de 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -86,7 +86,7 @@ class ConfigOptions(metaclass=ConfigMeta): #: Enable user authentication to Feast Core ENABLE_AUTH: str = "False" - #: Auth token for user authentication to Feast + #: JWT Auth token for user authentication to Feast AUTH_TOKEN: Optional[str] = None #: Path to certificate(s) to secure connection to Feast Core @@ -116,7 +116,7 @@ class ConfigOptions(metaclass=ConfigMeta): #: Default connection timeout to Feast Serving, Feast Core, and Feast Job Service (in seconds) GRPC_CONNECTION_TIMEOUT: str = "10" - #: Default gRPC connection timeout when sending an ApplyFeatureSet command to Feast Core (in seconds) + #: Default gRPC connection timeout when sending an ApplyFeatureTable command to Feast Core (in seconds) GRPC_CONNECTION_TIMEOUT_APPLY: str = "600" #: Default timeout when running batch ingestion @@ -126,12 +126,16 @@ class ConfigOptions(metaclass=ConfigMeta): BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600" #: Authentication Provider - Google OpenID/OAuth + #: + #: Options: "google" / "oauth" AUTH_PROVIDER: str = "google" #: Spark Job launcher SPARK_LAUNCHER: str = "dataproc" # standalone, dataproc, emr #: Feast Spark Job ingestion jobs staging location + #: + #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ SPARK_STAGING_LOCATION: Optional[str] = None #: Feast Spark Job ingestion jar file @@ -177,6 +181,8 @@ class ConfigOptions(metaclass=ConfigMeta): STATSD_PORT: Optional[str] = None #: IngestionJob DeadLetter Destination + #: + #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ DEADLETTER_PATH: str = "" #: ProtoRegistry Address (currently only Stencil Server is supported as registry) From 0cca6f8b8c9068de17603cb1f21872284b9a3ff8 Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 19 Nov 2020 10:23:46 +0800 Subject: [PATCH 10/12] Cleanup spark launcher constant Signed-off-by: Terence --- sdk/python/feast/constants.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 303c2627de..21c8300749 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -41,11 +41,8 @@ class ConfigMeta(type): """ def __new__(cls, name, bases, attrs): - NoneType = type(None) keys = [ - k - for k, v in attrs.items() - if not k.startswith("_") and isinstance(v, (str, int, float, NoneType)) + k for k, v in attrs.items() if not k.startswith("_") and not callable(v) ] attrs["__config_keys__"] = keys attrs.update({k: Option(k, attrs[k]) for k in keys}) @@ -131,7 +128,9 @@ class ConfigOptions(metaclass=ConfigMeta): AUTH_PROVIDER: str = "google" #: Spark Job launcher - SPARK_LAUNCHER: str = "dataproc" # standalone, dataproc, emr + #: + #: Options: "standalone", "dataproc", "emr" + SPARK_LAUNCHER: Optional[str] = None #: Feast Spark Job ingestion jobs staging location #: From cf063087a948e55765bfdbdba340a6220f848675 Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 19 Nov 2020 11:03:35 +0800 Subject: [PATCH 11/12] Update constants and clarify docstring Signed-off-by: Terence --- sdk/python/feast/constants.py | 9 ++++++--- sdk/python/feast/wait.py | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 21c8300749..363bcb2a60 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -37,7 +37,7 @@ def __get__(self, instance, owner): class ConfigMeta(type): """ Class factory which customizes ConfigOptions class instantiation. - Specifically, setting its name to lowercase of capitalized variable. + Specifically, setting configuration option's name to lowercase of capitalized variable. """ def __new__(cls, name, bases, attrs): @@ -67,6 +67,9 @@ def __new__(cls, name, bases, attrs): #: Default section in Feast configuration file to specify options CONFIG_FILE_SECTION: str = "general" +# Maximum interval(secs) to wait between retries for retry function +MAX_WAIT_INTERVAL: str = "60" + class ConfigOptions(metaclass=ConfigMeta): """ Feast Configuration Options """ @@ -138,6 +141,8 @@ class ConfigOptions(metaclass=ConfigMeta): SPARK_STAGING_LOCATION: Optional[str] = None #: Feast Spark Job ingestion jar file + #: + #: Eg. gs://some-bucket/some-jarfile, s3://some-bucket/some-jarfile, file://data/some-jarfile SPARK_INGESTION_JAR: str = "https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar" #: Spark resource manager master url @@ -215,8 +220,6 @@ class ConfigOptions(metaclass=ConfigMeta): #: Oauth token request url OAUTH_TOKEN_REQUEST_URL: Optional[str] = None - MAX_WAIT_INTERVAL: str = "60" - def defaults(self): return { k: getattr(self, k) diff --git a/sdk/python/feast/wait.py b/sdk/python/feast/wait.py index 4eee21c0d3..daa0b0b7dc 100644 --- a/sdk/python/feast/wait.py +++ b/sdk/python/feast/wait.py @@ -15,14 +15,14 @@ import time from typing import Any, Callable, Optional, Tuple -from feast.constants import ConfigOptions as opt +from feast.constants import MAX_WAIT_INTERVAL def wait_retry_backoff( retry_fn: Callable[[], Tuple[Any, bool]], timeout_secs: int = 0, timeout_msg: Optional[str] = "Timeout while waiting for retry_fn() to return True", - max_interval_secs: int = int(opt().MAX_WAIT_INTERVAL), + max_interval_secs: int = int(MAX_WAIT_INTERVAL), ) -> Any: """ Repeatedly try calling given retry_fn until it returns a True boolean success flag. From 4d7be0ea5576fd98c82ef4c3a6d12388a0848da5 Mon Sep 17 00:00:00 2001 From: Terence Date: Thu, 19 Nov 2020 11:54:03 +0800 Subject: [PATCH 12/12] Polish docstrings Signed-off-by: Terence --- sdk/python/feast/constants.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 363bcb2a60..1ed416a34b 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -130,19 +130,19 @@ class ConfigOptions(metaclass=ConfigMeta): #: Options: "google" / "oauth" AUTH_PROVIDER: str = "google" - #: Spark Job launcher + #: Spark Job launcher. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: #: Options: "standalone", "dataproc", "emr" SPARK_LAUNCHER: Optional[str] = None - #: Feast Spark Job ingestion jobs staging location + #: Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ SPARK_STAGING_LOCATION: Optional[str] = None - #: Feast Spark Job ingestion jar file + #: Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: - #: Eg. gs://some-bucket/some-jarfile, s3://some-bucket/some-jarfile, file://data/some-jarfile + #: Eg. "dataproc" (http and gs), "emr" (http and s3), "standalone" (http and file) SPARK_INGESTION_JAR: str = "https://storage.googleapis.com/feast-jobs/spark/ingestion/feast-ingestion-spark-develop.jar" #: Spark resource manager master url @@ -184,7 +184,7 @@ class ConfigOptions(metaclass=ConfigMeta): #: Default StatsD port STATSD_PORT: Optional[str] = None - #: IngestionJob DeadLetter Destination + #: Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER. #: #: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/ DEADLETTER_PATH: str = ""