◐ Shell
clean mode source ↗

fix: Add async refresh to prevent synchronous refresh in main thread by sudohainguyen · Pull Request #3812 · feast-dev/feast

@@ -1,4 +1,5 @@ import json import threading import traceback import warnings from typing import List, Optional Expand Down Expand Up @@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel): feature_views: Optional[List[str]] = None

def get_app(store: "feast.FeatureStore"): def get_app(store: "feast.FeatureStore", registry_ttl_sec: int = 5): proto_json.patch()
app = FastAPI() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down registry_proto = None shutting_down = False active_timer: Optional[threading.Timer] = None
async def get_body(request: Request): return await request.body()
def async_refresh(): store.refresh_registry() nonlocal registry_proto registry_proto = store.registry.proto() if shutting_down: return nonlocal active_timer active_timer = threading.Timer(registry_ttl_sec, async_refresh) active_timer.start()
@app.on_event("shutdown") def shutdown_event(): nonlocal shutting_down shutting_down = True if active_timer: active_timer.cancel()
async_refresh()
@app.post("/get-online-features") def get_online_features(body=Depends(get_body)): try: Expand Down Expand Up @@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)):
class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): self._app = get_app(store=store) self._app = get_app( store=store, registry_ttl_sec=options.get("registry_ttl_sec", 5), ) self._options = options super().__init__()
Expand All @@ -202,11 +229,13 @@ def start_server( no_access_log: bool, workers: int, keep_alive_timeout: int, registry_ttl_sec: int = 5, ): FeastServeApplication( store=store, bind=f"{host}:{port}", accesslog=None if no_access_log else "-", workers=workers, keepalive=keep_alive_timeout, registry_ttl_sec=registry_ttl_sec, ).run()