Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataproc and Standalone Cluster Spark Job launcher #1022

Merged
merged 10 commits into from
Oct 13, 2020

Conversation

khorshuheng
Copy link
Collaborator

What this PR does / why we need it:
This PR depends on #1021. Two types of Spark Job launchers have been included: Dataproc cluster and standalone cluster. Launchers for Yarn cluster, Kubernetes, Amazon EMR will be implemented at a later stage, in separate PRs.

Users are not expected to use these launchers directly. Instead, Feask SDK will use this launchers to submit batch feature retrieval job. This will be implemented in another PR.

Which issue(s) this PR fixes:

Fixes #

Does this PR introduce a user-facing change?:

NONE

@khorshuheng khorshuheng changed the title (WIP) Dataproc and Standalone Cluster Spark Job launcher Dataproc and Standalone Cluster Spark Job launcher Oct 9, 2020
@khorshuheng khorshuheng added the kind/feature New feature or request label Oct 9, 2020
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Configuration for the retrieval job, in json format. Sample configuration as follows:
Configuration for the retrieval job, in json format.
entity_df (DataFrame):
Optional. If provided, the entity will be used directly and conf["entity"] will be ignored.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does entity will be used directly mean. Not clear why entity_df can be optional.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly to support the scenario below:
client.get_historical_features_df(["rating"], entity_df)

Where a user passed in a Spark dataframe / Pandas dataframe directly as entity, instead of using FileSource , BigQuerySource.

If we mandate user to always use FileSource or BigQuerySource, then we don't need this additional argument.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand why its part of this method. Doesnt the entity_df get uploaded prior to calling this method, meaning it should be configured through the conf?

spark = SparkSession.builder.getOrCreate()
parser = argparse.ArgumentParser(description="Retrieval job arguments")
parser.add_argument(
"config_json", type=str, help="Configuration in json string format"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a json blob right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be a json string, similar approach to the Spark jobs written by @pyalex . Originally i chose to use an actual config file, but it makes the job launcher much more complicated due to uploading and retrieval of configuration file.

Copy link
Collaborator

@pyalex pyalex Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of ingestion job there're several json argument and I suggest you can split your config as well. It's just easier to work with job that has separated parameters, that one big json.
So in ingestion-job there are
--feature-table - json with project, name, entities, features
--source separate source to be able to run custom source
I assume in historical retrieval you would also need
--entity-source

Here's example of arguments for ingestion job

--source {"kafka":{"bootstrapServers":"10.200.219.238:6668","topic":"supply-aggregator-sauron","classpath":"com.gojek.esb.aggregate.supply.AggregatedSupplyMessage","mapping":{},"timestampColumn":"event_timestamp"}}
--feature-table {"name":"fs","project":"default","entities":[{"name":"customer","type":"STRING"}],"features":[{"name":"feature1","type":"INT32"}]}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that our jobs arguments will be completely the same. Historical job receives several feature tables for example. But at least we can try to keep it similar

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--source separate source to be able to run custom source

@pyalex How would the custom source applies in the context of historical feature retrieval? Is it something like a user input to override the Feature Table source registered on Feast Core?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean that the source of the feature tables should be it's own parameter (--source), rather than as a field under --feature-table ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to throw in another idea, for feature_table and entity we could also pass the protobuf in JSON format. You'd still to have to manually parse it, but on the caller side it would be easier as we could replace custom serialization code with e.g. json.dumps(MessageToDict(feature_table.to_proto()))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@khorshuheng I just provided example of ingestion job args. It may be not very relevant to Historical directly. I'm just saying we can try to converge arguments format.

Copy link
Collaborator

@pyalex pyalex Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oavdeev we can move towards protobuf-like format. But it's already can't be 100% compatible. FeatureTable, for example, has entities as strings. Whereas most jobs need type as well. So some massaging would still be required - mostly denormalization

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, didn't realize that. Let's just go with bespoke json then like in the ingestion job

bucket = client.get_bucket(self.staging_bucket)
blob_path = os.path.join(
self.remote_path,
"temp",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why temp?

self.region = region
self.job_client = dataproc_v1.JobControllerClient(
client_options={
"api_endpoint": "{}-dataproc.googleapis.com:443".format(region)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use f-strings please

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@khorshuheng
Copy link
Collaborator Author

/test test-end-to-end

options: Dict[str, str] = {}


class FeatureTableSource(NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we have two sources, nor why created_timestamp is needed for the FeatureTableSource

…es for Feast SDK

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
… FileSource and BQSource

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

class FeatureTableDataframe(NamedTuple):
"""
Feature table dataframe with specification.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here that describes this class. This comment says nearly nothing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand why we need this class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class contains information from both FeatureTable and Source, excluding the location of the feature table. In my original implementation, both timestamp columns and feature table specification are within the same level of configuration. But for the implementation in ingestion job, timestamp columns is in Source, whereas feature table specification is in FeatureTable.

I can see two choices here:

  1. Eliminate FeatureTableDataframe, and pass both Source and FeatureTable into as_of_join and join_entity_to_feature_tables, eventhough the two methods don't exactly need to know the format and path of the Feature Table source.

  2. Move timestamp columns to FeatureTable instead of Source, but that would mean the configuration is different from the Ingestion Job.

Copy link
Member

@woop woop Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class contains information from both FeatureTable and Source, excluding the location of the feature table. In my original implementation, both timestamp columns and feature table specification are within the same level of configuration. But for the implementation in ingestion job, timestamp columns is in Source, whereas feature table specification is in FeatureTable.

Ok, but it's not clear what this class is for. You're talking about your past implementations and implementation details but what problem are you trying to solve. This class is called FeatureTableDataframe, could it be replaced by using a FeatureTable and a DataFrame?

I can see two choices here:

  1. Eliminate FeatureTableDataframe, and pass both Source and FeatureTable into as_of_join and join_entity_to_feature_tables, eventhough the two methods don't exactly need to know the format and path of the Feature Table source.
  2. Move timestamp columns to FeatureTable instead of Source, but that would mean the configuration is different from the Ingestion Job.

You are presenting solutions but I dont know what problem you are trying to solve.

All class creation should be justified. So when I see a class like this I want to know why its essential complexity. Why can't it be a function?

Copy link
Collaborator Author

@khorshuheng khorshuheng Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feature table class doesn't contain timestamp column and created timestamp column. If it does, then yes, I don't need this class and I can easily replace this with a FeatureTable and a Dataframe, which is my preferred solution. The only reason why I haven't done this is because of the previous discussion where we want to use the configuration to be similar to ingestion job.

The problem I am trying to solve here: I want neither as_of_join method nor join_entity_features method to have any argument that contains file format and file path information, so that means I can't pass Source as an argument. But timestamp column information is required. Without creating a new class, that means my function arguments for as of join methods, will need to have list of Feature Table, List or dictionary of event timestamp columns, and the dataframes. The new class acts as a container that store all these attributes together.

Ok, let me just come out with alternative solution that doesn't involve this new class and still keep the existing configuration, and see how it goes.

Source for an entity or feature dataframe.

Attributes:
timestamp_column (str): Column representing the event timestamp.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this event_timestamp_column?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer event_timestamp_column as well, though there are other instances in Feast SDK where timestamp_column is used instead (Datasource, BatchSource, for example).

I am happy to make the change though. Should i change it only for historical_feature_retrieval_job?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets just make the change here please.

timestamp_column (str): Column representing the event timestamp.
created_timestamp_column (str): Column representing the creation timestamp. Required
only if the source corresponds to a feature table.
mapping (Optional[Dict[str, str]]): If present, the source column will be renamed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be field_map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with making this field_map, but currently Oleksi's ingestion job is using mapping instead. Which one should i use then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping seems a bit vague. The type is a map, so its already a mapping. @pyalex

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to field_mapping instead because that's what being used in the protobuf and the other part of the Feast SDK right now.

path: str


class EntityDataframe(NamedTuple):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand the point of this class.

Args:
spark (SparkSession): Spark session.
entity_source (Source): Entity data source.
feature_tables_sources (Source): List of feature tables data sources.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo on source

`max_age` is in seconds, and determines the lower bound of the timestamp of the retrieved feature.
If not specified, this would be unbounded.
Returns:
DataFrame: Join result.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is join result sufficient documentation?

raise NotImplementedError


class StandaloneCluster(JobLauncher):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be StandaloneClusterJobLauncher?

pyspark_script (str): Local file path to the pyspark script for historical feature
retrieval.
entity_source_conf (List[Dict]): Entity data source configuration.
feature_tables_sources_conf (Dict): List of feature tables data sources configurations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where have you documented where feature_tables_sources_conf will be used vs sources found in feature_tables_conf

Copy link
Collaborator Author

@khorshuheng khorshuheng Oct 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sources would not be part of feature_tables_conf , as per the design of configuration found here: https://github.com/feast-dev/feast/blob/master/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala#L74-L79

Though, i understand that the above behaviour is actually inconsistent with the way we currently define our FeatureTable proto.

Should i revamp the configuration (for historical retrieval job) such that Source is part of Feature Table conf? In which case, it would not be necessary to have both feature_tables_conf and feature_tables_sources_conf.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can tweak the format in a separate diff? Either way works, but above everything else i'd def prefer things to be consistent between this and @pyalex ingestion job.

`options` is optional. If present, the options will be used when reading / writing the input / output.
Args:
spark (SparkSession): Spark session.
entity_source (Source): Entity data source.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think "entity data source" is a concept that has been used before in the codebase, maybe worth adding an explanation what it is. Or maybe call it entity_df_source? It is somewhat confusing that "entities" is used interchangeably for the entity objects themselves and the dataframe for point-in-time joins.

"--master",
self.master_url,
"--name",
job_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is job_id externally configurable here? (as opposed to generating one inside JobLauncher)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feast client will be the one generating the job_id, rather than user specified. As to why the job id is generated in Feast Client, rather than JobLauncher:

Prior to launching the job, JobLauncher will be doing some preparatory task, such as uploading Pandas dataframe to GCS / S3 (if user input is a pandas dataframe rather than a uri pointing to the file) , and define the output path based on Feast client configuration. In both cases, we can use job id as part of the GCS / S3 path name, which would be useful for tracking purposes.

That being said, i am fine with moving the job id generation within JobLauncher instead, if there is a good reason as to why it's better to have the job id generation within Job Launcher instead of Feast Client?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I don't feel super strongly either way, but you could say that job id generation is a part of JobLauncher internal logic. That way the cloud-specific implementation would be able to use a specific jobid format for convenience. There might be cloud-specific limitations on job id length / charset that Feast Client is not aware of.

It may come in handy given that we don't plan to store jobs in a central database, they will have to be stored as some attribute or tag in dataproc/EMR job metadata itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this job_id the internal job_id or the external one? They might be different. The latter should be encapsulated, the former is internal to Feast and would be exposed to users. Or are we going to make them one and the same?

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@feast-ci-bot
Copy link
Collaborator

feast-ci-bot commented Oct 13, 2020

@khorshuheng: The following test failed, say /retest to rerun them all:

Test name Commit Details Rerun command
test-end-to-end-batch 9aed5d8a57cb70c957441433fd6e577894ba1818 link /test test-end-to-end-batch

Full PR test history

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here.

…configuration

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: khorshuheng, woop

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>
@oavdeev
Copy link
Collaborator

oavdeev commented Oct 13, 2020

/lgtm

@feast-ci-bot feast-ci-bot merged commit ffaf8c5 into feast-dev:master Oct 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants