◐ Shell
reader mode source ↗
Skip to content
Merged
Hide file tree
Changes from all commits
File filter
Conversations
Jump to
Diff view
Apply and reload
Show whitespace
Diff view
Apply and reload
19 changes: 11 additions & 8 deletions ingestion/src/main/java/feast/ingestion/boot/ImportJobModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import feast.ingestion.model.Specs;
import feast.ingestion.model.SpecsImpl;
import feast.ingestion.options.ImportJobOptions;
import feast.ingestion.service.CachedSpecService;
import feast.ingestion.service.CoreSpecService;
import feast.ingestion.service.FileSpecService;
import feast.ingestion.service.SpecService.Builder;
import feast.ingestion.service.SpecService.UnsupportedBuilder;
import feast.specs.ImportSpecProto.ImportSpec;
Expand All @@ -37,6 +34,8 @@
import feast.storage.service.ErrorsStoreService;
import feast.storage.service.ServingStoreService;
import feast.storage.service.WarehouseStoreService;

/** An ImportJobModule is a Guice module for creating dependency injection bindings. */
public class ImportJobModule extends AbstractModule {
Expand All @@ -54,23 +53,27 @@ protected void configure() {
bind(ImportJobOptions.class).toInstance(options);
bind(PipelineOptions.class).toInstance(options);
bind(ImportSpec.class).toInstance(importSpec);
bind(Specs.class).to(SpecsImpl.class);
}

@Provides
@Singleton
Builder provideSpecService(ImportJobOptions options) {
if (options.getCoreApiUri() != null) {
return new CachedSpecService.Builder(new CoreSpecService.Builder(options.getCoreApiUri()));
} else if (options.getCoreApiSpecPath() != null) {
return new CachedSpecService.Builder(
new FileSpecService.Builder(options.getCoreApiSpecPath()));
} else {
return new UnsupportedBuilder(
"Cannot initialise spec service as coreApiHost or specPath was not set.");
}
}

@Provides
@Singleton
List<WarehouseStore> provideWarehouseStores() {
Expand Down
124 changes: 112 additions & 12 deletions ingestion/src/main/java/feast/ingestion/model/Specs.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,127 @@

package feast.ingestion.model;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import feast.ingestion.service.SpecRetrievalException;
import feast.specs.EntitySpecProto.EntitySpec;
import feast.specs.FeatureSpecProto.FeatureSpec;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.specs.StorageSpecProto.StorageSpec;

public interface Specs extends Serializable {
FeatureSpec getFeatureSpec(String featureId);

List<FeatureSpec> getFeatureSpecByServingStoreId(String storeId) throws SpecRetrievalException;

EntitySpec getEntitySpec(String entityName) throws SpecRetrievalException;

ImportSpec getImportSpec() throws SpecRetrievalException;

Map<String, StorageSpec> getStorageSpecs() throws SpecRetrievalException;

StorageSpec getStorageSpec(String storeId);

String getJobName();
}
Loading
Toggle all file notes Toggle all file annotations