你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Event Hubs client library for JavaScript - version 6.0.3

Azure Event Hubs 是一个高度可扩展的发布-订阅服务,能够每秒接收数百万事件并向多个用户流式传输。 这样,便可以处理和分析连接的设备和应用程序生成的大量数据。 如果您想了解更多关于Azure事件中心的信息,可以查看: 什么是事件中心

Azure Event Hubs 客户端库允许您在 Node.js 应用中发送和接收事件。

关键链接:

注意:如果您使用的是版本 2.1.0 或更低,并希望迁移到最新版本,请查看我们的 迁移指南,帮助您从 EventHubs V2 迁移到 EventHubs V5

v2的样本和文档仍可在此处获取:

v2.1.0 | 的源代码v2.1.0 (npm) | 包v2.1.0 的示例

入门指南

安装软件包

使用 npm 安装 Azure Event Hubs 客户端库

npm install @azure/event-hubs

当前支持的环境

有关更多详细信息,请参阅我们的支持政策

先决条件

配置TypeScript

TypeScript 用户需要安装 Node 类型定义:

npm install @types/node

你还需要在 tsconfig.json中启用 compilerOptions.allowSyntheticDefaultImports 。 注意,如果你启用 compilerOptions.esModuleInterop了,默认 allowSyntheticDefaultImports 是启用的。 更多信息请参见 TypeScript 的编译器选项手册

JavaScript 捆绑包

若要在浏览器中使用此客户端库,首先需要使用捆绑程序。 有关如何执行此作的详细信息,请参阅我们的 捆绑文档

除了上述内容外,该库还需要为以下 NodeJS 核心内置模块添加额外的多边填充,以便在浏览器中正常工作:

  • buffer
  • os
  • path
  • process

与Webpack捆绑

如果你使用的是 Webpack v5,你可以安装以下开发依赖

  • npm install --save-dev os-browserify path-browserify

然后把以下内容加入你的 webpack.config.js

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

与Rollup捆绑

如果你用的是Rollup Bundler,请安装以下开发依赖

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

然后在 rollup.config.js 中包含以下内容

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

请查阅您喜欢的捆绑商的文档,了解更多关于聚酯填充的使用信息。

React Native 支持

和浏览器类似,React Native 不支持这个 SDK 库中使用的某些 JavaScript API,所以你需要为它们提供多重填充。 详情请参阅 Messaging React Native示例

对客户端进行身份验证

与事件中心的交互可以从 EventHubConsumerClient 类的实例或 EventHubProducerClient 类的实例开始。 有构造函数过载支持以下所示的不同实例化这些类的方法:

事件中心命名空间使用连接字符串

其中一个构造器过载会将表单 Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key; 和实体名的连接字符串连接到你的事件中心实例。 你可以创建一个消费者组,并从 Azure门户获取连接字符串和实体名称。

import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub",
);

在事件中心使用连接字符串作为策略

另一个构造器过载会直接在事件中心实例上(而不是事件中心命名空间)直接定义的共享访问策略对应的连接字符串。 该连接串的形式为 Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-name。 连接字符串格式与之前构造函数过载的关键区别是 ;EntityPath=my-event-hub-name

import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path",
);

使用Event Hubs命名空间和Azure身份

这种构造器过载会获取你事件中心实例的主机名和实体名,以及实现 TokenCredential 接口的凭证。 这允许你使用 Azure Active Directory 主体进行认证。 TokenCredential包中有该接口的实现。 主持人名称为格式 <yournamespace>.servicebus.windows.net。 使用Azure Active Directory时,您的委托人必须被分配一个角色,允许访问事件中心,例如Azure事件中心的数据所有者角色。 有关使用 Azure Active Directory 授权与事件中心合作的更多信息,请参阅 相关文档

import { DefaultAzureCredential } from "@azure/identity";
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";

const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential,
);

重要概念

  • Event Hub 生产者是遥测数据、诊断信息、使用日志或其他日志数据的来源,作为嵌入式设备解决方案、移动设备应用、在主机或其他设备上运行的游戏、基于客户端或服务器的商业解决方案,或网站的一部分。

  • 事件中心的消费者从事件中心获取并处理这些信息。 处理可能涉及聚合、复杂计算和过滤。 处理也可能涉及以原始或转换方式分发或存储信息。 Event Hub的客户通常是强大且规模较高的平台基础设施部分,内置分析功能,如Azure Stream Analytics、Apache Spark或Apache Storm。

  • 分区是事件中心中有序的事件序列。 分区是一种与事件消费者所需的并行性相关的数据组织方式。 Azure Event Hubs 通过分区消费者模式提供消息流,每个消费者只读取消息流的特定子集或分区。 随着新事件的到来,它们会被添加到该序列的结尾。 分区数量在事件中心创建时指定,且不可更改。

  • 消费者组是整个活动中心的视图。 消费者组使多个消费应用各自拥有独立的事件流视图,并能以自己的节奏和位置独立读取流。 每个消费者组最多可以同时有5个读卡器;但建议在给定分区和消费者组配对下,只有一个活跃的消费者。 每个活跃读者从其分区接收所有事件;如果同一分区有多个读者,它们会接收重复事件。

更多概念和深入讨论,请参见: 活动中心功能

重试指导

EventHubConsumerClientEventHubProducerClient 接受 options 中,你可以设置 retryOptions ,这样可以调整 SDK 如何处理瞬态错误。 暂时错误的例子包括临时的网络或服务问题。

在消耗事件时重试

如果在 SDK 接收事件时遇到瞬态错误(例如暂时网络问题),它会根据传递给 EventHubConsumerClient. 的重试选项重新尝试接收事件。 如果重试次数用尽, processError 该函数将被调用。

你可以使用重试设置来控制你被告知临时问题(比如网络连接问题)的速度。 例如,如果你需要立即知道网络问题,可以降低 和 maxRetries的值retryDelayInMs

执行 processError 该函数后,只要错误是可重试的,客户端仍会继续接收来自分区的事件。 否则,客户端调用用户提供的 processClose 函数。 当你停止订阅,或者客户端停止读取当前分区的事件(因为该事件被你的应用的其他实例作为负载均衡的一部分)时,也会调用。

processClose 函数提供了必要时更新检查点的机会。 执行 processClose后,客户端(或在负载均衡情况下,来自你应用其他实例的客户端)将调用用户提供的 processInitialize 函数,恢复同一分区最后更新检查点的读取事件。

如果你想停止尝试读取事件,你必须调用close()该方法返回subscription的邮件subscribe

例子

以下章节提供了代码片段,涵盖了使用 Azure 事件中心的一些常见任务

检查活动中心

许多事件中心作都在特定分区的范围内进行。 由于分区归事件中心所有,它们的名称是在创建时分配的。 要了解可用的分区,您可以使用两个客户端中的任一查询事件中心: EventHubProducerClientEventHubConsumerClient

在下面的例子中,我们使用了一个。EventHubProducerClient

import { EventHubProducerClient } from "@azure/event-hubs";

const client = new EventHubProducerClient("connectionString", "eventHubName");

const partitionIds = await client.getPartitionIds();

await client.close();

将活动发布到活动中心

要发布事件,你需要创建一个 EventHubProducerClient. 虽然下面的示例展示了一种创建客户端的方法,但请参见 “认证客户端 ”部分,了解其他实例化客户端的方法。

你可以将事件发布到特定分区,或者让事件中心服务决定应发布到哪些分区。 当事件发布需要高度可用或事件数据应均匀分布在各个分区时,建议使用自动路由。 在下面的示例中,我们将利用自动路由。

  • 使用 EventDataBatch 创建对象
  • 使用 tryAdd 方法将事件添加到批处理中。 你可以一直这样做,直到达到最大批量数量上限,或者你完成添加你喜欢的事件数量,以先到者为准。 该方法返回 false 时表示,由于批次大小已达最大,不能再添加更多事件到批次中。
  • 使用 sendBatch 方法发送事件批次。

在下面的示例中,我们尝试向 Azure 事件中心发送 10 个事件。

import { EventHubProducerClient } from "@azure/event-hubs";

const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

const eventDataBatch = await producerClient.createBatch();
let numberOfEventsToSend = 10;

while (numberOfEventsToSend > 0) {
  const wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
  if (!wasAdded) {
    break;
  }
  numberOfEventsToSend--;
}

await producerClient.sendBatch(eventDataBatch);
await producerClient.close();

你可以在不同阶段传递选项,以控制向 Azure 事件中心发送事件的过程。

  • EventHubProducerClient构造函数会选用一个类型EventHubClientOptions为可选参数,你可以用来指定重试次数等选项。
  • createBatch 方法会选用一个类型的 CreateBatchOptions 参数,你可以用它来确定被创建批次支持的最大批次大小。
  • sendBatch 方法会选择一个类型的 SendBatchOptions 可选参数,你可以用它来指定 abortSignal 取消当前作。
  • 如果你想发送到某个特定分区,方法的过载 sendBatch 允许你传递该分区的ID,以便发送事件。 上面的 “检查事件中心 ”示例展示了如何获取可用分区的 ID。

注意:使用 Azure Stream Analytics 时,发送的事件主体也应是 JSON 对象。 例如:body: { "message": "Hello World" }

从活动中心消费活动

要从事件中心实例消费事件,你还需要知道你想针对哪个消费者群体。 一旦你了解了这一点,就可以创建 EventHubConsumerClient了。 虽然下面的示例展示了一种创建客户端的方法,但请参见 “认证客户端 ”部分,了解其他实例化客户端的方法。

subscribe客户端的方法具有超载,结合构造函数,可以满足多种事件消耗方式:

subscribe 方法会使用一个可选的参数类型 SubscriptionOptions ,你可以用它来指定诸如 maxBatchSize(等待事件数量)和 maxWaitTimeInSeconds(等待 maxBatchSize 事件到达的时间)。

在单一进程中消耗事件

先创建一个实例, EventHubConsumerClient然后调用 subscribe() 该实例开始调用事件。

subscribe 方法通过回调处理来自 Azure 事件中心的事件。 要停止接收事件,你可以调用 close() 方法返回的 subscribe() 对象。

import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";

const client = new EventHubConsumerClient("my-consumer-group", "connectionString", "eventHubName");

// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
  startPosition: earliestEventPosition,
};

const subscription = client.subscribe(
  {
    processEvents: async (events, context) => {
      // event processing code goes here
    },
    processError: async (err, context) => {
      // error reporting/handling code here
    },
  },
  subscriptionOptions,
);

// Wait for a few seconds to receive events before closing
setTimeout(async () => {
  await subscription.close();
  await client.close();
  console.log(`Exiting sample`);
}, 3 * 1000);

在多个进程间负载均衡的事件消耗

Azure Event Hubs 能够处理每秒数百万事件。 为了扩展你的处理应用,你可以运行多个实例,让它们彼此平衡负载。

首先,利用一个构造子超载创建一个 EventHubConsumerClient 实例,该超载需要 , CheckpointStore然后调用 subscribe() 该方法开始消耗事件。 检查点存储将使用户组内的订阅者能够协调多个应用实例之间的处理。

在这个例子中,我们将使用 BlobCheckpointStore from the @azure/eventhubs-checkpointstore-blob package 实现了通过 Azure Blob Storage 实现持久存储所需的读写作。

subscribe 方法通过回调处理来自 Azure 事件中心的事件。 要停止接收事件,你可以调用 close() 方法返回的 subscribe() 对象。

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

if (!(await blobContainerClient.exists())) {
  await blobContainerClient.create();
}

const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
  consumerGroup,
  eventHubConnectionString,
  eventHubName,
  checkpointStore,
);

const subscription = consumerClient.subscribe({
  processEvents: async (events, context) => {
    // event processing code goes here
    if (events.length === 0) {
      // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
      // will pass you an empty array.
      return;
    }

    // Checkpointing will allow your service to pick up from
    // where it left off when restarting.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(events[events.length - 1]);
  },
  processError: async (err, context) => {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
  },
});

// Wait for a few seconds to receive events before closing
await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

await subscription.close();
await consumerClient.close();
console.log(`Exiting sample`);

请参阅“ 在您的应用程序多个实例间平衡分区负载 ”以了解更多信息。

从单一分区中消耗事件

先创建一个实例, EventHubConsumerClient然后调用 subscribe() 该实例开始调用事件。 把你想指向的分区的 id 传递给 subscribe() 只从该分区消费的方法。

在下面的例子中,我们使用了第一个划分。

subscribe 方法通过回调处理来自 Azure 事件中心的事件。 要停止接收事件,你可以调用 close() 方法返回的 subscribe() 对象。

import { EventHubConsumerClient, earliestEventPosition } from "@azure/event-hubs";

const client = new EventHubConsumerClient("my-consumer-group", "connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();

// In this sample, we use the position of earliest available event to start from
// Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
const subscriptionOptions = {
  startPosition: earliestEventPosition,
};

const subscription = client.subscribe(
  partitionIds[0],
  {
    processEvents: async (events, context) => {
      // event processing code goes here
    },
    processError: async (err, context) => {
      // error reporting/handling code here
    },
  },
  subscriptionOptions,
);

// Wait for a few seconds to receive events before closing
setTimeout(async () => {
  await subscription.close();
  await client.close();
  console.log(`Exiting sample`);
}, 3 * 1000);

使用 EventHubConsumerClient 来配合 IotHub

你也可以用 EventHubConsumerClient 来作 IotHub。 这对于从关联的EventHub接收IotHub的遥测数据非常有用。 关联的连接字符串不会有发送声明,因此无法发送事件。

  • 请注意,连接字符串必须是 事件中心兼容端点 (例如“Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name“)
import { EventHubConsumerClient } from "@azure/event-hubs";

const client = new EventHubConsumerClient(
  "my-consumer-group",
  "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name",
);

await client.getEventHubProperties();

// retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
const partitionId = "0";
await client.getPartitionProperties(partitionId);

await client.close();

Troubleshooting

AMQP依赖关系

事件枢纽库依赖于 RHEA-promise 库,用于管理连接、通过 AMQP 协议发送和接收事件。

伐木业

你可以设置 AZURE_LOG_LEVEL 环境变量以启用日志 stderr

export AZURE_LOG_LEVEL=verbose

或者,可以通过在 setLogLevel中调用 @azure/logger 在运行时启用日志记录:

import { setLogLevel } from "@azure/logger";

setLogLevel("info");

有关如何启用日志的更详细说明,可以查看 @azure/记录器包文档

也可以使用此库设置 DEBUG 环境变量以获取日志。 如果还希望从依赖项 rhea-promiserhea 发出日志,这非常有用。

注意: AZURE_LOG_LEVEL(如果设置)优先于 DEBUG。 在同时指定AZURE_LOG_LEVEL或调用 setLogLevel 时,请勿通过 DEBUG 指定任何 azure 库。

  • 只从Event Hubs SDK获取信息层级的调试日志。
export DEBUG=azure:*:info
  • 从 Event Hubs SDK 和协议级库获取调试日志。
export DEBUG=azure*,rhea*
  • 如果 对查看原始事件数据(消耗大量控制台/磁盘空间)不感兴趣,则可以按如下所示设置 DEBUG 环境变量:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • 如果仅对 错误感兴趣, 和 SDK 警告,则可以设置 DEBUG 环境变量,如下所示:
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

后续步骤

更多示例代码

请查看 示例 目录,详细了解如何使用该库发送和接收 事件

Contributing

若要参与此库,请阅读 参与指南 ,详细了解如何生成和测试代码。