OnDemandFeatureViews with RequestSource returns different columns depending on online/offline feature retrieval.
Context
I'm using an ODFV with a PandasTransformation and a RequestSource input. Furthermore, I'm using Postgres offline and online store.
Expected Behavior
I would expect both the get_online_features_async method and the get_historical_features method to return the same set of features. However, when retrieving features with get_online_features_async, the input data from the RequestSource is not present in the output response. On the other hand, when retrieving features with get_historical_features, the input data from the RequestSource is present in the output response.
I'm not sure which behavior is to be expected. However, I think think that both approaches should return the same columns.
Current Behavior
When calling the get_online_features_async method I do not see the input from the RequestSource back in the output response, while I would expect it to be there.
One could also argue that it should not be in the output response. That would also be an option. However, I would assume that the online and offline feature retrieval would return the same output in terms of columns.
Steps to reproduce
docker-compose.yml
--- version: "3" services: offline_store: image: postgres:16-alpine container_name: offline_store ports: - "6543:5432" environment: - POSTGRES_DB=offline_store - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres volumes: - ./postgres_init:/docker-entrypoint-initdb.d online_store: image: postgres:16-alpine container_name: online_store ports: - "5432:5432" environment: - POSTGRES_DB=online_store - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres
feature_store.yml
project: feast_tryout provider: local registry: registry_type: sql path: postgresql+psycopg2://postgres:postgres@0.0.0.0:5432/online_store cache_ttl_seconds: 60 online_store: type: postgres host: 0.0.0.0 port: 5432 database: online_store db_schema: online user: postgres password: postgres offline_store: type: postgres host: 0.0.0.0 port: 6543 database: offline_store db_schema: offline user: postgres password: postgres entity_key_serialization_version: 2
Insert into offline store (postgres)
postgres_init/create-offline-store-database.sql
CREATE SCHEMA offline; CREATE TABLE offline.features ( "ENTITY_ID" VARCHAR, "EVENT_TIMESTAMP" TIMESTAMP, "ENTITY_FLOAT" FLOAT, ); INSERT INTO offline.features SELECT * FROM ( VALUES ('11111111', '2024-01-01 13:00:00' :: TIMESTAMP, 1.1), ('11111111', '2024-01-01 14:00:00' :: TIMESTAMP, 1.11), ('11111111', '2024-01-01 15:00:00' :: TIMESTAMP, 1.111), ('22222222', '2024-01-01 13:00:00' :: TIMESTAMP, 2.2), ('22222222', '2024-01-01 14:00:00' :: TIMESTAMP, 2.22), ('33333333', '2024-01-01 13:00:00' :: TIMESTAMP, 3.3), ('44444444', '2024-01-02 22:00:00' :: TIMESTAMP, 4.4) )
bootstrap.py
from datetime import timedelta from typing import Any import pandas as pd from feast import ( Entity, FeatureService, FeatureStore, FeatureView, Field, RequestSource, ValueType, ) from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import ( PostgreSQLSource as PostgresSource, ) from feast.on_demand_feature_view import on_demand_feature_view from feast.types import Float32, Float64 feature_store = FeatureStore() features_entity = Entity( name="entity_id", join_keys=["ENTITY_ID"], value_type=ValueType.STRING, ) features_source = PostgresSource( name="features", timestamp_field="EVENT_TIMESTAMP", table="offline.features", ) features_feature_view = FeatureView( name="features_feature_view", entities=[features_entity], ttl=timedelta(days=0), schema=[Field(name="ENTITY_FLOAT", dtype=Float32)], online=True, source=features_source, ) request_source = RequestSource( name="request_feature", schema=[Field(name="REQUEST_FLOAT", dtype=Float32)], ) @on_demand_feature_view( sources=[features_feature_view, request_source], schema=[ Field(name="ENTITY_FLOAT_TRANSFORMED_PANDAS", dtype=Float64), Field(name="ENTITY_FLOAT_PLUS_REQUEST_SOURCE", dtype=Float64), ], mode="pandas", ) def odfv_pandas(input: pd.DataFrame) -> pd.DataFrame: output = pd.DataFrame() output["ENTITY_FLOAT_TRANSFORMED_PANDAS"] = input["ENTITY_FLOAT"] * 2 output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE"] = ( input["ENTITY_FLOAT"] * input["REQUEST_FLOAT"] ) return output @on_demand_feature_view( sources=[features_feature_view, request_source], schema=[Field(name="ENTITY_FLOAT_TRANSFORMED_PYTHON", dtype=Float64)], mode="python", ) def odfv_python(input: dict[str, Any]) -> dict[str, Any]: output = {} output["ENTITY_FLOAT_TRANSFORMED_PYTHON"] = [ value * 2 for value in input["ENTITY_FLOAT"] ] output["ENTITY_FLOAT_PLUS_REQUEST_SOURCE_PYTHON"] = [ e + r for e, r in zip(input["ENTITY_FLOAT"], input["REQUEST_FLOAT"]) ] return output features_feature_service_pandas = FeatureService( name="features_feature_service_pandas", features=[features_feature_view, odfv_pandas], ) features_feature_service_python = FeatureService( name="features_feature_service_python", features=[features_feature_view, odfv_python], ) feature_store.apply( [ features_entity, features_source, features_feature_view, odfv_pandas, odfv_python, features_feature_service_pandas, features_feature_service_python, ] )
materialize.py
from datetime import datetime from feast import FeatureStore feature_store = FeatureStore() feature_store.materialize( start_date=datetime(1900, 1, 1), end_date=datetime(9999, 1, 1), feature_views=["features_feature_view"], )
inference.py
"""Inference example.""" import pandas as pd from feast import FeatureStore feature_store = FeatureStore() feature_service_pandas = feature_store.get_feature_service( name="features_feature_service_pandas" ) feature_service_python = feature_store.get_feature_service( name="features_feature_service_python" ) entity_rows = [ {"ENTITY_ID": "11111111", "REQUEST_FLOAT": 1.0}, {"ENTITY_ID": "22222222", "REQUEST_FLOAT": 1.0}, ] entity_df = pd.DataFrame(entity_rows) entity_df["event_timestamp"] = pd.to_datetime("now", utc=True) print("offline with pandas") offline_features = feature_store.get_historical_features( entity_df=entity_df, features=feature_service_pandas, ).to_df() print(list(offline_features.to_dict().keys())) print("online with pandas") online_features = feature_store.get_online_features( entity_rows=entity_rows, features=feature_service_pandas, ).to_dict() print(list(online_features.keys())) print("online with python") online_features = feature_store.get_online_features( entity_rows=entity_rows, features=feature_service_python, ).to_dict() print(list(online_features.keys())) ## OUTPUT: # offline with pandas # ['ENTITY_ID', 'REQUEST_FLOAT', 'event_timestamp', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PANDAS', 'ENTITY_FLOAT_PLUS_REQUEST_SOURCE'] # online with pandas # ['ENTITY_ID', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PANDAS', 'ENTITY_FLOAT_PLUS_REQUEST_SOURCE'] # online with python # ['ENTITY_ID', 'ENTITY_FLOAT', 'ENTITY_FLOAT_TRANSFORMED_PYTHON'] # not possible to do offline transformation with python mode.
Specifications
Version: 0.36.0
Platform: macOS - M1
Subsystem: Sonoma 14.1.1