-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EMR launcher #1061
EMR launcher #1061
Conversation
/kind feature |
"--entity-df-path", | ||
"-e", | ||
help="Path to entity df in CSV format. It is assumed to have event_timestamp column and a header.", | ||
required=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if it might be better if the users are expected to provide a uri that is recognizable by the Spark Launcher, such as s3:// for EMR, gs:// for Dataproc, and file:// for standalone cluster launchers running locally. That way, we skip the process of reading to Pandas dataframe and convert the file again.
Staged panda dataframe is still a useful method to have though, because we plan to add support to pandas dataframe as input argument for historical feature retrieval method, for Feast Client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it is mostly for convenience/testing for now, to reduce a # of steps that someone need to do to see if historical retrieval works. I wouldn't expect people to normally use local CSV for entity dfs. I'd tweak this interface in later PRs though.
@@ -773,7 +773,7 @@ def _feature_table_from_dict(dct: Dict[str, Any]) -> FeatureTable: | |||
spark = SparkSession.builder.getOrCreate() | |||
args = _get_args() | |||
feature_tables_conf = json.loads(args.feature_tables) | |||
feature_tables_sources_conf = json.loads(args.feature_tables_source) | |||
feature_tables_sources_conf = json.loads(args.feature_tables_sources) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching the typo
self, df: pandas.DataFrame, event_timestamp: str, created_timestamp_column: str | ||
) -> FileSource: | ||
with tempfile.NamedTemporaryFile() as f: | ||
df.to_parquet(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyalex As of Feast 0.7 the dataframe is typically converted to pyarrow / Avro format. I am wondering, if we should make parquet the standard staging format as oppose to pyarrow / Avro? Since Spark doesn't support Avro format out of the box.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of Feast 0.7 the dataframe is typically converted to pyarrow / Avro format
In which instances do you see that? Uploading to BQ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already write parquet in client.ingest
but with DataFormat PR we'll have to support all formats that user may specify as Batch Source format, since client.ingest
writes to existing batch source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyalex In the case of historical feature retrieval, where the user input is Panda dataframe, do we actually want to make the user specify the format of the staged file? Or simply standardized to parquet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we previously agree to only support parquet?
entity_df["event_timestamp"] = pandas.to_datetime(entity_df["event_timestamp"]) | ||
|
||
uploaded_df = client.stage_dataframe( | ||
entity_df, "event_timestamp", "created_timestamp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads up: "created_timestamp" is actually supposed to be optional for the entity, so it's a bug that we need to resolve in another PR.
) -> SparkJob: | ||
return start_offline_to_online_ingestion(feature_table, start, end, self) # type: ignore | ||
|
||
def stage_dataframe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually already have method in the Feast Client that takes dataframe and put it in offline storage. Currently it called ingest
but I guess we'll rename it. Anyway, this shouldn't be a part of spark interop, since it's not related to spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind this is somewhat different from ingest. It is not intended for permanent storage, this is a convenience method "take this dataframe, put it in some temp location where Spark can access it". I agree launcher might not be the best place for it, just gotta be some code that can read staging_location from the config to construct the temp path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's some upload-to-temp-location function - it's probably shouldn't be part of Feast Client API. maybe contrib?
or just keep it internally. What's the user use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the user convenience - if you're getting started with feast and want to run historical retrieval, we can upload your pandas entity dataframe to S3 so you don't have to think about how to upload it and what bucket to use. We'll just put it in staging location for you. Basically trying to remove an extra friction point in onboarding and tutorials. Right now it is only used for CLI historical-retrieval command.
I agree it may not be the best place for it, but at the same time it needs to have access to the config to figure out where to upload the dataframe. So i can't make it completely detached from the client (that has the config object)
return h.hexdigest() | ||
|
||
|
||
def _s3_upload( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be moved to staging client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if it is ok i'd do it in a separate PR though. It is not exactly 1-1 with the current staging client interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I submitted a PR here: https://github.com/feast-dev/feast/pull/1063/files
e33c71b
to
dfccccb
Compare
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: oavdeev, pyalex The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
We agreed with @oavdeev to refactor staging client in separate PR and merge this to unblock further development. |
/lgtm |
What this PR does / why we need it:
This adds a EMR launcher in line with the new launcher interface. It does historical retrieval and offline-to-online retrieval. I'll do stream ingestion next, figured this is large enough.
A few notable bits (aside from just moving things around):
Which issue(s) this PR fixes:
Fixes #
Does this PR introduce a user-facing change?: