共用方式為


使用雙重寫入 Proxy 和 Apache Spark,將資料從 Apache Cassandra 即時移轉至 Azure Cosmos DB for Apache Cassandra

Azure Cosmos DB 中適用於 Cassandra 的 API 是針對在 Apache Cassandra 上執行的企業工作負載的絕佳選擇,原因有各種原因:

  • 沒有管理和監控的額外負擔: 它可以消除在操作系統、Java 虛擬機及 yaml 檔案中,管理和監控無數設定及其互動的額外負擔。
  • 節省大量成本: 您可以使用 Azure Cosmos DB 來節省成本,其中包括虛擬機、頻寬和任何適用的授權成本。 您不需要管理資料中心、伺服器、SSD 記憶體、網路和電力成本。
  • 可使用現有的程式碼和工具:Azure Cosmos DB 提供與現有 Cassandra SDK 和工具相容的有線通訊協定層級。 此相容性可確保您可以透過 Azure Cosmos DB for Apache Cassandra 使用現有程式碼基底執行瑣碎的變更。

Azure Cosmos DB 不支援原生 Apache Cassandra 八卦通訊協議進行複寫。 如果零停機時間是移轉的必要條件,則需要不同的方法。 本教學課程描述如何使用雙重寫入 ProxyApache Spark,將資料從原生 Apache Cassandra 叢集即時移轉至 Azure Cosmos DB for Apache Cassandra。

下方為該模式的圖解。 雙寫入代理可用來擷取即時變更。 歷程記錄數據會使用 Apache Spark 大量複製。 Proxy 可以接受來自應用程式程式碼的連線,而不需要變更任何設定。 它會將所有的要求路由傳送到您的來源資料庫,並在進行大量複製時,異步地將寫入路由到 Cassandra 的 API。

動畫顯示將數據即時移轉至適用於 Apache Cassandra 的 Azure 受控執行個體。

必要條件

重要

如果您需要在移轉期間保留 Apache Cassandrawritetime,則在建立資料表時必須設定下列旗標:

with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true

例如:

CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
 name text,
 userID int,
 address text,
 phone int,
 PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;

佈建 Spark 叢集

建議您使用 Azure Databricks。 使用支援 Spark 3.0 或更高版本的執行階段。

重要

您必須確保您的 Azure Databricks 帳戶與來源 Apache Cassandra 叢集具有網路連線能力。 此設定可能需要虛擬網路插入。 如需詳細資訊,請參閱 在 Azure 虛擬網路中部署 Azure Databricks。

顯示尋找 Azure Databricks 執行時間版本的螢幕快照。

新增 Spark 相依性

將 Apache Spark Cassandra 連接器程式庫新增至您的叢集,以連線至原生和 Azure Cosmos DB Cassandra 端點。 在您的叢集中,選取 [程式庫] > [安裝新的] > [Maven],然後在 Maven 座標中新增 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0

重要

如果您需要在移轉期間針對每個資料列保留 Apache Cassandra writetime,我們建議使用此範例。 此範例中的相依性 JAR 也包含 Spark 連接器,因此您應該安裝此版本,而不是先前所述的連接器元件。

如果您想要在記錄資料載入完成後在來源與目標之間執行資料列比較驗證,此範例也能幫上忙。 如需詳細資訊,請參閱 執行歷程記錄數據載入驗證來源和目標

顯示搜尋 Azure Databricks 中 Maven 套件的螢幕快照。

選取 [安裝],然後在安裝完成時重新啟動叢集。

注意

安裝 Cassandra 連接器程式庫之後,請務必重新啟動 Azure Databricks 叢集。

安裝雙重寫入 Proxy

為了在雙重寫入期間獲得最佳效能,建議您在來源 Cassandra 叢集中的所有節點上安裝 Proxy。

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

啟動雙重寫入 Proxy

建議您在來源 Cassandra 叢集中的所有節點上安裝 Proxy。 至少要執行下列命令,以在每個節點上啟動 Proxy。 將 <target-server> 取代為目標叢集中其中一個節點的 IP 或伺服器位址。 以本機 .jks 檔案的路徑取代 <path to JKS file>,並以對應的密碼取代 <keystore password>

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

以這種方式啟動 Proxy 便代表假設符合了下列條件:

  • 來源和目標端點具有相同的使用者名稱和密碼。
  • 來源和目標端點使用的是安全通訊端層 (SSL)。

如果您的來源和目標端點無法符合這些條件,請繼續閱讀以取得進一步的設定選項。

設定 SSL

針對 SSL,您可以實作現有的金鑰存放區,例如來源叢集所使用的 keytool金鑰存放區,或使用 建立自我簽署憑證:

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

如果來源或目標端點未使用 SSL,您也可以停用 SSL。 使用 --disable-source-tls--disable-target-tls 旗標:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> \
  --target-password <password> --disable-source-tls true  --disable-target-tls true 

注意

當您透過 Proxy 建置與資料庫的 SSL 連線時,請確定用戶端應用程式使用與雙寫入 Proxy 所使用的金鑰存放區和密碼相同。

設定認證和連接埠

根據預設,用戶端應用程式會傳遞來源認證。 Proxy 會使用認證來連線到來源和目標叢集。 如先前所述,此程式會假設來源的認證和目標的認證相同。 啟動 Proxy 時,您必須為 Cassandra 端點的目標 API 指定不同的使用者名稱和密碼:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

未指定時的預設來源和目標埠為9042。 在此情況下,適用於 Cassandra 的 API 會在埠 10350上執行。 使用 --source-port--target-port 指定埠號碼:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

遠端部署 Proxy

在某些情況下,您可能不想在叢集節點上安裝 Proxy。 您可能會想要將它安裝在個別的電腦上。 在該情境中,指定<source-server>的IP位址:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

警告

在個別計算機上安裝和執行 proxy,而不是在源 Apache Cassandra 叢集的每個節點上這樣做,會影響即時移轉時的效能。 雖然此設定運作正常,但用戶端驅動程式無法開啟與叢集內所有節點的連線。 用戶端依賴安裝 Proxy 的單一協調器節點來進行連線。

毫不允許應用程式程式碼變更

根據預設,Proxy 會在連接埠 29042 上接聽。 將應用程式程式代碼變更為指向此埠。 您可以改為變更 Proxy 接聽的埠。 如果您要藉由下列方式排除應用程式程式代碼變更,您可以進行這項變更:

  • 讓來源 Cassandra 伺服器在不同的連接埠上執行。
  • 讓 Proxy 在標準 Cassandra 連接埠 9042 上執行。
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

注意

在叢集節點上安裝 Proxy 不需要重新啟動節點。 如果您有許多應用程式用戶端,並偏好在標準 Cassandra 連接埠 9042 上執行 Proxy,以消除應用層級程式代碼變更,請變更 Apache Cassandra 預設埠。 接著,您必須重新啟動叢集中的節點,並將來源埠設定為您為來源 Cassandra 叢集定義的新埠。

在下列範例中,我們將來源 Cassandra 叢集變更為在連接埠 3074 上執行,然後在連接埠 9042 上啟動叢集:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
 --proxy-port 9042 --source-port 3074

強制通訊協定

Proxy 具有強制通訊協定的功能,在來源端點比目標更先進或不受支援時將可能需要使用功能。 在這種情況下,您可以指定 --protocol-version--cql-version 來強制通訊協定與目標相符:

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
  --protocol-version 4 --cql-version 3.11

執行雙寫入 Proxy 之後,您必須變更應用程式用戶端上的埠,然後重新啟動。 或者,如果您選擇該方法,請變更 Cassandra 連接埠並重新啟動叢集。 Proxy 會開始將寫入轉送至目標端點。 如需詳細資訊,請參閱 監視和計量

執行歷程資料載入

若要載入該資料,請在您的 Azure Databricks 帳戶中建立 Scala 筆記本。 將您的來源和目標 Cassandra 設定取代為對應的認證,以及來源和目標 keyspace 和資料表。 在下列範例中,視需要為每個資料表新增更多變數,然後執行。 在應用程式開始將要求傳送到雙重寫入 Proxy 之後,您即能開始轉移歷程記錄資料。

重要

在移轉數據之前,請將容器輸送量提高到應用程式快速移轉所需的數量。 開始移轉前調整輸送量可協助您在較少的時間內移轉數據。 為了協助防止歷程記錄數據載入期間的速率限制,您可以在適用於 Cassandra 的 API 中啟用伺服器端重試 (SSR)。 如需如何啟用 SSR 和詳細資訊的指示,請參閱 針對 Apache Cassandra 作業防止 Azure Cosmos DB 的速率限制錯誤

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

注意

在上述 Scala 範例中,您會注意到 timestamp 在讀取源數據表中的所有數據之前,已設定為目前的時間。 然後,writetime 會設定為追溯的時間戳記。 此方法確保從歷史數據載入到目標端點的記錄,不會覆寫來自雙重寫入代理的更新,當這些更新具有較新的時間戳且正在讀取歷史數據時。

重要

如果您基於任何原因而需要保留「確切」的時間戳記,您應該採用歷程資料移轉方法來保留時間戳記,例如此範例。 範例中的相依性 JAR 也包含 Spark 連接器,因此您不需要安裝先前必要條件中所述的 Spark 連接器元件。 在您的Spark叢集中安裝這兩者會造成衝突。

驗證來源和目標

在歷程資料載入完成後,您的資料庫應該已同步完成並準備好進行完全移轉。 我們建議您在最終切換之前,先驗證來源和目標,以確保它們匹配。

注意

如果您使用先前所述的 Cassandra 移轉程式範例來保留 writetime,此範例會包含藉由根據特定容錯比較來源和目標中的數據驗證移轉的功能。

後續步驟