Feast dataframe abstraction
FeastDataFrame Implementation Plan
Core Insights from Analysis
-
RetrievalJob is Central: Feast uses RetrievalJob as the lazy execution pattern. FeastDataFrame should integrate with this, not replace it.
-
Arrow is the Bridge: Arrow Tables are already used as the interchange format between different components.
-
Minimal Container: FeastDataFrame should be a thin wrapper that holds any DataFrame and routes it to the right engine.
Implementation Architecture
┌─────────────────────────┐
│ FeastDataFrame │
│ ┌─────────────────┐ │
│ │ data: Any │ │
│ │ engine: str │ │
│ │ metadata: dict │ │
│ └─────────────────┘ │
└───────────┬─────────────┘
│
┌───────────────┴───────────────┐
│ │
┌───────▼────────┐ ┌────────▼────────┐
│ RetrievalJob │ │ Compute Engines │
│ returns │ │ + Backends │
│ FeastDataFrame │ │ (Local/Spark/ │
└────────────────┘ │ Ray/etc.) │
└─────────────────┘
Phase 1: Core Container Implementation
# feast/dataframe.py from typing import Any, Optional, Dict, Union import pyarrow as pa import pandas as pd from enum import Enum class DataFrameEngine(str, Enum): """Supported DataFrame engines.""" PANDAS = "pandas" SPARK = "spark" DASK = "dask" RAY = "ray" ARROW = "arrow" UNKNOWN = "unknown" class FeastDataFrame: """ A lightweight container for DataFrame-like objects in Feast. This class wraps any DataFrame implementation and provides metadata about the engine type for proper routing in Feast's processing pipeline. """ def __init__( self, data: Any, engine: Optional[DataFrameEngine] = None, metadata: Optional[Dict[str, Any]] = None ): """ Initialize a FeastDataFrame. Args: data: The wrapped DataFrame object (pandas, Spark, Dask, etc.) engine: Explicitly specify the engine type (auto-detected if None) metadata: Additional metadata (schema hints, etc.) """ self.data = data self.metadata = metadata or {} self._engine = engine or self._detect_engine() def _detect_engine(self) -> DataFrameEngine: """Auto-detect the DataFrame engine based on type.""" type_name = type(self.data).__name__ module = type(self.data).__module__ if 'pandas' in module: return DataFrameEngine.PANDAS elif 'pyspark' in module: return DataFrameEngine.SPARK elif 'dask' in module: return DataFrameEngine.DASK elif 'ray' in module: return DataFrameEngine.RAY elif isinstance(self.data, pa.Table): return DataFrameEngine.ARROW else: return DataFrameEngine.UNKNOWN @property def engine(self) -> DataFrameEngine: """Get the detected or specified engine type.""" return self._engine def __repr__(self): return f"FeastDataFrame(engine={self.engine}, type={type(self.data).__name__})" # Helper methods for common operations @property def is_lazy(self) -> bool: """Check if the underlying DataFrame is lazy (Spark, Dask).""" return self.engine in [DataFrameEngine.SPARK, DataFrameEngine.DASK] def ensure_materialized(self) -> "FeastDataFrame": """ Ensure the DataFrame is materialized (for lazy DataFrames). Returns self for eager DataFrames. """ if self.engine == DataFrameEngine.SPARK and hasattr(self.data, 'cache'): # Cache Spark DataFrame self.data = self.data.cache() elif self.engine == DataFrameEngine.DASK and hasattr(self.data, 'persist'): # Persist Dask DataFrame self.data = self.data.persist() return self
Phase 2: RetrievalJob Integration
# Update feast/infra/offline_stores/offline_store.py class RetrievalJob(ABC): """A RetrievalJob manages the execution of a query to retrieve data from the offline store.""" def to_feast_df( self, validation_reference: Optional["ValidationReference"] = None, timeout: Optional[int] = None, ) -> FeastDataFrame: """ Execute the query and return result as FeastDataFrame. This is the new primary method that returns FeastDataFrame. """ # Get Arrow table as before arrow_table = self.to_arrow(validation_reference, timeout) # Wrap in FeastDataFrame with Arrow engine return FeastDataFrame( data=arrow_table, engine=DataFrameEngine.ARROW, metadata={ 'features': self.features if hasattr(self, 'features') else [], 'on_demand_feature_views': self.on_demand_feature_views if hasattr(self, 'on_demand_feature_views') else [] } ) def to_df(self, ...) -> pd.DataFrame: """ Legacy method - returns pandas DataFrame for backward compatibility. """ feast_df = self.to_feast_df(validation_reference, timeout) if feast_df.engine == DataFrameEngine.PANDAS: return feast_df.data elif feast_df.engine == DataFrameEngine.ARROW: return feast_df.data.to_pandas() else: # Handle other types return self._to_df_internal(timeout)
Phase 3: Engine-Specific RetrievalJob Implementations
# Example: BigQuery RetrievalJob update class BigQueryRetrievalJob(RetrievalJob): def to_feast_df(self, ...) -> FeastDataFrame: """ Override to return native format when possible. """ # If user wants BigQuery DataFrame (future feature) if self.config.offline_store.return_native_dataframe: with self._query_generator() as query: # Return query for lazy execution return FeastDataFrame( data={'query': query, 'client': self.client}, engine=DataFrameEngine.BIGQUERY, metadata={'query': query} ) else: # Default to Arrow return super().to_feast_df(validation_reference, timeout) # Example: Spark offline store class SparkRetrievalJob(RetrievalJob): def to_feast_df(self, ...) -> FeastDataFrame: """ Return Spark DataFrame directly for Spark processing. """ if hasattr(self, '_spark_df'): return FeastDataFrame( data=self._spark_df, engine=DataFrameEngine.SPARK ) else: # Fallback to Arrow return super().to_feast_df(validation_reference, timeout)
Phase 4: Update Core APIs
# Update feast/feature_store.py class FeatureStore: def get_historical_features( self, entity_df: Optional[Union[pd.DataFrame, str]] = None, features: Union[List[str], FeatureService] = [], ... ) -> RetrievalJob: """ User API remains unchanged - accepts pandas DataFrame, Spark DataFrame, str, etc. FeastDataFrame wrapping happens internally. """ # Keep existing signature and behavior # Pass to provider as-is job = provider.get_historical_features( self.config, feature_views, _feature_refs, entity_df, # Pass original entity_df self._registry, self.project, full_feature_names, **kwargs, ) return job # Update feast/infra/provider.py class Provider(ABC): def get_historical_features( self, config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: Optional[Union[pd.DataFrame, str]], registry: BaseRegistry, project: str, full_feature_names: bool, **kwargs, ) -> RetrievalJob: """ Provider wraps the entity_df in FeastDataFrame internally. """ # Wrap entity_df in FeastDataFrame for internal use if entity_df is not None and not isinstance(entity_df, str): # Auto-wrap any DataFrame in FeastDataFrame feast_entity_df = FeastDataFrame(entity_df) else: feast_entity_df = entity_df # Keep SQL strings as-is # Pass to offline store with wrapped DataFrame return self.offline_store.get_historical_features( config=config, feature_views=feature_views, feature_refs=feature_refs, entity_df=feast_entity_df, # Now wrapped registry=registry, project=project, full_feature_names=full_feature_names, **kwargs, ) # Update offline store implementations class BigQueryOfflineStore(OfflineStore): @staticmethod def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], entity_df: Union[FeastDataFrame, pd.DataFrame, str], ... ) -> RetrievalJob: """ Offline stores handle FeastDataFrame internally. """ # Extract actual DataFrame if wrapped if isinstance(entity_df, FeastDataFrame): actual_df = entity_df.data engine = entity_df.engine else: actual_df = entity_df engine = None # Use actual_df for processing # Existing logic remains the same ...
Phase 5: FeastDataFrame Integration with Transformations
Transformation is user facing so there is no changes to Transformation API. The key is to ensure that FeastDataFrame is unwrapped before passing to transformation functions.
Phase 6: Compute Engine Integration (Materialization)
The key insight is that Compute Engines (not offline stores) handle materialization in Feast:
- LocalComputeEngine - uses backends (pandas, polars)
- SparkComputeEngine - uses Spark DataFrames
- RayComputeEngine - uses Ray Datasets
# Update feast/infra/compute_engines/local/backends/factory.py class BackendFactory: @staticmethod def infer_from_entity_df(entity_df) -> Optional[DataFrameBackend]: # Enhanced to handle FeastDataFrame if isinstance(entity_df, FeastDataFrame): # Use the wrapped DataFrame to infer backend entity_df = entity_df.data # Handle pandas DataFrames and Arrow Tables if ( not entity_df or isinstance(entity_df, pyarrow.Table) or isinstance(entity_df, pd.DataFrame) ): return PandasBackend() # Handle Polars DataFrames if BackendFactory._is_polars(entity_df): return BackendFactory._get_polars_backend() # Handle Dask DataFrames if BackendFactory._is_dask(entity_df): return BackendFactory._get_dask_backend() return None # Update feast/infra/compute_engines/local/compute.py class LocalComputeEngine(ComputeEngine): def _get_backend(self, context: ExecutionContext) -> DataFrameBackend: if self._backend: return self._backend # Enhanced to handle FeastDataFrame from context.entity_df backend = BackendFactory.infer_from_entity_df(context.entity_df) if backend is not None: return backend raise ValueError("Could not infer backend from context.entity_df") # Update feast/infra/compute_engines/spark/compute.py class SparkComputeEngine(ComputeEngine): def _materialize_one( self, registry: BaseRegistry, task: MaterializationTask, from_offline_store: bool = False, **kwargs, ) -> MaterializationJob: """ Enhanced to preserve Spark DataFrames throughout processing. """ context = self.get_execution_context(registry, task) # If entity_df is FeastDataFrame with Spark engine, use it directly if isinstance(context.entity_df, FeastDataFrame) and context.entity_df.engine == DataFrameEngine.SPARK: # Use Spark DataFrame directly - no conversion needed spark_df = context.entity_df.data # Continue with Spark processing # Build and execute the Spark DAG builder = SparkFeatureBuilder(registry, task) plan = builder.build() # Execute with proper DataFrame type preservation ... # Update feast/infra/compute_engines/ray/compute.py class RayComputeEngine(ComputeEngine): def _materialize_one( self, registry: BaseRegistry, task: MaterializationTask, **kwargs, ) -> MaterializationJob: """ Enhanced to preserve Ray Datasets throughout processing. """ context = self.get_execution_context(registry, task) # If entity_df is FeastDataFrame with Ray engine, use it directly if isinstance(context.entity_df, FeastDataFrame) and context.entity_df.engine == DataFrameEngine.RAY: # Use Ray Dataset directly - no conversion needed ray_dataset = context.entity_df.data # Continue with Ray processing # Build and execute the Ray DAG builder = RayFeatureBuilder(registry, task) plan = builder.build() # Execute with proper DataFrame type preservation ...
Key Insight: ExecutionContext Enhancement
The critical piece is enhancing ExecutionContext to carry FeastDataFrame:
# Update feast/infra/compute_engines/dag/context.py class ExecutionContext: def __init__( self, project: str, repo_config: RepoConfig, offline_store: OfflineStore, online_store: OnlineStore, entity_defs: List[Entity], entity_df: Union[FeastDataFrame, pd.DataFrame, Any, None] = None, # Enhanced type ): self.project = project self.repo_config = repo_config self.offline_store = offline_store self.online_store = online_store self.entity_defs = entity_defs self.entity_df = entity_df # Can now be FeastDataFrame
Materialization Flow with FeastDataFrame
User provides entity_df (pandas/Spark/Dask/Ray)
↓
Provider.get_historical_features() wraps it in FeastDataFrame
↓
ComputeEngine.get_execution_context() receives FeastDataFrame
↓
FeatureBuilder detects engine type from FeastDataFrame
↓
Processing happens in native engine (Spark stays Spark, Ray stays Ray)
↓
Results written to online/offline stores using appropriate writers
Key Benefits of This Approach
- Minimal Changes: FeastDataFrame is a simple container that doesn't break existing code
- Engine Preservation: DataFrames stay in their native format until necessary
- Backward Compatible: Existing
to_df()methods still work - Extensible: Easy to add new engines
- Performance: No unnecessary conversions
Migration Strategy
- Phase 1: Implement FeastDataFrame and update RetrievalJob base class
- Phase 2: Update offline store implementations to support to_feast_df()
- Phase 3: Update transformation engine to work with FeastDataFrame
- Phase 4: Update get_historical_features to accept FeastDataFrame
- Phase 5: Update materialization to use FeastDataFrame
- Phase 6: Deprecate direct pandas returns in favor of FeastDataFrame
Example Usage After Implementation
# Example 1: Simple pandas usage (unchanged from current Feast) pandas_df = pd.DataFrame({'user_id': [1, 2, 3], 'timestamp': [...]}) features = store.get_historical_features(pandas_df, features=[...]) df = features.to_df() # Still returns pandas for compatibility # Example 2: Spark DataFrame (user doesn't need to know about FeastDataFrame) spark_df = spark.read.parquet("s3://...") features = store.get_historical_features(spark_df, features=[...]) # Just pass Spark DataFrame df = features.to_df() # Returns pandas for compatibility feast_df = features.to_feast_df() # New method returns FeastDataFrame print(feast_df.engine) # DataFrameEngine.SPARK (if using Spark offline store) # Example 3: Dask DataFrame dask_df = dd.read_parquet("s3://...") features = store.get_historical_features(dask_df, features=[...]) # Just pass Dask DataFrame df = features.to_df() # Returns pandas for compatibility # Example 4: SQL query (unchanged) features = store.get_historical_features( entity_df="SELECT * FROM users WHERE created_at > '2024-01-01'", features=[...] ) # Example 5: Internal usage - transformations with engine preservation # This happens inside Feast, not exposed to users @spark_transformation def my_transform(df): return df.groupBy('user_id').agg(...) # Inside Feast's transformation pipeline: # 1. RetrievalJob returns FeastDataFrame with Spark engine # 2. Transformation runs on Spark directly # 3. Result stays in Spark until materialization
Key Design Principle
Users never see FeastDataFrame - they continue using:
store.get_historical_features(pandas_df, ...)store.get_historical_features(spark_df, ...)store.get_historical_features(dask_df, ...)
FeastDataFrame is purely an internal implementation detail that enables:
- Intelligent routing to compute engines
- Avoiding unnecessary conversions
- Preserving lazy evaluation semantics
This implementation provides the minimal container functionality while enabling intelligent routing to processing engines.