fix: Redundant feature materialization and premature incremental materialization timestamp updates by james-crabtree-sp · Pull Request #3789 · feast-dev/feast
@@ -1,11 +1,14 @@
import logging
import uuid
from datetime import datetime
from time import sleep
from typing import Callable, List, Literal, Sequence, Union
import yaml from kubernetes import client from kubernetes import config as k8s_config from kubernetes import utils from kubernetes.client.exceptions import ApiException from kubernetes.utils import FailToCreateError from pydantic import StrictStr from tqdm import tqdmExpand All
@@ -16,6 +19,7 @@
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
Expand All
@@ -27,6 +31,8 @@
from .bytewax_materialization_job import BytewaxMaterializationJob
logger = logging.getLogger(__name__)
class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for Bytewax"""Expand Down
Expand Up
@@ -65,11 +71,26 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
""" (optional) additional labels to append to kubernetes objects """
max_parallelism: int = 10 """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" """ (optional) Maximum number of pods allowed to run in parallel"""
synchronous: bool = False """ (optional) If true, wait for materialization for one feature to complete before moving to the next """
retry_limit: int = 2 """ (optional) Maximum number of times to retry a materialization worker pod"""
mini_batch_size: int = 1000 """ (optional) Number of rows to process per write operation (default 1000)"""
active_deadline_seconds: int = 86400 """ (optional) Maximum amount of time a materialization job is allowed to run"""
job_batch_size: int = 100 """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""
print_pod_logs_on_failure: bool = True """(optional) Print pod logs on job failure. Only applies to synchronous materialization"""
class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__(Expand Down
Expand Up
@@ -173,8 +194,98 @@ def _materialize_one(
)
paths = offline_job.to_remote_storage() if self.batch_engine_config.synchronous: offset = 0 total_pods = len(paths) batch_size = self.batch_engine_config.job_batch_size if batch_size < 1: raise ValueError("job_batch_size must be a value greater than 0") if batch_size < self.batch_engine_config.max_parallelism: logger.warning( "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" ) batch_size = self.batch_engine_config.max_parallelism
while True: next_offset = min(offset + batch_size, total_pods) job = self._await_path_materialization( paths[offset:next_offset], feature_view, offset, next_offset, total_pods, ) offset += batch_size if ( offset >= total_pods or job.status() == MaterializationJobStatus.ERROR ): break else: job_id = str(uuid.uuid4()) job = self._create_kubernetes_job(job_id, paths, feature_view)
return job
def _await_path_materialization( self, paths, feature_view, batch_start, batch_end, total_pods ): job_id = str(uuid.uuid4()) return self._create_kubernetes_job(job_id, paths, feature_view) job = self._create_kubernetes_job(job_id, paths, feature_view)
try: while job.status() in ( MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING, ): logger.info( f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " f"(of {total_pods}) running..." ) sleep(30) logger.info( f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " f"(of {total_pods}) complete with status {job.status()}" ) except BaseException as e: logger.info(f"Deleting job {job.job_id()}") try: self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) except ApiException as ae: logger.warning(f"Could not delete job due to API Error: {ae.body}") raise e finally: logger.info(f"Deleting configmap {self._configmap_name(job_id)}") try: self.v1.delete_namespaced_config_map( self._configmap_name(job_id), self.namespace ) except ApiException as ae: logger.warning( f"Could not delete configmap due to API Error: {ae.body}" )
if ( job.status() == MaterializationJobStatus.ERROR and self.batch_engine_config.print_pod_logs_on_failure ): self._print_pod_logs(job.job_id(), feature_view, batch_start)
return job
def _print_pod_logs(self, job_id, feature_view, offset=0): pods_list = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"job-name={job_id}", ).items for i, pod in enumerate(pods_list): logger.info(f"Logging output for {feature_view.name} pod {offset+i}") try: logger.info( self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) ) except ApiException as e: logger.warning(f"Could not retrieve pod logs due to: {e.body}")
def _create_kubernetes_job(self, job_id, paths, feature_view): try:Expand Down
Expand Up
@@ -210,7 +321,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
"labels": {**labels, **self.batch_engine_config.labels},
},
"data": {
Expand All
@@ -223,7 +334,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)
def _create_job_definition(self, job_id, namespace, pods, env): def _configmap_name(self, job_id): return f"feast-{job_id}"
def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): """Create a kubernetes job definition.""" job_env = [ {"name": "RUST_BACKTRACE", "value": "full"},Expand Down
Expand Up
@@ -284,8 +398,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
"spec": {
"ttlSecondsAfterFinished": 3600,
"backoffLimit": self.batch_engine_config.retry_limit,
"completions": pods,
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
"activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds,
"completionMode": "Indexed",
"template": {
"metadata": {
Expand Down
Expand Up
@@ -324,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand Down
Expand Up
@@ -355,7 +471,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{"mountPath": "/etc/bytewax", "name": "hostfile"},
{
"mountPath": "/var/feast/",
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
],
}
Expand All
@@ -365,13 +481,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
{
"configMap": {
"defaultMode": 420,
"name": f"feast-{job_id}",
"name": self._configmap_name(job_id),
},
"name": "python-files",
},
{
"configMap": {"name": f"feast-{job_id}"},
"name": f"feast-{job_id}",
"configMap": {"name": self._configmap_name(job_id)},
"name": self._configmap_name(job_id),
},
],
},
Expand Down
import yaml from kubernetes import client from kubernetes import config as k8s_config from kubernetes import utils from kubernetes.client.exceptions import ApiException from kubernetes.utils import FailToCreateError from pydantic import StrictStr from tqdm import tqdm
from .bytewax_materialization_job import BytewaxMaterializationJob
logger = logging.getLogger(__name__)
class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for Bytewax"""
max_parallelism: int = 10 """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" """ (optional) Maximum number of pods allowed to run in parallel"""
synchronous: bool = False """ (optional) If true, wait for materialization for one feature to complete before moving to the next """
retry_limit: int = 2 """ (optional) Maximum number of times to retry a materialization worker pod"""
mini_batch_size: int = 1000 """ (optional) Number of rows to process per write operation (default 1000)"""
active_deadline_seconds: int = 86400 """ (optional) Maximum amount of time a materialization job is allowed to run"""
job_batch_size: int = 100 """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""
print_pod_logs_on_failure: bool = True """(optional) Print pod logs on job failure. Only applies to synchronous materialization"""
class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__(
paths = offline_job.to_remote_storage() if self.batch_engine_config.synchronous: offset = 0 total_pods = len(paths) batch_size = self.batch_engine_config.job_batch_size if batch_size < 1: raise ValueError("job_batch_size must be a value greater than 0") if batch_size < self.batch_engine_config.max_parallelism: logger.warning( "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" ) batch_size = self.batch_engine_config.max_parallelism
while True: next_offset = min(offset + batch_size, total_pods) job = self._await_path_materialization( paths[offset:next_offset], feature_view, offset, next_offset, total_pods, ) offset += batch_size if ( offset >= total_pods or job.status() == MaterializationJobStatus.ERROR ): break else: job_id = str(uuid.uuid4()) job = self._create_kubernetes_job(job_id, paths, feature_view)
return job
def _await_path_materialization( self, paths, feature_view, batch_start, batch_end, total_pods ): job_id = str(uuid.uuid4()) return self._create_kubernetes_job(job_id, paths, feature_view) job = self._create_kubernetes_job(job_id, paths, feature_view)
try: while job.status() in ( MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING, ): logger.info( f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " f"(of {total_pods}) running..." ) sleep(30) logger.info( f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " f"(of {total_pods}) complete with status {job.status()}" ) except BaseException as e: logger.info(f"Deleting job {job.job_id()}") try: self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) except ApiException as ae: logger.warning(f"Could not delete job due to API Error: {ae.body}") raise e finally: logger.info(f"Deleting configmap {self._configmap_name(job_id)}") try: self.v1.delete_namespaced_config_map( self._configmap_name(job_id), self.namespace ) except ApiException as ae: logger.warning( f"Could not delete configmap due to API Error: {ae.body}" )
if ( job.status() == MaterializationJobStatus.ERROR and self.batch_engine_config.print_pod_logs_on_failure ): self._print_pod_logs(job.job_id(), feature_view, batch_start)
return job
def _print_pod_logs(self, job_id, feature_view, offset=0): pods_list = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"job-name={job_id}", ).items for i, pod in enumerate(pods_list): logger.info(f"Logging output for {feature_view.name} pod {offset+i}") try: logger.info( self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) ) except ApiException as e: logger.warning(f"Could not retrieve pod logs due to: {e.body}")
def _create_kubernetes_job(self, job_id, paths, feature_view): try:
def _create_job_definition(self, job_id, namespace, pods, env): def _configmap_name(self, job_id): return f"feast-{job_id}"
def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): """Create a kubernetes job definition.""" job_env = [ {"name": "RUST_BACKTRACE", "value": "full"},