◐ Shell
clean mode source ↗

feat: Kickoff Transformation implementation by HaoXuAI · Pull Request #5130 · feast-dev/feast

@HaoXuAI

What this PR does / why we need it:

Created a Transformation interface. it still works with the current pandas_transformation, python_transformation etc.

The next step is refactor the BatchMaterializationEngine to make it works for both Materialization and Transformation.

Which issue(s) this PR fixes:

#4584
#4277 (comment)
#4696

Misc

franciscojavierarceo

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!! 🚀🚀🚀

def get_feature_transformation(self) -> Optional[Transformation]:
if not self.udf:
return None
if self.mode == TransformationMode.pandas or self.mode == "pandas":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably can just do a dictionary mapping

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will add that

udf_string: str = "",
tags: Optional[Dict[str, str]] = None,
description: str = "",
owner: str = "",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably can add the singleton parameter too



class TransformationMode(Enum):
PYTHON = "python"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@HaoXuAI HaoXuAI changed the title [DRAFT] Transformation skeleton feat: Transformation skeleton [DRAFT]

Mar 10, 2025

franciscojavierarceo

self,
*,
name: str,
mode: Union[TransformationMode, str],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

franciscojavierarceo

class TransformationMode(Enum):
PYTHON = "python"
PANDAS = "pandas"
spark = "spark"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark = "spark"
SPARK = "spark"

@HaoXuAI HaoXuAI changed the title feat: Transformation skeleton [DRAFT] feat: Transformation skeleton

Mar 12, 2025

@HaoXuAI HaoXuAI changed the title feat: Transformation skeleton feat: Kickoff Transformation implementation

Mar 12, 2025

@franciscojavierarceo

Do you think we can scope one type of batch transformation as our MVP? If so, which one would you feel most comfortable doing?

@HaoXuAI

Do you think we can scope one type of batch transformation as our MVP? If so, which one would you feel most comfortable doing?

yep, maybe Spark SQL. that should be easy to implement, and we can get it live soon.

@franciscojavierarceo

Do you think we can scope one type of batch transformation as our MVP? If so, which one would you feel most comfortable doing?

yep, maybe Spark SQL. that should be easy to implement, and we can get it live soon.

LFG!

@franciscojavierarceo

FYI @HaoXuAI a very selfish goal I have is to use a Spark batch transformation to handle scaling embedding documents.

So in an ideal world we could do something like:

def create_embeddings(partitionData):  
  tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
  model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
  for row in partitionData:  
      document = str(row.document)  
      inputs = tokenizer(document, padding=True, truncation=True, return_tensors="pt", max_length=512)  
      result = model(**inputs)  
      embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()  
      lst = embeddings.flatten().tolist()  
      yield [row.id, lst, "", "{}", None]

And

embeddings = dataset_df.rdd.mapPartitions(create_embeddings)  

Used in a batch feature view and an on demand feature view (on write) to support offline and online processing of docs for RAG. The key thing here is that we'd be able to really scale RAG offline embedding and make the transition seamless to online.

This example is from the Pinecone docs here: https://docs.pinecone.io/integrations/databricks#3-create-the-vector-embeddings

@HaoXuAI

FYI @HaoXuAI a very selfish goal I have is to use a Spark batch transformation to handling scaling embedding documents.

So in an ideal world we could do something like:

def create_embeddings(partitionData):  
  tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
  model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")  
  for row in partitionData:  
      document = str(row.document)  
      inputs = tokenizer(document, padding=True, truncation=True, return_tensors="pt", max_length=512)  
      result = model(**inputs)  
      embeddings = result.last_hidden_state[:, 0, :].cpu().detach().numpy()  
      lst = embeddings.flatten().tolist()  
      yield [row.id, lst, "", "{}", None]

And

embeddings = dataset_df.rdd.mapPartitions(create_embeddings)  

Used in a batch feature view and an on demand feature view (on write) to support offline and online processing of docs for RAG. The key thing here is that we'd be able to really scale RAG offline embedding and make the transition seamless to online.

This example is from the Pinecone docs here: https://docs.pinecone.io/integrations/databricks#3-create-the-vector-embeddings

This seems to be a really good example of the BatchFeatureView + transformation

@HaoXuAI

PR diverged, cherrypick at #5181