Delen via


Een Java-toepassing maken die gebruikmaakt van Azure Cosmos DB voor NoSQL en processor voor wijzigingenfeeds

Azure Cosmos DB is een volledig beheerde NoSQL-databaseservice die wordt geleverd door Microsoft. Hiermee kunt u eenvoudig wereldwijd gedistribueerde en zeer schaalbare toepassingen bouwen. Deze handleiding begeleidt u bij het maken van een Java-toepassing die gebruikmaakt van de Azure Cosmos DB for NoSQL-database en implementeert de wijzigingenfeedprocessor voor realtime gegevensverwerking. De Java-toepassing communiceert met Azure Cosmos DB for NoSQL met behulp van Azure Cosmos DB Java SDK v4.

Belangrijk

Deze zelfstudie is alleen bedoeld voor Azure Cosmos DB Java SDK v4. Bekijk de releaseopmerkingen voor Azure Cosmos DB Java SDK v4, Maven-opslagplaats, wijzigingenfeedprocessor in Azure Cosmos DB en de probleemoplossingsgids voor Azure Cosmos DB Java SDK v4 voor meer informatie. Als u momenteel een oudere versie dan v4 gebruikt, raadpleegt u de gids Migreren naar Azure Cosmos DB Java SDK v4 voor hulp om te upgraden naar v4.

Vereiste voorwaarden

  • Azure Cosmos DB-account: u kunt het maken vanuit Azure Portal of u kunt ook Azure Cosmos DB Emulator gebruiken.

  • Java Development Environment: Zorg ervoor dat u Java Development Kit (JDK) op uw computer hebt geïnstalleerd met ten minste 8 versies.

  • Azure Cosmos DB Java SDK V4: biedt de benodigde functies voor interactie met Azure Cosmos DB.

Achtergrond

De Azure Cosmos DB-wijzigingenfeed biedt een gebeurtenisgestuurde interface voor het activeren van acties als reactie op documentinvoeging die veel wordt gebruikt.

Het werk van het beheren van wijzigingenfeedgebeurtenissen wordt grotendeels verzorgd door de processorbibliotheek voor wijzigingenfeeds die is ingebouwd in de SDK. Deze bibliotheek is krachtig genoeg om wijzigingenfeed-gebeurtenissen over meerdere werknemers te distribueren, indien gewenst. U hoeft alleen een callback op te geven aan de wijzigingenfeed-bibliotheek.

Dit eenvoudige voorbeeld van een Java-toepassing demonstreert realtime gegevensverwerking met Azure Cosmos DB en de wijzigingenfeedprocessor. De toepassing voegt voorbeelddocumenten in een 'feedcontainer' in om een gegevensstroom te simuleren. De wijzigingenfeedprocessor, gebonden aan de feedcontainer, verwerkt binnenkomende wijzigingen en registreert de documentinhoud. De processor beheert automatisch leases voor parallelle verwerking.

Broncode

U kunt de SDK-voorbeeldopslagplaats klonen en dit voorbeeld vinden in SampleChangeFeedProcessor.java:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

Stapsgewijze handleiding

  1. Configureer de ChangeFeedProcessorOptions in een Java-toepassing met behulp van Azure Cosmos DB en Azure Cosmos DB Java SDK V4. De ChangeFeedProcessorOptions biedt essentiële instellingen voor het beheren van het gedrag van de verwerker van wijzigingenfeeds tijdens gegevensverwerking.

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. Initialiseer ChangeFeedProcessor met relevante configuraties, waaronder de hostnaam, feedcontainer, leasecontainer en logica voor gegevensverwerking. Met de methode start() wordt de gegevensverwerking gestart, waardoor gelijktijdige en realtime verwerking van binnenkomende gegevenswijzigingen vanuit de feedcontainer mogelijk wordt.

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. Specificeer de delegate die inkomende gegevenswijzigingen afhandelt aan de hand van de handleChanges() methode. De methode verwerkt de ontvangen JsonNode-documenten uit de wijzigingenfeed. Als ontwikkelaar heb je twee opties voor het afhandelen van het JsonNode-document dat door de Change Feed is geleverd. Een optie is om het document te bewerken in de vorm van een JsonNode. Dit is vooral handig als u niet één uniform gegevensmodel voor alle documenten hebt. De tweede optie: transformeer de JsonNode naar een POJO met dezelfde structuur als de JsonNode. Vervolgens kunt u aan de POJO werken.

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Bouw en voer de Java-toepassing uit. De toepassing start de processor van de wijzigingenfeed, voegt voorbeelddocumenten in de feedcontainer in en verwerkt de binnenkomende wijzigingen.

Conclusion

In deze handleiding hebt u geleerd hoe u een Java-toepassing maakt met behulp van Azure Cosmos DB Java SDK V4 die gebruikmaakt van de Azure Cosmos DB for NoSQL-database en de wijzigingenfeedprocessor gebruikt voor realtime gegevensverwerking. U kunt deze toepassing uitbreiden om complexere gebruiksvoorbeelden af te handelen en robuuste, schaalbare en wereldwijd gedistribueerde toepassingen te bouwen met behulp van Azure Cosmos DB.

Aanvullende bronnen

Volgende stappen

U kunt nu meer te weten komen over de change feed estimator in de volgende artikelen: