feat: MongoDB offline store by caseyclements · Pull Request #6138 · feast-dev/feast
caseyclements
changed the title
Feast offline store intpython 297
feat: MongoDB offline stores
- MongoDBSource: DataSource backed by a MongoDB collection, schema sampled via \ aggregation (default N=100) - MongoDBOfflineStoreConfig: connection_string + default database - MongoDBOfflineStore: delegates to ibis PIT join engine via in-memory memtable approach - SavedDatasetMongoDBStorage: persist training datasets to MongoDB - _build_data_source_reader/_build_data_source_writer closures capture config (connection_string, database) for MongoDB access Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Update copyright headers to 2026
- Move mongodb_to_feast_value_type to feast/type_map.py, consistent
with pg_type_to_feast_value_type and cb_columnar_type_to_feast_value_type
- Add docstrings to MongoDBOptions.to_proto/from_proto, MongoDBSource
class, and get_table_column_names_and_types
- Replace dead 'assert name' with cast(str, ...) for type-checker safety
- Add explanatory comment to validate() stub
- Remove module-level warnings.simplefilter('once', RuntimeWarning),
which was a process-wide side effect; per-call warnings.warn is enough
- Convert all assert isinstance(data_source, MongoDBSource) guards to
ValueError with descriptive messages in both public API methods and
the reader/writer closures
- Fix bug: add tz_aware=True to MongoClient in the writer closure,
matching the reader, to ensure consistent timezone-aware datetime
handling across read and write paths
Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Eliminate -based PIT join which scaled poorly (O(n×m)) - Use single query to fetch all matching feature data - Batch entity_ids into chunks of 1000 for large queries - Flatten features subdoc with pd.json_normalize - Apply pd.merge_asof for efficient PIT join per FeatureView - Handle TTL filtering in pandas instead of MQL \ - Remove unused _ttl_to_ms and _build_ttl_gte_expr helpers Performance improvement: - Before: 10k rows in ~188s (53 rows/s) - After: 10k rows in ~7.4s (1,354 rows/s) - Now competitive with Ibis implementation Signed-off-by: Casey Clements <casey.clements@mongodb.com>
…tity_df - Add CHUNK_SIZE (5000) for entity_df processing to bound memory usage - Extract _run_single helper function for processing each chunk - Add _chunk_dataframe generator for yielding DataFrame slices - Preserve original row ordering via _row_idx column - Exclude internal columns (prefixed with _) from entity key serialization - Concat chunk results and restore ordering at the end This allows processing arbitrarily large entity_df while keeping memory bounded by processing in 5000-row chunks. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
… sizes Performance optimizations: - Reuse MongoClient across chunks (was creating new client per chunk) - Increase CHUNK_SIZE from 5,000 to 50,000 rows - Increase MONGO_BATCH_SIZE from 1,000 to 10,000 entity_ids - Pass collection to _run_single instead of creating client each time - Make index creation idempotent (check for existing index) Results (100k rows): - Before: 21.7s - After: 5.2s (4.2x faster) Results (1M rows): - Before: 1664s (28 min) - After: 212s (3.5 min) (7.8x faster) Signed-off-by: Casey Clements <casey.clements@mongodb.com>
The Native implementation now lives exclusively in mongodb_native.py with the single-collection schema. This removes the confusing duplicate that used the Ibis collection-per-FV schema. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Move MongoDBSource, MongoDBOptions, SavedDatasetMongoDBStorage into mongodb.py - Move _infer_python_type_str helper into mongodb.py - Update imports in tests and benchmarks - Remove mongodb_source.py This consolidates the collection-per-FV implementation into a single file, making the codebase easier to navigate. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Rename module: mongodb_offline_store/ → mongodb/ - Rename files: mongodb.py → mongodb_many.py, mongodb_native.py → mongodb_one.py Class renames: - MongoDBSource → MongoDBSourceMany - MongoDBOptions → MongoDBOptionsMany - SavedDatasetMongoDBStorage → SavedDatasetMongoDBStorageMany - MongoDBOfflineStoreIbis → MongoDBOfflineStoreMany - MongoDBOfflineStoreIbisConfig → MongoDBOfflineStoreManyConfig - MongoDBSourceNative → MongoDBSourceOne - MongoDBOfflineStoreNative → MongoDBOfflineStoreOne - MongoDBOfflineStoreNativeConfig → MongoDBOfflineStoreOneConfig - MongoDBNativeRetrievalJob → MongoDBOneRetrievalJob The One/Many naming reflects the core architectural difference: - One: Single shared collection for all FeatureViews - Many: One collection per FeatureView Signed-off-by: Casey Clements <casey.clements@mongodb.com>
- Rename module: mongodb/ → mongodb_offline_store/ (follows naming convention) - Move tests to mongodb_offline_store/ subdirectory: - test_mongodb_offline_retrieval.py → mongodb_offline_store/test_many.py - test_mongodb_offline_retrieval_native.py → mongodb_offline_store/test_one.py - benchmark_mongodb_offline_stores.py → mongodb_offline_store/benchmark.py - Update all imports to use mongodb_offline_store path Signed-off-by: Casey Clements <casey.clements@mongodb.com>
The scoring_path detection checked entity uniqueness against the union of all feature view join keys. When FVs have different join key sets (e.g. FV_A on user_id, FV_B on (user_id, device_id)), entity_df can be unique on the union but have duplicate entity_ids for FVs with fewer keys. The $group stage would then discard valid older documents, causing silent NULL results. Move the scoring_path decision inside the per-FV loop, checking uniqueness on each FV's serialized entity_id column. Each FV now independently picks the scoring or training path based on its own key cardinality. Add test_mixed_join_key_cardinality covering the exact scenario. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
…lization offline_write_batch used mapped (aliased) join key names when serializing entity keys, while get_historical_features uses original names. When a join_key_map is configured, the entity_id bytes would differ between write and read, causing all features to silently return NULL. Use ec.name (original) instead of the mapped name so bytes match. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
pull_latest_from_table_or_query and pull_all_from_table_or_query only returned serialized entity_id bytes, event_timestamp, and feature columns — but not the actual join key columns (e.g. driver_id) that the base class contract requires. Add _expand_entity_id_column helper that deserializes entity_id bytes back into individual join key columns using deserialize_entity_key. Both methods now call this helper before returning the DataFrame. Add assertions in existing pull_latest and pull_all tests to verify join key columns are present in the output. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
The scoring path ($group $first) selects the latest doc per entity up to max_ts — the maximum request timestamp across ALL entities in the chunk. When entities have different request timestamps, $group may pick a doc that post-dates a specific entity's request time. The Python future_mask would null it, but the valid older doc was already discarded by $group. Add a second condition: scoring_path is only used when all entity request timestamps in the chunk are identical, which is the common real-time scoring case (all requests at 'now'). When timestamps differ, fall back to the training path (merge_asof) which handles per-row PIT correctness. Add test_heterogeneous_timestamps_fall_back_to_training_path. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
…ties The training path (merge_asof) sorted fv_df only by event_timestamp. When multiple docs for the same entity share the same event_timestamp but differ in created_at, merge_asof picks the last row in sorted order. Without a created_at sort, that order depends on MongoDB's undefined document return order — making the result non-deterministic. Sort by [event_timestamp, created_at] so merge_asof consistently picks the doc with the highest created_at among ties, matching the scoring path's behavior ($sort + $group $first). Add test_training_path_created_at_tiebreaker. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
…e/Agg Remove leftover references to earlier naming iterations (MongoDBSourceOne, 'agg offline store', 'Improves on MongoDBOfflineStoreOne') from class and method docstrings. Signed-off-by: Casey Clements <casey.clements@mongodb.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters