◐ Shell
reader mode source ↗
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
Original file line number Diff line number Diff line change
@@ -25,5 +25,5 @@ COPY README.md README.md
# git dir to infer the version of feast we're installing.
# https://github.com/pypa/setuptools_scm#usage-from-docker
# I think it also assumes that this dockerfile is being built from the root of the directory.
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,gcp,bytewax]'

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pyarrow.parquet as pq
from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

Expand All @@ -21,11 +21,13 @@ def __init__(
config: RepoConfig,
feature_view: FeatureView,
paths: List[str],
):
self.config = config
self.feature_store = FeatureStore(config=config)

self.feature_view = feature_view
self.paths = paths

self._run_dataflow()
Expand All @@ -40,11 +42,7 @@ def process_path(self, path):
return batches

def input_builder(self, worker_index, worker_count, _state):
worker_paths = distribute(self.paths, worker_index, worker_count)
for path in worker_paths:
yield None, path

return

def output_builder(self, worker_index, worker_count):
def yield_batch(iterable, batch_size):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import uuid
from datetime import datetime
from typing import Callable, List, Literal, Sequence, Union

import yaml
from kubernetes import client
from kubernetes import config as k8s_config
from kubernetes import utils
from kubernetes.utils import FailToCreateError
from pydantic import StrictStr
from tqdm import tqdm
Expand All @@ -16,6 +19,7 @@
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
Expand All @@ -27,6 +31,8 @@

from .bytewax_materialization_job import BytewaxMaterializationJob


class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for Bytewax"""
Expand Down Expand Up @@ -65,11 +71,26 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
""" (optional) additional labels to append to kubernetes objects """

max_parallelism: int = 10
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""

mini_batch_size: int = 1000
""" (optional) Number of rows to process per write operation (default 1000)"""


class BytewaxMaterializationEngine(BatchMaterializationEngine):
def __init__(
Expand Down Expand Up @@ -173,8 +194,98 @@ def _materialize_one(
)

paths = offline_job.to_remote_storage()
job_id = str(uuid.uuid4())
return self._create_kubernetes_job(job_id, paths, feature_view)

def _create_kubernetes_job(self, job_id, paths, feature_view):
try:
Expand Up @@ -210,7 +321,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": f"feast-{job_id}",
"labels": {**labels, **self.batch_engine_config.labels},
},
"data": {
Expand All @@ -223,7 +334,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_job_definition(self, job_id, namespace, pods, env):
"""Create a kubernetes job definition."""
job_env = [
{"name": "RUST_BACKTRACE", "value": "full"},
Expand Down Expand Up @@ -284,8 +398,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
"spec": {
"ttlSecondsAfterFinished": 3600,
"completions": pods,
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
"completionMode": "Indexed",
"template": {
"metadata": {
Expand Down Expand Up @@ -324,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
},
],
}
Expand Down Expand Up @@ -355,7 +471,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{"mountPath": "/etc/bytewax", "name": "hostfile"},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
},
],
}
Expand All @@ -365,13 +481,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{
"configMap": {
"defaultMode": 420,
"name": f"feast-{job_id}",
},
"name": "python-files",
},
{
"configMap": {"name": f"feast-{job_id}"},
"name": f"feast-{job_id}",
},
],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ def status(self):
if job_status.completion_time is None:
return MaterializationJobStatus.RUNNING
elif job_status.failed is not None:
return MaterializationJobStatus.ERROR
elif job_status.active is None and job_status.succeeded is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED

def should_be_retried(self):
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import yaml

from feast import FeatureStore, RepoConfig
@@ -19,4 +21,5 @@
config,
store.get_feature_view(bytewax_config["feature_view"]),
bytewax_config["paths"],
)
Toggle all file notes Toggle all file annotations