Now we know the basics, lets make something more useful.

For brevity the full connections YAML is included below for you to copy and replace your keys. However, we will be quite thorough to make sure each step is clear.

In this example, I need to take two tables ['clients', 'sales'] from our production MySql (replace with your favourite RDBMS) to Snowflake (replace with your favourite cloud DWH). Sample data for MySQL is included at the bottom.

We are going to load our data into a VARIANT field in Snowflake as this pattern is very fault tolerant.

Overview of what we want to do:

  1. List the tables from Typhoon variables.
  2. Extract each table from MySQL
  3. Write each table to S3 in JSON
    1. Creating a function to output to JSON
    2. Switching our connection to S3
  4. Copy the files into snowflake

DAG: list_tablesextract_tableswrite_data_S3copy_to_snowflake_stage

We will learn how to sample and test the flows as we build. Here is the final expected output:

name: dwh_flow
schedule_interval: rate(1 hours)
granularity: hour

tasks:
  list_tables:
    function: typhoon.flow_control.branch
    args:
      branches:
        - clients
        - sales

  extract_tables:
    input: list_tables
    function: typhoon.relational.execute_query
    args:
      hook: !Hook transaction_db
      batch_size: 10
      query_params:
        interval_start: !Py $DAG_CONTEXT.interval_start
        interval_end: !Py $DAG_CONTEXT.interval_end
      metadata:
        table_name: !Py $BATCH
      query: !MultiStep
        - !Py table_name=$BATCH
        - !Py f"SELECT * FROM {table_name} where creation_timestamp between %(interval_start)s and %(interval_end)s"

  write_data_S3:
    input: extract_tables
    function: typhoon.filesystem.write_data
    args:
      hook: !Py $HOOK.data_lake
      data: !Py transformations.data.list_of_tuples_to_json(list($BATCH.batch), $BATCH.columns)
      path: !Py f"data_{$BATCH.metadata['table_name']}_batch_num_" + str($BATCH_NUM) + "_" + str($DAG_CONTEXT.interval_end).replace(":", "_") + ".json"
      metadata: !Py $BATCH.metadata

  copy_to_snowflake_stage:
    input: write_data_S3
    function: typhoon.snowflake.copy_into
    args:
      hook: !Py $HOOK.snowflake
      table: !Py $BATCH.metadata['table_name']
      stage_name: stagetestcorpdatalake
      s3_path: ''

Lets create a file in the /my_project_root/dags folder called dwh_flow.yml .

1. List_tables

name: dwh_flow
schedule_interval: rate(1 hours)
add granularity: hour

tasks:
  list_tables:
    function: typhoon.flow_control.branch
    args:
      branches:
        - clients
        - sales

  echo_my_tables:
    input: list_tables
    function: typhoon.debug.echo
    args:
      mydata: !Py $BATCH

Life is simple when you only have two tables - if only!

Let's extract these hourly. We will improve this with variables later.

Let's run typhoon status, build it and run it:

typhoon dag build dwh_flow

typhoon dag run --dag-name dwh_flow

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/bb352c58-6975-4ce3-96a3-4f773c5c0e8d/Untitled.png

You can see we are simply outputting the tables in the list.

2. Extracting from MySQL