配置变更事件流式处理(预览版)

适用于: SQL Server 2025 (17.x) Azure SQL 数据库

本文介绍如何配置 SQL Server 2025(17.x)和 Azure SQL 数据库中引入的 更改事件流式处理(CES )功能。

注释

变更事件流目前处于预览版阶段:

在预览期间,此功能可能会更改。 有关当前可支持性,请参阅 “限制”。

概述

若要配置和使用更改事件流式处理,请执行以下步骤:

  1. 使用现有或创建新的 Azure 事件中心 命名空间和事件中心实例。 事件中心实例接收事件。
  2. 为用户数据库启用更改事件流式处理。
  3. 创建事件流组。 使用此组,配置目标、凭据、消息大小限制和分区架构。
  4. 将一个或多个表添加到事件流组。

本文的以下部分详细介绍了每个步骤。

先决条件

若要配置更改事件流式处理,需要以下资源、权限和配置:

配置 Azure 事件中心

若要了解如何创建 Azure 事件中心,请查看 使用 Azure 门户创建事件中心

Azure 事件中心访问控制

将 SQL 资源的访问控制配置为 Azure 事件中心。 Microsoft Entra 身份验证是最安全的方法,但目前仅受 Azure SQL Database for CES 支持。 虽然 Azure SQL 数据库和 SQL Server 2025 都支持使用共享访问策略,但如果Microsoft Entra 身份验证不是选项,则仅在 Azure SQL 数据库中使用它。

共享访问策略 向 Azure 事件中心提供身份验证和授权。 每个共享访问策略都需要名称、访问级别(ManageSendListen和资源绑定(事件中心命名空间或特定事件中心实例)。 实例级别策略通过遵循最低特权原则来提供更高的安全性。 SQL Server 2025 和 Azure SQL 数据库都支持此方法。 但是,尽可能使用 Azure SQL 数据库使用 Microsoft Entra 身份验证,因为它提供更好的安全性。

如果使用共享访问策略进行身份验证和授权,则向 Azure 事件中心发送数据的客户端需要提供要使用的策略的名称,以及从策略或策略的服务密钥生成的 SAS 令牌

SAS 令牌在服务密钥上具有安全优势:如果客户端遭到入侵,则 SAS 令牌仅在过期前有效,并且被入侵的客户端无法创建新的 SAS 令牌。 相比之下,服务密钥不会自动过期。 具有服务密钥的已泄露客户端可以使用密钥生成新的 SAS 令牌。

若要使用 AMQP 协议(默认的本机 Azure 事件中心协议)配置流式传输到 Azure 事件中心,请使用 发送 权限创建或重复使用共享访问策略并生成 SAS 令牌。 可以使用任何编程或脚本语言以编程方式生成令牌。 本文中的示例演示如何使用 PowerShell 脚本从新的或现有策略生成 SAS 令牌。

注释

为提高安全性,强烈建议尽可能使用基于 Microsoft Entra 的访问控制。 如果无法使用 Microsoft Entra 访问控制,并且您使用共享访问策略,请尽可能使用 SAS 令牌身份验证,而不是服务密钥认证。 SAS 令牌的最佳做法包括定义适当的最低要求访问范围、设置较短的到期日期以及定期轮换 SAS 密钥。 对于基于密钥的身份验证,请定期轮换密钥。 使用 Azure Key Vault 或类似服务安全地存储所有机密。

安装所需的模块

若要使用 PowerShell 脚本管理 Azure 事件中心资源,需要以下模块:

  • Az PowerShell 模块
  • Az.EventHub PowerShell 模块

以下脚本安装所需的模块:

Install-Module -Name Az -AllowClobber -Scope CurrentUser -Repository PSGallery -Force
Install-Module -Name Az.EventHub -Scope CurrentUser -Force

如果已有所需的模块,并且想要将其更新到最新版本,请运行以下脚本:

Update-Module -Name Az -Force
Update-Module -Name Az.EventHub -Force

连接到 Azure

可以使用 Azure Cloud Shell 或登录并设置订阅上下文。

若要使用 Azure Cloud Shell 运行,请查看 “登录到 Azure”。

定义策略

若要创建 SAS 令牌,需要具有 发送 权限的策略。 您可以选择:

  • 创建新策略

  • 使用现有策略

注释

策略必须具有 “发送” 权限。

为新策略或现有策略创建 SAS 令牌

创建新策略时,请确保它具有 “发送 ”权限。 如果使用现有策略,请检查它是否具有 “发送 ”权限。

以下脚本创建新的策略或获取现有策略,然后以 HTTP 授权标头格式生成完整的 SAS 令牌。

将尖括号 (<value>) 中的值替换为环境的值。

function Generate-SasToken {
$subscriptionId = "<Azure-Subscription-ID>"
$resourceGroupName = "<Resource-group-name>"
$namespaceName = "<Azure-Event-Hub-Namespace-name>"
$eventHubName = "<Azure-Event-Hubs-instance-name>"
$policyName = "<Policy-name>"
$tokenExpireInDays = "<number-of-days-token-will-be-valid>"

# Modifying the rest of the script is not necessary.

# Login to Azure and set Azure Subscription.
Connect-AzAccount

# Get current context and check subscription
$currentContext = Get-AzContext
if ($currentContext.Subscription.Id -ne $subscriptionId) {
    Write-Host "Current subscription is $($currentContext.Subscription.Id), switching to $subscriptionId..."
    Set-AzContext -SubscriptionId $subscriptionId | Out-Null
} else {
    Write-Host "Already using subscription $subscriptionId."
}

# Try to get the authorization policy (it should have Send rights)
$rights = @("Send")
$policy = Get-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -ErrorAction SilentlyContinue

# If the policy does not exist, create it
if (-not $policy) {
    Write-Output "Policy '$policyName' does not exist. Creating it now..."

    # Create a new policy with the Manage, Send and Listen rights
    $policy = New-AzEventHubAuthorizationRule -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName -Rights $rights
    if (-not $policy) {
        throw "Error. Policy was not created."
    }
    Write-Output "Policy '$policyName' created successfully."
} else {
    Write-Output "Policy '$policyName' already exists."
}

if ("Send" -in $policy.Rights) {
    Write-Host "Authorization rule has required right: Send."
} else {
    throw "Authorization rule is missing Send right."
}

$keys = Get-AzEventHubKey -ResourceGroupName $resourceGroupName -NamespaceName $namespaceName -EventHubName $eventHubName -AuthorizationRuleName $policyName

if (-not $keys) {
    throw "Could not obtain Azure Event Hub Key. Script failed and will end now."
}
if (-not $keys.PrimaryKey) {
    throw "Could not obtain Primary Key. Script failed and will end now."
}

# Get the Primary Key of the Shared Access Policy
$primaryKey = ($keys.PrimaryKey)
Write-Host $primaryKey

## Check that the primary key is not empty.

# Define a function to create a SAS token (similar to the C# code provided)
function Create-SasToken {
    param (
        [string]$resourceUri, [string]$keyName, [string]$key
    )

$sinceEpoch = [datetime]::UtcNow - [datetime]"1970-01-01"
    $expiry = [int]$sinceEpoch.TotalSeconds + ((60 * 60 * 24) * [int]$tokenExpireInDays) # seconds since Unix epoch
    $stringToSign = [System.Web.HttpUtility]::UrlEncode($resourceUri) + "`n" + $expiry
    $hmac = New-Object System.Security.Cryptography.HMACSHA256
    $hmac.Key = [Text.Encoding]::UTF8.GetBytes($key)
    $signature = [Convert]::ToBase64String($hmac.ComputeHash([Text.Encoding]::UTF8.GetBytes($stringToSign)))
    $sasToken = "SharedAccessSignature sr=$([System.Web.HttpUtility]::UrlEncode($resourceUri))&sig=$([System.Web.HttpUtility]::UrlEncode($signature))&se=$expiry&skn=$keyName"
    return $sasToken
}

# Construct the resource URI for the SAS token
$resourceUri = "https://$namespaceName.servicebus.windows.net/$eventHubName"

# Generate the SAS token using the primary key from the new policy
$sasToken = Create-SasToken -resourceUri $resourceUri -keyName $policyName -key $primaryKey

# Output the SAS token
Write-Output @"
-- Generated SAS Token --
$sasToken
-- End of generated SAS Token --
"@
}

Generate-SasToken

上一命令的输出应类似于以下文本:

-- Generated SAS Token --
SharedAccessSignature sr=https%3a%2f%YourEventHubNamespace.servicebus.windows.net%2fYourEventHub&sig=xxxxxxxxxxxxxxxxxxxxxxx&se=2059133074&skn=SharedKeyNameIsHERE
-- End of generated SAS Token --

复制整行以SharedAccessSignature开头的 SAS 令牌值,以在配置 CES 时使用,例如如下示例:

SharedAccessSignature sr=https%3a%2f%YourEventHubNamespace.servicebus.windows.net%2fYourEventHub&sig=xxxxxxxxxxxxxxxxxxxxxxx&se=2059133074&skn=SharedKeyNameIsHERE

启用和配置变更事件流式处理

若要启用和配置更改事件流式处理,请将数据库上下文更改为用户数据库,然后执行以下步骤:

  1. 如果尚未配置,请将数据库设置为 完整恢复模式
  2. 创建主密钥和数据库范围的凭据。
  3. 启用事件流式处理。
  4. 创建事件流组。
  5. 将一个或多个表添加到事件流组。

本节中的示例演示如何为 AMQP 协议和 Apache Kafka 协议启用 CES:

下面是本部分中示例的示例参数值:

  • @stream_group_name = N'myStreamGroup'

  • @destination_location = N'myEventHubsNamespace.servicebus.windows.net/myEventHubsInstance' - 此值是特定 Azure 事件中心和实例名称的 FQDN。

  • @partition_key_scheme = N'None' - (默认)分区使用轮询机制选择。 其他可能的选项包括:

    • StreamGroup - 按流组进行分区
    • Table - 按表分区
    • Column - 按列分区
  • [可选,如果使用服务密钥的共享访问策略]

    • 从共享访问策略获取的主密钥或辅助密钥值: Secret = 'BVFnT3baC/K6I8xNZzio4AeoFt6nHeK0i+ZErNGsxiw='
  • EXEC sys.sp_add_object_to_event_stream_group N'myStreamGroup', N'dbo.myTable'

  • @max_message_size_kb = 256 - 256 KB 是默认的最大消息大小,但此值应与目标 Azure 事件中心的限制保持一致。

示例:通过 AMQP 协议流式传输到 Azure 事件中心

本部分中的示例演示如何使用 AMQP 协议将更改事件流式传输到 Azure 事件中心。 AMQP 是默认的本机 Azure 事件中心协议。

本部分中的示例使用 SAS 令牌通过 AMQP 协议向 Azure 事件中心实例进行身份验证。 如果Microsoft Entra 身份验证不可用,请使用 SAS 令牌而不是服务密钥值来提高安全性。

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>;

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>';

CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = '<Generated SAS Token>' -- Be sure to copy the entire token. The SAS token starts with "SharedAccessSignature sr="

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',
    @destination_type =       N'AzureEventHubsAmqp',
    @destination_location =   N'<AzureEventHubsHostName>/<EventHubsInstance>',
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>,
    @partition_key_scheme =   N'<PartitionKeyScheme>'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>'

示例:通过 Apache Kafka 协议流式传输到 Azure 事件中心

本部分中的示例演示如何使用 Apache Kafka 协议将更改事件流式传输到 Azure 事件中心。

本部分中的示例使用 SAS 令牌通过 Apache Kafka 协议向 Azure 事件中心实例进行身份验证。 如果Microsoft Entra 身份验证不可用,请使用 SAS 令牌而不是服务密钥值来提高安全性。

将尖括号 (<value>) 中的值替换为环境的值。

USE <database name>

-- Create the Master Key with a password.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<Password>'

CREATE DATABASE SCOPED CREDENTIAL <CredentialName>
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = '<Event Hubs Namespace – Primary or Secondary connection string>'

EXEC sys.sp_enable_event_stream

EXEC sys.sp_create_event_stream_group
    @stream_group_name =      N'<EventStreamGroupName>',
    @destination_type =       N'AzureEventHubsApacheKafka',
    @destination_location =   N'<AzureEventHubsHostName>:<port>/<EventHubsInstance>', -- myEventHubsNamespace.servicebus.windows.net:9093/myEventHubsInstance
    @destination_credential = <CredentialName>,
    @max_message_size_kb =    <MaxMessageSize>,
    @partition_key_scheme =   N'<PatitionKeyScheme>'

EXEC sys.sp_add_object_to_event_stream_group
    N'<EventStreamGroupName>',
    N'<SchemaName>.<TableName>'

查看 CES 配置和功能

sys.databases 中, is_event_stream_enabled = 1 指示为数据库启用了更改事件流式处理。

以下查询返回启用了更改事件流式处理的所有数据库:

SELECT *
FROM sys.databases
WHERE is_event_stream_enabled = 1;

sys.tables 中, is_replicated = 1 指示表已流式传输, sp_help_change_feed_table 提供有关更改事件流式处理表组和表元数据的信息。

以下查询返回启用了更改事件流式处理的所有表,并提供元数据信息:

SELECT name,
       is_replicated
FROM sys.tables;

EXECUTE sp_help_change_feed_table
    @source_schema = '<schema name>',
    @source_name = '<table name>';

CES 存储过程、系统函数和 DMV

下表列出了可用于配置、禁用和监视更改事件流的存储过程、系统函数和 DMV:

System 对象 DESCRIPTION
配置 CES
sys.sp_enable_event_stream 为当前用户数据库启用 CES。
sys.sp_create_event_stream_group 创建流组,该流组是一组表的流配置。 流组还定义了目标和相关详细信息(例如身份验证、消息大小、分区)。 该过程完成后,会自动为最终用户生成并显示stream_group_id。
sys.sp_add_object_to_event_stream_group 将表添加到流组。
禁用 CES
sys.sp_remove_object_from_event_stream_group 从流组中删除表。
sys.sp_drop_event_stream_group 删除流组。 流组必须未被使用。
sys.sp_disable_event_stream 禁用当前用户数据集的 CES。
显示器 CES
sys.dm_change_feed_errors 返回交付错误。
sys.dm_change_feed_log_scan_sessions 返回有关日志扫描活动的信息。
sys.sp_help_change_feed_settings 提供配置的更改事件流式处理的状态和信息。
sys.sp_help_change_feed 监视变更流的当前配置。
sys.sp_help_change_feed_table_groups 返回用于配置更改事件流式处理组的元数据。
sys.sp_help_change_feed_table 提供用于更改事件流的流式处理组和表元数据的状态和信息。

局限性

变更事件流(CES)具有以下限制:

Azure SQL 数据库

将 CES 与 Azure SQL 数据库配合使用时,以下限制适用:

  • Azure SQL 数据库中的扩展事件(xEvent)调试当前不可用。

服务器级和常规限制

  • Linux 上的 SQL Server 2025 (17.x) 不支持 CES。
  • CES 仅针对来自INSERTUPDATEDELETE DML 语句的数据更改发出事件。
  • CES 不会处理架构更改(DDL 操作),这意味着它不会为 DDL 操作生成事件。 但是,DDL 操作不会被阻止,因此,如果执行它们,后续 DML 事件的架构将反映更新的表结构。 你需要顺利地处理具有已更新架构的事件。
  • 目前,CES 不会对在启用 CES 之前表中已有的数据进行流传输。 启用 CES 时,不会对现有数据进行初始化或作为快照文件发送。
  • 当 JSON 是指定的输出格式时,大型事件消息可能会拆分为大约 25% 每个流组配置的最大消息大小。 此限制不适用于二进制输出类型。
  • 如果消息超出 Azure 事件中心消息大小限制,则目前只能通过扩展事件观察到失败。 CES xEvent 目前仅在 SQL Server 2025 中可用,而不是 Azure SQL 数据库。
  • 不支持重命名 CES 配置的表和列。 重命名表或列失败。 允许数据库重命名。
  • MICROSOFT CES 的 Entra 身份验证目前在 SQL Server 2025 中不可用。

数据库级限制

表级限制

  • 表只能属于一个流式处理组。 不能将同一个表流式传输到多个目标。

  • 只能为 CES 配置用户表。 CES 不支持流处理系统表。

  • 最多可以配置 4,096 个流组。 每个流组最多可包含 40,000 个表。

  • 不支持联机索引操作

  • 在表上启用 CES 时,不能在该表上添加或删除主键约束。

  • ALTER TABLE SWITCH PARTITION 不支持在针对 CES 进行配置的表中使用。

  • TRUNCATE TABLE 在为 CES 启用的表上不受支持。

  • CES 不支持使用以下任何功能的表:

    • 聚集列存储索引
    • 临时历史记录表或账本历史记录表
    • 始终加密 (Always Encrypted)
    • 内存内 OLTP(内存优化表)
    • 图形表
    • 外部表

列级限制

  • CES 不支持以下数据类型。 流式处理跳过这些类型的列:
    • geography
    • geometry
    • image
    • json
    • rowversion / 时间戳
    • sql_variant
    • 文本 / ntext
    • 矢量
    • xml
    • 用户定义的类型 (UDT)

源数据库中的权限

  • 对于行级安全性,CES 会发送所有行的更改,而不考虑用户权限。
  • 动态数据掩码不适用于通过 CES 发送的数据。 数据以未掩码形式流式传输,即使已配置掩码。
  • CES 不会发出与对象级权限更改相关的事件(例如,向特定列授予权限)。