你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure Synapse Link for Azure Cosmos DB 使用户能够对 Azure Cosmos DB 中的操作数据运行准实时分析。 但是,有时需要聚合和丰富某些数据,以便为数据仓库用户提供服务。 只需通过笔记本中的几个单元格即可策展和导出 Azure Synapse Link 数据。
重要
现在可用与 Microsoft Fabric 的同步功能。 将数据镜像到 Fabric 提供 Azure Synapse Link 的所有功能,拥有更佳的分析性能,能够在 Fabric 中使用 OneLake 统一您的数据资产,并能以 Delta Parquet 格式开放数据访问权限。 使用 Fabric 镜像,而不是 Azure Synapse Link。
通过镜像传输到 Microsoft Fabric,您可以持续地将现有数据环境复制到 Fabric 中的 OneLake,其中包括来自 Cosmos DB、SQL Server 2016+、Azure SQL 数据库、Azure SQL 托管实例、Oracle、Snowflake 等的数据。
有关详细信息,请参阅 Microsoft Fabric 镜像数据库。
先决条件
- 为 Synapse 工作区预配以下内容:
- 为 Azure Cosmos DB 帐户预配包含数据的 HTAP 容器
- 将 Azure Cosmos DB HTAP 容器连接到工作区
- 使用正确的设置将数据从 Spark 导入专用 SQL 池
步骤
在本教程中,你将连接到分析存储,因此不会对事务存储产生任何影响(它不会消耗任何请求单位)。 我们将执行以下步骤:
- 将 Azure Cosmos DB HTAP 容器读入 Spark 数据帧
- 在新数据帧中聚合结果
- 将数据引入专用 SQL 池中
数据
在该示例中,我们使用名为“RetailSales”的 HTAP 容器。 它属于名为“ConnectedData”的链接服务,具有以下架构:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity:double(nullable = true)
- productCode:字符串(nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id:string(nullable = true)
- advertising: long (nullable = true)
- storeId:long(nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
为了进行报告,我们将按“productCode”和“weekStarting”聚合销售额(数量、收入 [价格 x 数量])。 最后,我们会将该数据导入名为 dbo.productsales 的专用 SQL 池表。
配置 Spark 笔记本
创建一个 Spark 笔记本,以 Scala as Spark (Scala) 作为主要语言。 使用笔记本的默认会话设置。
读取 Spark 中的数据
在第一个单元格中,使用 Spark 将 Azure Cosmos DB HTAP 容器读取到数据帧中。
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
在新数据帧中聚合结果
在第二个单元格中,在将新数据帧加载到专用 SQL 池数据库之前,先运行它所需的转换和聚合。
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
将结果加载到专用 SQL 池中
在第三个单元格中,将数据加载到专用 SQL 池中。 它将自动创建一个临时外部表、外部数据源和外部文件格式,作业完成后会删除这些内容。
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
通过 SQL 查询结果
可以使用简单的 SQL 查询(例如以下 SQL 脚本)查询结果:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
