feat: Add delta format to `FileSource`, add support for it in ibis/duckdb by tokoko · Pull Request #4123 · feast-dev/feast
from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.errors import SavedDatasetLocationAlreadyExists from feast.feature_logging import LoggingConfig, LoggingSource
return entity_table
@staticmethod def _read_data_source(data_source: DataSource) -> Table: assert isinstance(data_source, FileSource)
if isinstance(data_source.file_format, ParquetFormat): return ibis.read_parquet(data_source.path) elif isinstance(data_source.file_format, DeltaFormat): return ibis.read_delta(data_source.path)
@staticmethod def get_historical_features( config: RepoConfig,
for old_name, new_name in feature_view.batch_source.field_mapping.items(): if old_name in fv_table.columns:
table = ibis.read_parquet(data_source.path) table = IbisOfflineStore._read_data_source(data_source)
table = table.select(*fields)
if isinstance(data, Path): table = ibis.read_parquet(data) else: table = ibis.memtable(data) table = ( ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data) )
if destination.partition_by: kwargs = {"partition_by": destination.partition_by}
file_options = feature_view.batch_source.file_options prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() if table.schema != prev_table.schema: table = table.cast(prev_table.schema) new_table = pyarrow.concat_tables([table, prev_table])
ibis.memtable(new_table).to_parquet(file_options.uri) if isinstance(feature_view.batch_source.file_format, ParquetFormat): prev_table = ibis.read_parquet(file_options.uri).to_pyarrow() if table.schema != prev_table.schema: table = table.cast(prev_table.schema) new_table = pyarrow.concat_tables([table, prev_table])
ibis.memtable(new_table).to_parquet(file_options.uri) elif isinstance(feature_view.batch_source.file_format, DeltaFormat): from deltalake import DeltaTable
prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow() if table.schema != prev_schema: table = table.cast(prev_schema) ibis.memtable(table).to_delta(file_options.uri, mode="append")
class IbisRetrievalJob(RetrievalJob):
filesystem, path = FileSource.create_filesystem_and_path( storage.file_options.uri, storage.file_options.s3_endpoint_override, )
if path.endswith(".parquet"): pyarrow.parquet.write_table( self.to_arrow(), where=path, filesystem=filesystem if isinstance(storage.file_options.file_format, ParquetFormat): filesystem, path = FileSource.create_filesystem_and_path( storage.file_options.uri, storage.file_options.s3_endpoint_override, ) else: # otherwise assume destination is directory pyarrow.parquet.write_to_dataset( self.to_arrow(), root_path=path, filesystem=filesystem
if path.endswith(".parquet"): pyarrow.parquet.write_table( self.to_arrow(), where=path, filesystem=filesystem ) else: # otherwise assume destination is directory pyarrow.parquet.write_to_dataset( self.to_arrow(), root_path=path, filesystem=filesystem ) elif isinstance(storage.file_options.file_format, DeltaFormat): mode = ( "overwrite" if allow_overwrite and os.path.exists(storage.file_options.uri) else "error" ) self.table.to_delta(storage.file_options.uri, mode=mode)
@property def metadata(self) -> Optional[RetrievalMetadata]: