Microsoft Fabric 数据库中的每个 Cosmos DB 都以开源 Delta Lake 格式镜像到 OneLake 中。 此功能不需要任何额外的配置或设置,并在创建数据库时自动启用。 这种紧密集成消除了 ETL(提取、转换、加载)管道的需求,并确保 Cosmos DB 数据始终准备好分析。
此自动镜像支持的场景包括但不限于:
- 使用 Transact SQL (T-SQL) 查询语言的即席查询
- 与 Apache Spark 集成
- 使用笔记本对实时数据进行分析
- 数据科学和机器学习工作流
镜像状态
可以通过导航到 Fabric 门户中数据库的复制部分来检查复制状态。 本部分包括有关复制的元数据,包括上次同步的状态。
SQL 分析端点查询
使用 SQL 分析终结点,可以使用 T-SQL 直接在 Fabric 门户中查询镜像的 Cosmos DB 数据。 可以随时在 NoSQL 数据资源管理器和 T-SQL 分析终结点之间切换。
运行基本查询
使用标准 T-SQL 语法查询镜像数据。 以下示例演示了一个简单的聚合查询:
SELECT
categoryName,
COUNT(*) AS quantity
FROM
[<database-name>].[<database-name>].[<container-name>] -- Replace with your database and container name
GROUP BY
categoryName
使用高级查询分析示例数据
对于更复杂的分析方案,可以运行合并多个指标并使用高级 T-SQL 功能的查询。 以下示例使用内置示例数据集来计算产品 KPI,并查看各个类别的见解。 有关示例数据集的详细信息,请参阅 Microsoft Fabric 中的 Cosmos DB 中的示例数据集。
-- Product performance analysis by category
WITH SampleData AS (
SELECT *
FROM [<database-name>].[<database-name>].[<container-name>] -- Replace with your database and container name
),
TopProducts AS (
SELECT
categoryName,
name,
ROW_NUMBER() OVER (PARTITION BY categoryName ORDER BY currentPrice DESC) AS rn
FROM SampleData
WHERE docType = 'product'
)
SELECT
c.categoryName,
COUNT(DISTINCT CASE WHEN c.docType = 'product' THEN c.productId END) AS totalProducts,
ROUND(AVG(CASE WHEN c.docType = 'review' THEN CAST(c.stars AS FLOAT) END), 2) AS avgRating,
tp.name AS topProduct,
SUM(CASE WHEN c.docType = 'product' THEN c.currentPrice * c.inventory END) AS totalInventoryValue
FROM SampleData AS c
LEFT JOIN TopProducts AS tp ON c.categoryName = tp.categoryName AND tp.rn = 1
GROUP BY c.categoryName, tp.name
ORDER BY avgRating DESC;
在查询编辑器中观察查询的结果:
[
{
"categoryName": "Devices, E-readers",
"totalProducts": "10",
"avgRating": "4.38",
"topProduct": "eReader Lumina Edge X7",
"totalInventoryValue": "890338.94"
},
{
"categoryName": "Devices, Smartwatches",
"totalProducts": "10",
"avgRating": "4.37",
"topProduct": "PulseSync Pro S7",
"totalInventoryValue": "750008.86"
},
// Ommitted for brevity
]
使用 OPENJSON 查询嵌套 JSON 数组
使用该 OPENJSON 函数分析和查询文档中的嵌套 JSON 数组。 以下示例演示如何分析 priceHistory 数组,以识别价格上涨幅度最大的产品,帮助你跟踪定价趋势并优化定价策略。
-- Identify products with significant price increases
WITH PriceChanges AS (
SELECT
p.productId,
p.name,
p.categoryName,
p.currentPrice,
ph.priceDate,
ph.historicalPrice,
p.currentPrice - ph.historicalPrice AS priceIncrease,
ROUND(((p.currentPrice - ph.historicalPrice) / ph.historicalPrice) * 100, 1) AS percentIncrease
FROM [<database-name>].[<database-name>].[<container-name>] AS p -- Replace with your database and container name
CROSS APPLY OPENJSON(p.priceHistory) WITH (
priceDate datetime2,
historicalPrice float '$.price'
) AS ph
WHERE p.docType = 'product'
)
SELECT TOP 10
name,
categoryName,
currentPrice,
priceIncrease,
percentIncrease
FROM PriceChanges
WHERE priceIncrease > 0
ORDER BY percentIncrease DESC;
在查询编辑器中观察查询的结果:
[
{
"name": "Resonova Elite360 Wireless ANC Headphones",
"categoryName": "Accessories, Premium Headphones",
"currentPrice": "523.66",
"priceIncrease": "224.66",
"percentIncrease": "75.1"
},
{
"name": "AuraLux VX Pro Leather Case",
"categoryName": "Accessories, Luxury Cases",
"currentPrice": "129.96",
"priceIncrease": "50.89",
"percentIncrease": "64.4"
},
// Ommitted for brevity
]