Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
When you perform data quality checks in Microsoft Purview Unified Catalog, you can configure and publish data quality error records for rule exceptions. This setup helps you review and fix these records directly in your managed storage, whether in Microsoft Fabric Lakehouse or Microsoft Azure Data Lake Storage Gen2.
This feature enables Microsoft Purview Data Quality users to identify, analyze, and address data quality issues. It helps ensure data accuracy, completeness, consistency, timeliness, and validity across systems and use cases.
Architecture
You need to set up your managed storage in Microsoft Purview to publish data quality error records for rule exceptions. The Data Governance Administrator for your organization is responsible for configuring the storage details on the Microsoft Purview administration page. The Data Quality Steward can enable the toggle either from the Data Quality Manage console or the data quality overview page at Unified Catalog > Health management > Data quality.
Configuration storage location
- In Unified Catalog, go to Health management > Data quality.
- Select a governance domain from the list to open its details page.
- Select Manage, then select Settings.
- Select Azure Region from the dropdown list of all supported Azure regions.
- After selecting the Azure region, select New to configure storage details.
- Select Storage Type: Fabric or Data Lake Storage Gen2.
- Enter the Location URL.
- If you select Fabric as the storage type, enter the Workspace ID and Lakehouse ID. Grant contributor access to Microsoft Purview Manage Service Identity (MSI) to your Fabric workspace.
- If you select Azure Data Lake Storage Gen2 as the storage type, enter the Location URL. The URL needs to be
AdlsG2 path + "/(container name)":- Go to
portal.azure.com. - Select adlsg2 storage (Home > adlsg2).
- Go to Settings > Endpoints and select the primary endpoint of your data lake storage.
- Grant Storage Blob Data Contributor access to Microsoft Purview Manage Service Identity (MSI) to your Data Lake Storage Gen2 container.
- Go to
- Test the connection.
- Save the configuration.
Note
- If your data products include assets from multiple Azure regions, you must create a region-specific data quality error records folder for each region to meet data residency requirements. Microsoft Purview Data Quality doesn't copy data between Azure regions.
- If you already created a storage location without specifying an Azure region, you must update it to include the region. Data quality failed/error records won't publish until a region is provided.
- If you configure multiple region-specific folders for storing data quality failed/error records, Microsoft Purview Data Quality will automatically route the error records to the correct regional folder during the scan.
Caution
- If you don't have storage and a connection set up to store data quality error records in the asset region (as mentioned in the preceding section), the data quality scan job still succeeds, but error row publishing is skipped.
- After you configure the storage and connection, you can enable the data quality error record feature either by going to Health management > Data quality > Manage > Settings for the selected governance domain, or from the Data Quality overview page for a data asset.
- The published erroneous records must be read for any analyses using the latest versions of Spark (>= 3.xx) on Synapse or Databricks.
- Virtual network support isn't yet available for data quality error records storage.
Data quality issue detection
Select the data asset and configure rules for the critical columns that need a Data Quality measurement. You can use out-of-the-box rules, create custom rules, or use the rule suggestion feature to recommend rules for a data quality scan. For more information, see data quality rules.
Enable error recording
To get error records, you need to enable the capture and storage of error records for every data quality job run by following these steps:
- Follow the instructions to start a data quality scan on a data asset.
- After selecting Run quality scan on the data asset's page, turn on the toggle for Enable publishing of failed rows.
You can also enable error recording for all data assets with data quality rules associated with data products in a governance domain level by following these steps:
- In Unified Catalog, go to Health management > Data quality.
- Select a governance domain from the list to open its details page.
- Select Manage, then select Settings.
- Turn on the toggle for Enable publishing of failed rows.
Set up schedule to run
Set up the schedule to run your data quality job and publish the error records into the configured storage. For more information, see configure and run data quality scans.
Though you can run data quality scans on an ad-hoc basis by selecting Run quality scan on a data asset page, in production scenarios it's likely that the source data is constantly updated. It's best to ensure data quality is regularly monitored in order to detect any issues. Automating the scanning process helps you manage regular updates of quality scans.
Check the error records in the configured storage
- Find the location of the published error rows from the data quality overview page.
- Select View Scan report, in the top-right corner just below the Latest quality score.
- Go to the path shown in your Data Lake Storage Gen2 or Fabric Lakehouse folder, as seen in the following example:
Data Lake Storage Gen2 folder hierarchy
Data Lake Storage Gen2 container (for example, folder name is DEH):
- DataQualityAuditExtract
- Governance domain (BusinessDomain)
- DataProduct
- DataAsset
- RunDate
- RunId
- Purview DQRunId
- Error record file
Read data quality error records from Data Lake Storage Gen2
You can use Synapse or Databricks notebook to read data from Data Lake Storage Gen2 container by using this PySpark script:
#For complete error records of all jobs run on the asset
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path)
display(df)
#For All Runs on a particular Date
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunDate = '2025-08-06'")
display(df)
#For a Specific run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67'")
display(df)
#For a specific rule of the run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' AND _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df)
#For a specific rule across all runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where(" _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df)
#For comparing two runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' OR _purviewDQRunId = 'a508f960-254c-4eb1-9097-03307402475b'")
display(df)
Example dataset:
Read error records from Fabric Lakehouse
- Go to the path shown in your Lakehouse, and browse all fail records published to Fabric Lakehouse.
- Create a shortcut from Delta Parquet to Delta Table. Or you can use a Delta Parquet file to create a dashboard of your data quality error records, as seen in this example:
You can use a notebook to create a shortcut table of failed data asset records. See the notebook script.
Output data model
You can view failed records at the level of a governed data asset. For each new assessment run that succeeds on the data asset, the process adds a new set of failed records for each rule evaluation to the failed records table.
Format: Delta
Output schema: Each row of output contains all the columns of an input data asset row that failed a rule evaluation, along with a set of metadata columns that you can use to identify and analyze the failed row.
Primary key: Composite key of input data asset columns + _purviewDQRunId + _purviewDQRuleId
| Metadata columns | Data type | Description |
|---|---|---|
| _purviewDQRunId | string | A GUID that represents the Run ID of the assessment scan available to the user at the time of run. Also, a Delta partition-column. |
| _purviewDQRunDate | string | The run date in YYYY-MM-DD format. Also, a Delta partition-column. |
| _purviewDQRuleId | string | The quality Rule ID that corresponds to the failed rule. Also, a Delta partition-column. |
| _purviewDQRuleName | string | The Rule Name at the time of job run. |
| _purviewDQRuleType | string | The Rule type (for example: UniqueCheck, NotNull, Regex). |
| _purviewDQGovernanceDomainId | string | The governance domain ID of the data asset that you ran. |
| _purviewDQDataProductId | string | The data product of the data asset that you ran. |
| _purviewDQDataAssetId | string | The data asset ID. |
| _purviewDQRunSubmitTimestamp | string | The exact timestamps of the run submit time in the default UTC time-zone per the ISO format. |
Create a Power BI dashboard for your data quality error records
Use the Business Domain ID to link data products and their associated data assets for reporting data quality error records.
- One business domain can link to multiple data products.
- Each data product can link to multiple data assets.
- Each data asset can have multiple rules.
- Each rule can generate multiple error records.
This image illustrates a data model to create a basic report of your data quality error record:
This image illustrates a sample report created with the data model shown in the preceding image:
Limitations
- 100,000 failed records are published per rule for each run.
- Datasets up to 100 million rows with up to 40 rules have been benchmarked.
- Data Lake Storage Gen2 and Fabric storage in virtual network aren't supported.
- As data is stored in organizations' managed storage, the organization owns role-based access control for the data. Microsoft publishes data quality error rows to an organization's storage based on the organization's consent.
Script to create shortcut
You can automate the creation of table shortcuts from data quality audit extracts failed row data by using this script in a Fabric notebook. After creating the shortcut tables, your error records for new job runs on the asset shortcut table automatically refresh in Fabric.
# NEW Script to automate the creation of table shortcuts from Data Quality audit extracts failed row data.
# Update these three values for your Fabric Workspace ID, Lakehouse ID and Purview BYOC Self-serve store path
workspaceId = "f28dc1c8-360c-4788-9f46-e69853b1c40d" #Example: f28dc1c8-360c-4788-9f46-e69853b1c40d
lakehouseId = "77d6df6b-64ab-4628-985f-9365591a85a8" #Example: 77d6df6b-64ab-4628-985f-9365591a85a8
dataqualityauditPath = "Files/SelfServeFolder/DataQualityAuditExtracts" #Example: "Files/DEH2/DataQualityAuditExtracts"
#Use for lakehouses with Lakehouse-Schema (Public preview feature)
DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/DataQualityAuditExtracts"
#Use for lakehouses without Lakehouse-Schema
#DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables"
import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException
import fnmatch
import re
import os
from collections import deque
SourceUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/{dataqualityauditPath}"
request_headers = {
"Authorization": "Bearer " + mssparkutils.credentials.getToken("pbi"),
"Content-Type": "application/json"
}
print(request_headers)
def is_delta_table(uri: str):
#print("Checking for uri:" + uri)
delta_log_path = os.path.join(uri, "_delta_log")
return mssparkutils.fs.exists(delta_log_path)
def extract_onelake_https_uri_components(uri):
pattern = re.compile(r"abfss://([^@]+)@[^/]+/([^/]+)/(.*)")
match = pattern.search(uri)
if match:
workspace_id, item_id, path = match.groups()
return workspace_id, item_id, path
else:
return None, None, None
def is_valid_onelake_uri(uri: str) -> bool:
workspace_id, item_id, path = extract_onelake_https_uri_components(uri)
if "abfss://" not in uri or workspace_id is None or item_id is None or path is None:
return False
return True
def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):
shortcut_uri = (
f"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}"
)
result = client.get(shortcut_uri).json()
return result
def get_last_path_segment(uri: str):
path = uri.split("/") # Split the entire URI by '/'
return path[-1] if path else None
def create_onelake_shorcut(SourceUri: str, dest_uri: str, result: list):
src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(
SourceUri
)
dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
dest_uri
)
name = get_last_path_segment(SourceUri)
dest_uri_joined = os.path.join(dest_uri, name)
# If the destination path already exists, return without creating shortcut
if mssparkutils.fs.exists(dest_uri_joined):
print(f"Table already exists: {dest_uri_joined}")
result.append(dest_uri_joined)
return None
request_body = {
"name": name,
"path": dest_path,
"target": {
"oneLake": {
"itemId": src_item_id,
"path": src_path,
"workspaceId": src_workspace_id,
}
},
}
#print(request_body)
shortcut_uri = f"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts"
print(f"Creating shortcut: {shortcut_uri}/{name}..")
try:
client.post(shortcut_uri, json=request_body, headers=request_headers)
except FabricHTTPException as e:
print(e)
print(e.error_reason)
return None
return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)
client = fabric.FabricRestClient()
queue = deque([SourceUri])
result = []
while queue:
current_uri = queue.popleft()
#print(current_uri)
if is_delta_table(os.path.join("", current_uri)):
#print(current_uri)
shortcut = create_onelake_shorcut(os.path.join("", current_uri), DestinationShortcutUri, result)
if shortcut is not None:
result.append(shortcut)
continue;
# List subitems in the current folder
subitems = mssparkutils.fs.ls(current_uri)
for item in subitems:
if item.isDir:
queue.append(item.path)
print(f"{len(result)}" + " error record shortcut tables present in the lakehouse: ")
for item in result:
print(item)
Related content
- Data quality for Fabric Lakehouse data
- Data quality for Fabric mirrored data sources
- Data quality for Fabric shortcut data sources
- Data quality for Azure Synapse serverless and data warehouses
- Data quality for Azure Databricks Unity Catalog
- Data quality for Snowflake data sources
- Data quality for Google BigQuery
- Data quality native support for Iceberg data