Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature transformation support to FeatureView #4277

Open
franciscojavierarceo opened this issue Jun 13, 2024 · 5 comments
Open

Add feature transformation support to FeatureView #4277

franciscojavierarceo opened this issue Jun 13, 2024 · 5 comments
Labels
kind/feature New feature or request

Comments

@franciscojavierarceo
Copy link
Member

Is your feature request related to a problem? Please describe.
FeatureViews should support transformations as well.

Describe the solution you'd like

@batch_feature_view(
    sources=[credit_data_batch],
    entities=[user],
    mode="python",
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):
    return transactions[["user_id", "timestamp", "current_balance"]]

Describe alternatives you've considered
N/A

Additional context
Should behave similar to ODFV and Stream Feature Views and support Python and Pandas.

@franciscojavierarceo franciscojavierarceo added the kind/feature New feature or request label Jun 13, 2024
@tokoko
Copy link
Collaborator

tokoko commented Jul 8, 2024

@franciscojavierarceo Just to make sure we're on the same page, do you think we should have a different BatchFeatureView class for this just like odfvs and stream feature views, right?

Also, I think we should probably implement it w/o scheduled materialization at first, because it's not really obvious to me which feast service could do the scheduling for this (flight server seems most logical or maybe we need to add another service altogether).

@franciscojavierarceo
Copy link
Member Author

I am starting to think having the decorator of feature view type is weird.

Instead we could just do something like:

@transform(
    sources=[credit_data_batch],
    entities=[user],
    mode="python",
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):
    return some_computation(transactions[["user_id", "timestamp", "current_balance"]])

And maybe in the decorator we have a way to map to when the computation is happening?

My view is that we need to provide clarity about when transformations happen (on demand, during a write, or in a stream--i.e., before a write) and I'm not sure the current way does this as obviously. Maybe it does.

@franciscojavierarceo
Copy link
Member Author

This will be solved #4376

@franciscojavierarceo
Copy link
Member Author

Actually this is more general and does not handle the batch transformation use case so keeping this open. The ODFV writes do help for a subset of these items.

@Vishnu-Rangiah
Copy link

Vishnu-Rangiah commented Oct 21, 2024

Chaining transformation into a dag which can be reused across FVs would be nice to have. This functionality would reduce code duplication across the feature repo. Here is an example adapted from tecton's transformation API: https://docs.tecton.ai/docs/defining-features/feature-views/transformations#a-feature-view-that-calls-a-pyspark-transformation-passing-two-pyspark-transformation-outputs

@transformation(mode="big_query")
def last_balance_time(transactions, max_user_transaction):
    return f"""SELECT t.user_id, t.current_balance, last_t.last_transaction_date as timestamp
                      FROM {transactions} t
                      INNER JOIN {max_user_transaction} last_t
                      ON t.user_id = last_t.user_id AND t.timestamp = last_t.last_transaction_date;"""

@transformation(mode="big_query")
def user_last_transaction_time(transactions):
    return f"""SELECT user_id, MAX(timestamp) AS last_transaction_date
                      FROM {transactions}
                      GROUP BY user_id"""

@feature_view(
    sources=[credit_data_batch],
    entities=[user],
    mode="pipeline", # creates a DAG from re-useable transformation functions
    batch_schedule=timedelta(days=1),
    schema=[Field("user_id", String), Field("timestamp", Timestamp), Field("current_balance", Float64)],
)
def user_last_balance(transactions):

    user_last_transaction_time = user_last_transaction_time(transactions)
    return last_balance_time(transactions, user_last_transaction_time)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants