Processing data within modern data platforms, for example data lakehouses, is a complicated task. The variety and multitude of sources quickly make data pipelines chaotic and difficult to handle. A solution for this problem is to follow the mantra of ‘code once use often’ by making your pipelines dynamic and configurable.
With this blogpost I want to demonstrate how Azure Synapse Analytics and Azure Purview can be integrated together to build a framework for intelligently processing data. Azure Synapse is positioned as the analytics platform and Azure Purview as the foundation for metadata management. This blogpost will be a long read, but at the end you will see how Spark is used for data processing, data quality validation and building up historical datasets (slowly changing dimensions). Purview in this respect will hold all metadata for dynamically driving these data pipelines.
To get started, you must have an Azure Purview account and the following services configured:
- Azure Purview, including a service principal account for making API REST calls: https://piethein.medium.com/use-azure-purviews-rest-apis-for-creating-custom-lineage-ad8efacc6230
- Azure Synapse, including a Spark Pool.
- Azure SQL Server, including the AdventureWorks sample database.
Configuration — Create Pipeline using Copy Data Tool
The first step when designing a data pipeline is using a connector for collecting data from your source systems. We will make use of Azure Synapse Pipelines because it supports a wide range of databases and systems. In the Integrate section of Azure Synapse, the menu entry Copy Data Tool is the starting point for designing your first pipeline. We will use Azure SQL Server hosting the AdventureWorks sample database as the source for our project.
You can use the “New connection” option at the right. If you have granted yourself access and the connection works as expected, you should be able to see all tables. Select all SalesLT.* tables from the AdventureWorks database, and hit next.
In the next pane you need to configure the output location for the exported data. For the demo you should use the default ADLS2 storage account that comes with Synapse. We will use .CSV as the file type because it is a common file format. Note that modern data pipelines typically handle different file formats and connection types. Also note that I added the database name to the location folder path. A typical data pipeline uses either containers or folders to segregate sources from other source systems.
After completing the wizard, go to the Pipeline details using the left pipeline section. At the top hit Add Trigger -> Trigger Now.
If everything works as expected you should see the output data on the location you selected. This simple pipeline will be starting point for our demo. In the next sections we will extend it with more intelligence.
Azure Purview: Registration of our source metadata
For improving our data pipelines we will take advantage of Azure Purview for storing metadata to catalog incoming data. To achieve this, you first need to create a new type definitions supporting Synapse Pipelines and Notebooks. This is needed, because custom attributes for collections aren’t supported yet. A future release is expected to solve this.
For creating the custom type definitions, I’m using Postman to submit data to the Apache Atlas API ecosystem of Azure Purview. You need your service principal, authenticate and submit your token using the endpoint below:
After you’ve submitted the types using the Atlas REST API for Purview, you can catalog your database and tables. For registering the AdventureWorks database and tables you can use the endpoint and body below:
It is important that the database and table name matches the configuration from AdventureWorks, which include columns names and data types like STRING, INT, DATE, and so on. These will be used for data quality validation at a later stage. Also pay attention to the additional parameters. I defined the Primary Key of the dataset and added an operation type Merge, indicating that we will build up a slowly changing dimension by comparing current and previous incoming datasets using the primary key. Modern data pipelines typically support different processing types like merge, append, overwrite, and so on.
If everything works as expected you should be able to see your data landing zone in Purview under Custom source types.
Azure Synapse: Spark Pools Configuration
For processing our data we will leverage Apache Spark pools: open-source big data compute capabilities. The benefit of this framework is that you can easily extend it by loading in additional modules. In this demo we will convert the CSV to a DELTA table, which can be directly queried. This table also will store all history.
For our demo we will make use of greatexpectations: an open standard for data quality. I’m using the Wheel package format, which can be downloaded from the following location: https://pypi.org/project/great-expectations/#files. The package can be uploaded to Synapse using the workspace packages section, see below.
Next you need to modify the Spark Pool configuration. Select the pool and adjust the configuration using the Packages option. Ensure that great_expectations is selected.
Next, you need to extend the Spark pool with some additional packages by adjusting the configuration of required packages to be loaded at startup. I’m using jmespath, which is a query framework for easily reading JSON files. PyApacheAtlas is used as the SDK for Azure Purview. To use these two packages you must create a requirements.txt using the following content:
Upload the requirements.txt file using the same Packages options dialog. See below:
In this demo I’m blending Python artifacts and packages that are installed every time a Spark instance is created. If your environment is hardened, supported by self-created packages, or you want don’t want to be confronted with unexpected changes, I encourage to upload artifacts. If you want to install the latest versions of all packages, each time a Spark instance is created, I encourage you to use the requirements.txt approach.
Code Once Use Often: Dynamic Spark Notebook
The next step is that we will start building a configurable Apache Spark Notebook which is initialized with details of the source, file type, transformation type, target destination, and so on. The script will be invoked by Synapse Pipelines and receives its arguments from the pipelines we’re using to export all data. At the same time this script will read its metadata from Azure Purview for determining what data quality controls must be applied, before transforming any data. Lastly, the newly loaded will be processed into a slowly changing dimension, which retains all history. Note that I only added a small number of controls. For scaling up further you could extend it with additional validations. For example, you might want to check for data ownership or validate a business term for each technical data attribute is present. If not, you might want to reject the incoming data.
For our demo pipeline you create a new notebook and start with a parameter cell, defining the paths for storing data and credentials for Purview. The cw_database and cw_table are for this demo pre-populated, but will be overwritten when arguments are correctly set with the database and table names.
The next code block imports the required libraries and reads the CSV data from the landing zone location. Further improvements could be to parameterize the location and support additional file types.
The following code block queries Azure Purview using the database and table name. It reads the operation type, fetches the primary key from the parameters and schema metadata information. The script will exit when no metadata is found.
A future improvement could be to also read the classifications and apply these within your pipeline. For example, when you want to adhere to GDPR, you could think of dynamically masking incoming sensitive data.
The next code block performs a number of simple data quality checks using the great expectations package. In this demo, metadata from Purview is used for validating the correctness of all data. After validation, the data frame is converted using the correct schema and stored as a DELTA table. I would like to emphasize that the validations are simple and for demonstration purposes.
The following code block will take the incoming data as new data, when no data is present yet. This code will only be executed when a new source is onboarded.
The last code block reads the previous data, combines it with the newly created data, compares all data using a hash and adds the changed data to a DELTA table.
After the notebook has been saved, I encourage you to validate the notebook using the AdventureWorks data. When everything works correctly, you should see the changedzone and validatedzone folders within the default storage account of your Synapse workspace. Each folder should hold another folder using the database name: AdventureWorks. Inside each database folder there should be table folder: SalesLT.
Adjusting the pipeline
If everything works as expected we can modify the AdventureWorks pipeline. Go back to the original copy data tool pipeline. Open the ForEachTable step and add an additional Notebook step. Select the newly created Notebook using the Settings pane at the bottom.
Next we will adjust the pipeline by providing parameters, required by the script. Go to Settings and add two new base parameters:
- cw_database holding the database name
- cw_table holding the dynamically provided schema and table names:
If everything is configured you can trigger the pipeline again. The SalesLT table should be processed correctly. All other tables will fail, because metadata in Purview is missing: “message”: “SystemExit: No metadata has been found in Purview Traceback (most recent call last): SystemExit: No metadata has been found in Purview”
Examine the results
When the processing script works as expected it should also merge and version rows using the columns effectiveDate and endDate. Below you see an example of data that has been modified and correctly processed. You can directly query these Delta tables using a Serverless SQL Pool.
Further steps for improvement
The script from this demo could act as a boilerplate template for your repetitive processing steps across different data pipelines. You can implement it once for onboarding different source systems and reuse it. This will reduce your maintenance and increase delivery speed. For scaling up further I encourage you to look at the following recommendations:
- Use YAML templates, instead of passing in arguments.
- Store the configuration of your pipelines in version control, such as Git.
- Support multiple file types, such as Parquet, TXT, CSV, XML, JSON, etc.
- Support different processing types like merge, append and overwrite.
- Exclude (technical) columns when performing a comparison with previous delivered data, so you only validate on the actual business data.
- Dynamically load additional hooks or scripts for database- or table-specific processing, such as additional data quality checks or enrichments for data consumption, such as analytics and reporting.
- Extend the framework with data lineage creation, controls for classifications, missing data ownership or definitions.
- Automatic deployment of (secure) views for consumption, for example, in Synapse Serverless SQL Pools.
Data pipelines are the backbone of any data platform. They ingest, transform, and move data and enable teams to distribute and consume data at large. For scalability I encourage you to design a framework that embraces automation and metadata management. Azure Synapse Analytics and Azure Purview, as you’ve experienced, can be great step towards an integrated solution for managing all your data pipelines. They can be easily integrated and complement each other.