◐ Shell
clean mode source ↗

feat: Implemented Tiling Support for Time-Windowed Aggregations by ntkathole · Pull Request #5724 · feast-dev/feast

This PR implements tiling for computing time-windowed aggregations efficiently by pre-aggregating data into small, time-bucketed units (tiles) that can be reused across multiple queries. Instead of scanning all raw events for each window, it:

sales_features = StreamFeatureView(
    name="sales_features",
    entities=[customer],
    source=kafka_source,
    aggregations=[
        Aggregation(
            column="amount",
            function="avg",
            time_window=timedelta(minutes=5),
        ),
        Aggregation(
            column="amount",
            function="sum",
            time_window=timedelta(minutes=5),
        ),
    ],
    timestamp_field="event_timestamp",
    enable_tiling=True,                      # Enable tiling optimization
    tiling_hop_size=timedelta(minutes=1),    # Generate tiles every 1 minute
)
┌─────────────────────────────────────────────────────────────┐
│  StreamFeatureView (enable_tiling=True)                     │
└────────────────────────┬────────────────────────────────────┘
                         │
         ┌───────────────┴───────────────┐
         │                               │
    ┌────▼────┐                    ┌────▼────┐
    │  Spark  │                    │   Ray   │
    │  Node   │                    │  Node   │
    └────┬────┘                    └────┬────┘
         │                               │
         │  Convert to pandas            │
         ▼                               ▼
┌─────────────────────────────────────────────────────────────┐
│  Tiling Logic.                                                        │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  orchestrator.apply_sawtooth_window_tiling()         │   │
│  │  - Group by entity + hop interval                    │   │
│  │  - Compute cumulative aggregations (tiles)           │   │
│  │  - Output: Cumulative tiles at each hop              │   │
│  └──────────────────────┬───────────────────────────────┘   │
│                         │                                    │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │  tile_subtraction.convert_cumulative_to_windowed()   │   │
│  │  - Subtract tiles: window = tile_T - tile_(T-W)      │   │
│  │  - Recompute holistic aggs from IRs                  │   │
│  │  - Output: Windowed aggregations                     │   │
│  └──────────────────────┬───────────────────────────────┘   │
│                         │                                    │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │  tile_subtraction.deduplicate_keep_latest()          │   │
│  │  - Keep latest window per entity                     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────┬───────────────────────────────────┘
                          │  Convert back to engine format
                          ▼
                  ┌───────────────┐
                  │ Online Store  │
                  │               │
                  └───────────────┘