Snowflake Transformation Pipelines - Building a Data Mart in Snowflake Using Streams, Tasks, and Dynamic Tables
/Overview
When it comes to data transformation tools that pair well with Snowflake, there are quite a few options out there. SQL-centric transform tools such as DBT and Coalesce are great in what they do, but what if the company you work for can’t afford to license such products. The good news here is, Snowflake already gives you all the tools you need to transform and load your data marts. Streams give us CDC, Tasks provides a scheduler where we can call stored procedures to load data, and now with Dynamic tables we have the consolidation of Streams and Tasks in a single feature. What’s really awesome about all of this is that we can create downstream dependencies on table loads where we can build the DAG with a single anchor and ensure that all stages of the load happen in exactly the order we need.
Our approach will deal with the most common data mart (dimensional) modeling patterns. We will look at how to implement SCD Type 2 using Merge and consider how to implement surrogate relationships between dimensions and facts. We will then walk through the entire process with a demonstration.
Snowflake Streams, Tasks and Dynamic Tables
When we think about a data mart loading pattern, these are the basic steps we need to follow:
Move the raw data from your source applications to Snowflake using a tool like Fivetran.
Design the Data Mart, create your ERD, review with your stakeholders.
Implement a CDC solution on the source data. There are many ways to configure CDC, but we are going to focus on the most budget friendly of those options. With Snowflake Streams we can enable CDC on a target table. When changes occur on that table it will write those changes to the Stream where we can consume and load only our deltas. The Stream tracks all the DML operations so an update produces both a delete and insert event and we can use this to flesh out our merge logic for SCD 2. Once a DML operation is issued against the Stream, it will truncate the Stream so it can start capturing the next delta.
Now that we know how to find our data change, next we need to create a process to load that data into our target table. For this we create a Snowflake Task which will call a stored procedure that will load the data from the Stream. The procedure is a Merge and can support SCD Type 2 + loads.
Snowflake Dynamic tables combine the functionality of both Streams and Tasks into one, but we don’t have control over how the data is merged into the target table with Dynamic tables. We need to evaluate on a case-by-case basis when to implement which solution. The good news is, you can run Streams and Tasks with Dynamic Tables together to build your DAG, so you have a lot of options here.
Now that we have a loading process, we need to make sure we have created the dependencies that load data in the desired order. This can be achieved by building a Task Graph or DAG using the AFTER parameter in your task definition or if you are using Dynamic tables, you can use the DOWNSTREAM parameter in your Dynamic table definition. We would want to start with an anchor task that would be the control point for the rest of the data load.
When it comes to this step in the process there is some art involved in the design. We will not go deep into all design considerations here, but instead provide a starting point for reference. Each load will have different requirements and constraints so it’s up to you to figure out what works best for your organization.
Demonstration
This demo is setup so you can do the work in your own Snowflake instance. Just change the target database and schema to one where you like to test. We will be creating the data for this demo so all you need to do it copy the code and follow along at your own pace.
Now let’s look at how to put some of these pieces together in practice. First, we are going to create our raw table, insert some data, and enable change tracking. This table represents the state of the data after it would have been replicated over from the source system using a tool like Fivetran, we can think of this as stage or raw.
-- Setup the source table. This would be like the raw landing table for any replication in to Snowflake
CREATE OR REPLACE TABLE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER (
CUSTOMERID NUMBER(38,0),
CSDESCRIPTION VARCHAR(100),
CSNAME VARCHAR(50),
CSSTATE VARCHAR(2),
SALESREGION VARCHAR(50));
-- Populate the table
INSERT INTO GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER ( CUSTOMERID, CSDESCRIPTION, CSNAME, CSSTATE, SALESREGION )
SELECT 1, 'Data Platform for the Cloud', 'Snowflake', 'MT', 'Northwest'
UNION
SELECT 2, 'CRM & Cloud Software Company', 'Salesforce', 'CA', 'Northwest'
UNION
SELECT 3, 'Soul Harvester', 'Amazon', 'WA', 'Northwest'
UNION
SELECT 4, 'Marketing Machine', 'Apple', 'CA', 'Northwest'
UNION
SELECT 5, 'Software Company', 'Microsoft', 'WA', 'Northwest';
-- SELECT * FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
------------------------------------------------------
-- Make sure change tracking is enabled on the table
ALTER TABLE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER SET CHANGE_TRACKING = TRUE;
Now we want to create our stream, view the stream, and then set up and load our history table. From the target history table we need to be able to build an SCD type 2 dimension. Since we don’t have history tracking at our source, we need to build this step in before we move to the dimension load. If anything happens in the data mart that requires a full rebuild, we need to have all the table history for the target dimension before we reload it.
-- Now we want to create the Snowflake stream. This is what enables CDC on the table so we can do differential loads
-- Create the stream on your target table
CREATE STREAM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_STRM_CUSTOMER ON TABLE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER;
-- View all streams
SHOW STREAMS;
-------------------------------------------------------------------------------------------------
-- Now we want to setup the history table, this is the step before data moves to the dimension.
-- This is only requred for SCD (2 +) type dimensions
CREATE OR REPLACE TABLE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER (
CUSTOMERID NUMBER(38,0) NOT NULL,
CSDESCRIPTION VARCHAR(100),
CSNAME VARCHAR(50),
CSSTATE VARCHAR(2),
SALESREGION VARCHAR(50),
VALIDFROMDATE TIMESTAMP_NTZ(9),
VALIDTODATE TIMESTAMP_NTZ(9),
ISACTIVERECORD BOOLEAN,
SCDHASH NUMBER(19,0) NOT NULL DEFAULT HASH(CONCAT(CSNAME, SALESREGION)),
CSLASTMODIFYDATE TIMESTAMP_NTZ(9) NOT NULL DEFAULT CURRENT_TIMESTAMP(),
CONSTRAINT CUSTDIM_SK PRIMARY KEY (CUSTOMERID, VALIDTODATE)
);
-- Populate the hist table so we can see change behavior from the merge
INSERT INTO GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER ( CUSTOMERID, CSDESCRIPTION, CSNAME, CSSTATE, SALESREGION, VALIDFROMDATE, VALIDTODATE, ISACTIVERECORD )
SELECT CUSTOMERID, CSDESCRIPTION, CSNAME, CSSTATE, SALESREGION, '2000-01-01 01:00:00.000', '2199-12-31 01:00:00.000', TRUE
FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
-- SELECT * FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER
If we were going to load this history table to our target dimension, we might create a Stream on the TST_HIST_CUSTOMER table or perhaps we could create the dimension as a dynamic table that is fed from this table. There are a few different options here, but for now I want to focus on how we can capture the history data.
First we are going to make several changes to the raw data--an insert, a delete, an update, and an update where we want to track the history state of the record. Depending on your design you may want to capture history on all updated records, but more commonly we only what to capture history for rows where key attributes have been updated such as a region, role, or group change. Once our update is complete we are going to look again at the raw and target table and also the Stream data.
For the example below, the history new row attributes are the Sales Region and the and the CS Name. We are going to update the Sales Region multiple times on the same record to demonstrate how this works. In the Stream CDC there are only ever two states of the changed record, the first and the most recent. When we consume the stream, we will be loading on the last recorded state of the record, so ‘Southwest’.
-- Any changes made to the target stream table will be logged for CDC. Any DML operation on the stream data will force truncation.
----------------------------------------------------------------------------------
-- Now we want to change data in our raw table to show how that flows through the pipeline
UPDATE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
SET CSDESCRIPTION = 'That plug wont work'
WHERE CUSTOMERID = 4;
UPDATE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
SET SALESREGION = 'Southeast'
WHERE CUSTOMERID = 3;
UPDATE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER -- SECOND UPDATE TO SCD record
SET SALESREGION = 'Southwest'
WHERE CUSTOMERID = 3;
INSERT INTO GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER ( CUSTOMERID, CSDESCRIPTION, CSNAME, CSSTATE, SALESREGION )
SELECT 6, 'Trolling Specialty', 'Tesla', 'TX', 'Southwest';
DELETE FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
WHERE CUSTOMERID = 5;
-- See what our raw table looks like now
SELECT * FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_RAW_CUSTOMER
-- See what records we have in our stream
SELECT * FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_STRM_CUSTOMER
-- See what records we have in our history table
SELECT * FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER
ORDER BY CustomerId
Now we are ready to create the merge routine, this will be run in a stored procedure and called from a Task. The merge is built to capture history so we can support SCD type 2 dimensions downstream.
Don’t forget, a Stream is truncated once DML is executed against it. It’s important to open a transaction and make sure to roll it back if you have an error in your routine, that way you won’t lose your stream data.
-- Remember an Update is actually a delete and then insert, important for our merge logic
CREATE OR REPLACE PROCEDURE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER_MERGE()
RETURNS varchar
LANGUAGE SQL AS
BEGIN
BEGIN TRANSACTION;
MERGE INTO GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER AS mt
USING (
SELECT -- Inserts AND deletes
sc.CustomerId
,sc.CSDescription
,sc.CSName
,sc.CSState
,sc.SalesRegion
,sc.METADATA$ACTION
,sc.METADATA$ISUPDATE
,CASE WHEN sc.METADATA$ISUPDATE = TRUE THEN CURRENT_TIMESTAMP()
ELSE '2199-12-31 01:00:00.000'
END AS ValidToDate
,CASE WHEN sc.METADATA$ISUPDATE = FALSE AND sc.METADATA$ACTION = 'DELETE' THEN 'DELETE'
ELSE 'INSERT'
END AS OperationType
FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_STRM_CUSTOMER sc
LEFT JOIN GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER hc ON sc.CUSTOMERID = hc.CUSTOMERID
AND hc.ISACTIVERECORD = TRUE
WHERE ((sc.CSName <> hc.CSName OR sc.SalesRegion <> hc.SalesRegion) AND sc.METADATA$ISUPDATE = TRUE AND sc.METADATA$ACTION = 'INSERT')
OR (sc.METADATA$ISUPDATE = FALSE AND sc.METADATA$ACTION IN ('INSERT','DELETE'))
UNION
SELECT -- Updates
sc.CustomerId
,sc.CSDescription
,sc.CSName
,sc.CSState
,sc.SalesRegion
,sc.METADATA$ACTION
,sc.METADATA$ISUPDATE
,'2199-12-31 01:00:00.000' AS ValidTo
,CASE WHEN sc.CSName = hc.CSName AND sc.SalesRegion = hc.SalesRegion THEN 'UPDATE'
ELSE 'DELETE' END AS OperationType
FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_STRM_CUSTOMER sc
LEFT JOIN GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER hc ON sc.CUSTOMERID = hc.CUSTOMERID
AND hc.ISACTIVERECORD = TRUE
WHERE sc.METADATA$ISUPDATE = TRUE AND sc.METADATA$ACTION = 'INSERT'
) AS st
ON mt.CustomerId = st.CustomerId
AND mt.ValidToDate = st.ValidToDate
WHEN MATCHED AND st.OperationType = 'UPDATE'
THEN UPDATE SET
mt.CSDescription = st.CSDescription
,mt.CSState = st.CSState
,mt.CSLastModifyDate = CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9))
WHEN MATCHED AND st.OperationType = 'DELETE'
THEN UPDATE SET
mt.VALIDTODATE = CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9))
,mt.IsActiveRecord = FALSE
,mt.CSLastModifyDate = CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9))
WHEN NOT MATCHED --AND st.OperationType = 'INSERT'
THEN INSERT (
CustomerId
,CSDescription
,CSName
,CSState
,SalesRegion
,ValidFromDate
,ValidToDate
,IsActiveRecord
,CSLastModifyDate
)
VALUES (
st.CustomerId
,st.CSDescription
,st.CSName
,st.CSState
,st.SalesRegion
,CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9))
,'2199-12-31 01:00:00.000'
,TRUE
,CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9))
);
COMMIT;
RETURN 'Merge complete - success.';
EXCEPTION
WHEN OTHER THEN ROLLBACK;
RAISE;
END;
--CALL GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER_MERGE()
Now we are ready to add our task to call the stored procedure. In the Task definition below we are only going to provide the target (our stored procedure) and set the schedule. When we are building the actual data mart this Task would likely reference another Task using the AFTER parameter, which would then enable the Task Graph or DAG. We can continue to chain tasks together to build out the entire DAG of the data mart load similar to how DBT builds the DAG, but we can do this without using DBT.
Setup the task and then make sure to enable it. Let it run and then have a look at the tables and your Stream.
--======================================================================
-- Create our task
CREATE TASK GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_TSK_CUSTHIST_LOAD
WAREHOUSE = REPORTING_WAREHOUSE
SCHEDULE = '2 minute'
AS
CALL GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER_MERGE();
-- * Note - We can include the AFTER clause in the task definition to target one or more predecessor tasks for the current task. This is how we build the DAG
-- After creating the task, need to ALTER TASK and resume before the task will run
ALTER TASK GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_TSK_CUSTHIST_LOAD RESUME; -- SUSPEND
-- SHOW TASKS;
-- DESCRIBE TASK GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_TSK_CUSTHIST_LOAD;
-- DROP TASK GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_TSK_CUSTHIST_LOAD;
-- View task history
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
ORDER BY SCHEDULED_TIME;
Now we know how to create a Stream and Task and how to create the DAG using Task dependencies—let’s have a look at a Dynamic Table definition. Where we have complete control over the Stream and Task load, we have a little less control using Dynamic Tables. Dynamic tables effectively materialize the query used in the definition of the table. A Dynamic Table creates a change record to the underlying result so that provides the stream (CDC), and since we set a schedule in the DDL, that effectively becomes the task for that table. You can use the DOWNSTREAM parameter to link the Dynamic table to other Dynamic tables to construct your DAG. Dynamic tables and Tasks can be co-dependent This behavior can be used together with Streams and Tasks so we have a lot of flexibility in building our DAG.
Below is an example of a Dynamic table definition. This does not include the DOWNSTREAM parameter but if we were to add one we could just reference another table, which then creates a dependency between the two tables and constrains the load—it’s quite simple really.
-------------------------------
-- Dynamic table definition
-- Sequence not allowed in dynamic table definition so for SCD2 history dims with surrogate keys, this will need to go through streams and tasks
CREATE OR REPLACE DYNAMIC TABLE GLOBAL_ANALYTICS.DBT_BHETT_REVOP.DBT_BHETT_REVOP.TST_DIMDT_CUSTOMER
TARGET_LAG = '5 MINUTE' -- Run schedule
WAREHOUSE = REPORTING_WAREHOUSE
AS
SELECT
CONCAT(CUSTOMERID, '~', DATE_PART(EPOCH_MICROSECONDS, CSLASTMODIFYDATE)) AS CUSTOMER_SK
,CUSTOMERID AS CUSTOMERID_NK
,CSDESCRIPTION
,CSNAME
,CSSTATE
,SALESREGION
,CASE WHEN SALESREGION IN ('Northwest', 'Northeast') THEN 1
WHEN SALESREGION IN ('Southwest', 'Southeast') THEN 2
END AS SalesRegionGroupId
,VALIDFROMDATE
,VALIDTODATE
,ISACTIVERECORD
,CAST(CURRENT_TIMESTAMP() AS TIMESTAMP_NTZ(9)) AS DIMLASTMODIFYDATE
FROM GLOBAL_ANALYTICS.DBT_BHETT_REVOP.TST_HIST_CUSTOMER
ORDER BY CUSTOMERID
At this point you should have all the tools you need to build your DAG using Snowflake native features. This approach is easy to understand and simple to construct. There are also plenty of ways we can monitor and alert on failed loads that we didn’t cover here. SCD Type 2 + is fairly easy to implement using MERGE with Streams, and provides great flexibility when building your data mart.
One thing we did not talk about, and I know it’s a contested subject, is surrogate keys and MIISK vs hash. I won’t get into the details here as it can be a heady subject but I will say a few things about it. Snowflake does not preserve contiguous ordering of your identity/sequence key and that can lead to large gaps and eventually a rebuild, but that really depends on the size of data you are working with, update frequency, and how much data is changing. There are some things you will need to consider in your design when choosing your SK, just don’t get bogged down in thinking from the past. The Kimball gods will not smite you should you not adhere 100% to the doctrine, but do try to follow most of it :)
While all of this can save you on licensing costs, tools like DBT and Coalesce still bring good value if your company can afford them. As long as your data platform is Snowflake, you can’t go wrong.