◐ Shell
clean mode source ↗

Kafka IO fixes by tims · Pull Request #23 · feast-dev/feast

Expand Up @@ -17,84 +17,81 @@
package feast.ingestion.transform;
import com.google.common.base.Preconditions; import com.google.common.base.Strings; import static com.google.common.base.Preconditions.checkArgument;
import feast.ingestion.deserializer.FeatureRowDeserializer; import feast.ingestion.deserializer.FeatureRowKeyDeserializer; import feast.options.Options; import feast.options.OptionsParser; import feast.specs.ImportSpecProto.ImportSpec; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FeatureRowProto.FeatureRowKey; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import javax.validation.constraints.NotEmpty; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput;
import java.util.ArrayList; import java.util.Arrays; import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
public class FeatureRowKafkaIO { static final String KAFKA_TYPE = "kafka";
static final String KAFKA_TYPE = "kafka"; /** * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages * from kafka one or more kafka topics. */ public static Read read(ImportSpec importSpec) { return new Read(importSpec); }
public static class KafkaReadOptions implements Options { @NotEmpty public String server; @NotEmpty public String topics; }
/** * Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} * proto messages from kafka one or more kafka topics. * */ public static Read read(ImportSpec importSpec) { return new Read(importSpec); } public static class Read extends FeatureIO.Read {
public static class Read extends FeatureIO.Read { private ImportSpec importSpec;
private ImportSpec importSpec;
private Read(ImportSpec importSpec) { this.importSpec = importSpec; }
@Override public PCollection<FeatureRow> expand(PInput input) {
checkArgument(importSpec.getType().equals(KAFKA_TYPE));
String bootstrapServer = importSpec.getOptionsMap().get("server");
Preconditions.checkArgument( !Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set");
String topics = importSpec.getOptionsMap().get("topics");
Preconditions.checkArgument( !Strings.isNullOrEmpty(topics), "kafka topic(s) must be set");
List<String> topicsList = new ArrayList<>(Arrays.asList(topics.split(",")));
KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read() .withBootstrapServers(bootstrapServer) .withTopics(topicsList) .withKeyDeserializer(FeatureRowKeyDeserializer.class) .withValueDeserializer(FeatureRowDeserializer.class);
PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord = input.getPipeline().apply(kafkaIOReader); private Read(ImportSpec importSpec) { this.importSpec = importSpec; }
PCollection<FeatureRow> featureRow = featureRowRecord.apply( ParDo.of( new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() { @ProcessElement public void processElement(ProcessContext processContext) { KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element(); processContext.output(record.getKV().getValue()); } })); return featureRow; } @Override public PCollection<FeatureRow> expand(PInput input) {
checkArgument(importSpec.getType().equals(KAFKA_TYPE));
KafkaReadOptions options = OptionsParser.parse(importSpec.getOptionsMap(), KafkaReadOptions.class);
List<String> topicsList = new ArrayList<>(Arrays.asList(options.topics.split(",")));
KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read() .withBootstrapServers(options.server) .withTopics(topicsList) .withKeyDeserializer(FeatureRowKeyDeserializer.class) .withValueDeserializer(FeatureRowDeserializer.class);
PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord = input.getPipeline().apply(kafkaIOReader);
PCollection<FeatureRow> featureRow = featureRowRecord.apply( ParDo.of( new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() { @ProcessElement public void processElement(ProcessContext processContext) { KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element(); processContext.output(record.getKV().getValue()); } })); return featureRow; } } }