feat: Make arrow primary interchange for offline ODFV execution by tokoko · Pull Request #4083 · feast-dev/feast
if self.on_demand_feature_views: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: if odfv.mode not in {"pandas", "substrait"}: raise Exception( f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, self.full_feature_names, ) )
if validation_reference: if not flags_helper.is_test(): warnings.warn( "Dataset validation is an experimental feature. " "This API is unstable and it could and most probably will be changed in the future. " "We do not guarantee that future changes will maintain backward compatibility.", RuntimeWarning, )
validation_result = validation_reference.profile.validate(features_df) if not validation_result.is_success: raise ValidationFailed(validation_result)
return features_df return ( self.to_arrow(validation_reference=validation_reference, timeout=timeout) .to_pandas() .reset_index(drop=True) )
def to_arrow( self,
features_df = self._to_df_internal(timeout=timeout) features_table = self._to_arrow_internal(timeout=timeout) if self.on_demand_feature_views: for odfv in self.on_demand_feature_views: if odfv.mode not in {"pandas", "substrait"}: raise Exception( f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, self.full_feature_names, ) transformed_arrow = odfv.transform_arrow( features_table, self.full_feature_names )
for col in transformed_arrow.column_names: if col.startswith("__index"): continue features_table = features_table.append_column( col, transformed_arrow[col] )
if validation_reference: if not flags_helper.is_test(): warnings.warn(
validation_result = validation_reference.profile.validate(features_df) validation_result = validation_reference.profile.validate( features_table.to_pandas() ) if not validation_result.is_success: raise ValidationFailed(validation_result)
return pyarrow.Table.from_pandas(features_df) return features_table
def to_sql(self) -> str: """