次の方法で共有


Azure Service Bus キューとの間でメッセージを送受信する (Go)

このチュートリアルでは、Go プログラミング言語を使用して、Azure Service Bus キューとの間でメッセージを送受信する方法について説明します。

Azure Service Bus は、メッセージ キューと発行/サブスクライブ機能を備えたフル マネージドのエンタープライズ メッセージ ブローカーです。 Service Bus は、アプリケーションとサービスを互いに切り離すために使用され、分散型で信頼性が高く、高パフォーマンスのメッセージ トランスポートを提供します。

Azure SDK for Go の azservicebus パッケージを使用すると、Azure Service Bus と Go プログラミング言語を使用してメッセージを送受信できます。

このチュートリアルを終了すると、1 つのメッセージまたはメッセージのバッチをキューに送信したり、メッセージを受信したり、処理されない配信不能メッセージを受信したりできます。

[前提条件]

サンプル アプリを作成する

開始するには、新しい Go モジュールを作成します。

  1. service-bus-go-how-to-use-queuesという名前のモジュールの新しいディレクトリを作成します。

  2. azservicebus ディレクトリで、モジュールを初期化し、必要なパッケージをインストールします。

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. main.go という名前で新しいファイルを作成します。

クライアントの認証と作成

main.go ファイルで、GetClientという名前の新しい関数を作成し、次のコードを追加します。

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

GetClient関数は、Azure Service Bus 名前空間と資格情報を使用して作成された新しいazservicebus.Client オブジェクトを返します。 名前空間は、 AZURE_SERVICEBUS_HOSTNAME 環境変数によって提供されます。 資格情報は、 azidentity.NewDefaultAzureCredential 関数を使用して作成されます。

ローカル開発では、 DefaultAzureCredential は Azure CLI からのアクセス トークンを使用しました。これは、 az login コマンドを実行して Azure に対して認証することで作成できます。

ヒント

接続文字列で認証するには、 NewClientFromConnectionString 関数を使用します。

キューにメッセージを送信する

main.go ファイルで、SendMessageという名前の新しい関数を作成し、次のコードを追加します。

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage は、メッセージ文字列と azservicebus.Client オブジェクトの 2 つのパラメーターを受け取ります。 次に、新しい azservicebus.Sender オブジェクトを作成し、メッセージをキューに送信します。 一括メッセージを送信するには、 SendMessageBatch 関数を main.go ファイルに追加します。

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch は、メッセージのスライスと azservicebus.Client オブジェクトの 2 つのパラメーターを受け取ります。 次に、新しい azservicebus.Sender オブジェクトを作成し、メッセージをキューに送信します。

キューからメッセージを受信する

キューにメッセージを送信した後は、 azservicebus.Receiver の種類でメッセージを受信できます。 キューからメッセージを受信するには、 GetMessage 関数を main.go ファイルに追加します。

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage は、 azservicebus.Client オブジェクトを受け取り、新しい azservicebus.Receiver オブジェクトを作成します。 その後、キューからメッセージを受信します。 Receiver.ReceiveMessages関数は、コンテキストと受信するメッセージの数という 2 つのパラメーターを受け取ります。 Receiver.ReceiveMessages関数は、azservicebus.ReceivedMessage オブジェクトのスライスを返します。

次に、 for ループがメッセージを反復処理し、メッセージ本文を出力します。 その後、メッセージを完了するために CompleteMessage 関数が呼び出され、キューから削除されます。

長さの制限を超えたメッセージ、無効なキューに送信されたメッセージ、または正常に処理されないメッセージは、配信不能キューに送信できます。 配信不能キューにメッセージを送信するには、 SendDeadLetterMessage 関数を main.go ファイルに追加します。

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage は、 azservicebus.Client オブジェクトと azservicebus.ReceivedMessage オブジェクトを受け取ります。 次に、配信不能キューにメッセージを送信します。 この関数は、コンテキストと azservicebus.DeadLetterOptions オブジェクトの 2 つのパラメーターを受け取ります。 メッセージが配信不能キューに送信されない場合、 Receiver.DeadLetterMessage 関数はエラーを返します。

配信不能キューからメッセージを受信するには、ReceiveDeadLetterMessage ファイルにmain.go関数を追加します。

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage は、azservicebus.Client オブジェクトを受け取り、デッドレターキューのオプションを含む新しい azservicebus.Receiver オブジェクトを作成します。 その後、デッドレターキューからメッセージを受信します。 関数はその後、デッドレターキューから1つのメッセージを受信します。 その後、そのメッセージの配信不能の理由と説明を出力します。

サンプル コード

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

コードの実行

コードを実行する前に、 AZURE_SERVICEBUS_HOSTNAMEという名前の環境変数を作成します。 環境変数の値を Service Bus 名前空間に設定します。

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

次に、次の go run コマンドを実行してアプリを実行します。

go run main.go

次のステップ

詳細については、次のリンクを参照してください。