◐ Shell
reader mode source ↗
Skip to content
Merged
Show file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,29 @@

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRow> Protobuf message type
*/
public class FeatureRowDeserializer implements Deserializer<FeatureRow> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRow deserialize(String topic, byte[] data) {
try {
return FeatureRow.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.*;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRowKey> Protobuf message type
*/
public class FeatureRowKeyDeserializer implements Deserializer<FeatureRowKey> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRowKey deserialize(String topic, byte[] data) {
try {
return FeatureRowKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRowKey from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions, FlinkPipelineOptions, GcpOptions {
@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,81 @@

package feast.ingestion.transform;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import feast.ingestion.deserializer.FeatureRowDeserializer;
import feast.ingestion.deserializer.FeatureRowKeyDeserializer;
import feast.options.OptionsParser;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

public class FeatureRowKafkaIO {

static final String KAFKA_TYPE = "kafka";


/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow}
* proto messages from kafka one or more kafka topics.
*
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}

public static class Read extends FeatureIO.Read {

private ImportSpec importSpec;

private Read(ImportSpec importSpec) {
this.importSpec = importSpec;
}

@Override
public PCollection<FeatureRow> expand(PInput input) {

checkArgument(importSpec.getType().equals(KAFKA_TYPE));

String bootstrapServer = importSpec.getOptionsMap().get("server");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set");

String topics = importSpec.getOptionsMap().get("topics");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(topics), "kafka topic(s) must be set");

List<String> topicsList = new ArrayList<>(Arrays.asList(topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(bootstrapServer)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord = input.getPipeline().apply(kafkaIOReader);

PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
return featureRow;
}
}
}
Loading
Toggle all file notes Toggle all file annotations