次の方法で共有


JSON メッセージ形式 - イベント ストリーミングの変更

適用対象: SQL Server 2025 (17.x) Azure SQL Database

この記事では、SQL Server 2025(17.x)およびAzure SQL Databaseで導入された Change Event Streaming(CES) 機能を使用した場合に、SQL ServerからAzure Event HubsへストリーミングされるCloudEventsメッセージのJSON形式について説明します。

変更イベント ストリーミングは、次の目的で現在 プレビュー段階 にあります。

プレビュー期間中、この機能は変更される可能性があります。 現在のサポート可能性については、「 制限事項」を参照してください。

概要

変更イベント ストリーミングによって生成されるイベントは CloudEvents 仕様に従い、イベント ドリブン システムと簡単に統合できます。 すべての CES CloudEvent には、11 個の属性 (フィールド) が含まれています。 CES は、CloudEvents を JSON (ネイティブ) または Avro バイナリとしてシリアル化するように構成できます。 この記事の以降のセクションでは、CES CloudEvent 属性やシリアル化など、メッセージ形式について詳しく説明します。

該当する場合、このセクションの説明は、追加の詳細を含む CloudEvent 仕様から取得されます。

属性

  • specversion:

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントが使用する CloudEvents 仕様のバージョン。 これにより、コンテキストの解釈が可能になります。
  • type

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • 発生元の発生に関連するイベントの種類を表す値を格納します。 この形式はプロデューサーによって定義され、型のバージョンなどの情報が含まれる場合があります。 詳細については、「 CloudEvents のバージョン管理」を参照してください。
    • Change Event Streamingイベントのタイプは現在の: com.microsoft.SQL.CES.DML.V{n}で、 {n} はMicrosoft Change Event Streaming DMLイベントスキーマのバージョンを示します。
      • 現在の最新のスキーマバージョンは1です。
  • source

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントが発生したコンテキストを識別します。 ソース + ID は、イベントごとに一意である必要があります。 現在、このフィールドはSQLからストリーミングされるイベントで常に \/ として送信されています。
  • id

    • データ型: 文字列
    • 必要な CloudEvent 属性
    • イベントを識別します。 プロデューサーは、個別のイベントごとにソース + ID が一意であることを確認する必要があります。 重複するイベントが再送信された場合 (たとえば、ネットワーク エラーが原因)、同じ ID を持つことができます。 コンシューマーは、ソースと ID が同一のイベントが重複していると見なす場合があります。
  • logicalid

    • データ型: 文字列
    • 拡張属性
    • 分割メッセージ (Event Hubs メッセージ のサイズ制限のため) は、共有論理 ID によって識別されます。
  • time

    • データ型: タイムスタンプ
    • オプションの CloudEvent 属性
    • コミットが最初にストリームイベントをトリガーしたSQLトランザクション内で発生したUTCタイムスタンプ。
  • datacontenttype

    • データ型: 文字列
    • オプションの CloudEvent 属性
    • データ値のコンテンツ タイプ。 この属性を使用すると、データは任意の種類のコンテンツを保持できます。形式とエンコードは、選択したイベント形式とは異なる場合があります。 たとえば、JSON エンベロープ形式を使用してレンダリングされたイベントでは、データに XML ペイロードが含まれる可能性があり、コンシューマーには、この属性が "application/xml" に設定されたことが通知されます。 異なる datacontenttype 値に対するデータコンテンツのレンダリング方法のルールはイベントフォーマット仕様で定義されています
  • operation

    • データ型: 文字列
    • 拡張
    • 発生したSQL操作の種類を表しています:
      • インサート用INS
      • アップデート
      • 削除のためのDELです
  • segmentindex

    • データ型: 整数
    • 拡張属性
    • セグメント インデックス。論理メッセージ チャンク内のメッセージの位置を示します。 セグメント インデックスは、メッセージが論理メッセージ フラグメントのシーケンス内のどこに存在するかについての情報を提供します。 この場は常に存在します。 logalid + segmentindex + finalsegment fieldsを使って、複数のイベントに分割された大規模なSQLペイロードを表す入ってくるイベントをソートします。
  • finalsegment

    • データ型: Boolean
    • 拡張属性
    • このセグメントがシーケンスの最後のセグメントであるかどうかを示します。 このフィールドは常に存在し、設定済みの最大メッセージサイズを超えるSQLイベントがサブイベントに分割されたかどうかを特定するのに役立ちます。
  • data

    • データ型: 文字列
    • オプションの CloudEvent 属性
    • ドメイン固有のイベント データ。 CES の場合、データは JSON として解析できる文字列です。 この JSON では、データがどのように変更されたかについて説明します。 データ属性の形式はデータ 属性の形式です

例示

JSON メッセージの例 - 挿入

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
  "time": "2025-03-14T16:45:20.650Z",
  "datacontenttype": "application\/json",
  "operation": "INS",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON メッセージの例 – 更新

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
  "time": "2025-03-14T16:49:59.567Z",
  "datacontenttype": "application\/json",
  "operation": "UPD",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON メッセージの例 - 削除

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
  "time": "2025-03-14T16:51:39.613Z",
  "datacontenttype": "application\/json",
  "operation": "DEL",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n    \"current\": \"{}\"\n  }\n}"
}

データ属性の形式

データは、2 つの属性を含む文字列属性でラップされた JSON オブジェクトです。

  • eventSource
  • eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"

これら 2 つの属性の詳細については、次のセクションで詳しく説明します。

eventsource

イベントが発生したデータベースとテーブルに関するメタデータについて説明します。

  • db

    • データ型: 文字列
    • 説明: テーブルが配置されているデータベースの名前。
    • 例: cessqldb001
  • schema

    • データ型: 文字列
    • 説明: テーブルを含むデータベース スキーマ。
    • 例: dbo
  • tbl

    • データ型: 文字列
    • 説明: イベントが発生したテーブル。
    • 例: Purchases
  • cols

    • データ型: 配列
    • 説明: テーブル内の列の詳細を示す配列。
      • name (string): 列の名前。
      • type (string): 列のデータ型 (VARCHAR または INT)。
      • index (整数): テーブル内の列のインデックスまたは位置。
  • pkkey

    • データ型: 配列
    • 説明: 特定の行を識別するための主キー列とその値を表します。
      • columnname (string): 主キーで使用される列の名前。
      • value (string/int/etc): 主キーで使用される列の値は、行を一意に識別するのに役立ちます。

eventrow

行レベルの変更について説明し、レコード内のフィールドの古い値と現在の値を比較します。

  • old (文字列でラップされたオブジェクト): イベントの前の行の値を表します。
    • 各キーと値のペアは、次の要素で構成されます。
      • <column_name>: (文字列): 列の名前。
      • <column_value>: (string/int/etc):その列の前の値。
  • current (文字列でラップされたオブジェクト): イベントの後の行の更新された値を表します。
    • 古いオブジェクトと同様に、各キーと値のペアは次のように構造化されています。
      • <column_name> (文字列): 列の名前。
      • <column_value> (string/int/etc): その列の新しい値または現在の値。

CES CloudEvent JSON スキーマ

{
  "type": "record",
  "name": "ChangeEvent",
  "fields": [
    {
      "name": "specversion",
      "type": "string"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "logicalid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    },
    {
      "name": "datacontenttype",
      "type": "string"
    },
    {
      "name": "operation",
      "type": "string"
    },
    {
      "name": "segmentindex",
      "type": "int"
    },
    {
      "name": "finalsegment",
      "type": "boolean"
    },
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}

CES データ属性の JSON スキーマ

{
  "name": "Data",
  "type": "record",
  "fields": [
    {
      "name": "eventsource",
      "type": {
        "name": "EventSource",
        "type": "record",
        "fields": [
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "tbl",
            "type": "string"
          },
          {
            "name": "cols",
            "type": {
              "type": "array",
              "items": {
                "name": "Column",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "index",
                    "type": "int"
                  }
                ]
              }
            }
          },
          {
            "name": "pkkey",
            "type": {
              "type": "array",
              "items": {
                "name": "PkKey",
                "type": "record",
                "fields": [
                  {
                    "name": "columnname",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "string"
                  }
                ]
              }
            }
          },
          {
            "name": "transaction",
            "type": {
              "name": "Transaction",
              "type": "record",
              "fields": [
                {
                  "name": "commitlsn",
                  "type": "string"
                },
                {
                  "name": "beginlsn",
                  "type": "string"
                },
                {
                  "name": "sequencenumber",
                  "type": "int"
                },
                {
                  "name": "committime",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "eventrow",
      "type": {
        "name": "EventRow",
        "type": "record",
        "fields": [
          {
            "name": "old",
            "type": "string"
          },
          {
            "name": "current",
            "type": "string"
          }
        ]
      }
    }
  ]
}