fix: Integration tests for async sdk method by breno-costa · Pull Request #4201 · feast-dev/feast
@@ -1,3 +1,4 @@
import asyncio
import datetime
import os
import time
Expand All
@@ -12,6 +13,7 @@
import requests
from botocore.exceptions import BotoCoreError
from feast import FeatureStore from feast.entity import Entity from feast.errors import FeatureNameCollisionError from feast.feature_service import FeatureServiceExpand Down
Expand Up
@@ -400,19 +402,15 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s
)
@pytest.mark.integration @pytest.mark.universal_online_stores @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) def test_online_retrieval_with_event_timestamps( environment, universal_data_sources, full_feature_names ): fs = environment.feature_store def setup_feature_store_universal_feature_views( environment, universal_data_sources ) -> FeatureStore: fs: FeatureStore = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources)
fs.apply([driver(), feature_views.driver, feature_views.global_fv])
# fake data to ingest into Online Store data = { "driver_id": [1, 2], "conv_rate": [0.5, 0.3],Expand All
@@ -429,18 +427,11 @@ def test_online_retrieval_with_event_timestamps(
}
df_ingest = pd.DataFrame(data)
# directly ingest data into the Online Store fs.write_to_online_store("driver_stats", df_ingest) return fs
response = fs.get_online_features( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) df = response.to_df(True)
def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)Expand All
@@ -464,6 +455,50 @@ def test_online_retrieval_with_event_timestamps(
)
@pytest.mark.integration @pytest.mark.universal_online_stores def test_online_retrieval_with_event_timestamps(environment, universal_data_sources): fs = setup_feature_store_universal_feature_views( environment, universal_data_sources )
response = fs.get_online_features( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) df = response.to_df(True)
assert_feature_store_universal_feature_views_response(df)
@pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_async_online_retrieval_with_event_timestamps( environment, universal_data_sources ): fs = setup_feature_store_universal_feature_views( environment, universal_data_sources )
response = asyncio.run( fs.get_online_features_async( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) ) df = response.to_df(True)
assert_feature_store_universal_feature_views_response(df)
@pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_online_store_cleanup(environment, universal_data_sources):Expand Down
from feast import FeatureStore from feast.entity import Entity from feast.errors import FeatureNameCollisionError from feast.feature_service import FeatureService
@pytest.mark.integration @pytest.mark.universal_online_stores @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) def test_online_retrieval_with_event_timestamps( environment, universal_data_sources, full_feature_names ): fs = environment.feature_store def setup_feature_store_universal_feature_views( environment, universal_data_sources ) -> FeatureStore: fs: FeatureStore = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources)
fs.apply([driver(), feature_views.driver, feature_views.global_fv])
# fake data to ingest into Online Store data = { "driver_id": [1, 2], "conv_rate": [0.5, 0.3],
# directly ingest data into the Online Store fs.write_to_online_store("driver_stats", df_ingest) return fs
response = fs.get_online_features( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) df = response.to_df(True)
def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
@pytest.mark.integration @pytest.mark.universal_online_stores def test_online_retrieval_with_event_timestamps(environment, universal_data_sources): fs = setup_feature_store_universal_feature_views( environment, universal_data_sources )
response = fs.get_online_features( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) df = response.to_df(True)
assert_feature_store_universal_feature_views_response(df)
@pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_async_online_retrieval_with_event_timestamps( environment, universal_data_sources ): fs = setup_feature_store_universal_feature_views( environment, universal_data_sources )
response = asyncio.run( fs.get_online_features_async( features=[ "driver_stats:avg_daily_trips", "driver_stats:acc_rate", "driver_stats:conv_rate", ], entity_rows=[{"driver_id": 1}, {"driver_id": 2}], ) ) df = response.to_df(True)
assert_feature_store_universal_feature_views_response(df)
@pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_online_store_cleanup(environment, universal_data_sources):