◐ Shell
clean mode source ↗

revert: Verify the existence of Registry tables in snowflake… by etirelli · Pull Request #3907 · feast-dev/feast

Expand Up @@ -124,19 +124,15 @@ def __init__( f'"{self.registry_config.database}"."{self.registry_config.schema_}"' )
if not self._verify_registry_database(): # Verify the existing resitry database schema from snowflake. If any table names and column types is wrong, run table recreation SQL. with GetSnowflakeConnection(self.registry_config) as conn: sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" with open(sql_function_file, "r") as file: sqlFile = file.read()
sqlCommands = sqlFile.split(";") for command in sqlCommands: query = command.replace( "REGISTRY_PATH", f"{self.registry_path}" ) execute_snowflake_statement(conn, query) with GetSnowflakeConnection(self.registry_config) as conn: sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" with open(sql_function_file, "r") as file: sqlFile = file.read()
sqlCommands = sqlFile.split(";") for command in sqlCommands: query = command.replace("REGISTRY_PATH", f"{self.registry_path}") execute_snowflake_statement(conn, query)
self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) Expand All @@ -149,55 +145,6 @@ def __init__( ) self.project = project
def _verify_registry_database( self, ) -> bool: """Verify the records in registry database. To check: 1, the 11 tables are existed. 2, the column types are correct.
Example return from snowflake's cursor.describe("SELECT * FROM a_table") command: [ResultMetadata(name='ENTITY_NAME', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), ResultMetadata(name='PROJECT_ID', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=False), ResultMetadata(name='LAST_UPDATED_TIMESTAMP', type_code=6, display_size=None, internal_size=None, precision=0, scale=9, is_nullable=False), ResultMetadata(name='ENTITY_PROTO', type_code=11, display_size=None, internal_size=8388608, precision=None, scale=None, is_nullable=False)]
Returns: True if the necessary 11 tables are existed in Snowflake and schema of each table is correct. False if failure happens. """
from feast.infra.utils.snowflake.registry.snowflake_registry_table import ( snowflake_registry_table_names_and_column_types as expect_tables, )
res = True
try: with GetSnowflakeConnection(self.registry_config) as conn: for table_name in expect_tables: result_metadata_list = conn.cursor().describe( f"SELECT * FROM {table_name}" ) for col in result_metadata_list: if ( expect_tables[table_name][col.name]["type_code"] != col.type_code ): res = False break except Exception as e: res = False # Set to False for all errors. logger.debug( f"Failed to verify Registry tables and columns types with exception: {e}." ) finally: # The implementation in snowflake_utils.py will cache the established connection without re-connection logic. # conn.close() pass
return res
def refresh(self, project: Optional[str] = None): if project: project_metadata = proto_registry_utils.get_project_metadata( Expand Down