◐ Shell
reader mode source ↗
Skip to content

feat: Compute Engine Initial Implementation#5223

Merged
franciscojavierarceo merged 21 commits into
masterfrom
compute-engine
Apr 16, 2025
Merged

feat: Compute Engine Initial Implementation#5223
franciscojavierarceo merged 21 commits into
masterfrom
compute-engine

Conversation

@HaoXuAI

@HaoXuAI HaoXuAI commented Apr 4, 2025

Copy link
Copy Markdown
Collaborator

What this PR does / why we need it:

Introduce the ComputeEngine component that runs current materialization and get_historical_features on a compute engine such as Spark, Flink, etc.
This will hand over heavy computing work from online/offline stores to the engines. And let the online/offline deal with IO only.
The compute engine builds the execution plan in a DAG format named FeatureBuilder. It derives feature generation from Feature View definitiions including:

  1. Transformation (via Transformation API)
  2. Aggregation (via Aggregation API)
  3. Join (join with an entity, adding Customized JOIN and join with another Feature View to do in the future)
  4. Filter (Point in time filter, ttl filter, filter by custom expression)
  5. ...

User facing API, e.g:

def transform_feature(df: DataFrame) -> DataFrame:
    df = df.withColumn("sum_conv_rate", df["sum_conv_rate"] * 2)
    df = df.withColumn("avg_acc_rate", df["avg_acc_rate"] * 2)
    return df

driver_stats_fv = BatchFeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    mode="python",
    batch_engine=Field(...), # Not supported yet.
    aggregations=[
        Aggregation(column="conv_rate", function="sum"),
        Aggregation(column="acc_rate", function="avg"),
    ],
    join=[...], # Not supported yet
    udf=transform_feature,
    udf_string="transform_feature",
    ttl=timedelta(days=3), # Not supported for materialization yet
    filter="filter_expr", # Not fully support
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
        Field(name="driver_id", dtype=Int32),
    ],
    online=True,
    offline=False, # not supported yet
    source=data_source,
)

Feature build flow:

  1. join with entity first if necessary
  2. filter data based on entity event timestamp, ttl, and filter
  3. feature aggregation based on provided aggregations
  4. run transformation based on provided udf or feature_transformations
  5. write data to online or offline

Current use case:

# Create retrieval task
task = MaterializationTask(
    project=spark_environment.project,
    feature_view=driver_stats_fv,
    start_time=now - timedelta(days=1),
    end_time=now,
    tqdm_builder=tqdm_builder,
)

# Create SparkComputeEngine
engine = SparkComputeEngine(
    repo_config=spark_environment.config,
    offline_store=SparkOfflineStore(),
    online_store=MagicMock(),
    registry=registry,
)

spark_materialize_job = engine.materialize(task)

TODOs:

  • Update Batch/Stream feature view to support batch_engine and stream_engine configurations.
  • Support offline write
  • Change materialization read node to the engine, and merge the flow with get_historical_features
  • Support filter by filter_expr in feature view
  • Use engine APIs in feature_store API.
  • Support compute engine in repo yaml config:
  • Join with Feature View or Data Source.
  • Update get_online_feature to be able to handle aggregated and transformed features
  • Research the Transformation, Aggregation, and Filter to merge into single API.

Which issue(s) this PR fixes:

Misc

HaoXuAI added 3 commits April 4, 2025 14:58
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
@HaoXuAI HaoXuAI requested a review from a team as a code owner April 4, 2025 23:38
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

@franciscojavierarceo franciscojavierarceo left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hide comment

Nice! Left some small nits on naming.

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
@franciscojavierarceo

Copy link
Copy Markdown
Member

@HaoXuAI is this ready to review? Or still draft?

@HaoXuAI

HaoXuAI commented Apr 7, 2025

Copy link
Copy Markdown
Collaborator Author

Not yet. Still needs some work but should be ready soon

HaoXuAI added 9 commits April 7, 2025 23:41
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
@HaoXuAI HaoXuAI changed the title feat: Compute engine DRAFT Apr 13, 2025
HaoXuAI added 3 commits April 12, 2025 23:07
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
17 hidden items Load more…

@franciscojavierarceo franciscojavierarceo left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hide comment

in this:

def transform_feature(df: DataFrame) -> DataFrame:
    df = df.withColumn("sum_conv_rate", df["sum_conv_rate"] * 2)
    df = df.withColumn("avg_acc_rate", df["avg_acc_rate"] * 2)
    return df

driver_stats_fv = BatchFeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    mode="python",
    batch_engine=Field(...), # Not supported yet.
    aggregations=[
        Aggregation(column="conv_rate", function="sum"),
        Aggregation(column="acc_rate", function="avg"),
    ],
    join=[...], # Not supported yet
    udf=transform_feature,
    udf_string="transform_feature",
    ttl=timedelta(days=3), # Not supported for materialization yet
    filter="filter_expr", # Not fully support
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
        Field(name="driver_id", dtype=Int32),
    ],
    online=True,
    offline=False, # not supported yet
    source=data_source,
)

What is the point of the Aggregations in the Feature View?

    aggregations=[
        Aggregation(column="conv_rate", function="sum"),
        Aggregation(column="acc_rate", function="avg"),
    ],

@HaoXuAI

HaoXuAI commented Apr 15, 2025

Copy link
Copy Markdown
Collaborator Author

in this:

def transform_feature(df: DataFrame) -> DataFrame:
    df = df.withColumn("sum_conv_rate", df["sum_conv_rate"] * 2)
    df = df.withColumn("avg_acc_rate", df["avg_acc_rate"] * 2)
    return df

driver_stats_fv = BatchFeatureView(
    name="driver_hourly_stats",
    entities=[driver],
    mode="python",
    batch_engine=Field(...), # Not supported yet.
    aggregations=[
        Aggregation(column="conv_rate", function="sum"),
        Aggregation(column="acc_rate", function="avg"),
    ],
    join=[...], # Not supported yet
    udf=transform_feature,
    udf_string="transform_feature",
    ttl=timedelta(days=3), # Not supported for materialization yet
    filter="filter_expr", # Not fully support
    schema=[
        Field(name="conv_rate", dtype=Float32),
        Field(name="acc_rate", dtype=Float32),
        Field(name="avg_daily_trips", dtype=Int64),
        Field(name="driver_id", dtype=Int32),
    ],
    online=True,
    offline=False, # not supported yet
    source=data_source,
)

What is the point of the Aggregations in the Feature View?

    aggregations=[
        Aggregation(column="conv_rate", function="sum"),
        Aggregation(column="acc_rate", function="avg"),
    ],

aggregation from the Design perspective is to simplify defining aggregation features with OOTB API. There are some benefits listed in Tecton: https://docs.tecton.ai/docs/beta/defining-features/feature-views/aggregation-engine. What I think important is the lifetime window support. For the Aggregation you can define
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)) where the compute will handle the window logic for you. For stream compute it is pretty helpful since the time window concept is bit complex for end users to understand such as the Flink window (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) we used before

HaoXuAI added 2 commits April 15, 2025 15:31
Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
@franciscojavierarceo

Copy link
Copy Markdown
Member

For the Aggregation you can define
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)) where the compute will handle the window logic for you. For stream compute it is pretty helpful since the time window concept is bit complex for end users to understand such as the Flink window (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) we used before

Yeah that makes a lot of sense, I'm good with this.

@HaoXuAI

HaoXuAI commented Apr 16, 2025

Copy link
Copy Markdown
Collaborator Author

For the Aggregation you can define
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1)) where the compute will handle the window logic for you. For stream compute it is pretty helpful since the time window concept is bit complex for end users to understand such as the Flink window (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/) we used before

Yeah that makes a lot of sense, I'm good with this.

This is helpful and a really good discussion. And I agree the interface is still immature and definitely subject to change.

@HaoXuAI HaoXuAI changed the title feat: Compute Engine Apr 16, 2025

@franciscojavierarceo franciscojavierarceo left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hide comment

Let's go! 🚀🚀🚀

@franciscojavierarceo franciscojavierarceo merged commit 64bdafd into master Apr 16, 2025
jfw-ppi pushed a commit to jfw-ppi/feast that referenced this pull request Jun 7, 2025
* add compute engine

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* fix linting

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* fix linting

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* fix linting

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* fix linting

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* add doc

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* add test

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* add integration test

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update API

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update API

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update API

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update API

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update API

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* fix linting

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update doc

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update doc

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update test

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

* update doc

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>

---------

Signed-off-by: HaoXuAI <sduxuhao@gmail.com>
Signed-off-by: Jacob Weinhold <29459386+j-wine@users.noreply.github.com>
jfw-ppi pushed a commit to jfw-ppi/feast that referenced this pull request Jun 7, 2025
# [0.49.0](feast-dev/feast@v0.48.0...v0.49.0) (2025-04-29)

### Bug Fixes

* Adding brackets to unit tests ([c46fea3](feast-dev@c46fea3))
* Adding logic back for a step ([2bb240b](feast-dev@2bb240b))
* Adjustment for unit test action ([a6f78ae](feast-dev@a6f78ae))
* Allow get_historical_features with only On Demand Feature View ([feast-dev#5256](feast-dev#5256)) ([0752795](feast-dev@0752795))
* CI adjustment ([3850643](feast-dev@3850643))
* Embed Query configuration breaks when switching between DataFrame and SQL ([feast-dev#5257](feast-dev#5257)) ([32375a5](feast-dev@32375a5))
* Fix for proto issue in utils ([1b291b2](feast-dev@1b291b2))
* Fix milvus online_read ([feast-dev#5233](feast-dev#5233)) ([4b91f26](feast-dev@4b91f26))
* Fix tests ([431d9b8](feast-dev@431d9b8))
* Fixed Permissions object parameter in example ([feast-dev#5259](feast-dev#5259)) ([045c100](feast-dev@045c100))
* Java CI [feast-dev#12](feast-dev#12) ([d7e44ac](feast-dev@d7e44ac))
* Java PR [feast-dev#15](feast-dev#15) ([a5da3bb](feast-dev@a5da3bb))
* Java PR [feast-dev#16](feast-dev#16) ([e0320fe](feast-dev@e0320fe))
* Java PR [feast-dev#17](feast-dev#17) ([49da810](feast-dev@49da810))
* Materialization logs ([feast-dev#5243](feast-dev#5243)) ([4aa2f49](feast-dev@4aa2f49))
* Moving to custom github action for checking skip tests ([caf312e](feast-dev@caf312e))
* Operator - remove default replicas setting from Feast Deployment ([feast-dev#5294](feast-dev#5294)) ([e416d01](feast-dev@e416d01))
* Patch java pr [feast-dev#14](feast-dev#14) ([592526c](feast-dev@592526c))
* Patch update for test ([a3e8967](feast-dev@a3e8967))
* Remove conditional from steps ([995307f](feast-dev@995307f))
* Remove misleading HTTP prefix from gRPC endpoints in logs and doc ([feast-dev#5280](feast-dev#5280)) ([0ee3a1e](feast-dev@0ee3a1e))
* removing id ([268ade2](feast-dev@268ade2))
* Renaming workflow file ([5f46279](feast-dev@5f46279))
* Resolve `no pq wrapper` import issue ([feast-dev#5240](feast-dev#5240)) ([d5906f1](feast-dev@d5906f1))
* Update actions to remove check skip tests ([feast-dev#5275](feast-dev#5275)) ([b976f27](feast-dev@b976f27))
* Update docling demo ([446efea](feast-dev@446efea))
* Update java pr [feast-dev#13](feast-dev#13) ([fda7db7](feast-dev@fda7db7))
* Update java_pr ([fa138f4](feast-dev@fa138f4))
* Update repo_config.py ([6a59815](feast-dev@6a59815))
* Update unit tests workflow ([06486a0](feast-dev@06486a0))
* Updated docs for docling demo ([768e6cc](feast-dev@768e6cc))
* Updating action for unit tests ([0996c28](feast-dev@0996c28))
* Updating github actions to filter at job level ([0a09622](feast-dev@0a09622))
* Updating Java CI ([c7c3a3c](feast-dev@c7c3a3c))
* Updating java pr to skip tests ([e997dd9](feast-dev@e997dd9))
* Updating workflows ([c66bcd2](feast-dev@c66bcd2))

### Features

* Add date_partition_column_format for spark source ([feast-dev#5273](feast-dev#5273)) ([7a61d6f](feast-dev@7a61d6f))
* Add Milvus tutorial with Feast integration ([feast-dev#5292](feast-dev#5292)) ([a1388a5](feast-dev@a1388a5))
* Add pgvector tutorial with PostgreSQL integration ([feast-dev#5290](feast-dev#5290)) ([bb1cbea](feast-dev@bb1cbea))
* Add ReactFlow visualization for Feast registry metadata ([feast-dev#5297](feast-dev#5297)) ([9768970](feast-dev@9768970))
* Add retrieve online documents v2 method into  pgvector  ([feast-dev#5253](feast-dev#5253)) ([6770ee6](feast-dev@6770ee6))
* Compute Engine Initial Implementation ([feast-dev#5223](feast-dev#5223)) ([64bdafd](feast-dev@64bdafd))
* Enable write node for compute engine ([feast-dev#5287](feast-dev#5287)) ([f9baf97](feast-dev@f9baf97))
* Local compute engine ([feast-dev#5278](feast-dev#5278)) ([8e06dfe](feast-dev@8e06dfe))
* Make transform on writes configurable for ingestion ([feast-dev#5283](feast-dev#5283)) ([ecad170](feast-dev@ecad170))
* Offline store update pull_all_from_table_or_query to make timestampfield optional ([feast-dev#5281](feast-dev#5281)) ([4b94608](feast-dev@4b94608))
* Serialization version 2 deprecation notice ([feast-dev#5248](feast-dev#5248)) ([327d99d](feast-dev@327d99d))
* Vector length definition moved to Feature View from Config  ([feast-dev#5289](feast-dev#5289)) ([d8f1c97](feast-dev@d8f1c97))

Signed-off-by: Jacob Weinhold <29459386+j-wine@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants