◐ Shell
reader mode source ↗
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
39 changes: 23 additions & 16 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typeguard import typechecked

from feast import type_map
from feast.data_format import FileFormat, ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Up @@ -157,24 +157,31 @@ def get_table_column_names_and_types(
filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
)
# Adding support for different file format path
# based on S3 filesystem
if filesystem is None:
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
pass
else:
schema = schema.to_arrow_schema()
else:
schema = ParquetDataset(path, filesystem=filesystem).schema

return zip(schema.names, map(str, schema.types))

Expand Down
74 changes: 51 additions & 23 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ibis.expr.types import Table
from pytz import utc

from feast.data_source import DataSource
from feast.errors import SavedDatasetLocationAlreadyExists
from feast.feature_logging import LoggingConfig, LoggingSource
Expand Down @@ -105,6 +106,15 @@ def _generate_row_id(

return entity_table

@staticmethod
def get_historical_features(
config: RepoConfig,
Expand Down Expand Up @@ -137,7 +147,9 @@ def get_historical_features(
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)

for old_name, new_name in feature_view.batch_source.field_mapping.items():
if old_name in fv_table.columns:
Expand Down @@ -227,7 +239,7 @@ def pull_all_from_table_or_query(
start_date = start_date.astimezone(tz=utc)
end_date = end_date.astimezone(tz=utc)

table = ibis.read_parquet(data_source.path)

table = table.select(*fields)

Expand Down Expand Up @@ -260,10 +272,9 @@ def write_logged_features(
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)

if isinstance(data, Path):
table = ibis.read_parquet(data)
else:
table = ibis.memtable(data)

if destination.partition_by:
kwargs = {"partition_by": destination.partition_by}
Expand Down Expand Up @@ -294,12 +305,21 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
if table.schema != prev_table.schema:
table = table.cast(prev_table.schema)
new_table = pyarrow.concat_tables([table, prev_table])

ibis.memtable(new_table).to_parquet(file_options.uri)


class IbisRetrievalJob(RetrievalJob):
Expand Down Expand Up @@ -338,20 +358,28 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
storage.file_options.s3_endpoint_override,
)

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem
)

@property
def metadata(self) -> Optional[RetrievalMetadata]:
Expand Down
47 changes: 26 additions & 21 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ bidict==0.23.1
# via ibis-framework
bleach==6.1.0
# via nbconvert
boto3==1.34.85
# via
# feast (setup.py)
# moto
botocore==1.34.85
# via
# boto3
# moto
Expand Down Expand Up @@ -134,11 +134,11 @@ cryptography==42.0.5
# snowflake-connector-python
# types-pyopenssl
# types-redis
dask[array,dataframe]==2024.4.1
# via
# dask-expr
# feast (setup.py)
dask-expr==1.0.11
# via dask
db-dtypes==1.2.0
# via google-cloud-bigquery
@@ -148,6 +148,8 @@ decorator==5.1.1
# via ipython
defusedxml==0.7.1
# via nbconvert
dill==0.3.8
# via feast (setup.py)
distlib==0.3.8
Expand All @@ -158,15 +160,15 @@ docker==7.0.0
# testcontainers
docutils==0.19
# via sphinx
duckdb==0.10.1
# via
# duckdb-engine
# ibis-framework
duckdb-engine==0.11.5
# via ibis-framework
entrypoints==0.4
# via altair
exceptiongroup==1.2.0
# via
# anyio
# ipython
Expand All @@ -175,7 +177,7 @@ execnet==2.1.1
# via pytest-xdist
executing==2.0.1
# via stack-data
fastapi==0.110.1
# via feast (setup.py)
fastjsonschema==2.19.1
# via nbformat
Expand Down Expand Up @@ -263,7 +265,7 @@ greenlet==3.0.3
# via sqlalchemy
grpc-google-iam-v1==0.13.0
# via google-cloud-bigtable
grpcio==1.62.1
# via
# feast (setup.py)
# google-api-core
Expand All @@ -275,15 +277,15 @@ grpcio==1.62.1
# grpcio-status
# grpcio-testing
# grpcio-tools
grpcio-health-checking==1.62.1
# via feast (setup.py)
grpcio-reflection==1.62.1
# via feast (setup.py)
grpcio-status==1.62.1
# via google-api-core
grpcio-testing==1.62.1
# via feast (setup.py)
grpcio-tools==1.62.1
# via feast (setup.py)
gunicorn==22.0.0 ; platform_system != "Windows"
# via feast (setup.py)
Expand Down Expand Up @@ -482,13 +484,13 @@ nest-asyncio==1.6.0
# via ipykernel
nodeenv==1.8.0
# via pre-commit
notebook==7.1.2
# via great-expectations
notebook-shim==0.2.4
# via
# jupyterlab
# notebook
numpy==1.24.4
# via
# altair
# dask
Expand Down Expand Up @@ -615,12 +617,15 @@ pyarrow==15.0.2
# via
# dask-expr
# db-dtypes
# feast (setup.py)
# google-cloud-bigquery
# ibis-framework
# snowflake-connector-python
pyarrow-hotfix==0.6
# via ibis-framework
pyasn1==0.6.0
# via
# pyasn1-modules
Expand Down Expand Up @@ -692,7 +697,7 @@ pytest-ordering==0.6
# via feast (setup.py)
pytest-timeout==1.4.2
# via feast (setup.py)
pytest-xdist==3.5.0
# via feast (setup.py)
python-dateutil==2.9.0.post0
# via
Expand Down Expand Up @@ -728,7 +733,7 @@ pyyaml==6.0.1
# pre-commit
# responses
# uvicorn
pyzmq==26.0.0
# via
# ipykernel
# jupyter-client
Expand Down Expand Up @@ -785,7 +790,7 @@ rsa==4.9
# via google-auth
ruamel-yaml==0.17.17
# via great-expectations
ruff==0.3.7
# via feast (setup.py)
s3transfer==0.10.1
# via boto3
Expand All @@ -812,7 +817,7 @@ sniffio==1.3.1
# httpx
snowballstemmer==2.2.0
# via sphinx
snowflake-connector-python[pandas]==3.8.1
# via feast (setup.py)
sortedcontainers==2.4.0
# via snowflake-connector-python
Expand Down Expand Up @@ -895,7 +900,7 @@ tqdm==4.66.2
# via
# feast (setup.py)
# great-expectations
traitlets==5.14.2
# via
# comm
# ipykernel
Expand Down
Loading
Toggle all file notes Toggle all file annotations