重要
這項功能處於公開預覽狀態。
本文說明如何讀取和寫入 XML 檔案。
可延伸標記語言 (XML) 是一種標記語言,用於格式化、儲存及共用文字格式的數據。 它會定義一組規則,以串行化從檔到任意數據結構的數據。
原生 XML 檔案格式支援可讓您擷取、查詢和剖析 XML 數據,以進行批處理或串流。 它可以自動推斷和演進架構和數據類型、支援 SQL 運算式,例如 from_xml,而且可以產生 XML 檔。 它不需要外部資料庫,且可順暢地與自動載入器、read_files 和COPY INTO搭配運作。 您可以選擇性地針對 XML 架構定義 (XSD) 驗證每個資料列層級 XML 記錄。
需求
Databricks Runtime 14.3 和更新版本
剖析 XML 記錄
XML 規格規定格式正確的結構。 不過,此規格不會立即對應至表格格式。 您必須指定 rowTag 選項,以指出對應至 DataFrameRow的 XML 專案。 元素 rowTag 會變成最上層 struct。 的子項目 rowTag 會成為最上層 struct的欄位。
您可以指定此記錄的架構,或讓它自動推斷。 因為剖析器只會檢查 rowTag 元素,因此會篩選掉 DTD 和外部實體。
下列範例說明使用不同 rowTag 選項剖析 XML 檔案的架構推斷和剖析:
Python
xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)
程式語言 Scala
val xmlString = """
<books>
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
</book>
<book id="bk104">
<author>Corets, Eva</author>
<title>Oberon's Legacy</title>
</book>
</books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)
以 「books」 選項讀取 XML 檔案 rowTag :
Python
df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)
程式語言 Scala
val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)
輸出:
root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)
+------------------------------------------------------------------------------+
|book |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+
以 「book」 讀取 XML rowTag 檔案:
Python
df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:
程式語言 Scala
val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:
輸出:
root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)
+-----+-----------+---------------+
|_id |author |title |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+
數據源選項
XML 的數據來源選項可以指定下列方式:
-
.option/.options下列方法:- DataFrameReader
- DataFrameWriter
- DataStreamReader
- DataStreamWriter
- 下列內建函式:
-
OPTIONS的 CREATE TABLE 子句
如需選項清單,請參閱 自動載入器選項。
XSD 支援
您可以選擇性地驗證 XML 架構定義 (XSD) 的每個資料列層級 XML 記錄。 在選項中 rowValidationXSDPath 指定 XSD 檔案。 XSD 不會以其他方式影響已提供或推斷的架構。 驗證失敗的記錄會標示為「損毀」,並根據選項區段中所述的損毀記錄處理模式選項進行處理。
您可以使用 XSDToSchema 從 XSD 檔案擷取 Spark DataFrame 架構。 它只支持簡單、複雜和循序類型,而且只支援基本的 XSD 功能。
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path
val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="book">
<xs:complexType>
<xs:sequence>
<xs:element name="author" type="xs:string" />
<xs:element name="title" type="xs:string" />
<xs:element name="genre" type="xs:string" />
<xs:element name="price" type="xs:decimal" />
<xs:element name="publish_date" type="xs:date" />
<xs:element name="description" type="xs:string" />
</xs:sequence>
<xs:attribute name="id" type="xs:string" use="required" />
</xs:complexType>
</xs:element>
</xs:schema>"""
dbutils.fs.put(xsdPath, xsdString, true)
val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))
下表顯示 XSD 資料類型轉換成 Spark 資料類型:
| XSD 數據類型 | Spark 資料類型 |
|---|---|
boolean |
BooleanType |
decimal |
DecimalType |
unsignedLong |
DecimalType(38, 0) |
double |
DoubleType |
float |
FloatType |
byte |
ByteType |
short、unsignedByte |
ShortType |
integer、、negativeIntegernonNegativeInteger、nonPositiveInteger、、positiveInteger、unsignedShort |
IntegerType |
long、unsignedInt |
LongType |
date |
DateType |
dateTime |
TimestampType |
Others |
StringType |
剖析巢狀 XML
現有 DataFrame 中字串值資料行中的 XML 數據可以剖析, schema_of_xml 並以 from_xml 新 struct 數據行傳回架構和剖析的結果。 做為自變數傳遞至 schema_of_xml 的 XML 數據,而且 from_xml 必須是單一格式正確的 XML 記錄。
XML 結構模式
語法
schema_of_xml(xmlStr [, options] )
引數
-
xmlStr:指定單一格式正確的 XML 記錄的 STRING 運算式。 -
options:指定指示詞的選擇性常MAP<STRING,STRING>值。
傳回
STRING,包含具有 n 個字串字段的結構定義,其中數據行名稱衍生自 XML 元素和屬性名稱。 域值會保存衍生的格式化 SQL 類型。
從 XML 匯入
語法
from_xml(xmlStr, schema [, options])
引數
-
xmlStr:指定單一格式正確的 XML 記錄的 STRING 運算式。 -
schema:函式的schema_of_xmlSTRING 運算式或調用。 -
options:指定指示詞的選擇性常MAP<STRING,STRING>值。
傳回
結構體,其字段名稱和類型符合架構定義。 架構必須定義為逗號分隔的資料列名稱和資料類型群組,例如 CREATE TABLE 資料來源選項中顯示的 大部分選項 都適用下列例外狀況:
-
rowTag:因為只有一個 XML 記錄,因此rowTag選項不適用。 -
mode(預設值: ):PERMISSIVE允許在剖析期間處理損毀記錄的模式。-
PERMISSIVE:當它符合損毀的記錄時,會將格式錯誤的字串放入 所設定columnNameOfCorruptRecord的欄位,並將格式不正確的欄位設定為null。 若要保留損毀的記錄,您可以在使用者定義的架構中設定名為columnNameOfCorruptRecord的字串類型字段。 如果結構沒有該欄位,則在解析過程中會忽略損壞的記錄。 推斷架構時,它會隱含地在輸出架構中新增columnNameOfCorruptRecord字段。 -
FAILFAST:遇到損毀的記錄時擲回例外狀況。
-
結構轉換
由於 DataFrame 和 XML 之間的結構差異,因此有一些從 XML 數據到 DataFrameDataFrame XML 數據的轉換規則。 請注意,使用 選項 excludeAttribute可以停用處理屬性。
從 XML 轉換成 DataFrame
屬性:屬性會轉換成標題前置詞attributePrefix為 的欄位。
<one myOneAttrib="AAAA">
<two>two</two>
<three>three</three>
</one>
會產生下列架構:
root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)
包含 attribute(s) 或子元素的專案中的字元數據: 這些數據會剖析成 valueTag 欄位。 如果有多個字元數據出現,欄位 valueTag 會 array 轉換成類型。
<one>
<two myTwoAttrib="BBBBB">two</two>
some value between elements
<three>three</three>
some other value between elements
</one>
會產生下列架構:
root
|-- _VALUE: array (nullable = true)
| |-- element: string (containsNull = true)
|-- two: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)
從 DataFrame 轉換成 XML
元素做為陣列中的陣列:撰寫 XML 檔案 DataFrame 時,將具有具有其元素的欄位 ArrayType ,如同 ArrayType 為專案加上額外的巢狀字段。 這不會發生在讀取和寫入 XML 數據,而是從其他來源寫入 DataFrame 讀取時發生。 因此,讀取和寫入 XML 檔案的往返都有相同的結構,但從其他來源寫入 DataFrame 讀取可能會有不同的結構。
具有下列架構的 DataFrame:
|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
以及下列資料:
+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+
會產生下列 XML 檔案:
<a>
<item>aa</item>
</a>
<a>
<item>bb</item>
</a>
中 DataFrame 未命名陣列的元素名稱是由 選項 arrayElementName 指定 (預設值: item)。
已獲救的數據列
已獲救的數據行可確保您在 ETL 期間永遠不會遺失或錯過數據。 您可以啟用已獲救的數據行來擷取未剖析的任何數據,因為記錄中的一或多個字段有下列其中一個問題:
- 不存在提供的架構
- 不符合所提供架構的數據類型
- 與所提供架構中的功能變數名稱不符
已修復的資料行會以 JSON 文件的形式傳回,其中包含已修復的資料行,以及記錄的來源檔案路徑。 若要從已獲救的數據行中移除來源檔案路徑,您可以設定下列 SQL 組態:
Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
程式語言 Scala
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").
您可以在讀取資料時將選項 rescuedDataColumn 設定為資料行名稱(例如使用 _rescued_data 的 spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)),以啟用已修復的資料行。
剖析記錄時,XML 剖析器支援三種模式: PERMISSIVE、 DROPMALFORMED和 FAILFAST。 搭配使用 rescuedDataColumn 時,資料類型不符會導致記錄在 DROPMALFORMED 模式中捨棄或以 FAILFAST 模式擲回錯誤。 只有損毀的記錄(不完整或格式不正確的 XML)會卸除或擲回錯誤。
自動載入器中的架構推斷和演進
如需本主題和適用選項的詳細討論,請參閱 設定自動載入器中的架構推斷和演進。 您可以設定自動載入器來自動偵測已載入之 XML 數據的架構,讓您不需要明確宣告數據架構並隨著新數據行匯入而演進數據表架構,即可初始化數據表。 這樣就不需要在一段時間內手動追蹤和套用架構變更。
依預設,Auto Loader 架構推斷會主動避免因類型不匹配而導致的架構演化問題。 對於未編碼數據類型的格式(JSON、CSV 和 XML),自動載入器會將所有數據行推斷為字串,包括 XML 檔案中的巢狀字段。 Apache Spark DataFrameReader 會針對架構推斷使用不同的行為,根據範例數據選取 XML 來源中數據行的數據類型。 若要使用自動載入器開啟此行為,請將 選項 cloudFiles.inferColumnTypes 設定為 true。
自動加載器在處理您的資料時會偵測到新增新資料行的情況。 當自動載入器偵測到新的數據行時,數據流會以 UnknownFieldException停止。 在資料流出現此錯誤之前,自動載入器會先對最新的微批次資料執行架構推斷,並以最新的架構更新架構位置,將新欄位合併至架構結尾。 現有數據行的數據類型保持不變。 自動載入器支援架構演進的不同模式,您可以在 選項 cloudFiles.schemaEvolutionMode中設定。
您可以使用 架構提示 來強制執行所知道且預期於推斷架構上的架構資訊。 當您知道資料行是特定資料類型,或者如果您想要選擇更一般數據類型(例如雙精度浮點數而非整數),您可以使用 SQL 架構規格語法,為數據行數據類型提供任意數目的提示作為字元串。 啟用獲救的數據行時,在架構以外的案例中命名的欄位會載入至數據 _rescued_data 行。 您可以將 選項 readerCaseSensitive 設定為 false來變更此行為,在此情況下,自動載入器會以不區分大小寫的方式讀取數據。
範例
本節中的範例會使用可在 Apache Spark GitHub 存放庫中下載的 XML 檔案。
讀取和寫入 XML
Python
df = (spark.read
.format('xml')
.options(rowTag='book')
.load(xmlPath)) # books.xml
selected_data = df.select("author", "_id")
(selected_data.write
.options(rowTag='book', rootTag='books')
.xml('newbooks.xml'))
程式語言 Scala
val df = spark.read
.option("rowTag", "book")
.xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
R
df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
讀取資料時,您可以手動指定架構:
Python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
custom_schema = StructType([
StructField("_id", StringType(), True),
StructField("author", StringType(), True),
StructField("description", StringType(), True),
StructField("genre", StringType(), True),
StructField("price", DoubleType(), True),
StructField("publish_date", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)
selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')
程式語言 Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml
val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")
R
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")
SQL API
XML 資料來源可以推斷資料型態:
DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;
您也可以在 DDL 中指定資料行名稱和類型。 在此情況下,不會自動推斷架構。
DROP TABLE IF EXISTS books;
CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");
使用 COPY INTO 載入 XML
DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;
COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');
讀取具有數據列驗證的 XML
Python
df = (spark.read
.format("xml")
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.load(inputPath))
df.printSchema()
程式語言 Scala
val df = spark.read
.option("rowTag", "book")
.option("rowValidationXSDPath", xsdPath)
.xml(inputPath)
df.printSchema
剖析巢狀 XML (from_xml 和 schema_of_xml)
Python
from pyspark.sql.functions import from_xml, schema_of_xml, lit, col
xml_data = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>
"""
df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()
程式語言 Scala
import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}
val xmlData = """
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>""".stripMargin
val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()
使用 SQL API from_xml和schema_of_xml
SELECT from_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>',
schema_of_xml('
<book id="bk103">
<author>Corets, Eva</author>
<title>Maeve Ascendant</title>
<genre>Fantasy</genre>
<price>5.95</price>
<publish_date>2000-11-17</publish_date>
</book>')
);
使用自動載入器載入 XML
Python
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(availableNow=True)
.toTable("table_name")
)
程式語言 Scala
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "xml")
.option("rowTag", "book")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", schemaPath)
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load(inputPath)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", checkPointPath)
.trigger(Trigger.AvailableNow()
.toTable("table_name")
)