Snowflake is a cloud data platform delivered as a service, that is accurate and in their own words, “powerful but simple to use.” Snowflake’s multi-cluster shared data architecture allows for secure real-time data sharing and unmatched data warehousing scalability and concurrency. It is available across AWS, Azure and Google Cloud Platform (GCP), with a modifiable code base that allows for data sharing across any cloud platform in any region. This cross-cloud approach, along with robust performance and reduced costs of near-zero infrastructure management has resulted in Snowflake being one of the fastest-growing cloud-based data service providers today.
Here at SpringML, we decided to check out what Snowflake is all about and how it can be configured to allow high-performance data loading from Google Cloud Storage (GCS).
Access to Snowflake
Snowflake encourages its users to begin with a 30-day free trial with $400 of using credit and a virtual hands-on lab. We select GCP as our cloud provider during a quick and easy sign-up. We now have Snowflake up and running on its user-friendly console that utilizes ANSI-standard SQL commands. We can also connect to Snowflake in a number of alternate ways, be it through its CLI client, SnowSQL, or through its several connectors and drivers. We installed and implemented the Snowflake Connector for Python.
Creating GCS Integration
We can now set up bulk loading of data files from GCS into our Snowflake environment.
- First, we allow Snowflake to securely access data files stored in a GCS bucket by configuring a Cloud Storage integration.
- storage_allowed_locations and storage_blocked_locations specify the buckets that the integration is allowed to access during the creation of stages.
create storage integration springml_demo_int type = external_stage storage_provider = gcs enabled = true storage_allowed_locations = ('gcs://springml_demo_snowflake/sample_sales_data/') storage_blocked_locations = ('gcs://springml_demo_snowflake/surplus_data/')
- On creating the integration, Snowflake creates a Cloud Storage service account whose ID can be retrieved with the desc storage integration springml_demo_int command
- After running the above command, locate the value under the property storage_gcp_service_account. The service account will look something like this: <service-account>@gcpuscentral1-1dfa.iam.gserviceaccount.com
- We grant the permissions mentioned below on our GCS bucket, to the service account obtained from above
- Snowflake provides a single Cloud Storage service account for each Snowflake account, which means that all Cloud Storage integrations for a single Snowflake account will use the same service account.
storage.buckets.get storage.objects.create storage.objects.delete storage.objects.get storage.objects.list
Setting Up the Snowflake Environment
We require a warehouse to load data, as well as a database and schema in which we will create tables to load our data into. Warehouses in Snowflake can be configured to auto-suspend and auto-resume, but can also be manually controlled with the ALTER command.
create warehouse if not exists springml_demo_warehouse create database if not exists springml_demo_database use database springml_demo_database create schema if not exists springml_demo_schema
If we are not in a role that allows create stage and usage privilege on storage integration, then the accountadmin must grant these privileges to the said role. We used the accountadmin role for convenience, and set up use of the warehouse, database, and schema we created.
use role accountadmin use warehouse springml_demo_warehouse use database springml_demo_database use schema springml_demo_schema
We then create a table that will be populated with data using an external stage using the copy command.
create or replace table springml_demo_table (region string, country string, item_type string, sales_channel string, order_priority char(1), order_date string, order_id integer, ship_date string, units_sold integer, unit_price float, unit_cost float, total_revenue float, total_cost float, total_profit float);
Loading the Data
Once our user session has set a database and schema for use, we can create a stage that references the GCS bucket and folder paths where we want to load or unload data files. We also specify the storage integration supporting the GCS account as well as the file format of the data files. Here, the file format springml_demo_csv_format was previously defined with the desired parameters: (type = csv field_delimiter = ‘,’ skip_header = 1).
create or replace stage springml_demo_stage url = 'gcs://springml_demo_snowflake/sample_sales_data/2020/06/12/small_sample.csv' storage_integration = springml_demo_int file_format = springml_demo_csv_format;
We can view the stage we created by using list @springml_demo_stage.
We can now proceed to load files from the stage and into our tables. Snowflake allows the loading of files either from an external stage or directly from the bucket or folder path. If we choose the latter, Snowflake copies all the data files in that path, however, we can specify which files we wish to copy into the table using file format options or pattern matching. The following copies from the external stage we created, using a previously defined file format.
Note: Snowflake has an in-built ability to load only the newest files from a given path when executing a copy command. However, we can choose to organize our files in date separated folders (assuming new files will be ingested every day). This will enable loading all the files in a given path, we use the force=true flag in the copy command.
copy into springml_demo_table from @springml_demo_stage file_format=springml_demo_csv_format force=true;
For unloading data from Snowflake into our GCS bucket, we can easily create a new storage integration to do so. We create an external stage using that integration and proceed to unload data from our tables in Snowflake in the following way.
create or replace stage springml_demo_unload_stage url = 'gcs://springml_demo_snowflake/unload/' storage_integration = springml_demo_int file_format = springml_demo_unload_csv_format
copy into @springml_demo_unload_stage/d1 from springml_demo_table;
Integrating with Airflow
As mentioned earlier, we utilized Snowflake’s Python Connector library to write Python code that encapsulates the SQL commands necessary to connect to and use Snowflake. We then integrate that code into Airflow to be able to schedule the data loading as a workflow task.
- The Python code/function can be run through a Python operator in an Airflow DAG.
- This way, we can automate the process of loading the data from GCS into Snowflake.
- Furthermore, the DAG can also be scheduled, configured for retries, and be monitored for failures.
- Thus, incorporating the SQL commands in a Python function and running it through a simple DAG will give us more control over the data loading and unloading, and help create a robust and repeatable process.
Below is the DAG as it looks in the Airflow UI.
This blog assumes that the user can manually upload new files into a GCS path so as to be loaded into a Snowflake table. However, it will be interesting to explore and design a solution where the data extraction also happens from an ETL tool like Google Cloud Dataflow before it is dumped into a GCS bucket and then loaded. We also hope to further explore the various features within Snowflake and continue this series of blogs.
Please check out our SpringML’s video where we demonstrate how a user can integrate Snowflake’s Data Warehouse service with Google Cloud Storage (GCS) Or please check it out directly on YouTube: