โ— Shell
clean mode source โ†—

BFV Compute-on-Read for get_historical_features() in SparkOfflineStore

Is your feature request related to a problem? Please describe.
When using @batch_feature_view with TransformationMode.PYTHON and the Spark offline store, calling store.get_historical_features() fails with UNRESOLVED_COLUMN errors. The PIT join SQL template reads directly from the raw batch_source and expects output columns (e.g., aggregated features) to exist in the source data. The BFV's Python transformation is never invoked during offline reads โ€” only during feast materialize.
This forces users to either:

  • Maintain a separate ETL pipeline that pre-computes the same features the BFV defines
  • Use plain FeatureView pointing at pre-computed data, duplicating transformation logic
    This breaks the "define once, use everywhere" promise of the feature store.

Describe the solution you'd like
In SparkOfflineStore.get_historical_features(), before building the PIT join SQL, detect BatchFeatureView instances with a UDF. For each:

  1. Read the raw source into a Spark DataFrame
  2. Invoke the BFV's udf() function (same as SparkTransformationNode.execute() does during materialization)
  3. Register the transformed DataFrame as a Spark temp view
  4. Replace the table_subquery in the FeatureViewQueryContext with the temp view name
    This makes the entire pipeline distributed Spark: raw read -> transformation -> PIT join -> training data. No code duplication required.

Describe alternatives you've considered

  • Pre-compute features via an external Spark job and use plain FeatureView (works but duplicates logic)
  • Set offline=True on BFVs and rely on materialized offline parquet (requires running feast materialize before training, adds operational complexity)
  • Use on_demand_feature_view for transformations (doesn't support Spark-native aggregations like groupBy)

Additional context
Add any other context or screenshots about the feature request here.