Azure Cosmos DB 是由 Microsoft 提供的全託管 NoSQL 資料庫服務。 它讓你能輕鬆打造全球分散且高度可擴展的應用程式。 這份操作指南將引導你了解如何建立一個使用 Azure Cosmos 資料庫來管理 NoSQL 資料庫,並實作變更資料處理器以實現即時資料處理的 Java 應用程式。 Java 應用程式使用 Azure Cosmos DB Java SDK v4 與 Azure Cosmos DB for NoSQL 通訊。
這很重要
這個教學只針對 Azure Cosmos DB Java SDK v4。 請參閱 Azure Cosmos DB Java SDK v4 版本釋出說明、 Maven 倉庫、 Azure Cosmos DB 中的變更訂閱處理器,以及 Azure Cosmos DB Java SDK v4 故障排除指南 以獲取更多資訊。 如果您目前使用的版本比 v4 舊,請參閱遷移至 Azure Cosmos DB Java SDK v4 指南,以取得升級至 v4 的協助。
先決條件
Azure Cosmos 資料庫帳號:你可以從 Azure 入口網站 建立,或者也可以使用 Azure Cosmos DB 模擬器 。
Java 開發環境:確保你的電腦安裝了至少 8 版本的 Java Development Kit(JDK)。
Azure Cosmos DB Java SDK V4:提供與 Azure Cosmos DB 互動所需的功能。
背景
Azure Cosmos DB 變更源提供事件驅動的介面,以觸發對文件新增的回應,其應用範圍廣泛。
管理變更導向事件的工作,主要由內建於 SDK 中的變更導向處理器函式庫負責。 這個函式庫足夠強大,可以將變更動態事件分配給多個工作者,如果需要的話。 你只要給變更資料流庫一個回呼函數即可。
這個簡單的 Java 應用程式範例展示了使用 Azure Cosmos DB 與變更資料處理器進行即時資料處理。 該應用程式會將範例文件插入「饋入容器」中,以模擬資料串流。 綁定於資訊容器的變更導流處理器處理新入的變更並記錄文件內容。 處理器會自動管理租約以進行平行處理。
原始程式碼
你可以複製 SDK 範例庫,並在以下 SampleChangeFeedProcessor.java資料夾中找到這個範例:
git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/
操作指南
用 Azure Cosmos DB 和 Azure Cosmos DB Java SDK V4 在 Java 應用程式中配置
ChangeFeedProcessorOptions。ChangeFeedProcessorOptions提供關鍵設定,以控制變更提要處理器在資料處理過程中的行為。options = new ChangeFeedProcessorOptions(); options.setStartFromBeginning(false); options.setLeasePrefix("myChangeFeedDeploymentUnit"); options.setFeedPollDelay(Duration.ofSeconds(5)); options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);初始化 ChangeFeedProcessor 時,包含相關設定,包括主機名稱、饋送容器、租約容器及資料處理邏輯。 start() 方法啟動資料處理,使得來自饋線容器的輸入資料變更能同時且即時地處理。
logger.info("Start Change Feed Processor on worker (handles changes asynchronously)"); ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder() .hostName("SampleHost_1") .feedContainer(feedContainer) .leaseContainer(leaseContainer) .handleChanges(handleChanges()) .options(options) .buildChangeFeedProcessor(); changeFeedProcessorInstance.start() .subscribeOn(Schedulers.boundedElastic()) .subscribe();指定代理使用
handleChanges()方法來處理收到的資料變更。 該方法處理從變更資料饋送中接收的 JsonNode 文件。 作為開發者,你有兩種方式來處理 Change Feed 提供的 JsonNode 文件。 一種選擇是以 JsonNode 的形式操作文件。 這點非常好,尤其是當你沒有統一資料模型涵蓋所有文件時。 第二個選項是將 JsonNode 轉換成與 JsonNode 相同結構的 POJO。 然後你就可以在POJO上操作。private static Consumer<List<JsonNode>> handleChanges() { return (List<JsonNode> docs) -> { logger.info("Start handleChanges()"); for (JsonNode document : docs) { try { //Change Feed hands the document to you in the form of a JsonNode //As a developer you have two options for handling the JsonNode document provided to you by Change Feed //One option is to operate on the document in the form of a JsonNode, as shown below. This is great //especially if you do not have a single uniform data model for all documents. logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter() .writeValueAsString(document)); //You can also transform the JsonNode to a POJO having the same structure as the JsonNode, //as shown below. Then you can operate on the POJO. CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class); logger.info("id: " + pojo_doc.getId()); } catch (JsonProcessingException e) { e.printStackTrace(); } } isWorkCompleted = true; logger.info("End handleChanges()"); }; }建置並執行 Java 應用程式。 應用程式啟動變更資料處理器,將樣本文件插入資料容器,並處理收到的變更。
Conclusion
在本指南中,你學會了如何使用 Azure Cosmos DB Java SDK V4 建立一個 Java 應用程式,該應用程式使用 Azure Cosmos DB 用於 NoSQL 資料庫,並使用 Change Feed 處理器進行即時資料處理。 你可以擴充此應用程式以處理更複雜的使用案例,並透過 Azure Cosmos DB 建立穩健、可擴展且全球分散的應用程式。
其他資源
後續步驟
你現在可以繼續閱讀以下文章,了解更多關於變換飼料預估器的資訊: