Using DBT for building a Medallion Lakehouse architecture (Azure Databricks + Delta + DBT)

There’s a lot of fuzz going around the data build tool (DBT). It gains a lot of traction from the data community. I’ve seen several customers and projects, where DBT is already chosen as the tool to be used. But what it is? Where can you position DBT in your data landscape? What does it do and how does it work?

This blog post focuses on answering these questions. I’ll demonstrate how DBT works by using a DataBricks medallion architecture. Be warned! It will be a long read. Before configuring anything, let’s first look at DBT.

What is data build tool?

DBT comes in two flavors: “DBT core” which is the open source cli version, and a paid version: “DBT cloud”. In this blog I’ll use the free cli version. We’ll use it after running our extraction pipeline using Azure Data Factory (ADF). From that point, we’ll transform using DBT.

DBT shows strength by defining transformations using templates. The syntax is similar to SELECT statements in SQL. In addition, the whole flow is constructed in Direct Acyclic Graph (DAG), which visualizes documentation, including data lineage.

DBT differs from other tools because it’s template-based and CLI-driven. Thus, instead of visually designing ETL or writing code using Notebooks, you configure your transformations using SQL boiler templates. A benefit of this approach is that you aren’t that strongly dependent on the underlying database. So, you can more easily switch from one database vendor towards another. In addition, you don’t have to learn many database languages. DBT automatically transpiles or generate the code needed to transform data.

Use Case

Prerequisites

It’s recommended to also deploy these services in the above order. When doing so, your DataBricks service principle account will automatically have access to Key Vault.

Configuration — Container creation

Some background on the structure.

The bonze container will be used for capturing all raw ingested data. We’ll use Parquet files, because no versioning is required. We will use an YYYYMMDD partition scheme for adding newly loaded data to this container.

The silver container will be used for slightly transformed and standardized data. The file format for silver is Delta. For the design, we’ll develop slowly changing dimensions using DBT.

At last, there’s gold, which will be used for the final integrated data. Again, we’ll use DBT to join different datasets together. The file format for gold is Delta as well.

Azure Data Factory — New pipeline

Before creating a new pipeline, ensure your ADLS and Azure SQL services are added as linked services: https://learn.microsoft.com/en-us/azure/data-factory/concepts-linked-services?tabs=data-factory

After you’ve configured your linked services, start your project by creating a new pipeline in ADF. The end result has been displayed in the image below. For the pipeline, you need the following activities: Lookup, ForEach, CopyTables and Notebook.

In the next sections, we’ll discuss how to onboard data from the AdventureWorks sample database. For building the Lookup and ForEach activities, I recommend to watch this video: https://www.youtube.com/watch?v=KsO2FHQdILs. For the entire flow, three datasets are needed: TableList, SQLTable, ParquetTable.

The first dataset uses the Linked Service to Azure SQL. It is meant for listing all tables. You could, for example, call this TableList. No specific configuration for this linked service is required, see below:

The second dataset uses the same Linked Service to Azure SQL. This dataset is meant for copying all tables. You can, for example, call this SQLTable. For dynamically configuring schema and table names use the following two parameters:

@dataset().SchemaName
@dataset().TableName

Also ensure that the parameters are set accordingly:

After configuring the Azure SQL DataSets, head back to your pipeline. Drag the Lookup activity in, select the DataSet for listing all tables. We’ll use a Query for fetching all relevant tables. Also ensure you uncheck the first row only.

SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA = 'SalesLT'

Validate that everything works by clicking the Preview Data button. After that, drag in a ForEach activity. For iterating over the sys objects information that is return from SQL Server, add the following information to Items:

@activity('List Tables').output.value

It’s important that the activity input name exactly matches the name of the previous activity. So in this example it should point back to the List Tables job. When correctly configured, your schema and table names are extracted from the query result and passed in as arguments into the ForEach loop.

Next, open the ForEach actity and drag in a CopyTables activity. Select the SQLTable dataset that hold the input parameters. Use a TableName and SchemaName. Add the following information:

@item().table_name
@item().table_schema

Head over to the Sink. Add a new DataSet. This dataset can be, for example, called ParquetTable. Select Parquet as the file format. In the Parameters dialog, add two properties: FileName and FilePath.

Head over to the Connection tab. Select the Browse button. Navigate to your bronze container. For this demo, we don’t use any application sub-folders. Extend the File path location with two parameters:

@dataset().FilePath
@dataset().FileName

Go back to the pipeline, and the properties of the Sink. For the FileName and FilePath, use the text from below.

@concat(item().table_schema,'.',item().table_name,'.parquet')
@formatDateTime(utcnow(), 'yyyyMMdd')

If everything is configured, hit the button ‘Add trigger’ and monitor the process. If everything works as expected you should see a folder within your bronze container. The folder has the datetime of today.

Each folder holds a set of tables using the Parquet file format. The benefit of this approach is that we historize all data using a daily time window. In the next steps we’ll complete the pipeline by adding the schemas to DataBricks. So each time the pipeline runs, the schemas are updated to point to the latest data. Let’s move over to DataBricks for continuing our journey.

Configuration — DataBricks connectivity

pip install databricks-cli

For accessing DataBricks, I will use an Access Token. You can manage these under your user settings. So, click on the right top on your username, click on User Settings. Click on Generate new token. Add a name and secure the token. You’ll need to use this token several times, so I recommend to keep it somewhere safe.

Next, go back to your command prompt and run the following command:

databricks configure --token

You’ll be prompted to answer two questions. For the host, use the https address when accessing the workspace. So, in my case: https://adb-7945672859019303.3.azuredatabricks.net/. After copy and pasting in the full HTTP address, you’ll be asked to paste in the token. After that, you should be connected. You can validate this by typing the following command:

databricks fs ls

If the command successfully runs, you should see folder names, such as databricks-results and user.

Configuration — DataBricks and KeyVault

To setup a connection between Key Vault and DataBricks, you’ll need to create a secrets scope. You can do this by adding #secrets/createScope at the end of your workspace url, so in my case: https://adb-7945672859019303.3.azuredatabricks.net/?o=7945672859019303#secrets/createScope.

For the secret scope: enter a Scope name, and add your DNS name and Resource ID for Key Vault. You can find these in the Azure Portal on the right. So, select properties and copy the Vault URI and Resource ID.

After you’re done, validate the scope is listed using the command prompt. Type the following:

databricks secrets list-scopes

While we’re still configuring the Key Vault, move over to Key Vault Secrets, and add a new secret for accessing the Storage Account. Create new, enter a name, such as blobAccountKey.

The secret value must be the Access key from your Storage Account. You can find this under your Storage Account under Access Keys.

After creating your secret, ensure that your DataBricks service principle has list and read rights on your secrets. You can configure this under Access Policies.

To validate that everything works as expected, Create a new DataBricks Notebook and type in the following information:

abcd = dbutils.secrets.get('dbtScope','blobAccountKey')
print(abcd)

Run the notebook. If you see the answer REDACTED, you know that your Databricks workspace is able to access your storage account using the Key Vault.

Next, I recommend you to add the mounting points to your storage containers. Create another notebook and execute the following code for adding mounting points to bronze, silver and gold:

#mount bronze
dbutils.fs.mount(
source='wasbs://bronze@dbtdemo.blob.core.windows.net/',
mount_point = '/mnt/bronze',
extra_configs = {'fs.azure.account.key.dbtdemo.blob.core.windows.net': dbutils.secrets.get('dbtScope','blobAccountKey')}
)

#mount silver
dbutils.fs.mount(
source='wasbs://silver@dbtdemo.blob.core.windows.net/',
mount_point = '/mnt/silver',
extra_configs = {'fs.azure.account.key.dbtdemo.blob.core.windows.net': dbutils.secrets.get('dbtScope','blobAccountKey')}
)

#mount gold
dbutils.fs.mount(
source='wasbs://gold@dbtdemo.blob.core.windows.net/',
mount_point = '/mnt/gold',
extra_configs = {'fs.azure.account.key.dbtdemo.blob.core.windows.net': dbutils.secrets.get('dbtScope','blobAccountKey')}
)

dbutils.fs.ls("/mnt/bronze")
dbutils.fs.ls("/mnt/silver")
dbutils.fs.ls("/mnt/gold")

Please note that your ADLS location will look slightly different, so update the above code using your blob name.

Azure Data Factory — DataBricks schemas

Ensure that your connection works by testing the connection using the button on the right. Next, head back to your pipeline. In the ForEach activity, add a new DataBricks Notebook activity, but before that we first need to create a dynamic script in DataBricks itself. The script that I propose has the following code:

#fetch parameters from Azure Data Factory
table_schema=dbutils.widgets.get("table_schema")
table_name=dbutils.widgets.get("table_name")
filePath=dbutils.widgets.get("filePath")

#create database
spark.sql(f'create database if not exists {table_schema}')

#create new external table using latest datetime location
ddl_query = """CREATE OR REPLACE """+table_schema+"""."""+table_name+"""
USING PARQUET
LOCATION '/mnt/bronze/"""+filePath+"""/"""+table_schema+"""."""+table_name+""".parquet'
"""

#execute query
spark.sql(ddl_query)

The script fetches the parameters from ADF and uses these to drop any existing tables and creating new ones using the latest YYYYMMDD location. After creating the script, head back to your ADF pipeline and Notebook activity.

Under Settings of your Notebook activity, select your newly created script. Add three base parameters for table_schema, table_name and filePath. You can use the text from below:

@item().table_schema
@item().table_name
@formatDateTime(utcnow(), 'yyyyMMdd')

After adding this step, hit the Add trigger button again. Not head over to DataBricks and watch the data section. If everything works as expected, your new table name should appear under the saleslt database.

With all of the steps from the above we’ve laid out the foundation for further processing. Each day, when we run the pipeline, new folders will be created, raw data will be copied in, DataBricks tables will be dropped and recreated. With the foundation in place, let’s explore what DBT has to offer.

DBT — Installation and configuration

python3 -m venv dbt-env

Next, activate the environment by running:

dbt-env\Scripts\activate

Next, use command prompt and type in the following to install dbt:

pip install dbt-databricks

After installation, validate that DBT is working. Type the following:

dbt --version

DBT — Connect DBT to DataBricks

Next, create a new project by using the following command:

dbt init

Answer all questions by choosing databricks, adding your host, https_path, no for unity catalog, and default as the schema name. After you’ve configured everything, validate your connection by heading over to your newly created project folder. Next, run dbt debug:

As you can see, everything works as expected. Let’s start building your first transformation logic! In case something went wrong or you want to change your token or configuration, the DBT config file is stored in your profile folder. So, in my case: C:\Users\pstrengholt\.dbt\profiles.yml

DBT — Building slowly changing dimensions

Before we can model or historize any data, you first need to define the existing tables that sit in DataBricks. Create a folder under models called staging. In the staging folder, create a file called sources.yml. Add the following code:

version: 2

sources:
- name: saleslt
schema: saleslt
description: This is the adventureworks database loaded into bronze
tables:
- name: address
- name: customer
- name: customeraddress

For this demo, we’ll only use three tables: address, customer and customeraddress. The sources added, point to the data that already sits in DataBricks. Remember, this is the bronze view that we created, which points to the most recent parquet files in the lake.

Beside models, you also see a snapshots folder. The files that are created in this folder are for generting data that looks back in time. Thus, great for historizing data. Inside this folder, create a new file called: customer.sql

{% snapshot customer_snapshot %}

{{
config(
file_format = "delta",
location_root = "/mnt/silver/customer",

target_schema='snapshots',
invalidate_hard_deletes=True,
unique_key='CustomerId',
strategy='check',
check_cols='all'
)
}}

with source_data as (
select
CustomerId,
NameStyle,
Title,
FirstName,
MiddleName,
LastName,
Suffix,
CompanyName,
SalesPerson,
EmailAddress,
Phone,
PasswordHash,
PasswordSalt
from {{ source('saleslt', 'customer') }}
)
select *
from source_data

{% endsnapshot %}

If you inspect the file you see that we start by define the snapshot section. We provide parameters for using the delta format and the location for storing the data. In our case /mnt/silver/customer. Additionally, we provide information about how the delta must be determined. After that we select all the relevant data from the sales.customer data. You environment should like the screenshot below.

After you’ve created your staging sources and snapshot script, it’s time to head back to the terminal and test. Type the following:

dbt snapshot

Let’s move over to DataBricks to inspect what we did. If you would look under data, you see a newly created database that is called “snapshot”. If everything was successful, you also see a new table called: customer_snapshot.

Scroll down or all the way to the right. You see new columns like dbt_valid_from and dbt_valid_to. These columns are used to track newly created data. Each time when a record changes, it will be added to the table. The dbt_valid_to, in case of older records, will be populated with the date till which the records are valid. All new records will have a empty dbt_valid_to value.

I recommend you to also have a look in your silver container in your data lake. You’ll see a newly created folder customer, which holds files in Delta format.

To complete the historization part, add two more files:

After adding all files, run the snapshot process again. You should have three new tables by now.

DBT — Building integration logic for gold layer

Create a new folder under your models directory called “marts”. Inside create two new files. One is called: dim_customers.sql

{{
config(
materialized = "table",
file_format = "delta",
location_root = "/mnt/gold/customers"
)
}}

with address_snapshot as (
select
AddressID,
AddressLine1,
AddressLine2,
City,
StateProvince,
CountryRegion,
PostalCode
from {{ ref('address_snapshot') }} where dbt_valid_to is null
)

, customeraddress_snapshot as (
select
CustomerId,
AddressId,
AddressType
from {{ref('customeraddress_snapshot')}} where dbt_valid_to is null
)

, customer_snapshot as (
select
CustomerId,
-- Adopted function concat() to concatenate first, middle and lastnames
concat(ifnull(FirstName,' '),' ',ifnull(MiddleName,' '),' ',ifnull(LastName,' ')) as FullName
from {{ref('customer_snapshot')}} where dbt_valid_to is null
)

, transformed as (
select
row_number() over (order by customer_snapshot.customerid) as customer_sk, -- auto-incremental surrogate key
customer_snapshot.CustomerId,
customer_snapshot.fullname,
customeraddress_snapshot.AddressID,
customeraddress_snapshot.AddressType,
address_snapshot.AddressLine1,
address_snapshot.City,
address_snapshot.StateProvince,
address_snapshot.CountryRegion,
address_snapshot.PostalCode
from customer_snapshot
inner join customeraddress_snapshot on customer_snapshot.CustomerId = customeraddress_snapshot.CustomerId
inner join address_snapshot on customeraddress_snapshot.AddressID = address_snapshot.AddressID
)
select *
from transformed

If you inspect the new file, you quickly see we’ll use Delta again. This time we output data to our gold container. You also see that we apply some simple business logic. For example, I concat three columns for building a FullName.

Also create another file called: dim_customers.yml

version: 2

models:
- name: dim_customers
columns:
- name: customer_sk
description: The surrogate key of the customer
tests:
- unique
- not_null

- name: customerid
description: The natural key of the customer
tests:
- not_null
- unique

- name: fullname
description: The customer name. Adopted as customer_fullname when person name is not null.

- name: AddressId
tests:
- not_null
- name: AddressType
- name: AddressLine1
- name: City
- name: StateProvince
- name: CountryRegion
- name: PostalCode

This file holds the schema information of the newly integrated dataset. It contains primary key information and run some tests on what data is unique.

We’re set, so let’s find out if the gold layer can be correctly processed. Head back to the command prompt. This time run:

dbt run

After you completed the run process, go back to your DataBricks workspace and look under data. You’ll find your newly created table called: dim_customer. This table, as you can see below, contains all integrated data.

Well done! You’ve successfully copied data from your Azure SQL and processed this through several stages.

DBT — Documentation and Lineage

dbt docs generate
dbt docs serve

Wait for your browser to open or manually navigate to http://localhost:8080 to see all the documentation from your project. For seeing all lineage, click on the green button below on the right side of your screen. Below an example, of what you created in this tutorial.

Wrapping up

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store