次の方法で共有


Python から Azure Queue Storage を使用する方法

概要

この記事では、Azure Queue Storage サービスを使用する一般的なシナリオについて説明します。 対象となるシナリオには、キュー メッセージの挿入、プレビュー、取得、削除があります。 キューを作成および削除するためのコードについても説明します。

この記事の例は Python で記述されており、Python 用の Azure Queue Storage クライアント ライブラリを使用しています。 キューの詳細については、「 次のステップ 」セクションを参照してください。

Queue Storage とは

Azure Queue Storage は、HTTP または HTTPS を使用して認証された呼び出しを介して世界中のどこからでもアクセスできる多数のメッセージを格納するためのサービスです。 1 つのキュー メッセージのサイズは最大 64 KB で、キューにはストレージ アカウントの合計容量制限まで、数百万のメッセージを含めることができます。 キュー ストレージは、非同期的に処理する作業のバックログを作成するためによく使用されます。

キューサービスの概念

Azure Queue サービスには、次のコンポーネントが含まれています。

Azure Queue サービス コンポーネント

  • #B0 ストレージ アカウント: #C1 Azure Storage へのすべてのアクセスは、ストレージ アカウントを介して行われます。 ストレージ アカウントの詳細については、「ストレージ アカウントの概要」を参照してください。

  • キュー: キューは、メッセージのセットを格納します。 すべてのメッセージはキューに 格納されている必要があります。 キュー名は小文字で入力する必要があります。 キューの名前付け規則については、「 Naming Queues and Metadata (キューとメタデータの名前付け規則)」を参照してください。

  • メッセージ: 形式を問わず、メッセージのサイズは最大で 64 KB です。 メッセージがキューに残ることができる最大時間は 7 日間です。 バージョン 2017-07-29 以降では、最大有効期間を任意の正の数にすることができます。また、-1 は、メッセージが期限切れにならないことを示します。 このパラメーターを省略すると、既定の有効期間は 7 日になります。

  • URL 形式: キューは、次の URL 形式を使用してアドレス指定できます: http://.queue.core.windows.net/<storage account>

    次の URL は、図内のキューをアドレス指定します。

    http://myaccount.queue.core.windows.net/incoming-orders

Azure Storage アカウントを作成する

最初の Azure ストレージ アカウントを作成する最も簡単な方法は、Azure portalを使用することです。 詳細については、「 ストレージ アカウントの作成」を参照してください。

Azure PowerShell 、Azure CLI 、または .NET Azure Storage Resource Provider使用して、Azure ストレージ アカウントを作成することもできます。

現時点で Azure でストレージ アカウントを作成しない場合は、Azurite ストレージ エミュレーターを使用して、ローカル環境でコードを実行してテストすることもできます。 詳細については、「ローカルの Azure Storage 開発に Azurite エミュレーターを使用する」を参照してください。

Azure Storage SDK for Python をダウンロードしてインストールする

Azure Storage SDK for Python には、Python v2.7、v3.3 以降が必要です。

PyPI 経由でインストールする

Python パッケージ インデックス (PyPI) を使用してインストールするには、次のように入力します。

pip install azure-storage-queue

Azure Storage SDK for Python v0.36 以前からアップグレードする場合は、最新のパッケージをインストールする前に、 pip uninstall azure-storage を使用して古い SDK をアンインストールします。

別のインストール方法については、 Azure SDK for Python に関するページを参照してください。

Azure Portal で資格情報をコピーする

サンプル アプリケーションから Azure Storage に対して要求を実行するときは、承認されている必要があります。 要求を承認するには、ストレージ アカウントの資格情報を接続文字列としてアプリケーションに追加します。 ストレージ アカウントの資格情報を表示するには、次の手順に従います。

  1. Azure portal にサインインします。

  2. 自分のストレージ アカウントを探します。

  3. ストレージ アカウント メニュー ウィンドウの [セキュリティとネットワーク] で、 [アクセス キー] を選択します。 ここで、アカウント アクセス キーと各キーの完全な接続文字列が確認できます。

    Azure portal 内のアクセス キー設定の場所を示すスクリーンショット

  4. [アクセス キー] ペインで、 [キーの表示] を選択します。

  5. [key1] セクションで、 [接続文字列] の値を見つけます。 [クリップボードにコピー] アイコンを選択して、接続文字列をコピーします。 次のセクションで、この接続文字列の値を環境変数に追加します。

    Azure portal から接続文字列をコピーする方法を示すスクリーンショット

ストレージ接続文字列を構成してください

接続文字列をコピーした後、アプリケーションを実行しているローカル マシンの新しい環境変数にそれを書き込みます。 環境変数を設定するには、コンソール ウィンドウを開いて、お使いのオペレーティング システムの手順に従います。 <yourconnectionstring> は、実際の接続文字列に置き換えてください。

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

Windows で環境変数を追加した後、コマンド ウィンドウの新しいインスタンスを開始する必要があります。

プログラムの再起動

環境変数を追加した後、環境変数の読み取りを必要とする実行中のプログラムをすべて再起動します。 たとえば、続行する前に、ご使用の開発環境またはエディターを再起動します。

Queue Storage にアクセスするようにアプリケーションを構成する

QueueClient オブジェクトを使用すると、キューを操作できます。 プログラムで Azure キューにアクセスする Python ファイルの先頭付近に、次のコードを追加します。

from azure.storage.queue import (
        QueueClient,
        BinaryBase64EncodePolicy,
        BinaryBase64DecodePolicy
)

import os, uuid

os パッケージは、環境変数を取得するためのサポートを提供します。 uuid パッケージは、キュー名に対する一意の識別子を生成するためのサポートを提供します。

キューを作成する

接続文字列は、前に設定した AZURE_STORAGE_CONNECTION_STRING 環境変数から取得されます。

次のコードは、ストレージ接続文字列を使用して QueueClient オブジェクトを作成します。

# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())

# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)

# Create the queue
queue_client.create_queue()

Azure のキュー メッセージはテキストとして格納されます。 バイナリ データを格納する場合は、キューにメッセージを配置する前に、Base64 エンコードおよびデコード関数を設定します。

クライアント オブジェクトの作成時に Base64 エンコードおよびデコード関数を構成します。

# Setup Base64 encoding and decoding functions
base64_queue_client = QueueClient.from_connection_string(
                            conn_str=connect_str, queue_name=q_name,
                            message_encode_policy = BinaryBase64EncodePolicy(),
                            message_decode_policy = BinaryBase64DecodePolicy()
                        )

キューにメッセージを挿入する

メッセージをキューに挿入するには、 send_message メソッドを使用します。

message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)

メッセージを覗く

peek_messages メソッドを呼び出せば、メッセージをキューから削除せずに覗くことができます。 既定では、このメソッドは 1 つのメッセージを対象としてピークします。

# Peek at the first message
messages = queue_client.peek_messages()

for peeked_message in messages:
    print("Peeked message: " + peeked_message.content)

キューに入ったメッセージの内容を変更する

キュー内のメッセージの内容をその場で変更できます。 メッセージがタスクを表している場合は、この機能を使用してタスクの状態を更新できます。

次のコードでは、update_message メソッドを使用してメッセージを更新します。 表示のタイムアウトは 0 に設定されています。これは、メッセージが即時に表示され、コンテンツが更新されることを示します。

messages = queue_client.receive_messages()
list_result = next(messages)

message = queue_client.update_message(
        list_result.id, list_result.pop_receipt,
        visibility_timeout=0, content=u'Hello World Again')

print("Updated message to: " + message.content)

キューの長さを取得する

キュー内のメッセージ数の見積もりを取得できます。

get_queue_properties メソッドは、approximate_message_countを含むキューのプロパティを返します。

properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

サービスが要求に応答した後にメッセージが追加または削除される可能性があるため、取得される結果はおおよその数を示しているだけです。

メッセージをキューから取り出す

2 つの手順でキューからメッセージを削除します。 コードでメッセージの処理に失敗した場合、この 2 段階のプロセスにより、同じメッセージを取得してもう一度やり直すことができます。 メッセージが正常に処理された後、 delete_message を呼び出します。

receive_messagesを呼び出すと、既定でキューに次のメッセージが表示されます。 receive_messages から返されたメッセージは、このキューからメッセージを読み取る他のコードから参照できなくなります。 既定では、このメッセージは 30 秒間非表示のままです。 また、キューからのメッセージの削除を完了するには、delete_message を呼び出す必要があります。

messages = queue_client.receive_messages()

for message in messages:
    print("Dequeueing message: " + message.content)
    queue_client.delete_message(message.id, message.pop_receipt)

キューからのメッセージの取得をカスタマイズする方法は 2 つあります。 1 つ目の方法では、(最大 32 個の) メッセージのバッチを取得できます。 2 つ目の方法では、コードで各メッセージを完全に処理できるように、非表示タイムアウトの設定を長くまたは短くすることができます。

次のコード例では、 receive_messages メソッドを使用してメッセージをバッチで取得します。 次に、入れ子になった for ループを使用して、各バッチ内の各メッセージを処理します。 また、各メッセージの非表示タイムアウトを 5 分に設定します。

messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)

for msg_batch in messages.by_page():
   for msg in msg_batch:
      print("Batch dequeue message: " + msg.content)
      queue_client.delete_message(msg)

キューを削除する

キューおよびキューに含まれているすべてのメッセージを削除するには、delete_queue メソッドを呼び出します。

print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()

ヒント

Microsoft Azure ストレージ エクスプローラーを試す

Microsoft Azure ストレージ エクスプローラーは、Windows、macOS、Linux で Azure Storage のデータを視覚的に操作できる Microsoft 製の無料のスタンドアロン アプリです。

次のステップ

Queue Storage の基本を学習したので、次のリンクをクリックして詳細を確認してください。

非推奨の Python バージョン 2 SDK を使用する関連コード サンプルについては、 Python バージョン 2 を使用したコード サンプルを参照してください。