Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
O Link do Synapse para Cosmos DB não tem mais suporte para novos projetos. Não use esse recurso.
Use o Espelhamento do Azure Cosmos DB para Microsoft Fabric, que agora está em Disponibilidade Geral. O espelhamento fornece os mesmos benefícios de ETL zero e é totalmente integrado ao Microsoft Fabric. Saiba mais na Visão Geral do Espelhamento do Cosmos DB.
Este artigo é relevante para arquivos e contêineres Parquet no Link do Azure Synapse para Azure Cosmos DB. Você pode usar o Spark ou a linguagem SQL para a leitura ou a transformação de dados por meio de esquemas complexos, tal como as matrizes ou estruturas aninhadas. O exemplo seguinte é concluído com um único documento, mas pode ser facilmente dimensionado a bilhões de documentos por meio do Spark ou SQL. O código incluso neste artigo usa PySpark (Python).
Caso de uso
Os tipos de dados complexos estão cada vez mais comuns e representam um desafio para os engenheiros de dados. Analisando o esquema de Modelo de Dados de Entidade aninhado e as matrizes pode implicar em consultas SQL e complexas que exigem tempo. Além disso, pode ser difícil renomear ou converter o tipo de dados das colunas aninhadas. Além disso, quando você estiver trabalhando com os objetos profundamente aninhados, poderá encontrar problemas de desempenho.
Os engenheiros de dados precisam reconhecer o processo de eficiência dos tipos de dados complexos e torná-los facilmente acessíveis a todos. No exemplo a seguir, você usa o Spark no Azure Synapse Analytics para leitura e transformação dos objetos em uma estrutura plana por meio de quadros de dados. Você usa o modulo sem servidor do SQL no Azure Synapse Analytics para consultar esses objetos diretamente e retornar a esse conjunto de resultados como um guia normal.
O que são matrizes e estruturas aninhadas?
O objeto SQL Server a seguir vem de Visual Studio Online Application insights. Nesse objeto, existem estruturas aninhadas e matrizes que contêm estruturas aninhadas.
{
"id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
"context": {
"data": {
"eventTime": "2020-06-10T13:43:34.553Z",
"samplingRate": "100.0",
"isSynthetic": "false"
},
"session": {
"isFirst": "false",
"id": "38619c14-7a23-4687-8268-95862c5326b1"
},
"custom": {
"dimensions": [
{
"customerInfo": {
"ProfileType": "ExpertUser",
"RoomName": "",
"CustomerName": "diamond",
"UserName": "XXXX@yahoo.com"
}
},
{
"customerInfo": {
"ProfileType": "Novice",
"RoomName": "",
"CustomerName": "topaz",
"UserName": "XXXX@outlook.com"
}
}
]
}
}
}
O exemplo de esquema de matrizes e estruturas aninhadas
Quando você estiver imprimindo o esquema do quadro de dados do objeto (chamado de DF) com o comando df.printschema, você presenciará da seguinte representação:
- O amarelo representa as estruturas aninhadas.
- O verde representa uma matriz com dois elementos.
_rid, _ts e _etag foram acrescentados ao sistema, visto que o documento foi ingerido no armazenamento transacional no Azure Cosmos DB.
O quadro de dados anterior conta simplesmente na quantidade de 5 colunas e 1 linha. Após a transformação, o quadro de dados coletado terá 13 colunas e 2 linhas, em um formato tabelar.
Nivelar as estruturas aninhadas e detalhar as matrizes
Com o Spark no Azure Synapse Analytics, é fácil transformar as estruturas aninhadas em colunas e os elementos de matriz em múltiplas linhas. Utilize as etapas a seguir para a implementação.
Definir uma função para nivelar o esquema aninhado
Você poderá usar essa função sem alteração. Criar uma célula em um notebook da PySpark com a seguinte função:
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Definir a função para nivelar o esquema aninhado
Nesta etapa, você nivela o esquema aninhado do quadro de dados (DF) em um novo quadro de dados ( df_flat ):
from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))
A função de linguagem desenvolvida deve tabelar 10 colunas e 1 linha. A matriz e os elementos aninhados estão lá.
Transformar as matrizes
Aqui, você transforma a matriz,context_custom_dimensions, no quadro de dados df_flatnum novo quadro de dados df_flat_explode. No código seguinte, você também define qual coluna selecionar:
from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))
A função de linguagem desenvolvida deve tabelar 10 colunas e 2 linhas. A próxima etapa é nivelar os esquemas aninhados com a função definida na etapa 1.
Definir a função para nivelar o esquema aninhado
Por fim, você utiliza a função para nivelar o quadro de dados do esquema aninhado df_flat_explode em um novo quadro de dados, df_flat_explode_flat :
df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))
A função de linguagem exibida deve tabelar 13 colunas e 2 linhas.
A funçãoprintSchemado quadro de dadosdf_flat_explode_flat retorna o seguinte resultado:
Ler matrizes e estruturas aninhadas diretamente
Com o módulo sem servidor do SQL, você pode consultar e criar exibições e tabelas em tais objetos.
Primeiro, dependendo de como os dados foram armazenados, os usuários devem usar a taxonomia seguinte. Tudo o que é mostrado em letras maiúsculas é específico ao caso de uso:
| Em massa | Formatar |
|---|---|
| https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet | ' Parquet '(ADLSg2) |
| N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/;account=ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' | ' CosmosDB ' (Azure Synapse) |
Substitua cada campo como a seguir:
- 'YOUR BULK ABOVE' é a cadeia de conexão da fonte de dados à qual você se conecta.
- 'YOUR TYPE ABOVE' é o formato que você usa para se conectar à fonte de dados.
select *
FROM
openrowset(
BULK 'YOUR BULK ABOVE',
FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
contextdataeventTime varchar(50) '$.context.data.eventTime',
contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
contextsessionisFirst varchar(50) '$.context.session.isFirst',
contextsessionid varchar(50) '$.context.session.id',
contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q
cross apply openjson (contextcustomdimensions)
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
RoomName varchar(50) '$.customerInfo.RoomName',
CustomerName varchar(50) '$.customerInfo.CustomerName',
UserName varchar(50) '$.customerInfo.UserName'
)
Há dois tipos diferentes de operações:
O primeiro tipo de operação é indicado na linha de código seguinte,que define a coluna chamada
contextdataeventTimeque se refere ao elemento aninhado,Context.Data.eventTime.contextdataeventTime varchar(50) '$.context.data.eventTime'Esta linha define a coluna chamada
contextdataeventTimeque se refere ao elemento aninhado,Context>Data>eventTime.O segundo tipo de operação usa
cross applypara criar novas linhas para cada elemento sob a matriz. Em seguida, ele define cada objeto aninhado.cross apply openjson (contextcustomdimensions) with ( ProfileType varchar(50) '$.customerInfo.ProfileType',Se a matriz tivesse 5 elementos com 4 estruturas aninhadas, o modelo sem servidor do SQL tabelará 5 linhas e 4 colunas. O modelo do SQL sem servidor pode ser consultado no local, mapear a matriz em 2 linhas e exibir todas as estruturas aninhadas em colunas.