Leveraging the Power of Snowflake : Real Time Loading from GCS

Continuing our series “Leveraging the Power of Snowflake”, this blog looks at how to load data in Snowflake  as soon as it appears in Google Cloud Storage (GCS).  The previous blogs discussed Snowflake’s unique architecture and features like zero-copy data cloning, dynamic caching, and data sharing in Leveraging the Power of Snowflake: Key Benefits and Architecture and how easy it is to load data from Google Cloud Storage into Snowflake in Leveraging the Power of Snowflake: Bulk Loading from Google Cloud Storage.  If you are all caught up, let’s get started!

What is Snowpipe?

Snowpipe is Snowflake’s continuous data ingestion service. Snowpipe loads data within minutes after files are added to a stage and submitted for ingestion.

A pipe is a named, first-class Snowflake object that contains a COPY statement used by Snowpipe. The COPY statement identifies the source location of the data files (i.e., a stage) and a target table.

With Snowpipe’s serverless compute model, Snowflake manages load capacity, ensuring optimal compute resources to meet demand. In short, Snowpipe provides a “pipeline” for loading fresh data in micro-batches as soon as it’s available.

Bulk Load vs Snowpipe

Bulk load requires a warehouse to execute the COPY commands whereas Snowpipe uses Snowflake provisioned resources. Hence bulk load bills each active warehouse per second whereas Snowpipe billing is based on the compute resources used while loading the files.

Snowpipe for Google Cloud Storage

Setting up Snowpipe for GCS is a 2-step process.

    1. Configuring Secure Access to Cloud Storage

This step consists of :

    1. Creating a storage integration
    2. Retrieving the service account
    3. Granting permissions to the service account in GCP

The above steps have been discussed in Leveraging the Power of Snowflake: Bulk Loading from Google Cloud Storage

  1. Configuring Automated Snowpipe Using GCS Pub/Sub

Setting up a Snowpipe requires some setup on the GCP account.

  1. Create Pub/Sub topic

Pub/Sub notifications allow you to track changes to your Cloud Storage objects. Execute the following command to create the topic and enable it to listen for activity in the specified GCS bucket:

gsutil notification create -t -f json -p gs://

  1. Create a Pub/Sub subscription

gcloud pubsub subscriptions create SUBSCRIPTION_ID –topic=TOPIC_ID

Creating a topic and subscription can be accomplished from the GCP console too.

  1. Retrieve subscription id

Copy the subscription name from the GCP console.

  1. Create a notification integration in Snowflake

CREATE NOTIFICATION INTEGRATION can be used to create a notification integration. Users with ACCOUNTADMIN role or role with global CREATE INTEGRATION privilege can execute this SQL command.

create notification integration pipe_notification_int
type = QUEUE
notification_provider = GCP_PUBSUB
enabled = true
gcp_pubsub_subscription_name = ”;

  1. Retrieve Cloud Storage service account

Snowflake creates a service account with the above step. A project editor should grant access to this service account to access the Pub/Sub Subscription. In order to retrieve the service account, run the following command:

desc notification integration pipe_notification_int;

Sample output from above query:

Sample output

Under Big Data » Pub/Sub » Subscriptions on cloud console, add the service account in the Add members field.

  1. Grant permission in Google Cloud
Subscriptions on cloud console

Also in the IAM & Admin section grant the service account the “Monitoring Viewer” role.

  1. Create a Pipe with Auto-Ingest Enabled

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

create pipe gcs_snowpipe
auto_ingest = true
integration = pipe_notification_int
as
copy into mydb.public.snowpipe_table
from @mydb.public.pipe_stage;

After configuring the Snowpipe with auto-ingest, when new data files are added to the GCS bucket, the event message informs Snowpipe to load them into the target table defined in the pipe.

But what about historical data?

Backlog of data files can be loaded by running the ALTER PIPE … REFRESH statement. This statement can only load files that were staged within the last 7 days. So it can only be used as an interim solution if the pipe breaks and not as a long-term solution. Any data files that were staged longer than 7 days can be copied into Snowflake using other options.

Managing Snowpipe

A JSON representation of the Snowpipe status can be retrieved from the command below

select SYSTEM$PIPE_STATUS (‘gcs_snowpipe’);

PIPE_USAGE_HISTORY can be used to query the history of data loaded into Snowflake tables using Snowpipe within a specified date range. This function returns pipe activity within the last 14 days.

select *
from table(information_schema.pipe_usage_history(
date_range_start=>dateadd(‘hour’,-72,current_timestamp()),
pipe_name=>’mydb.public.gcs_snowpipe’));

Sample output from above query:

output

Snowpipe Best Practices

Snowflake does not recommend loading files larger than 100GB. To optimize the number of parallel operations, load data files roughly 100-250 MB (or larger) in size compressed. Aggregate smaller files to minimize the processing overhead and split large files into a greater number of smaller files to distribute the load among the servers in an active warehouse.

Consider creating a data file per minute to get a good balance between cost and performance. Staging files more often than one per minute cannot guarantee loading data in Snowflake.

Data Transformations

Snowflake supports transformations during data load. This feature helps you avoid the use of temporary tables to store pre-transformed data when reordering columns during a data load.

Transformations allowed on CSV data include:

  • Loading a subset of data
  • Reordering CSV columns during load
  • Converting data types
  • Including Sequence columns
  • Including AutoIncrement/Identity columns

Transformations allowed on Semi-Structured data include:

  • Loading Semi-structured data into separate columns
  • Flattening Semi-structured data
  • Splitting Semi-structured elements into separate columns
  • Loading PARQUET data into separate columns

Summary

Snowpipe automates near real-time data loading from Google Cloud Storage in just a few simple steps. Snowpipe is a tool to extract data from cloud storage and load it into Snowflake. Additionally, transformations can be implemented during data load to complete the Extract, Load, Transform (ETL) pipeline.

Snowflake’s documentation is the best source for additional information. As always, the best way to learn is to get your hands dirty, Snowflake provides a free 30-day trial with $400 worth of usage for hands-on learning.

For more information on how SpringML can help you implement Snowflake, email us at [email protected] or fill out the contact form.