Share via


Data quality error records

When you perform data quality checks in Microsoft Purview Unified Catalog, you can configure and publish data quality error records for rule exceptions. This setup helps you review and fix these records directly in your managed storage, whether in Microsoft Fabric Lakehouse or Microsoft Azure Data Lake Storage Gen2.

This feature enables Microsoft Purview Data Quality users to identify, analyze, and address data quality issues. It helps ensure data accuracy, completeness, consistency, timeliness, and validity across systems and use cases.

Architecture

You need to set up your managed storage in Microsoft Purview to publish data quality error records for rule exceptions. The Data Governance Administrator for your organization is responsible for configuring the storage details on the Microsoft Purview administration page. The Data Quality Steward can enable the toggle either from the Data Quality Manage console or the data quality overview page at Unified Catalog > Health management > Data quality.

dq error record architecture

Configuration storage location

  • In Unified Catalog, go to Health management > Data quality.
  • Select a governance domain from the list to open its details page.
  • Select Manage, then select Settings.
  • Select Azure Region from the dropdown list of all supported Azure regions.
  • After selecting the Azure region, select New to configure storage details.
  • Select Storage Type: Fabric or Data Lake Storage Gen2.
  • Enter the Location URL.
  • If you select Fabric as the storage type, enter the Workspace ID and Lakehouse ID. Grant contributor access to Microsoft Purview Manage Service Identity (MSI) to your Fabric workspace.
  • If you select Azure Data Lake Storage Gen2 as the storage type, enter the Location URL. The URL needs to be AdlsG2 path + "/(container name)":
    • Go to portal.azure.com.
    • Select adlsg2 storage (Home > adlsg2).
    • Go to Settings > Endpoints and select the primary endpoint of your data lake storage.
      • Grant Storage Blob Data Contributor access to Microsoft Purview Manage Service Identity (MSI) to your Data Lake Storage Gen2 container.
  • Test the connection.
  • Save the configuration.

Note

  • If your data products include assets from multiple Azure regions, you must create a region-specific data quality error records folder for each region to meet data residency requirements. Microsoft Purview Data Quality doesn't copy data between Azure regions.
  • If you already created a storage location without specifying an Azure region, you must update it to include the region. Data quality failed/error records won't publish until a region is provided.
  • If you configure multiple region-specific folders for storing data quality failed/error records, Microsoft Purview Data Quality will automatically route the error records to the correct regional folder during the scan.

Data quality error record setting

Caution

  • If you don't have storage and a connection set up to store data quality error records in the asset region (as mentioned in the preceding section), the data quality scan job still succeeds, but error row publishing is skipped.
  • After you configure the storage and connection, you can enable the data quality error record feature either by going to Health management > Data quality > Manage > Settings for the selected governance domain, or from the Data Quality overview page for a data asset.
  • The published erroneous records must be read for any analyses using the latest versions of Spark (>= 3.xx) on Synapse or Databricks.
  • Virtual network support isn't yet available for data quality error records storage.

Data quality issue detection

Select the data asset and configure rules for the critical columns that need a Data Quality measurement. You can use out-of-the-box rules, create custom rules, or use the rule suggestion feature to recommend rules for a data quality scan. For more information, see data quality rules.

Enable error recording

To get error records, you need to enable the capture and storage of error records for every data quality job run by following these steps:

  1. Follow the instructions to start a data quality scan on a data asset.
  2. After selecting Run quality scan on the data asset's page, turn on the toggle for Enable publishing of failed rows.

activate error record feature from data asset page

You can also enable error recording for all data assets with data quality rules associated with data products in a governance domain level by following these steps:

  • In Unified Catalog, go to Health management > Data quality.
  • Select a governance domain from the list to open its details page.
  • Select Manage, then select Settings.
  • Turn on the toggle for Enable publishing of failed rows.

Set up schedule to run

Set up the schedule to run your data quality job and publish the error records into the configured storage. For more information, see configure and run data quality scans.

Though you can run data quality scans on an ad-hoc basis by selecting Run quality scan on a data asset page, in production scenarios it's likely that the source data is constantly updated. It's best to ensure data quality is regularly monitored in order to detect any issues. Automating the scanning process helps you manage regular updates of quality scans.

Check the error records in the configured storage

  1. Find the location of the published error rows from the data quality overview page.
  2. Select View Scan report, in the top-right corner just below the Latest quality score.
  3. Go to the path shown in your Data Lake Storage Gen2 or Fabric Lakehouse folder, as seen in the following example:

error records in adlsG2

Data Lake Storage Gen2 folder hierarchy

Data Lake Storage Gen2 container (for example, folder name is DEH):

  1. DataQualityAuditExtract
  2. Governance domain (BusinessDomain)
  3. DataProduct
  4. DataAsset
  5. RunDate
  6. RunId
  7. Purview DQRunId
  8. Error record file

Read data quality error records from Data Lake Storage Gen2

You can use Synapse or Databricks notebook to read data from Data Lake Storage Gen2 container by using this PySpark script:


#For complete error records of all jobs run on the asset
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path)
display(df)

#For All Runs on a particular Date
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunDate = '2025-08-06'")
display(df)

#For a Specific run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67'")
display(df)

#For a specific rule of the run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' AND _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df) 

#For a specific rule across all runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where(" _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df) 

#For comparing two runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' OR _purviewDQRunId = 'a508f960-254c-4eb1-9097-03307402475b'")
display(df) 


Example dataset:

error records sample

Read error records from Fabric Lakehouse

  1. Go to the path shown in your Lakehouse, and browse all fail records published to Fabric Lakehouse.
  2. Create a shortcut from Delta Parquet to Delta Table. Or you can use a Delta Parquet file to create a dashboard of your data quality error records, as seen in this example:

error records published in Lakehouse

You can use a notebook to create a shortcut table of failed data asset records. See the notebook script.

Output data model

You can view failed records at the level of a governed data asset. For each new assessment run that succeeds on the data asset, the process adds a new set of failed records for each rule evaluation to the failed records table.

Format: Delta

Output schema: Each row of output contains all the columns of an input data asset row that failed a rule evaluation, along with a set of metadata columns that you can use to identify and analyze the failed row.

Primary key: Composite key of input data asset columns + _purviewDQRunId + _purviewDQRuleId

Metadata columns Data type Description
_purviewDQRunId string A GUID that represents the Run ID of the assessment scan available to the user at the time of run. Also, a Delta partition-column.
_purviewDQRunDate string The run date in YYYY-MM-DD format. Also, a Delta partition-column.
_purviewDQRuleId string The quality Rule ID that corresponds to the failed rule. Also, a Delta partition-column.
_purviewDQRuleName string The Rule Name at the time of job run.
_purviewDQRuleType string The Rule type (for example: UniqueCheck, NotNull, Regex).
_purviewDQGovernanceDomainId string The governance domain ID of the data asset that you ran.
_purviewDQDataProductId string The data product of the data asset that you ran.
_purviewDQDataAssetId string The data asset ID.
_purviewDQRunSubmitTimestamp string The exact timestamps of the run submit time in the default UTC time-zone per the ISO format.

Create a Power BI dashboard for your data quality error records

Use the Business Domain ID to link data products and their associated data assets for reporting data quality error records.

  • One business domain can link to multiple data products.
  • Each data product can link to multiple data assets.
  • Each data asset can have multiple rules.
  • Each rule can generate multiple error records.

This image illustrates a data model to create a basic report of your data quality error record:

error records reporting datamodel

This image illustrates a sample report created with the data model shown in the preceding image:

error records sample report

Limitations

  • 100,000 failed records are published per rule for each run.
  • Datasets up to 100 million rows with up to 40 rules have been benchmarked.
  • Data Lake Storage Gen2 and Fabric storage in virtual network aren't supported.
  • As data is stored in organizations' managed storage, the organization owns role-based access control for the data. Microsoft publishes data quality error rows to an organization's storage based on the organization's consent.

Script to create shortcut

You can automate the creation of table shortcuts from data quality audit extracts failed row data by using this script in a Fabric notebook. After creating the shortcut tables, your error records for new job runs on the asset shortcut table automatically refresh in Fabric.


# NEW Script to automate the creation of table shortcuts from Data Quality audit extracts failed row data.

# Update these three values for your Fabric Workspace ID, Lakehouse ID and Purview BYOC Self-serve store path
workspaceId = "f28dc1c8-360c-4788-9f46-e69853b1c40d" #Example: f28dc1c8-360c-4788-9f46-e69853b1c40d
lakehouseId = "77d6df6b-64ab-4628-985f-9365591a85a8" #Example: 77d6df6b-64ab-4628-985f-9365591a85a8
dataqualityauditPath = "Files/SelfServeFolder/DataQualityAuditExtracts" #Example: "Files/DEH2/DataQualityAuditExtracts"

#Use for lakehouses with Lakehouse-Schema (Public preview feature)
DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/DataQualityAuditExtracts" 

#Use for lakehouses without Lakehouse-Schema
#DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables" 

import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException
import fnmatch
import re
import os
from collections import deque

SourceUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/{dataqualityauditPath}"

request_headers = {
            "Authorization": "Bearer " + mssparkutils.credentials.getToken("pbi"),
            "Content-Type": "application/json"
        }
print(request_headers)


def is_delta_table(uri: str):
    #print("Checking for uri:" + uri)
    delta_log_path = os.path.join(uri, "_delta_log")
    return mssparkutils.fs.exists(delta_log_path)

def extract_onelake_https_uri_components(uri):
    pattern = re.compile(r"abfss://([^@]+)@[^/]+/([^/]+)/(.*)")
    match = pattern.search(uri)
    if match:
        workspace_id, item_id, path = match.groups()
        return workspace_id, item_id, path
    else:
        return None, None, None

def is_valid_onelake_uri(uri: str) -> bool:
    workspace_id, item_id, path = extract_onelake_https_uri_components(uri)
    if "abfss://" not in uri or workspace_id is None or item_id is None or path is None:
        return False
    return True

def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):
    shortcut_uri = (
        f"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}"
    )
    result = client.get(shortcut_uri).json()
    return result

def get_last_path_segment(uri: str):
    path = uri.split("/")  # Split the entire URI by '/'
    return path[-1] if path else None

def create_onelake_shorcut(SourceUri: str, dest_uri: str, result: list):
    src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(
        SourceUri
    )

    dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
        dest_uri
    )

    name = get_last_path_segment(SourceUri)
    dest_uri_joined = os.path.join(dest_uri, name)

    # If the destination path already exists, return without creating shortcut
    if mssparkutils.fs.exists(dest_uri_joined):
        print(f"Table already exists: {dest_uri_joined}")
        result.append(dest_uri_joined)
        return None

    request_body = {
        "name": name,
        "path": dest_path,
        "target": {
            "oneLake": {
                "itemId": src_item_id,
                "path": src_path,
                "workspaceId": src_workspace_id,
            }
        },
    }
    #print(request_body)

    shortcut_uri = f"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts"
    print(f"Creating shortcut: {shortcut_uri}/{name}..")
    try:
        client.post(shortcut_uri, json=request_body, headers=request_headers)
    except FabricHTTPException as e:
        print(e)
        print(e.error_reason)
        return None

    return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)


client = fabric.FabricRestClient()

queue = deque([SourceUri])
result = []

while queue:
    current_uri = queue.popleft()

    #print(current_uri)
    
    if  is_delta_table(os.path.join("", current_uri)):
        #print(current_uri)
        shortcut = create_onelake_shorcut(os.path.join("", current_uri), DestinationShortcutUri, result)
        if shortcut is not None:
            result.append(shortcut)
        continue;
    
    # List subitems in the current folder
    subitems = mssparkutils.fs.ls(current_uri)   
    for item in subitems:
        if item.isDir:
            queue.append(item.path)   


print(f"{len(result)}" + " error record shortcut tables present in the lakehouse: ")
for item in result:
    print(item)