Modern Data Pipelines with Azure Synapse Analytics and Azure Purview

Processing data within modern data platforms, for example data lakehouses, is a complicated task. The variety and multitude of sources quickly make data pipelines chaotic and difficult to handle. A solution for this problem is to follow the mantra of ‘code once use often’ by making your pipelines dynamic and configurable.

With this blogpost I want to demonstrate how Azure Synapse Analytics and Azure Purview can be integrated together to build a framework for intelligently processing data. Azure Synapse is positioned as the analytics platform and Azure Purview as the foundation for metadata management. This blogpost will be a long read, but at the end you will see how Spark is used for data processing, data quality validation and building up historical datasets (slowly changing dimensions). Purview in this respect will hold all metadata for dynamically driving these data pipelines.


Configuration — Create Pipeline using Copy Data Tool

You can use the “New connection” option at the right. If you have granted yourself access and the connection works as expected, you should be able to see all tables. Select all SalesLT.* tables from the AdventureWorks database, and hit next.

In the next pane you need to configure the output location for the exported data. For the demo you should use the default ADLS2 storage account that comes with Synapse. We will use .CSV as the file type because it is a common file format. Note that modern data pipelines typically handle different file formats and connection types. Also note that I added the database name to the location folder path. A typical data pipeline uses either containers or folders to segregate sources from other source systems.

After completing the wizard, go to the Pipeline details using the left pipeline section. At the top hit Add Trigger -> Trigger Now.

If everything works as expected you should see the output data on the location you selected. This simple pipeline will be starting point for our demo. In the next sections we will extend it with more intelligence.

Azure Purview: Registration of our source metadata

For creating the custom type definitions, I’m using Postman to submit data to the Apache Atlas API ecosystem of Azure Purview. You need your service principal, authenticate and submit your token using the endpoint below:

POST https://{{catalog_end_point}}/api/atlas/v2/types/typedefs

After you’ve submitted the types using the Atlas REST API for Purview, you can catalog your database and tables. For registering the AdventureWorks database and tables you can use the endpoint and body below:

POST https://{{catalog_end_point}}/api/atlas/v2/entity/

It is important that the database and table name matches the configuration from AdventureWorks, which include columns names and data types like STRING, INT, DATE, and so on. These will be used for data quality validation at a later stage. Also pay attention to the additional parameters. I defined the Primary Key of the dataset and added an operation type Merge, indicating that we will build up a slowly changing dimension by comparing current and previous incoming datasets using the primary key. Modern data pipelines typically support different processing types like merge, append, overwrite, and so on.

If everything works as expected you should be able to see your data landing zone in Purview under Custom source types.

Azure Synapse: Spark Pools Configuration

Python packages

Next you need to modify the Spark Pool configuration. Select the pool and adjust the configuration using the Packages option. Ensure that great_expectations is selected.

Next, you need to extend the Spark pool with some additional packages by adjusting the configuration of required packages to be loaded at startup. I’m using jmespath, which is a query framework for easily reading JSON files. PyApacheAtlas is used as the SDK for Azure Purview. To use these two packages you must create a requirements.txt using the following content:


Upload the requirements.txt file using the same Packages options dialog. See below:

In this demo I’m blending Python artifacts and packages that are installed every time a Spark instance is created. If your environment is hardened, supported by self-created packages, or you want don’t want to be confronted with unexpected changes, I encourage to upload artifacts. If you want to install the latest versions of all packages, each time a Spark instance is created, I encourage you to use the requirements.txt approach.

Code Once Use Often: Dynamic Spark Notebook

Spark notebook

The next code block imports the required libraries and reads the CSV data from the landing zone location. Further improvements could be to parameterize the location and support additional file types.

The following code block queries Azure Purview using the database and table name. It reads the operation type, fetches the primary key from the parameters and schema metadata information. The script will exit when no metadata is found.

A future improvement could be to also read the classifications and apply these within your pipeline. For example, when you want to adhere to GDPR, you could think of dynamically masking incoming sensitive data.

The next code block performs a number of simple data quality checks using the great expectations package. In this demo, metadata from Purview is used for validating the correctness of all data. After validation, the data frame is converted using the correct schema and stored as a DELTA table. I would like to emphasize that the validations are simple and for demonstration purposes.

The following code block will take the incoming data as new data, when no data is present yet. This code will only be executed when a new source is onboarded.

The last code block reads the previous data, combines it with the newly created data, compares all data using a hash and adds the changed data to a DELTA table.

After the notebook has been saved, I encourage you to validate the notebook using the AdventureWorks data. When everything works correctly, you should see the changedzone and validatedzone folders within the default storage account of your Synapse workspace. Each folder should hold another folder using the database name: AdventureWorks. Inside each database folder there should be table folder: SalesLT.

Adjusting the pipeline

Next we will adjust the pipeline by providing parameters, required by the script. Go to Settings and add two new base parameters:

  1. cw_database holding the database name
  2. cw_table holding the dynamically provided schema and table names:

If everything is configured you can trigger the pipeline again. The SalesLT table should be processed correctly. All other tables will fail, because metadata in Purview is missing: “message”: “SystemExit: No metadata has been found in Purview Traceback (most recent call last): SystemExit: No metadata has been found in Purview”

Examine the results

Further steps for improvement

  • Use YAML templates, instead of passing in arguments.
  • Store the configuration of your pipelines in version control, such as Git.
  • Support multiple file types, such as Parquet, TXT, CSV, XML, JSON, etc.
  • Support different processing types like merge, append and overwrite.
  • Exclude (technical) columns when performing a comparison with previous delivered data, so you only validate on the actual business data.
  • Dynamically load additional hooks or scripts for database- or table-specific processing, such as additional data quality checks or enrichments for data consumption, such as analytics and reporting.
  • Extend the framework with data lineage creation, controls for classifications, missing data ownership or definitions.
  • Automatic deployment of (secure) views for consumption, for example, in Synapse Serverless SQL Pools.





Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store