Now we know the basics, lets make something more useful.
For brevity the full connections YAML is included below for you to copy and replace your keys. However, we will be quite thorough to make sure each step is clear.
In this example, I need to take two tables ['clients', 'sales'] from our production MySql (replace with your favourite RDBMS) to Snowflake (replace with your favourite cloud DWH). Sample data for MySQL is included at the bottom.
We are going to load our data into a VARIANT field in Snowflake as this pattern is very fault tolerant.
Overview of what we want to do:
DAG: list_tables
→ extract_tables
→ write_data_S3
→ copy_to_snowflake_stage
We will learn how to sample and test the flows as we build. Here is the final expected output:
name: dwh_flow
schedule_interval: rate(1 hours)
granularity: hour
tasks:
list_tables:
function: typhoon.flow_control.branch
args:
branches:
- clients
- sales
extract_tables:
input: list_tables
function: typhoon.relational.execute_query
args:
hook: !Hook transaction_db
batch_size: 10
query_params:
interval_start: !Py $DAG_CONTEXT.interval_start
interval_end: !Py $DAG_CONTEXT.interval_end
metadata:
table_name: !Py $BATCH
query: !MultiStep
- !Py table_name=$BATCH
- !Py f"SELECT * FROM {table_name} where creation_timestamp between %(interval_start)s and %(interval_end)s"
write_data_S3:
input: extract_tables
function: typhoon.filesystem.write_data
args:
hook: !Py $HOOK.data_lake
data: !Py transformations.data.list_of_tuples_to_json(list($BATCH.batch), $BATCH.columns)
path: !Py f"data_{$BATCH.metadata['table_name']}_batch_num_" + str($BATCH_NUM) + "_" + str($DAG_CONTEXT.interval_end).replace(":", "_") + ".json"
metadata: !Py $BATCH.metadata
copy_to_snowflake_stage:
input: write_data_S3
function: typhoon.snowflake.copy_into
args:
hook: !Py $HOOK.snowflake
table: !Py $BATCH.metadata['table_name']
stage_name: stagetestcorpdatalake
s3_path: ''
Lets create a file in the /my_project_root/dags folder called dwh_flow.yml
.
name: dwh_flow
schedule_interval: rate(1 hours)
add granularity: hour
tasks:
list_tables:
function: typhoon.flow_control.branch
args:
branches:
- clients
- sales
echo_my_tables:
input: list_tables
function: typhoon.debug.echo
args:
mydata: !Py $BATCH
Life is simple when you only have two tables - if only!
Let's extract these hourly. We will improve this with variables later.
Let's run typhoon status
, build it and run it:
typhoon dag build dwh_flow
typhoon dag run --dag-name dwh_flow
You can see we are simply outputting the tables in the list.