fix: Add async refresh to prevent synchronous refresh in main thread by sudohainguyen · Pull Request #3812 · feast-dev/feast
@@ -1,4 +1,5 @@
import json
import threading
import traceback
import warnings
from typing import List, Optional
Expand Down
Expand Up
@@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel):
feature_views: Optional[List[str]] = None
def get_app(store: "feast.FeatureStore"): def get_app(store: "feast.FeatureStore", registry_ttl_sec: int = 5): proto_json.patch()
app = FastAPI() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down registry_proto = None shutting_down = False active_timer: Optional[threading.Timer] = None
async def get_body(request: Request): return await request.body()
def async_refresh(): store.refresh_registry() nonlocal registry_proto registry_proto = store.registry.proto() if shutting_down: return nonlocal active_timer active_timer = threading.Timer(registry_ttl_sec, async_refresh) active_timer.start()
@app.on_event("shutdown") def shutdown_event(): nonlocal shutting_down shutting_down = True if active_timer: active_timer.cancel()
async_refresh()
@app.post("/get-online-features") def get_online_features(body=Depends(get_body)): try:Expand Down
Expand Up
@@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)):
class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): self._app = get_app(store=store) self._app = get_app( store=store, registry_ttl_sec=options.get("registry_ttl_sec", 5), ) self._options = options super().__init__()
Expand All
@@ -202,11 +229,13 @@ def start_server(
no_access_log: bool,
workers: int,
keep_alive_timeout: int,
registry_ttl_sec: int = 5,
):
FeastServeApplication(
store=store,
bind=f"{host}:{port}",
accesslog=None if no_access_log else "-",
workers=workers,
keepalive=keep_alive_timeout,
registry_ttl_sec=registry_ttl_sec,
).run()
def get_app(store: "feast.FeatureStore"): def get_app(store: "feast.FeatureStore", registry_ttl_sec: int = 5): proto_json.patch()
app = FastAPI() # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down registry_proto = None shutting_down = False active_timer: Optional[threading.Timer] = None
async def get_body(request: Request): return await request.body()
def async_refresh(): store.refresh_registry() nonlocal registry_proto registry_proto = store.registry.proto() if shutting_down: return nonlocal active_timer active_timer = threading.Timer(registry_ttl_sec, async_refresh) active_timer.start()
@app.on_event("shutdown") def shutdown_event(): nonlocal shutting_down shutting_down = True if active_timer: active_timer.cancel()
async_refresh()
@app.post("/get-online-features") def get_online_features(body=Depends(get_body)): try:
class FeastServeApplication(gunicorn.app.base.BaseApplication): def __init__(self, store: "feast.FeatureStore", **options): self._app = get_app(store=store) self._app = get_app( store=store, registry_ttl_sec=options.get("registry_ttl_sec", 5), ) self._options = options super().__init__()