JSON 消息格式 - 更改事件流式处理

适用于: SQL Server 2025 (17.x) Azure SQL 数据库

本文描述了在使用 SQL Server 2025(17.x)和 Azure SQL 数据库引入 的变更事件流(CES) 功能时,从 SQL Server 流式传输到 Azure Event Hubs 的 CloudEvents 消息的 JSON 格式。

注释

更改事件流目前以 预览 版提供:

在预览期间,此功能可能会更改。 有关当前可支持性,请参阅 “限制”。

概述

更改事件流发出的事件遵循 CloudEvents 规范,使其易于与事件驱动系统集成。 所有 CES CloudEvents 都包含 11 个属性(字段)。 CES 可配置为将 CloudEvents 序列化为 JSON(本机),或将 Avro 二进制文件序列化。 本文的以下部分详细介绍了消息格式,包括 CES CloudEvent 属性和序列化。

如果适用,本节中的说明取自 CloudEvent 规范,其中包括更多详细信息。

特性

  • specversion:

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 事件使用的 CloudEvents 规范的版本。 这样就可以解释上下文。
  • type

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 包含一个值,该值描述与发起事件相关的事件类型。 此格式由生成者定义,可能包含类型版本等信息。 有关详细信息,请查看 CloudEvents 的版本控制
    • 对于变更事件流事件,目前类型为: com.microsoft.SQL.CES.DML.V{n},其中 {n} 表示Microsoft变更事件流式DML事件模式的版本。
      • 当前最新的模式版本是1.
  • source

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 标识发生事件的上下文。 源 + ID 对于每个事件必须是唯一的。 目前,该字段总是像事件一样从SQL流式发送 \/
  • id

    • 数据类型:字符串
    • 必需的 CloudEvent 属性
    • 标识事件。 生成者必须确保源 + ID 对于每个非重复事件是唯一的。 如果重复事件重新发送(例如,由于网络错误),它可能具有相同的 ID。 使用者可能假定具有相同源和 ID 的事件是重复的。
  • logicalid

    • 数据类型:字符串
    • 扩展属性
    • 拆分消息(由于事件中心 msg 大小限制)由共享逻辑 ID 标识。
  • time

    • 数据类型:时间戳
    • 可选 CloudEvent 属性
    • 提交发生在最初触发流事件的SQL事务中的时间戳。
  • datacontenttype

    • 数据类型:字符串
    • 可选 CloudEvent 属性
    • 数据类型值。 此属性使数据能够携带任何类型的内容,其中格式和编码可能与所选事件格式的不同。 例如,使用 JSON 信封格式呈现的事件可能会在数据中携带 XML 有效负载,使用者会通过此属性设置为“application/xml”来通知使用者。 不同值的数据内容渲染 datacontenttype 规则在事件格式规范中定义
  • operation

    • 数据类型:字符串
    • 扩展
    • 表示发生的SQL作类型:
      • 插页的INS
      • 更新
      • 删除时的 DEL
  • segmentindex

    • 数据类型:整数
    • 扩展属性
    • 段索引,表示消息在逻辑消息区块中的位置。 段索引提供有关消息在逻辑消息片段序列中的位置的信息。 这个场始终存在。 使用 logicalid + segmentindex + finalsegment 字段来排序表示大型 SQL 负载的入事件,这些负载被拆分为多个事件。
  • finalsegment

    • 数据类型:布尔值
    • 扩展属性
    • 指示此段是否为序列的最终段。 该字段始终存在,有助于识别某个SQL事件是否因配置最大消息大小过大而被拆分为子事件。
  • data

    • 数据类型:字符串
    • 可选 CloudEvent 属性
    • 特定于域的事件数据。 对于 CES,数据是可以分析为 JSON 的字符串。 此 JSON 描述数据是如何更改的。 数据属性的格式采用 数据属性格式

例子

JSON 消息示例 - 插入

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "d43f09a6-d13b-4902-86d4-17bdb5edb872",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C00000300017C:00000000000000000001",
  "time": "2025-03-14T16:45:20.650Z",
  "datacontenttype": "application\/json",
  "operation": "INS",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"101\\\", \\\"product_name\\\": \\\"Game 2077\\\", \\\"price_per_item\\\": \\\"60\\\", \\\"quantity\\\": \\\"1\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON 消息示例 - 已更新

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "c425575f-00bb-45cf-acec-c55fdc7d08cd",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003500004:00000000000000000001",
  "time": "2025-03-14T16:49:59.567Z",
  "datacontenttype": "application\/json",
  "operation": "UPD",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{}\",\n    \"current\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\"\n  }\n}"
}

JSON 消息示例 - 删除

{
  "specversion": "1.0",
  "type": "com.microsoft.SQL.CES.DML.V1",
  "source": "\/",
  "id": "24fa0c2c-c45d-4abf-9a8d-fba04c29fc86",
  "logicalid": "9c8d4ad2-bf54-4f10-a96f-038af496997f:0000002C000003600019:00000000000000000001",
  "time": "2025-03-14T16:51:39.613Z",
  "datacontenttype": "application\/json",
  "operation": "DEL",
  "splitindex": 0,
  "splittotalcnt": 0,
  "data": "{\n  \"eventsource\": {\n    \"db\": \"db1\",\n    \"schema\": \"dbo\",\n    \"tbl\": \"Purchases\",\n    \"cols\": [\n      {\n        \"name\": \"purchase_id\",\n        \"type\": \"int\",\n        \"index\": 0\n      },\n      {\n        \"name\": \"customer_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 1\n      },\n      {\n        \"name\": \"product_id\",\n        \"type\": \"int\",\n        \"index\": 2\n      },\n      {\n        \"name\": \"product_name\",\n        \"type\": \"varchar(100)\",\n        \"index\": 3\n      },\n      {\n        \"name\": \"price_per_item\",\n        \"type\": \"int\",\n        \"index\": 4\n      },\n      {\n        \"name\": \"quantity\",\n        \"type\": \"int\",\n        \"index\": 5\n      },\n      {\n        \"name\": \"purchase_date\",\n        \"type\": \"datetime\",\n        \"index\": 6\n      },\n      {\n        \"name\": \"payment_method\",\n        \"type\": \"varchar(50)\",\n        \"index\": 7\n      }\n    ],\n    \"pkkey\": [\n      {\n        \"columnname\": \"purchase_id\",\n        \"value\": \"105\"\n      }\n    ]\n  },\n  \"eventrow\": {\n    \"old\": \"{\\\"purchase_id\\\": \\\"105\\\", \\\"customer_name\\\": \\\"Anna Doe\\\", \\\"product_id\\\": \\\"100\\\", \\\"product_name\\\": \\\"Game 2066\\\", \\\"price_per_item\\\": \\\"50\\\", \\\"quantity\\\": \\\"2\\\", \\\"purchase_date\\\": \\\"2025-03-14 16:45:01.000\\\", \\\"payment_method\\\": \\\"Credit Card\\\"}\",\n    \"current\": \"{}\"\n  }\n}"
}

数据属性格式

数据是包装在字符串属性中的 JSON 对象,其中包含两个属性:

  • eventSource
  • eventRow
"data": "{ "eventsource": {<eventSource>}, "eventdata": {<eventData>}}"

以下各节将更详细地介绍这两个属性的详细信息:

eventsource

描述有关数据库和发生事件的表的元数据:

  • db

    • 数据类型:字符串
    • 说明:表所在的数据库的名称。
    • 示例:cessqldb001
  • schema

    • 数据类型:字符串
    • 说明:包含表的数据库架构。
    • 示例:dbo
  • tbl

    • 数据类型:字符串
    • 说明:发生事件的表。
    • 示例:Purchases
  • cols

    • 数据类型:数组
    • 说明:详细说明表中的列的数组。
      • name (字符串):列的名称。
      • type (string):列的数据类型(VARCHARINT)。
      • index (整数):表中列的索引或位置。
  • pkkey

    • 数据类型:数组
    • 说明:表示用于标识特定行的主键列及其值。
      • columnname (string):主键中使用的列的名称。
      • value (string/int/etc.):主键中使用的列的值有助于唯一标识行。

eventrow

描述行级更改,并比较记录中字段的旧值和当前值。

  • old (对象包装在字符串中):表示事件前行中的值。
    • 每个键值对包括:
      • <column_name>:(字符串):列的名称。
      • <column_value>:(string/int/etc.):该列的上一个值。
  • current (对象包装在字符串中):表示事件后行中的更新值。
    • 与旧对象类似,每个键值对的结构如下:
      • <column_name> (字符串):列的名称。
      • <column_value> (string/int/etc.):该列的新值或当前值。

CES CloudEvent JSON 架构

{
  "type": "record",
  "name": "ChangeEvent",
  "fields": [
    {
      "name": "specversion",
      "type": "string"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "source",
      "type": "string"
    },
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "logicalid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "string"
    },
    {
      "name": "datacontenttype",
      "type": "string"
    },
    {
      "name": "operation",
      "type": "string"
    },
    {
      "name": "segmentindex",
      "type": "int"
    },
    {
      "name": "finalsegment",
      "type": "boolean"
    },
    {
      "name": "data",
      "type": "bytes"
    }
  ]
}

CES 数据属性 JSON 架构

{
  "name": "Data",
  "type": "record",
  "fields": [
    {
      "name": "eventsource",
      "type": {
        "name": "EventSource",
        "type": "record",
        "fields": [
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "schema",
            "type": "string"
          },
          {
            "name": "tbl",
            "type": "string"
          },
          {
            "name": "cols",
            "type": {
              "type": "array",
              "items": {
                "name": "Column",
                "type": "record",
                "fields": [
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "type",
                    "type": "string"
                  },
                  {
                    "name": "index",
                    "type": "int"
                  }
                ]
              }
            }
          },
          {
            "name": "pkkey",
            "type": {
              "type": "array",
              "items": {
                "name": "PkKey",
                "type": "record",
                "fields": [
                  {
                    "name": "columnname",
                    "type": "string"
                  },
                  {
                    "name": "value",
                    "type": "string"
                  }
                ]
              }
            }
          },
          {
            "name": "transaction",
            "type": {
              "name": "Transaction",
              "type": "record",
              "fields": [
                {
                  "name": "commitlsn",
                  "type": "string"
                },
                {
                  "name": "beginlsn",
                  "type": "string"
                },
                {
                  "name": "sequencenumber",
                  "type": "int"
                },
                {
                  "name": "committime",
                  "type": "string"
                }
              ]
            }
          }
        ]
      }
    },
    {
      "name": "eventrow",
      "type": {
        "name": "EventRow",
        "type": "record",
        "fields": [
          {
            "name": "old",
            "type": "string"
          },
          {
            "name": "current",
            "type": "string"
          }
        ]
      }
    }
  ]
}