次の方法で共有


データ品質エラー レコード

Microsoft Purview 統合カタログでデータ品質チェックを実行すると、ルールの例外に対してデータ品質エラー レコードを構成して発行できます。 このセットアップは、Microsoft Fabric Lakehouse または Microsoft Azure Data Lake Storage Gen2のいずれであっても、マネージド ストレージ内のこれらのレコードを直接確認して修正するのに役立ちます。

この機能を使用すると、Microsoft Purview データ品質ユーザーはデータ品質の問題を特定、分析、対処できます。 これは、システムとユース ケース全体でデータの正確性、完全性、一貫性、タイムライン、有効性を確保するのに役立ちます。

アーキテクチャ

ルール例外のデータ品質エラー レコードを公開するには、Microsoft Purview でマネージド ストレージを設定する必要があります。 organizationのデータ ガバナンス管理者は、Microsoft Purview 管理ページでストレージの詳細を構成する役割を担います。 Data Quality Steward では、Data Quality Manage コンソールまたはデータ品質の概要ページの 統合カタログ>Health Management>Data quality の切り替えを有効にできます。

dq エラー レコード アーキテクチャ

構成ストレージの場所

  • 統合カタログで、[正常性管理>データ品質] に移動します。
  • 一覧からガバナンス ドメインを選択して詳細ページを開きます。
  • [ 管理] を選択し、[ 設定] を選択します。
  • サポートされているすべてのAzureリージョンのドロップダウン リストから [Azure リージョン] を選択します。
  • Azure リージョンを選択したら、[新規] を選択してストレージの詳細を構成します。
  • [ストレージの種類: ファブリック] または [Data Lake Storage Gen2] を選択します。
  • 場所 URL を入力します
  • ストレージの種類として [Fabric ] を選択した場合は、[ ワークスペース ID ] と [ Lakehouse ID] を入力します。 Microsoft Purview Manage Service Identity (MSI) への共同作成者アクセスを Fabric ワークスペースに付与します。
  • ストレージの種類として [Azure Data Lake Storage Gen2] を選択した場合は、次の詳細を入力します。
    • サブスクリプションAzure追加します。 Azureサブスクリプション名を見つけるには、[portal.azure.com] に移動します。
    • リソースの追加。 リソース名を見つけるには、[ホーム] を選択し>リソースの一覧から [リソース] を選択します。
    • コンテナー名を追加します。 コンテナー名を見つけるには、 の概要ページから [Data Lake Storage] を選択します。portal.azure.com
    • フォルダー パスを追加します。 これは省略可能な情報です。
    • Microsoft Purview Manage Service Identity (MSI) へのストレージ BLOB データ共同作成者のアクセス権を、Data Lake Storage Gen2 コンテナーに付与します。
  • 接続をテストします。
  • 構成を保存します。
  • 入力した内容を失わないよう、必要な情報がすべてない場合は 、下書きとして保存 できます。

コンピューティングは、プライベート エンドポイントまたは仮想ネットワーク ストレージのセットアップの前提条件です。 プライベート エンドポイント承認要求の場合は、[ マネージド V-Net を有効にする ] チェック ボックスをオンにします。 承認要求がAzure portalに送信され、仮想ネットワーク アクセスを有効にするには承認する必要があります。 このプロセスは、データ品質接続の設定に似ています。 コンピューティングは、ストレージ アカウントと同じAzure リージョンにデプロイする必要があります。

仮想ネットワークまたはプライベート リンクで実行するように Fabric テナントが構成されている場合は、[ストレージの構成] ページで [マネージド V-NET を有効にする] チェック ボックスをオンにする必要があります。 次に、次に示すようにPrivate Linkリソース ID を追加する必要があります。

/subscriptions/07d669d6-83f2-4f15-8b5b 4a4b31a7050e/resourceGroups/pdgbugbashfabricvnet/providers/Microsoft.PowerBI/priva teLinkServicesForPowerBI/fabricvnetpl。

仮想ネットワークのコンピューティングを設定するには、「マネージド仮想ネットワーク のデータ品質を設定する」を参照してください。 Fabric テナントのプライベート リンクを設定して使用するには、データ品質の仮想ネットワーク接続とコンピューティング割り当てを構成する前に、 Fabric テナントのプライベート リンクを設定 する手順に従います。

注:

  • データ製品に複数のAzureリージョンの資産が含まれている場合は、データ所在地の要件を満たすために、リージョン固有のデータ品質エラー レコード フォルダーをリージョンごとに作成する必要があります。 Microsoft Purview データ品質は、Azure リージョン間でデータをコピーしません。
  • Azureリージョンを指定せずにストレージの場所を既に作成している場合は、リージョンを含むように更新する必要があります。 データ品質の失敗/エラー レコードは、リージョンが指定されるまで発行されません。
  • データ品質の失敗/エラー レコードを格納するように複数のリージョン固有のフォルダーを構成した場合、Microsoft Purview データ品質はスキャン中にエラー レコードを正しいリージョン フォルダーに自動的にルーティングします。
  • 仮想ネットワークを使用している場合は、接続をテストできません。 [ テスト接続 ] タブが無効になっています。
  • 仮想ネットワーク接続でもジョブを実行している場合にのみ、仮想ネットワークの背後でエラー シンクを使用できます。 そうでない場合、エラー行の発行はスキップされます。

非仮想ネットワーク ストレージのデータ品質エラー レコードのストレージ設定 - Fabric: データ品質エラー レコード設定

仮想ストレージのデータ品質エラー レコードストレージ設定 - Fabric: 仮想ネットワーク ストレージのデータ品質エラー レコード設定

非仮想ストレージのデータ品質エラー レコードのストレージ設定 - AdlsG2: データ品質エラー レコードの設定 Azure Data Lake Storage Gen2

仮想ネットワーク ストレージのデータ品質エラー レコードストレージ設定 - AdlsG2: 仮想ネットワーク ストレージのデータ品質エラー レコード設定 Azure Data Lake Storage Gen2

注意

  • ストレージがなく、前のセクションで説明したように、資産リージョンにデータ品質エラー レコードを格納するように接続が設定されていない場合、データ品質スキャン ジョブは引き続き成功しますが、エラー行の発行はスキップされます。
  • ストレージと接続を構成したら、選択したガバナンス ドメインの 正常性管理>Data quality>Manage>Settings に移動するか、データ資産の [データ品質の概要] ページからデータ品質エラー レコード機能を有効にすることができます。
  • Synapse または Databricks で最新バージョンの Spark (>= 3.xx) を使用する分析では、発行された誤ったレコードを読み取る必要があります。

データ品質の問題の検出

データ資産を選択し、データ品質測定が必要な重要な列のルールを構成します。 既定のルールを使用したり、カスタム ルールを作成したり、ルール提案機能を使用してデータ品質スキャンのルールを推奨したりできます。 詳細については、「 データ品質ルール」を参照してください。

エラー記録を有効にする

エラー レコードを取得するには、次の手順に従って、すべてのデータ品質ジョブのエラー レコードのキャプチャとストレージを有効にする必要があります。

  1. 指示に従って、 データ資産のデータ品質スキャンを開始します。
  2. データ資産のページで [品質スキャンの実行 ] を選択した後、[ 失敗した行の発行を有効にする] のトグルをオンにします

データ資産ページからエラー レコード機能をアクティブ化する

また、次の手順に従って、 ガバナンス ドメイン レベルのデータ製品に関連付けられたデータ品質ルールを使用して、すべてのデータ資産に対してエラー記録を有効にすることもできます。

  • 統合カタログで、[正常性管理>データ品質] に移動します。
  • 一覧からガバナンス ドメインを選択して詳細ページを開きます。
  • [ 管理] を選択し、[ 設定] を選択します。
  • [ 失敗した行の発行を有効にする] のトグルをオンにします。

実行するスケジュールを設定する

データ品質ジョブを実行し、構成されたストレージにエラー レコードを発行するようにスケジュールを設定します。 詳細については、「 データ品質スキャンを構成して実行する」を参照してください。

データ資産ページで [ 品質スキャンの実行 ] を選択することで、データ品質スキャンをアドホックに実行できますが、運用シナリオでは、ソース データが常に更新される可能性があります。 問題を検出するために、データ品質を定期的に監視することをお勧めします。 スキャン プロセスを自動化すると、品質スキャンの定期的な更新を管理できます。

構成されたストレージ内のエラー レコードを確認する

  1. 公開されたエラー行の場所は、データ品質の概要ページから見つけます。
  2. [最新の品質スコア] のすぐ下の右上隅にある [スキャン レポートの表示] を選択します。
  3. 次の例に示すように、Data Lake Storage Gen2または Fabric Lakehouse フォルダーに表示されているパスに移動します。

adlsG2 のエラー レコード

フォルダー階層のData Lake Storage Gen2

コンテナー Data Lake Storage Gen2 (たとえば、フォルダー名は DEH)。

  1. DataQualityAuditExtract
  2. ガバナンス ドメイン (BusinessDomain)
  3. DataProduct
  4. DataAsset
  5. RunDate
  6. RunId
  7. Purview DQRunId
  8. エラー レコード ファイル

Data Lake Storage Gen2からデータ品質エラー レコードを読み取る

Synapse または Databricks ノートブックを使用して、次の PySpark スクリプトを使用してData Lake Storage Gen2 コンテナーからデータを読み取ることができます。


#For complete error records of all jobs run on the asset
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path)
display(df)

#For All Runs on a particular Date
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunDate = '2025-08-06'")
display(df)

#For a Specific run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67'")
display(df)

#For a specific rule of the run
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' AND _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df) 

#For a specific rule across all runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where(" _purviewDQRuleId = '32d57f8d-87dc-4862-90c9-c3bcad1051ad'")
display(df) 

#For comparing two runs
path = "abfss://TestWSPDGBB@onelake.dfs.fabric.microsoft.com/TestSelfServeLakehouse.Lakehouse/Files/SelfServeFolder/DataQualityAuditExtracts/BusinessDomain_577b8e54-e534-478d-bbc6-19a3221fc71e/DataProduct_a04c82a2-2372-4b9e-9e0a-4dfd8959ee85/DataAsset_4b2b2644-c94a-447f-9b2e-32961ac0170b"
df = spark.read.format("delta").load(path).where("_purviewDQRunId = '647b8a8f-7140-4158-9914-9c2409a5ec67' OR _purviewDQRunId = 'a508f960-254c-4eb1-9097-03307402475b'")
display(df) 


データセットの例:

エラー レコードのサンプル

Fabric Lakehouse からエラー レコードを読み取る

  1. Lakehouse に表示されているパスに移動し、Fabric Lakehouse に発行されたすべての失敗レコードを参照します。
  2. Delta Parquet から Delta Table へのショートカットを作成します。 または、Delta Parquet ファイルを使用して、次の例に示すように、データ品質エラー レコードのダッシュボードを作成できます。

Lakehouse で発行されたエラー レコード

ノートブックを使用して、失敗したデータ資産レコードのショートカット テーブルを作成できます。 ノートブック スクリプトを参照してください。

出力データ モデル

失敗したレコードは、管理されたデータ資産のレベルで表示できます。 データ資産で成功する新しい評価の実行ごとに、各ルール評価の失敗したレコードの新しいセットが失敗したレコード テーブルに追加されます。

形式: Delta

出力スキーマ: 出力の各行には、ルールの評価に失敗した入力データ資産行のすべての列と、失敗した行を識別および分析するために使用できるメタデータ列のセットが含まれています。

主キー: 入力データ資産列の複合キー + _purviewDQRunId + _purviewDQRuleId

メタデータ列 データ型 説明
_purviewDQRunId string 実行時にユーザーが使用できる評価スキャンの実行 ID を表す GUID。 また、Delta パーティション列。
_purviewDQRunDate string YYYY-MM-DD 形式の実行日。 また、Delta パーティション列。
_purviewDQRuleId string 失敗したルールに対応する品質規則 ID。 また、Delta パーティション列。
_purviewDQRuleName string ジョブの実行時のルール名。
_purviewDQRuleType string 規則の種類 (UniqueCheck、NotNull、Regex など)。
_purviewDQGovernanceDomainId string 実行したデータ資産のガバナンス ドメイン ID。
_purviewDQDataProductId string 実行したデータ資産のデータ製品。
_purviewDQDataAssetId string データ資産 ID。
_purviewDQRunSubmitTimestamp string ISO 形式ごとの既定の UTC タイム ゾーンでの実行送信時刻の正確なタイムスタンプ。

データ品質エラー レコードの Power BI ダッシュボードを作成する

ビジネス ドメイン ID を使用して、データ製品とそれに関連するデータ資産をリンクして、データ品質エラー レコードを報告します。

  • 1 つのビジネス ドメインが複数のデータ製品にリンクできます。
  • 各データ製品は、複数のデータ資産にリンクできます。
  • 各データ資産には複数のルールを設定できます。
  • 各ルールは、複数のエラー レコードを生成できます。

この画像は、データ品質エラー レコードの基本的なレポートを作成するデータ モデルを示しています。

データ モデルを報告するエラー レコード

次の図は、前の図に示したデータ モデルを使用して作成されたサンプル レポートを示しています。

エラー レコードのサンプル レポート

制限事項

  • 失敗したレコードは、実行ごとにルールごとに 100,000 件発行されます。
  • 最大 40 個のルールを含む最大 1 億行のデータセットがベンチマークされています。
  • 仮想ネットワーク内の Data Lake Storage Gen2 と Fabric ストレージはサポートされていません。
  • データが組織のマネージド ストレージに格納されると、organizationはデータのロールベースのアクセス制御を所有します。 Microsoft は、organizationの同意に基づいて、データ品質エラー行をorganizationのストレージに公開します。

ショートカットを作成するためのスクリプト

データ品質監査からテーブル ショートカットの作成を自動化するには、Fabric ノートブックでこのスクリプトを使用して、失敗した行データを抽出します。 ショートカット テーブルを作成すると、新しいジョブのエラー レコードが資産ショートカット テーブルで実行され、Fabric で自動的に更新されます。


# NEW Script to automate the creation of table shortcuts from Data Quality audit extracts failed row data.

# Update these three values for your Fabric Workspace ID, Lakehouse ID and Purview BYOC Self-serve store path
workspaceId = "f28dc1c8-360c-4788-9f46-e69853b1c40d" #Example: f28dc1c8-360c-4788-9f46-e69853b1c40d
lakehouseId = "77d6df6b-64ab-4628-985f-9365591a85a8" #Example: 77d6df6b-64ab-4628-985f-9365591a85a8
dataqualityauditPath = "Files/SelfServeFolder/DataQualityAuditExtracts" #Example: "Files/DEH2/DataQualityAuditExtracts"

#Use for lakehouses with Lakehouse-Schema (Public preview feature)
DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables/DataQualityAuditExtracts" 

#Use for lakehouses without Lakehouse-Schema
#DestinationShortcutUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/Tables" 

import sempy.fabric as fabric
from sempy.fabric.exceptions import FabricHTTPException
import fnmatch
import re
import os
from collections import deque

SourceUri = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{lakehouseId}/{dataqualityauditPath}"

request_headers = {
            "Authorization": "Bearer " + mssparkutils.credentials.getToken("pbi"),
            "Content-Type": "application/json"
        }
print(request_headers)


def is_delta_table(uri: str):
    #print("Checking for uri:" + uri)
    delta_log_path = os.path.join(uri, "_delta_log")
    return mssparkutils.fs.exists(delta_log_path)

def extract_onelake_https_uri_components(uri):
    pattern = re.compile(r"abfss://([^@]+)@[^/]+/([^/]+)/(.*)")
    match = pattern.search(uri)
    if match:
        workspace_id, item_id, path = match.groups()
        return workspace_id, item_id, path
    else:
        return None, None, None

def is_valid_onelake_uri(uri: str) -> bool:
    workspace_id, item_id, path = extract_onelake_https_uri_components(uri)
    if "abfss://" not in uri or workspace_id is None or item_id is None or path is None:
        return False
    return True

def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):
    shortcut_uri = (
        f"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}"
    )
    result = client.get(shortcut_uri).json()
    return result

def get_last_path_segment(uri: str):
    path = uri.split("/")  # Split the entire URI by '/'
    return path[-1] if path else None

def create_onelake_shorcut(SourceUri: str, dest_uri: str, result: list):
    src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(
        SourceUri
    )

    dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(
        dest_uri
    )

    name = get_last_path_segment(SourceUri)
    dest_uri_joined = os.path.join(dest_uri, name)

    # If the destination path already exists, return without creating shortcut
    if mssparkutils.fs.exists(dest_uri_joined):
        print(f"Table already exists: {dest_uri_joined}")
        result.append(dest_uri_joined)
        return None

    request_body = {
        "name": name,
        "path": dest_path,
        "target": {
            "oneLake": {
                "itemId": src_item_id,
                "path": src_path,
                "workspaceId": src_workspace_id,
            }
        },
    }
    #print(request_body)

    shortcut_uri = f"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts"
    print(f"Creating shortcut: {shortcut_uri}/{name}..")
    try:
        client.post(shortcut_uri, json=request_body, headers=request_headers)
    except FabricHTTPException as e:
        print(e)
        print(e.error_reason)
        return None

    return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)


client = fabric.FabricRestClient()

queue = deque([SourceUri])
result = []

while queue:
    current_uri = queue.popleft()

    #print(current_uri)
    
    if  is_delta_table(os.path.join("", current_uri)):
        #print(current_uri)
        shortcut = create_onelake_shorcut(os.path.join("", current_uri), DestinationShortcutUri, result)
        if shortcut is not None:
            result.append(shortcut)
        continue;
    
    # List subitems in the current folder
    subitems = mssparkutils.fs.ls(current_uri)   
    for item in subitems:
        if item.isDir:
            queue.append(item.path)   


print(f"{len(result)}" + " error record shortcut tables present in the lakehouse: ")
for item in result:
    print(item)