Change Data Capture Using Snowflake Streams

Introduction

A Snowflake table stream creates a change table that shows what has changed, at a row-level, between two transactional points of time. Streams are like processing queues and can be queried just like a table. Table streams make it easy to grab the new data in a table so that one can have more efficient processing. Streams do that by taking a snapshot of all rows in a table at a point in time and only storing an offset for the source table. In that way, a stream can return the change data capture records by leveraging the versioning history.
To help provide an in-depth understanding of Snowflake streams, let’s go through an example. We’ll be creating a table for a bank branch that stores the Branch ID, the city, and the associated dollar amount of deposits for a particular day.

Prep Work

To prepare for the example, we’ll create a new database, schema, and table. We’ll insert some values to get us started. We are going to use the insert command in our example, but typically one wouldn’t be using insert statements to load data into your Snowflake instance. Instead, you’d probably use something like a Snowpipe. To keep things simple, we’ll be inserting only a few records at a time. We’ll use the insert command to do so.

Here are the statements to set things up for our Snowflake stream example.

USE ROLE SYSADMIN;
CREATE DATABASE DB_PRACTICE;
CREATE SCHEMA BANKING;
CREATE TABLE BRANCH (ID varchar, City varchar, Amount number (20,2));
INSERT INTO BRANCH (ID, City, Amount) values
(3001, ‘Arlington’, 4587.98),
(3002, ‘Bakersfield’, 33378.01),
(3003, ‘Carlswell’, 9796.54);

Before moving on, you can pause and view the records in the table by using the following statement:

SELECT * FROM BRANCH;

Creating Streams and Inserting New Records

We’ll create two streams and then use the SHOW STREAMS command to see the created streams.

CREATE STREAM STREAM_A ON TABLE BRANCH;
CREATE STREAM STREAM_B ON TABLE BRANCH;
SHOW STREAMS;

If you run SELECT * statements on these streams, you will see that both are empty. What happens if we insert some new records into the table? We expect that the streams would then have records.

INSERT INTO BRANCH (ID, City, Amount) values
(3004, ‘Denton’, 1252.65),
(3005, ‘Evansville’, 31175.98),
(3006, ‘Fresno’, 566689.11);

Now trying executing each of the SELECT * statements, one at a time, for the Branch table and the two streams.

SELECT * FROM BRANCH;
SELECT * FROM STREAM_A;
SELECT * FROM STREAM_B;

What you should see is that there are now six records in the BRANCH table and three records each in the streams. If we add another stream, STREAM_C, we will expect that there would be no records in that stream.

CREATE STREAM STREAM_C ON TABLE BRANCH;

Now, let’s go ahead and insert some records for a third time.

INSERT INTO BRANCH (ID, City, Amount) values
(3007, ‘Gainsville’, 453127.44),
(3008, ‘Houston’, 80970.84);

If you run SELECT * statements on the table and the three streams, you’ll see that the table has 8 records, streams A and B each have 5, and stream C has 2 records.

Let’s do one more thing. Let’s recreate stream B to see what happens in the next section.

CREATE OR REPLACE STREAM STREAM_B ON TABLE BRANCH;

At this point, stream B will have zero records.

Impact of Deleting Records

Let’s go ahead and delete the first record from each of the previous inserts.

DELETE FROM BRANCH WHERE ID = 3001;
DELETE FROM BRANCH WHERE ID = 3004;
DELETE FROM BRANCH WHERE ID = 3007;

If we run SELECT * on the table and all three streams, we should see five records in the BRANCH table (Figure 1), four records in stream A, three records in stream B, and three records in stream C.

Change Data Capture Using Snowflake Figure 1
Figure 1 [Results of SELECT * on the BRANCH Table]

Stream A results

After stream A was created, there were three records entered: 3004, 3005, and 3006. Then two more records were entered: 3007 and 3008. When records 3004 and 3007 were deleted, they were removed from Stream A. When record 3001 was deleted, this shows up as a new entry in stream A because record 3001 didn’t previously exist. In total there should now be four records in stream A (Figure 2).

Change Data Capture Using Snowflake Figure 2
Figure 2 [Results of SELECT * on the Stream A]

Stream B results

As a reminder, stream B was recreated. From the time, stream B was recreated, there were no new records inserted. When records 3001, 3004, and 3007 were deleted, they all appeared in stream B because they had not appeared there previously. Thus, there should now be three records in stream B (Figure 3).

Change Data Capture Using Snowflake Figure 3
Figure 3 [Results of SELECT * on Stream B]

Stream C results

After stream C was created, two records were entered: 3007 and 3008. When record 3007 was deleted, it was removed from stream C. When records 3001 and 3004 were deleted, they showed up as new entries in stream C because those records didn’t previously exist. In total there should now be three records in stream C (Figure 4).

Change Data Capture Using Snowflake Figure 4
Figure 4 [Results of SELECT * on Stream C]

Impact of Updating Records

Our examples demonstrate the impact of inserting and deleting records and, we can see the results in the METADATA$ACTION column in Figure 2 through 4. But what happens when we update a record? Let’s update the city to Fremont where the Branch ID equals 3006.

UPDATE BRANCH
SET
City = ‘Fremont’
WHERE
ID = 3006;

As expected, the Branch table shows the updated city (Figure 5).

Change Data Capture Using Snowflake Figure 5
Figure 5 [Results of SELECT * on the Branch Table]

Stream A results

Record 3006 already existed in Stream A and, thus, no new entry was needed. The value was simply updated in the stream (Figure 6).

Change Data Capture Using Snowflake Figure 6
Figure 6 [Results of SELECT * on the Stream A]

Stream B results

Record 3006 did not previously exist in stream B. Therefore, we see that there is an entry for the deletion of record 3006 with the city of Fresno and then an entry for the insertion of the new 3006 record with the value of Fremont. You can see that the values for those two entries show as having a value of TRUE in the METADATA$ISUPDATE column. With the addition of those two new entries, stream B now has five records (Figure 7).

Change Data Capture Using Snowflake Figure 7
Figure 7 [Results of SELECT * on the Stream B]

Stream C results

Record 3006 did not previously exist in stream C. Therefore, we see that there is an entry for the deletion of record 3006 with the city of Fresno and then an entry for the insertion of the new 3006 record with the value of Fremont. You can see that the values for those two entries show as having a value of TRUE in the METADATA$ISUPDATE column. With the addition of those two new entries, stream C now has five records (Figure 8).

Change Data Capture Using Snowflake Figure 8
Figure 8 [Results of SELECT * on the Stream C]

Interestingly, you’ll notice that streams B and C both have five records at this point, but they don’t have the same five entries.
You can easily remove the practice database in your Snowflake instance, along with the table and all streams with the following command.

DROP DATABASE DB_PRACTICE;

Summary

As you can see in the example presented in this article, Snowflake streams are a powerful way to handle changing data sets. In Snowflake, one of the most important reasons for using table streams is to keep the staging table and production table in sync. Using a staging table, along with streams, helps to protect undesired changes from being made to the production table.
Snowflake table streams are also often used in conjunction with other features, such as Snowflake pipeline or Snowflake tasks. Be sure to check out our other blog posts and demos for more details about Snowflake table streams.

Thought Leadership

Snowflake Summit 2022: Register to join us live at Snowflake Summit 2022