◐ Shell
clean mode source ↗

feat: Add delta format to `FileSource`, add support for it in ibis/duckdb by tokoko · Pull Request #4123 · feast-dev/feast

Expand Up @@ -13,6 +13,7 @@ from ibis.expr.types import Table from pytz import utc
from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.errors import SavedDatasetLocationAlreadyExists from feast.feature_logging import LoggingConfig, LoggingSource Expand Down Expand Up @@ -105,6 +106,15 @@ def _generate_row_id(
return entity_table
@staticmethod def _read_data_source(data_source: DataSource) -> Table: assert isinstance(data_source, FileSource)
if isinstance(data_source.file_format, ParquetFormat): return ibis.read_parquet(data_source.path) elif isinstance(data_source.file_format, DeltaFormat): return ibis.read_delta(data_source.path)
@staticmethod def get_historical_features( config: RepoConfig, Expand Down Expand Up @@ -137,7 +147,9 @@ def get_historical_features( def read_fv( feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool ) -> Tuple: fv_table: Table = ibis.read_parquet(feature_view.batch_source.name) fv_table: Table = IbisOfflineStore._read_data_source( feature_view.batch_source )
for old_name, new_name in feature_view.batch_source.field_mapping.items(): if old_name in fv_table.columns: Expand Down Expand Up @@ -227,7 +239,7 @@ def pull_all_from_table_or_query( start_date = start_date.astimezone(tz=utc) end_date = end_date.astimezone(tz=utc)
table = ibis.read_parquet(data_source.path) table = IbisOfflineStore._read_data_source(data_source)
table = table.select(*fields)
Expand Down Expand Up @@ -260,10 +272,9 @@ def write_logged_features( destination = logging_config.destination assert isinstance(destination, FileLoggingDestination)
if isinstance(data, Path): table = ibis.read_parquet(data) else: table = ibis.memtable(data) table = ( ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data) )
if destination.partition_by: kwargs = {"partition_by": destination.partition_by} Expand Down Expand Up @@ -294,12 +305,21 @@ def offline_write_batch( )
file_options = feature_view.batch_source.file_options prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() if table.schema != prev_table.schema: table = table.cast(prev_table.schema) new_table = pyarrow.concat_tables([table, prev_table])
ibis.memtable(new_table).to_parquet(file_options.uri) if isinstance(feature_view.batch_source.file_format, ParquetFormat): prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() if table.schema != prev_table.schema: table = table.cast(prev_table.schema) new_table = pyarrow.concat_tables([table, prev_table])
ibis.memtable(new_table).to_parquet(file_options.uri) elif isinstance(feature_view.batch_source.file_format, DeltaFormat): from deltalake import DeltaTable
prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow() if table.schema != prev_schema: table = table.cast(prev_schema) ibis.memtable(table).to_delta(file_options.uri, mode="append")

class IbisRetrievalJob(RetrievalJob): Expand Down Expand Up @@ -338,20 +358,28 @@ def persist( if not allow_overwrite and os.path.exists(storage.file_options.uri): raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)
filesystem, path = FileSource.create_filesystem_and_path( storage.file_options.uri, storage.file_options.s3_endpoint_override, )
if path.endswith(".parquet"): pyarrow.parquet.write_table( self.to_arrow(), where=path, filesystem=filesystem if isinstance(storage.file_options.file_format, ParquetFormat): filesystem, path = FileSource.create_filesystem_and_path( storage.file_options.uri, storage.file_options.s3_endpoint_override, ) else: # otherwise assume destination is directory pyarrow.parquet.write_to_dataset( self.to_arrow(), root_path=path, filesystem=filesystem
if path.endswith(".parquet"): pyarrow.parquet.write_table( self.to_arrow(), where=path, filesystem=filesystem ) else: # otherwise assume destination is directory pyarrow.parquet.write_to_dataset( self.to_arrow(), root_path=path, filesystem=filesystem ) elif isinstance(storage.file_options.file_format, DeltaFormat): mode = ( "overwrite" if allow_overwrite and os.path.exists(storage.file_options.uri) else "error" ) self.table.to_delta(storage.file_options.uri, mode=mode)
@property def metadata(self) -> Optional[RetrievalMetadata]: Expand Down