本教程演示了使用示例数据的常见 Delta 表作。 Delta Lake 是优化的存储层,可为 Databricks 上的表提供基础。 除非另有指定,否则 Databricks 上的所有表都是 Delta 表。
在您开始之前
若要完成本教程,需要:
- 使用现有计算资源或创建新的计算资源的权限。 请参阅 计算。
- Unity Catalog 权限:
USE CATALOG、USE SCHEMA和CREATE TABLE在workspace目录。 若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象。
这些示例依赖于名为 “合成人员记录:10K 到 10M 记录”的数据集。 此数据集包含人的虚构记录,包括其名字和姓氏、性别和年龄。
首先,下载本教程的数据集。
- 访问 Kaggle 上的合成人员记录:10K 到 10M 记录 页面。
- 单击“ 下载 ”,然后将 数据集下载为 zip。 这会下载一个名为
archive.zip的文件到您的本地计算机。 - 请从
archive文件中提取archive.zip文件夹。
接下来,将 person_10000.csv 数据集上传到 Azure Databricks 工作区中的 Unity 目录 卷 。 Azure Databricks 建议将数据上传到 Unity 目录卷,因为卷提供访问、存储、管理和组织文件的功能。
- 单击
打开目录资源管理器。边栏中的目录。
- 在目录资源管理器中,单击
添加数据并创建卷。 - 将卷
my-volume命名,然后选择托管卷 作为卷类型。 - 选择
workspace目录和default架构,然后单击“ 创建”。 - 打开
my-volume并单击“ 上传到此卷”。 - 在本地计算机的
person_10000.csv文件夹中,拖放或浏览以选择archive文件。 - 单击“上载” 。
最后,创建用于运行示例代码的笔记本。
- 单击边栏中的
“新建 ”。 - 单击
以创建笔记本的新笔记本。
- 为笔记本选择语言。
创建表
创建一个新的 Unity 目录托管表,该表命名 workspace.default.people_10k 自 person_10000.csv. Delta Lake 是 Azure Databricks 中所有表创建、读取和写入命令的默认值。
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala(编程语言)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
SQL
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
可通过多种不同的方法来创建或克隆表。 有关详细信息,请参阅 CREATE TABLE。
在 Databricks Runtime 13.3 LTS 及更高版本中,可用于 CREATE TABLE LIKE 创建一个新的空 Delta 表,该表复制源 Delta 表的架构和表属性。 将表从开发环境提升到生产环境时,这非常有用。
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
重要
此功能目前以公共预览版提供。
使用用于 DeltaTableBuilder 和 Scala 的 API 创建空表。
DataFrameWriter与和DataFrameWriterV2相比,DeltaTableBuilderAPI 可以更轻松地指定其他信息,如列注释、表属性和生成的列。
Python
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
Scala(编程语言)
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
在表中更新插入
使用名为upsert的操作来修改表中的现有记录或添加新记录。 若要将一组更新和插入合并到现有 Delta 表中,请使用 DeltaTable.mergePython 和 ScalaMERGE INTO 中的方法和 SQL 中的语句。
例如,将数据从源表 people_10k_updates 合并到目标 Delta 表 workspace.default.people_10k。 如果两个表中有一个匹配行,Delta Lake 会使用给定的表达式更新数据列。 如果没有匹配行,Delta Lake 会添加一个新行。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
Scala(编程语言)
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
SQL
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
在 SQL 中 * ,运算符更新或插入目标表中的所有列,假定源表的列与目标表具有相同的列。 如果目标表没有相同的列,查询将引发分析错误。 此外,在执行插入作时,必须为表中的每一列指定一个值。 列值可以是空的, ''例如。 执行插入作时,无需更新所有值。
读取表
使用表名或路径访问 Delta 表中的数据。 若要访问 Unity Catalog 管理的表,请使用完全限定的表名称。 卷和外部表仅支持基于路径的访问,而不支持托管表。 有关详细信息,请参阅 Unity 目录卷中的路径规则和访问权限。
Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
Scala(编程语言)
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SQL
SELECT * FROM workspace.default.people_10k;
写入到表
Delta Lake 使用标准语法将数据写入表。 若要向现有 Delta 表添加新数据,请使用追加模式。 与增改插入不同,写入表不会检查重复记录。
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
Scala(编程语言)
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
SQL
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Databricks 笔记本单元输出最多显示 10,000 行或 2 MB,以较低者为准。 由于 workspace.default.people_10k 包含超过 10,000 行,因此在笔记本输出 display(df)中只显示前 10,000 行。 表中存在其他行,但由于此限制,不会在笔记本输出中呈现。 可以通过专门筛选额外的行来查看它们。
若要替换表中的所有数据,请使用覆盖模式。
Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
Scala(编程语言)
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
SQL
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
更新表
基于谓词更新 Delta 表中的数据。 例如,将 gender 列中的值从 Female 更改为 F,从 Male 更改为 M,从 Other 更改为 O。
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala(编程语言)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
从表中删除
从 Delta 表中删除与谓词匹配的数据。 例如,下面的代码演示了两个删除作:首先删除年龄小于 18 的行,然后删除年龄小于 21 的行。
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
Scala(编程语言)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
SQL
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
重要
删除会从最新版本的 Delta 表中删除数据,但在显式清空旧版本之前,不会将其从物理存储中删除。 有关详细信息,请参阅 真空。
显示表历史记录
使用 DeltaTable.history 方法在 Python 和 Scala 中以及 SQL 中的 DESCRIBE HISTORY 语句来查看每次写入表的来源信息。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
Scala(编程语言)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
SQL
DESCRIBE HISTORY workspace.default.people_10k
通过时间旅行功能来查询表的早期版本
使用 Delta Lake 的时间旅行功能查询 Delta 表的较旧快照。 若要查询特定版本,请使用表的版本号或时间戳。 例如,表历史记录中的查询版本 0 或时间戳 2026-01-05T23:09:47.000+00:00 。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
Scala(编程语言)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
SQL
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
对于时间戳,仅接受日期或时间戳字符串。 例如,字符串的格式必须为 "2026-01-05T22:43:15.000+00:00" 或 "2026-01-05 22:43:15"。
使用 DataFrameReader 选项从固定到某个特定版本或时间戳的 Delta 表中创建一个数据帧。
Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
Scala(编程语言)
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
SQL
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
有关详细信息,请参阅 表历史记录的处理。
优化表
对表的多个更改可以创建多个小型文件,这会降低读取查询性能。 使用优化操作将小文件合并为较大的文件来提高速度。 请参阅 OPTIMIZE。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
Scala(编程语言)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE workspace.default.people_10k
注意
如果启用了预测优化,则无需手动优化。 预测优化会自动管理维护任务。 有关详细信息,请参阅 Unity 目录托管表的预测优化。
按列进行 Z 排序
按 Z 顺序排列数据并进一步提高读取性能,请在操作中指定需要排序的列。 例如,按高基数列 firstName 进行排列。 有关 z 排序的详细信息,请参阅 “数据跳过”。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
Scala(编程语言)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
SQL
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
通过真空操作删除快照
Delta Lake 具有读取快照隔离,这意味着在其他用户或作业查询表的同时可以安全地运行优化操作。 但是,最终应该清理旧快照,因为这样做会降低存储成本,提高查询性能,并确保数据符合性。 运行VACUUM操作来清理旧快照。 请参阅 VACUUM。
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
Scala(编程语言)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
SQL
VACUUM workspace.default.people_10k
有关如何有效使用真空操作的详细信息,请参阅 使用清理操作删除未使用的数据文件。