Spring Cloud Stream은 공유 메시징 시스템과 연결된 확장성이 뛰어난 이벤트 기반 마이크로 서비스를 빌드하기 위한 프레임워크입니다.
프레임워크는 이미 확립되고 친숙한 Spring 관용구 및 모범 사례를 기반으로 구축된 유연한 프로그래밍 모델을 제공합니다. 이러한 모범 사례에는 영구 게시/하위 의미 체계, 소비자 그룹 및 상태 저장 파티션에 대한 지원이 포함됩니다.
현재 바인더 구현은 다음과 같습니다.
-
spring-cloud-azure-stream-binder-eventhubs- 자세한 내용은 Azure Event Hubs Spring Cloud Stream Binder 참조하세요. - 자세한 내용은 Azure Service Bus Spring Cloud Stream Binder를 참조하세요.
Azure Event Hubs용 Spring Cloud Stream Binder
주요 개념
Azure Event Hubs용 Spring Cloud Stream Binder는 Spring Cloud Stream 프레임워크에 대한 바인딩 구현을 제공합니다.
이 구현에서는 Spring Integration Event Hubs 채널 어댑터를 기초로 사용합니다. 디자인의 관점에서 Event Hubs는 Kafka와 유사합니다. 또한 Kafka API를 통해 Event Hubs에 액세스할 수 있습니다. 프로젝트에 Kafka API에 대한 종속성이 엄격한 경우 Kafka API 샘플 사용하여 이벤트 허브를
소비자 그룹
Event Hubs는 Apache Kafka와 유사한 소비자 그룹을 지원하지만 논리는 약간 다릅니다. Kafka는 모든 커밋된 오프셋을 broker에 저장하지만 수동으로 처리되는 Event Hubs 메시지의 오프셋을 저장해야 합니다. Event Hubs SDK는 Azure Storage 내에 이러한 오프셋을 저장하는 함수를 제공합니다.
분할 지원
Event Hubs는 Kafka와 유사한 물리적 파티션 개념을 제공합니다. 그러나 소비자와 파티션 간의 Kafka 자동 재조정과 달리 Event Hubs는 일종의 선점 모드를 제공합니다. 스토리지 계정은 어떤 소비자가 어떤 파티션을 소유하는지 결정하는 임대 역할을 합니다. 새 소비자가 시작되면 가장 많이 로드된 소비자로부터 일부 파티션을 도용하여 워크로드 균형을 달성하려고 합니다.
부하 분산 전략을 지정하기 위해 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* 속성이 제공됩니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.
Batch 소비자 지원
Spring Cloud Azure Stream Event Hubs 바인더는 Spring Cloud Stream Batch Consumer 기능지원합니다.
일괄 처리 소비자 모드를 사용하려면 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 속성을 true설정합니다. 사용하도록 설정하면 일괄 처리된 이벤트 목록의 페이로드가 포함된 메시지가 수신되고 Consumer 함수에 전달됩니다. 또한 각 메시지 헤더는 목록으로 변환되며, 그 중 콘텐츠는 각 이벤트에서 구문 분석된 연결된 헤더 값입니다. 파티션 ID, 검사점 및 마지막으로 큐에 추가된 속성의 공동 헤더는 이벤트의 전체 일괄 처리가 동일한 값을 공유하므로 단일 값으로 표시됩니다. 자세한 내용은 Spring Integration
메모
검사점 헤더는 MANUAL 검사점 모드를 사용하는 경우에만 존재합니다.
일괄 처리 소비자의 검사점은 BATCH 및 MANUAL두 가지 모드를 지원합니다.
BATCH 모드는 바인더가 이벤트를 받으면 전체 이벤트 일괄 처리의 검사점을 지정하는 자동 검사점 모드입니다.
MANUAL 모드는 사용자가 이벤트의 검사점을 지정하는 것입니다. 사용하는 경우 Checkpointer 메시지 헤더에 전달되고 사용자는 이를 사용하여 검사점을 수행할 수 있습니다.
접두사 max-size있는 max-wait-time 및 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch. 속성을 설정하여 일괄 처리 크기를 지정할 수 있습니다.
max-size 속성이 필요하며 max-wait-time 속성은 선택 사항입니다. 자세한 내용은 소비자 속성 섹션을 참조하세요.
종속성 설정
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Event Hubs Starter를 사용할 수도 있습니다.
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
구성
바인더는 구성 옵션의 다음 세 부분을 제공합니다.
연결 구성 속성
이 섹션에는 Azure Event Hubs에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.
메모
보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.
spring-cloud-azure-stream-binder-eventhubs의 연결 구성 가능한 속성:
| 재산 | 형 | 묘사 |
|---|---|---|
| spring.cloud.azure.eventhubs.enabled를 |
부울 | Azure Event Hubs를 사용할 수 있는지 여부입니다. |
| spring.cloud.azure.eventhubs.connection-string을 |
문자열 | Event Hubs 네임스페이스 연결 문자열 값입니다. |
| spring.cloud.azure.eventhubs.namespace를 |
문자열 | FQDN의 접두사인 Event Hubs 네임스페이스 값입니다. FQDN은 NamespaceName.DomainName으로 구성되어야 합니다. |
| spring.cloud.azure.eventhubs.domain-name을 |
문자열 | Azure Event Hubs 네임스페이스 값의 도메인 이름입니다. |
| spring.cloud.azure.eventhubs.custom-endpoint-address를 |
문자열 | 사용자 지정 엔드포인트 주소입니다. |
팁
일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Event Hubs 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 spring.cloud.azure.eventhubs.접두사로 구성할 수 있습니다.
바인더는 기본적으로 Spring Could Azure Resource Manager Data 관련 역할로 부여되지 않은 보안 주체를 사용하여 연결 문자열을 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource ManagerBasic 사용량 섹션을 참조하세요.
검사점 구성 속성
이 섹션에는 파티션 소유권 및 검사점 정보를 유지하는 데 사용되는 Storage Blobs 서비스에 대한 구성 옵션이 포함되어 있습니다.
메모
버전 4.0.0부터 spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists 수동으로 사용하도록 설정되지 않은 경우 spring.cloud.stream.bindings.binding-name.destination이름으로 스토리지 컨테이너가 자동으로 만들어지지 않습니다.
spring-cloud-azure-stream-binder-eventhubs의 구성 가능한 속성 검사점:
| 재산 | 형 | 묘사 |
|---|---|---|
| spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists를 |
부울 | 컨테이너가 없는 경우 컨테이너를 만들 수 있도록 허용할지 여부입니다. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name을 |
문자열 | 스토리지 계정의 이름입니다. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key를 |
문자열 | 스토리지 계정 액세스 키입니다. |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name을 |
문자열 | 스토리지 컨테이너 이름입니다. |
팁
일반적인 Azure 서비스 SDK 구성 옵션은 Storage Blob 검사점 저장소에서도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 spring.cloud.azure.eventhubs.processor.checkpoint-store접두사로 구성할 수 있습니다.
Azure Event Hubs 바인딩 구성 속성
다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.
소비자 속성
이러한 속성은 EventHubsConsumerProperties통해 노출됩니다.
메모
반복을 방지하기 위해 버전 4.17.0 및 5.11.0부터 Spring Cloud Azure Stream Binder Event Hubs는 모든 채널에 대한 값을 spring.cloud.stream.eventhubs.default.consumer.<property>=<value>형식으로 설정하도록 지원합니다.
spring-cloud-azure-stream-binder-eventhubs의 소비자 구성 가능한 속성:
| 재산 | 형 | 묘사 |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.mode를 |
체크포인트 모드 | 소비자가 메시지를 검사점 지정하는 방법을 결정할 때 사용되는 검사점 모드 |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.count를 |
정수 | 각 파티션이 하나의 검사점을 수행할 메시지 양을 결정합니다.
PARTITION_COUNT 검사점 모드를 사용하는 경우에만 적용됩니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.checkpoint.interval을 |
기간 | 하나의 검사점을 수행할 시간 간격을 결정합니다.
TIME 검사점 모드를 사용하는 경우에만 적용됩니다. |
| spring.cloud.stream.eventhubs.bindings를 <.batch.max 크기 | 정수 | 일괄 처리의 최대 이벤트 수입니다. 일괄 처리 소비자 모드에 필요합니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time |
기간 | 일괄 처리 사용의 최대 기간입니다. 일괄 처리 소비자 모드를 사용하도록 설정하고 선택 사항인 경우에만 적용됩니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval을 |
기간 | 업데이트에 대한 간격 기간입니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy를 |
로드밸런싱 전략 | 부하 분산 전략입니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval을 |
기간 | 파티션 소유권이 만료되는 기간입니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties를 |
부울 | 이벤트 프로세서가 연결된 파티션의 마지막 큐에 포함된 이벤트에 대한 정보를 요청하고 이벤트가 수신될 때 해당 정보를 추적해야 하는지 여부입니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count를 |
정수 | 이벤트 허브 소비자가 적극적으로 수신하고 로컬로 큐에 대기하는 이벤트 수를 제어하기 위해 소비자가 사용하는 수입니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position을 |
키를 파티션 ID로 매핑하고 StartPositionProperties |
파티션의 검사점이 검사점 저장소에 없는 경우 각 파티션에 사용할 이벤트 위치를 포함하는 맵입니다. 이 맵은 파티션 ID에서 키가 지정됩니다. |
메모
initial-partition-event-position 구성은 각 이벤트 허브의 초기 위치를 지정하는 map 허용합니다. 따라서 해당 키는 파티션 ID이며 값은 오프셋, 시퀀스 번호, 큐에 포함된 날짜 시간 및 포함 여부의 속성을 포함하는 StartPositionProperties값입니다. 예를 들어 다음과 같이 설정할 수 있습니다.
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
고급 소비자 구성
위의 연결, 검사점및 일반적인 Azure SDK 클라이언트 구성은 접두사 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다.
생산자 속성
이러한 속성은 EventHubsProducerProperties통해 노출됩니다.
메모
반복을 방지하기 위해 버전 4.17.0 및 5.11.0부터 Spring Cloud Azure Stream Binder Event Hubs는 모든 채널에 대한 값을 spring.cloud.stream.eventhubs.default.producer.<property>=<value>형식으로 설정하도록 지원합니다.
spring-cloud-azure-stream-binder-eventhubs의 생산자 구성 가능한 속성:
| 재산 | 형 | 묘사 |
|---|---|---|
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync를 |
부울 | 생산자 동기화에 대한 스위치 플래그입니다. true이면 생산자는 보내기 작업 후 응답을 기다립니다. |
| spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout을 |
오래 | 보내기 작업 후 응답을 기다리는 시간입니다. 동기화 생산자를 사용하도록 설정한 경우에만 적용됩니다. |
고급 생산자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사 spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다.
기본 사용량
Event Hubs에서/받는 메시지 보내기 및 받기
구성 옵션을 자격 증명 정보로 채웁니다.
연결 문자열로 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL메모
사용 가능한 가장 안전한 인증 흐름을 사용하는 것이 권장됩니다. 데이터베이스, 캐시, 메시징 또는 AI 서비스와 같이 이 절차에서 설명하는 인증 흐름은 애플리케이션에 대한 신뢰 수준이 매우 높고 다른 흐름에 존재하지 않는 위험을 수반합니다. 암호 없는 연결 또는 키 없는 연결에 대한 관리 ID와 같은 더 안전한 옵션이 실행 가능하지 않은 경우에만 이 흐름을 사용합니다. 로컬 컴퓨터 작업의 경우 암호 없는 연결이나 키 없는 연결에 사용자 ID를 사용하는 것이 좋습니다.
서비스 주체 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
메모
tenant-id 허용되는 값은 common, organizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은
자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
공급업체 및 소비자를 정의합니다.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
분할 지원
보낼 메시지에 대한 파티션 정보를 구성하기 위해 사용자가 제공한 파티션 정보가 포함된 PartitionSupplier 만들어집니다. 다음 순서도는 파티션 ID 및 키에 대해 다른 우선 순위를 가져오는 프로세스를 보여줍니다.
분할 지원 프로세스의 순서도를 보여 주는
Batch 소비자 지원
다음 예제와 같이 일괄 처리 구성 옵션을 제공합니다.
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed공급업체 및 소비자를 정의합니다.
BATCH검사점 모드의 경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 사용할 수 있습니다.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }MANUAL검사점 모드의 경우 다음 코드를 사용하여 메시지를 보내고 일괄 처리로 사용/검사점을 사용할 수 있습니다.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
메모
일괄 처리 사용 모드에서 Spring Cloud Stream 바인더의 기본 콘텐츠 형식은 application/json메시지 페이로드가 콘텐츠 형식과 정렬되어 있는지 확인합니다. 예를 들어 기본 콘텐츠 형식의 application/json 사용하여 String 페이로드가 있는 메시지를 수신하는 경우 페이로드는 원래 JSON String 텍스트의 큰따옴표로 묶은 String합니다.
text/plain 콘텐츠 형식의 경우 String 개체일 수 있습니다. 자세한 내용은 Spring Cloud Stream 콘텐츠 형식 협상참조하세요.
오류 메시지 처리
아웃바운드 바인딩 오류 메시지 처리
기본적으로 Spring Integration은
errorChannel라는 전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }인바운드 바인딩 오류 메시지 처리
Spring Cloud Stream Event Hubs Binder는 인바운드 메시지 바인딩에 대한 오류를 처리하는 하나의 솔루션인 오류 처리기를 지원합니다.
오류 처리기:
Spring Cloud Stream은
Consumer인스턴스를 허용하는ErrorMessage추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을 노출합니다. 자세한 내용은 Spring Cloud Stream 설명서에서 오류 메시지처리를 참조하세요. 바인딩 기본 오류 처리기
모든 인바운드 바인딩 오류 메시지를 사용하도록 단일
Consumerbean을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }또한
spring.cloud.stream.default.error-handler-definition속성을 함수 이름으로 설정해야 합니다.바인딩 관련 오류 처리기
특정 인바운드 바인딩 오류 메시지를 사용하도록
Consumerbean을 구성합니다. 다음 함수는 특정 인바운드 바인딩 오류 채널을 구독하고 바인딩 기본 오류 처리기보다 우선 순위가 높습니다.@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }또한
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition속성을 함수 이름으로 설정해야 합니다.
Event Hubs 메시지 헤더
지원되는 기본 메시지 헤더는 Spring Integration대한
여러 바인더 지원
여러 Event Hubs 네임스페이스에 대한 연결도 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열을 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원됩니다. 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.
Event Hubs에서 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000메모
이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000같은 구성을 사용할 수 있습니다.메모
사용 가능한 가장 안전한 인증 흐름을 사용하는 것이 권장됩니다. 데이터베이스, 캐시, 메시징 또는 AI 서비스와 같이 이 절차에서 설명하는 인증 흐름은 애플리케이션에 대한 신뢰 수준이 매우 높고 다른 흐름에 존재하지 않는 위험을 수반합니다. 암호 없는 연결 또는 키 없는 연결에 대한 관리 ID와 같은 더 안전한 옵션이 실행 가능하지 않은 경우에만 이 흐름을 사용합니다. 로컬 컴퓨터 작업의 경우 암호 없는 연결이나 키 없는 연결에 사용자 ID를 사용하는 것이 좋습니다.
두 개의 공급업체와 두 명의 소비자를 정의해야 합니다.
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
리소스 프로비저닝
Event Hubs 바인더는 이벤트 허브 및 소비자 그룹의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
메모
tenant-id 허용되는 값은 common, organizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은
샘플
자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.
Azure Service Bus용 Spring Cloud Stream Binder
주요 개념
Azure Service Bus용 Spring Cloud Stream Binder는 Spring Cloud Stream Framework에 대한 바인딩 구현을 제공합니다. 이 구현에서는 Spring Integration Service Bus 채널 어댑터를 기초로 사용합니다.
예약된 메시지
이 바인더는 지연된 처리를 위해 토픽에 메시지 전송을 지원합니다. 사용자는 헤더 x-delay 메시지의 지연 시간을 밀리초 단위로 표현하여 예약된 메시지를 보낼 수 있습니다. 메시지는 x-delay 밀리초 후에 해당 토픽에 전달됩니다.
소비자 그룹
Service Bus 토픽은 Apache Kafka와 비슷한 소비자 그룹을 지원하지만 논리는 약간 다릅니다.
이 바인더는 소비자 그룹 역할을 하는 토픽의 Subscription 사용합니다.
종속성 설정
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
또는 Maven에 대한 다음 예제와 같이 Spring Cloud Azure Stream Service Bus Starter를 사용할 수도 있습니다.
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
구성
바인더는 구성 옵션의 다음 두 부분을 제공합니다.
연결 구성 속성
이 섹션에는 Azure Service Bus에 연결하는 데 사용되는 구성 옵션이 포함되어 있습니다.
메모
보안 주체를 사용하여 Azure 리소스에 액세스하기 위해 Microsoft Entra ID로 인증하고 권한을 부여하도록 선택한 경우 Microsoft Entra ID 액세스 권한 부여를 참조하여 보안 주체에게 Azure 리소스에 액세스할 수 있는 충분한 권한이 부여되었는지 확인합니다.
spring-cloud-azure-stream-binder-servicebus의 연결 구성 가능한 속성:
| 재산 | 형 | 묘사 |
|---|---|---|
| spring.cloud.azure.servicebus.enabled를 |
부울 | Azure Service Bus를 사용할 수 있는지 여부입니다. |
| spring.cloud.azure.servicebus.connection-string을 |
문자열 | Service Bus 네임스페이스 연결 문자열 값입니다. |
| spring.cloud.azure.servicebus.custom-endpoint-address를 |
문자열 | Service Bus에 연결할 때 사용할 사용자 지정 엔드포인트 주소입니다. |
| spring.cloud.azure.servicebus.namespace를 |
문자열 | FQDN의 접두사인 Service Bus 네임스페이스 값입니다. FQDN은 NamespaceName.DomainName으로 구성되어야 합니다. |
| spring.cloud.azure.servicebus.domain-name |
문자열 | Azure Service Bus 네임스페이스 값의 도메인 이름입니다. |
메모
일반적인 Azure 서비스 SDK 구성 옵션은 Spring Cloud Azure Stream Service Bus 바인더에도 구성할 수 있습니다. 지원되는 구성 옵션은 Spring Cloud Azure 구성도입되었으며 통합 접두사 spring.cloud.azure. 또는 spring.cloud.azure.servicebus.접두사로 구성할 수 있습니다.
바인더는 기본적으로 Spring Could Azure Resource Manager Data 관련 역할로 부여되지 않은 보안 주체를 사용하여 연결 문자열을 검색하는 방법에 대한 자세한 내용은 Spring Could Azure Resource ManagerBasic 사용량 섹션을 참조하세요.
Azure Service Bus 바인딩 구성 속성
다음 옵션은 소비자 속성, 고급 소비자 구성, 생산자 속성 및 고급 생산자 구성의 네 가지 섹션으로 나뉩니다.
소비자 속성
이러한 속성은 ServiceBusConsumerProperties통해 노출됩니다.
메모
반복을 방지하기 위해 버전 4.17.0 및 5.11.0부터 Spring Cloud Azure Stream Binder Service Bus는 모든 채널에 대한 값을 spring.cloud.stream.servicebus.default.consumer.<property>=<value>형식으로 설정하도록 지원합니다.
spring-cloud-azure-stream-binder-servicebus의 소비자 구성 가능한 속성:
| 재산 | 형 | 기본값 | 묘사 |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected를 |
부울 | 거짓 | 실패한 메시지가 DLQ로 라우팅되는 경우 |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls |
정수 | 1 | Service Bus 프로세서 클라이언트에서 처리해야 하는 최대 동시 메시지입니다. 세션을 사용하도록 설정하면 각 세션에 적용됩니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions |
정수 | 영 | 지정된 시간에 처리할 최대 동시 세션 수입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled를 |
부울 | 영 | 세션을 사용할 수 있는지 여부입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer .session-idle-timeout을 | 기간 | 영 | 현재 활성 세션에 대해 메시지가 수신될 때까지 대기할 최대 시간(기간)을 설정합니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count를 |
정수 | 0 | Service Bus 프로세서 클라이언트의 프리페치 수입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue를 |
하위 큐 | 없음 | 연결할 하위 큐의 형식입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration을 |
기간 | 5분 | 잠금 자동 갱신을 계속할 시간입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode를 |
ServiceBusReceiveMode | peek_lock | Service Bus 프로세서 클라이언트의 수신 모드입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete를 |
부울 | 참 | 메시지를 자동으로 정정할지 여부입니다. false로 설정하면 개발자가 메시지를 수동으로 해결할 수 있도록 Checkpointer 메시지 헤더가 추가됩니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-size-in-megabytes를 |
길다 | 1024 | 큐/토픽의 최대 크기(큐/토픽에 할당된 메모리 크기)입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.consumer.default-message-time-to-live |
기간 | P10675199DT2H48M5.4775807S입니다. (10675199 일, 2시간, 48분, 5초 및 477밀리초) | 메시지가 Service Bus로 전송되는 시점부터 시작하여 메시지가 만료되는 기간입니다. |
중요하다
ARM(Azure Resource Manager)을 사용하는 경우 spring.cloud.stream.servicebus.bindings.<binding-name>.consume.entity-type 속성을 구성해야 합니다. 자세한 내용은 GitHub의 servicebus-queue-binder-arm 샘플을 참조하세요.
고급 소비자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.사용하여 구성할 수 있는 각 바인더 소비자에 대한 사용자 지정을 지원합니다.
생산자 속성
이러한 속성은 ServiceBusProducerProperties통해 노출됩니다.
메모
반복을 방지하기 위해 버전 4.17.0 및 5.11.0부터 Spring Cloud Azure Stream Binder Service Bus는 모든 채널에 대한 값을 spring.cloud.stream.servicebus.default.producer.<property>=<value>형식으로 설정하도록 지원합니다.
spring-cloud-azure-stream-binder-servicebus의 생산자 구성 가능한 속성:
| 재산 | 형 | 기본값 | 묘사 |
|---|---|---|---|
| spring.cloud.stream.servicebus.bindings.binding-name.producer.sync를 |
부울 | 거짓 | 생산자 동기화에 대한 스위치 플래그입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout을 |
오래 | 1만 | 생산자를 보내기 위한 시간 제한 값입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type을 |
ServiceBusEntityType | 영 | 바인딩 생산자에 필요한 생산자의 Service Bus 엔터티 형식입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.max-size-in-megabytes를 |
길다 | 1024 | 큐/토픽의 최대 크기(큐/토픽에 할당된 메모리 크기)입니다. |
| spring.cloud.stream.servicebus.bindings.binding-name.producer.default-message-time-to-live를 |
기간 | P10675199DT2H48M5.4775807S입니다. (10675199 일, 2시간, 48분, 5초 및 477밀리초) | 메시지가 Service Bus로 전송되는 시점부터 시작하여 메시지가 만료되는 기간입니다. |
중요하다
바인딩 생산자를 사용하는 경우 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type 속성을 구성해야 합니다.
고급 생산자 구성
위의 연결 및 일반적인 Azure SDK 클라이언트 구성은 접두사 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.사용하여 구성할 수 있는 각 바인더 생산자에 대한 사용자 지정을 지원합니다.
기본 사용량
Service Bus에서/받는 메시지 보내기 및 받기
구성 옵션을 자격 증명 정보로 채웁니다.
연결 문자열로 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic메모
사용 가능한 가장 안전한 인증 흐름을 사용하는 것이 권장됩니다. 데이터베이스, 캐시, 메시징 또는 AI 서비스와 같이 이 절차에서 설명하는 인증 흐름은 애플리케이션에 대한 신뢰 수준이 매우 높고 다른 흐름에 존재하지 않는 위험을 수반합니다. 암호 없는 연결 또는 키 없는 연결에 대한 관리 ID와 같은 더 안전한 옵션이 실행 가능하지 않은 경우에만 이 흐름을 사용합니다. 로컬 컴퓨터 작업의 경우 암호 없는 연결이나 키 없는 연결에 사용자 ID를 사용하는 것이 좋습니다.
서비스 주체 자격 증명의 경우 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
메모
tenant-id 허용되는 값은 common, organizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은
자격 증명을 관리 ID로 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
공급업체 및 소비자를 정의합니다.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
파티션 키 지원
바인더는 메시지 헤더에서 파티션 키 및 세션 ID를 설정하여 Service Bus 분할 지원합니다. 이 섹션에서는 메시지의 파티션 키를 설정하는 방법을 소개합니다.
Spring Cloud Stream은 파티션 키 SpEL 식 속성 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression제공합니다. 예를 들어 이 속성을 "'partitionKey-' + headers[<message-header-key>]" 설정하고 message-header-key라는 헤더를 추가합니다. Spring Cloud Stream은 식을 평가할 때 이 헤더의 값을 사용하여 파티션 키를 할당합니다. 다음 코드는 예제 생산자를 제공합니다.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
세션 지원
바인더는 Service Bus의
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
메모
Service Bus 분할따라 세션 ID는 파티션 키보다 우선 순위가 높습니다. 따라서 ServiceBusMessageHeaders#SESSION_ID 헤더와 ServiceBusMessageHeaders#PARTITION_KEY 헤더가 모두 설정되면 결국 세션 ID 값이 파티션 키의 값을 덮어쓰는 데 사용됩니다.
오류 메시지 처리
아웃바운드 바인딩 오류 메시지 처리
기본적으로 Spring Integration은
errorChannel라는 전역 오류 채널을 만듭니다. 아웃바운드 바인딩 오류 메시지를 처리하도록 다음 메시지 엔드포인트를 구성합니다.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }인바운드 바인딩 오류 메시지 처리
Spring Cloud Stream Service Bus 바인더는 인바운드 메시지 바인딩에 대한 오류를 처리하는 두 가지 솔루션인 바인더 오류 처리기 및 처리기를 지원합니다.
바인더 오류 처리기:
기본 바인더 오류 처리기는 인바운드 바인딩을 처리합니다. 이 처리기를 사용하여
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected사용하도록 설정된 경우 실패한 메시지를 배달 못한 편지 큐로 보냅니다. 그렇지 않으면 실패한 메시지가 중단됩니다. 바인더 오류 처리기는 제공된 다른 오류 처리기와 함께 사용할 수 없습니다.오류 처리기:
Spring Cloud Stream은
Consumer인스턴스를 허용하는ErrorMessage추가하여 사용자 지정 오류 처리기를 제공하는 메커니즘을 노출합니다. 자세한 내용은 Spring Cloud Stream 설명서에서 오류 메시지처리를 참조하세요. 바인딩 기본 오류 처리기
모든 인바운드 바인딩 오류 메시지를 사용하도록 단일
Consumerbean을 구성합니다. 다음 기본 함수는 각 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }또한
spring.cloud.stream.default.error-handler-definition속성을 함수 이름으로 설정해야 합니다.바인딩 관련 오류 처리기
특정 인바운드 바인딩 오류 메시지를 사용하도록
Consumerbean을 구성합니다. 다음 함수는 바인딩 기본 오류 처리기보다 우선 순위가 높은 특정 인바운드 바인딩 오류 채널을 구독합니다.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }또한
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition속성을 함수 이름으로 설정해야 합니다.
Service Bus 메시지 헤더
지원되는 기본 메시지 헤더는 Spring Integration대한
메모
파티션 키를 설정할 때 메시지 헤더의 우선 순위가 Spring Cloud Stream 속성보다 높습니다. 따라서 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressionServiceBusMessageHeaders#SESSION_ID 및 ServiceBusMessageHeaders#PARTITION_KEY 헤더가 구성되지 않은 경우에만 적용됩니다.
여러 바인더 지원
여러 Service Bus 네임스페이스에 대한 연결도 여러 바인더를 사용하여 지원됩니다. 이 샘플에서는 연결 문자열을 예로 사용합니다. 서비스 주체 및 관리 ID의 자격 증명도 지원되며, 사용자는 각 바인더의 환경 설정에서 관련 속성을 설정할 수 있습니다.
ServiceBus의 여러 바인더를 사용하려면 application.yml 파일에서 다음 속성을 구성합니다.
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000메모
이전 애플리케이션 파일은 모든 바인딩에 애플리케이션에 대한 단일 기본 폴러를 구성하는 방법을 보여 줍니다. 특정 바인딩에 대해 폴러를 구성하려면
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000같은 구성을 사용할 수 있습니다.메모
사용 가능한 가장 안전한 인증 흐름을 사용하는 것이 권장됩니다. 데이터베이스, 캐시, 메시징 또는 AI 서비스와 같이 이 절차에서 설명하는 인증 흐름은 애플리케이션에 대한 신뢰 수준이 매우 높고 다른 흐름에 존재하지 않는 위험을 수반합니다. 암호 없는 연결 또는 키 없는 연결에 대한 관리 ID와 같은 더 안전한 옵션이 실행 가능하지 않은 경우에만 이 흐름을 사용합니다. 로컬 컴퓨터 작업의 경우 암호 없는 연결이나 키 없는 연결에 사용자 ID를 사용하는 것이 좋습니다.
공급업체 2개와 소비자 2개를 정의해야 합니다.
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
리소스 프로비저닝
Service Bus 바인더는 큐, 토픽 및 구독의 프로비저닝을 지원하며, 사용자는 다음 속성을 사용하여 프로비저닝을 사용하도록 설정할 수 있습니다.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
메모
tenant-id 허용되는 값은 common, organizations, consumers또는 테넌트 ID입니다. 이러한 값에 대한 자세한 내용은
Service Bus 클라이언트 속성 사용자 지정
개발자는 AzureServiceClientBuilderCustomizer 사용하여 Service Bus 클라이언트 속성을 사용자 지정할 수 있습니다. 다음 예제에서는 sessionIdleTimeoutServiceBusClientBuilder 속성을 사용자 지정합니다.
@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}
샘플
자세한 내용은 GitHub의 azure-spring-boot-samples 리포지토리를 참조하세요.