Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable materialization destination for view in BigQuerySource #1201

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Directory where Spark is installed
SPARK_HOME: Optional[str] = None

#: The project id where the materialized view of BigQuerySource is going to be created
#: by default, use the same project where view is located
SPARK_BQ_MATERIALIZATION_PROJECT: Optional[str] = None

#: The dataset id where the materialized view of BigQuerySource is going to be created
#: by default, use the same dataset where view is located
SPARK_BQ_MATERIALIZATION_DATASET: Optional[str] = None

#: Dataproc cluster to run Feast Spark Jobs in
DATAPROC_CLUSTER_NAME: Optional[str] = None

Expand Down
15 changes: 14 additions & 1 deletion sdk/python/feast/pyspark/historical_feature_retrieval_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class BigQuerySource(Source):
event_timestamp_column (str): Column representing the event timestamp.
created_timestamp_column (str): Column representing the creation timestamp. Required
only if the source corresponds to a feature table.
materialization (Dict[str, str]): Optional. Destination for materialized view,
e.g. dict(project="...", dataset="...).
"""

def __init__(
Expand All @@ -133,13 +135,15 @@ def __init__(
event_timestamp_column: str,
created_timestamp_column: Optional[str],
field_mapping: Optional[Dict[str, str]],
materialization: Optional[Dict[str, str]] = None,
):
super().__init__(
event_timestamp_column, created_timestamp_column, field_mapping
)
self.project = project
self.dataset = dataset
self.table = table
self.materialization = materialization

@property
def spark_format(self) -> str:
Expand All @@ -151,7 +155,15 @@ def spark_path(self) -> str:

@property
def spark_read_options(self) -> Dict[str, str]:
return {**super().spark_read_options, "viewsEnabled": "true"}
opts = {**super().spark_read_options, "viewsEnabled": "true"}
if self.materialization:
opts.update(
{
"materializationProject": self.materialization["project"],
"materializationDataset": self.materialization["dataset"],
}
)
return opts


def _source_from_dict(dct: Dict) -> Source:
Expand All @@ -174,6 +186,7 @@ def _source_from_dict(dct: Dict) -> Source:
field_mapping=dct["bq"].get("field_mapping", {}),
event_timestamp_column=dct["bq"]["event_timestamp_column"],
created_timestamp_column=dct["bq"].get("created_timestamp_column"),
materialization=dct["bq"].get("materialization"),
)


Expand Down
23 changes: 16 additions & 7 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def resolve_launcher(config: Config) -> JobLauncher:
return _launchers[config.get(opt.SPARK_LAUNCHER)](config)


def _source_to_argument(source: DataSource):
def _source_to_argument(source: DataSource, config: Config):
common_properties = {
"field_mapping": dict(source.field_mapping),
"event_timestamp_column": source.event_timestamp_column,
Expand All @@ -94,6 +94,14 @@ def _source_to_argument(source: DataSource):
properties["project"] = project
properties["dataset"] = dataset
properties["table"] = table
if config.exists(opt.SPARK_BQ_MATERIALIZATION_PROJECT) and config.exists(
opt.SPARK_BQ_MATERIALIZATION_DATASET
):
properties["materialization"] = dict(
project=config.get(opt.SPARK_BQ_MATERIALIZATION_PROJECT),
dataset=config.get(opt.SPARK_BQ_MATERIALIZATION_DATASET),
)

return {"bq": properties}

if isinstance(source, KafkaSource):
Expand Down Expand Up @@ -141,9 +149,9 @@ def start_historical_feature_retrieval_spark_session(
spark_session = SparkSession.builder.getOrCreate()
return retrieve_historical_features(
spark=spark_session,
entity_source_conf=_source_to_argument(entity_source),
entity_source_conf=_source_to_argument(entity_source, client._config),
feature_tables_sources_conf=[
_source_to_argument(feature_table.batch_source)
_source_to_argument(feature_table.batch_source, client._config)
for feature_table in feature_tables
],
feature_tables_conf=[
Expand All @@ -164,14 +172,15 @@ def start_historical_feature_retrieval_job(
launcher = resolve_launcher(client._config)
feature_sources = [
_source_to_argument(
replace_bq_table_with_joined_view(feature_table, entity_source)
replace_bq_table_with_joined_view(feature_table, entity_source),
client._config,
)
for feature_table in feature_tables
]

return launcher.historical_feature_retrieval(
RetrievalJobParameters(
entity_source=_source_to_argument(entity_source),
entity_source=_source_to_argument(entity_source, client._config),
feature_tables_sources=feature_sources,
feature_tables=[
_feature_table_to_argument(client, project, feature_table)
Expand Down Expand Up @@ -224,7 +233,7 @@ def start_offline_to_online_ingestion(
return launcher.offline_to_online_ingestion(
BatchIngestionJobParameters(
jar=client._config.get(opt.SPARK_INGESTION_JAR),
source=_source_to_argument(feature_table.batch_source),
source=_source_to_argument(feature_table.batch_source, client._config),
feature_table=_feature_table_to_argument(client, project, feature_table),
start=start,
end=end,
Expand All @@ -251,7 +260,7 @@ def get_stream_to_online_ingestion_params(
return StreamIngestionJobParameters(
jar=client._config.get(opt.SPARK_INGESTION_JAR),
extra_jars=extra_jars,
source=_source_to_argument(feature_table.stream_source),
source=_source_to_argument(feature_table.stream_source, client._config),
feature_table=_feature_table_to_argument(client, project, feature_table),
redis_host=client._config.get(opt.REDIS_HOST),
redis_port=client._config.getint(opt.REDIS_PORT),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ case class FileSource(
override val datePartitionColumn: Option[String] = None
) extends BatchSource

case class BQMaterializationConfig(project: String, dataset: String)

case class BQSource(
project: String,
dataset: String,
table: String,
override val fieldMapping: Map[String, String],
override val eventTimestampColumn: String,
override val createdTimestampColumn: Option[String] = None,
override val datePartitionColumn: Option[String] = None
override val datePartitionColumn: Option[String] = None,
materialization: Option[BQMaterializationConfig] = None
) extends BatchSource

case class KafkaSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,20 @@ object BigQueryReader {
start: DateTime,
end: DateTime
): DataFrame = {
sqlContext.read
val reader = sqlContext.read
.format("bigquery")
.option("viewsEnabled", "true")

source.materialization match {
case Some(materializationConfig) =>
reader
.option("materializationProject", materializationConfig.project)
.option("materializationDataset", materializationConfig.dataset)

case _ => ()
}

reader
.load(s"${source.project}.${source.dataset}.${source.table}")
.filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis))
.filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis))
Expand Down